diff options
author | xiubuzhe <xiubuzhe@sina.com> | 2023-10-08 20:59:00 +0800 |
---|---|---|
committer | xiubuzhe <xiubuzhe@sina.com> | 2023-10-08 20:59:00 +0800 |
commit | 1dac2263372df2b85db5d029a45721fa158a5c9d (patch) | |
tree | 0365f9c57df04178a726d7584ca6a6b955a7ce6a /lib/sqlalchemy/dialects/postgresql/pygresql.py | |
parent | b494be364bb39e1de128ada7dc576a729d99907e (diff) | |
download | sunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.tar.gz sunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.tar.bz2 sunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.zip |
first add files
Diffstat (limited to 'lib/sqlalchemy/dialects/postgresql/pygresql.py')
-rw-r--r-- | lib/sqlalchemy/dialects/postgresql/pygresql.py | 278 |
1 files changed, 278 insertions, 0 deletions
diff --git a/lib/sqlalchemy/dialects/postgresql/pygresql.py b/lib/sqlalchemy/dialects/postgresql/pygresql.py new file mode 100644 index 0000000..d273b8c --- /dev/null +++ b/lib/sqlalchemy/dialects/postgresql/pygresql.py @@ -0,0 +1,278 @@ +# postgresql/pygresql.py +# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +""" +.. dialect:: postgresql+pygresql + :name: pygresql + :dbapi: pgdb + :connectstring: postgresql+pygresql://user:password@host:port/dbname[?key=value&key=value...] + :url: https://www.pygresql.org/ + +.. note:: + + The pygresql dialect is **not tested as part of SQLAlchemy's continuous + integration** and may have unresolved issues. The recommended PostgreSQL + dialect is psycopg2. + +.. deprecated:: 1.4 The pygresql DBAPI is deprecated and will be removed + in a future version. Please use one of the supported DBAPIs to + connect to PostgreSQL. + +""" # noqa + +import decimal +import re + +from .base import _DECIMAL_TYPES +from .base import _FLOAT_TYPES +from .base import _INT_TYPES +from .base import PGCompiler +from .base import PGDialect +from .base import PGIdentifierPreparer +from .base import UUID +from .hstore import HSTORE +from .json import JSON +from .json import JSONB +from ... import exc +from ... import processors +from ... import util +from ...sql.elements import Null +from ...types import JSON as Json +from ...types import Numeric + + +class _PGNumeric(Numeric): + def bind_processor(self, dialect): + return None + + def result_processor(self, dialect, coltype): + if not isinstance(coltype, int): + coltype = coltype.oid + if self.asdecimal: + if coltype in _FLOAT_TYPES: + return processors.to_decimal_processor_factory( + decimal.Decimal, self._effective_decimal_return_scale + ) + elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: + # PyGreSQL returns Decimal natively for 1700 (numeric) + return None + else: + raise exc.InvalidRequestError( + "Unknown PG numeric type: %d" % coltype + ) + else: + if coltype in _FLOAT_TYPES: + # PyGreSQL returns float natively for 701 (float8) + return None + elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: + return processors.to_float + else: + raise exc.InvalidRequestError( + "Unknown PG numeric type: %d" % coltype + ) + + +class _PGHStore(HSTORE): + def bind_processor(self, dialect): + if not dialect.has_native_hstore: + return super(_PGHStore, self).bind_processor(dialect) + hstore = dialect.dbapi.Hstore + + def process(value): + if isinstance(value, dict): + return hstore(value) + return value + + return process + + def result_processor(self, dialect, coltype): + if not dialect.has_native_hstore: + return super(_PGHStore, self).result_processor(dialect, coltype) + + +class _PGJSON(JSON): + def bind_processor(self, dialect): + if not dialect.has_native_json: + return super(_PGJSON, self).bind_processor(dialect) + json = dialect.dbapi.Json + + def process(value): + if value is self.NULL: + value = None + elif isinstance(value, Null) or ( + value is None and self.none_as_null + ): + return None + if value is None or isinstance(value, (dict, list)): + return json(value) + return value + + return process + + def result_processor(self, dialect, coltype): + if not dialect.has_native_json: + return super(_PGJSON, self).result_processor(dialect, coltype) + + +class _PGJSONB(JSONB): + def bind_processor(self, dialect): + if not dialect.has_native_json: + return super(_PGJSONB, self).bind_processor(dialect) + json = dialect.dbapi.Json + + def process(value): + if value is self.NULL: + value = None + elif isinstance(value, Null) or ( + value is None and self.none_as_null + ): + return None + if value is None or isinstance(value, (dict, list)): + return json(value) + return value + + return process + + def result_processor(self, dialect, coltype): + if not dialect.has_native_json: + return super(_PGJSONB, self).result_processor(dialect, coltype) + + +class _PGUUID(UUID): + def bind_processor(self, dialect): + if not dialect.has_native_uuid: + return super(_PGUUID, self).bind_processor(dialect) + uuid = dialect.dbapi.Uuid + + def process(value): + if value is None: + return None + if isinstance(value, (str, bytes)): + if len(value) == 16: + return uuid(bytes=value) + return uuid(value) + if isinstance(value, int): + return uuid(int=value) + return value + + return process + + def result_processor(self, dialect, coltype): + if not dialect.has_native_uuid: + return super(_PGUUID, self).result_processor(dialect, coltype) + if not self.as_uuid: + + def process(value): + if value is not None: + return str(value) + + return process + + +class _PGCompiler(PGCompiler): + def visit_mod_binary(self, binary, operator, **kw): + return ( + self.process(binary.left, **kw) + + " %% " + + self.process(binary.right, **kw) + ) + + def post_process_text(self, text): + return text.replace("%", "%%") + + +class _PGIdentifierPreparer(PGIdentifierPreparer): + def _escape_identifier(self, value): + value = value.replace(self.escape_quote, self.escape_to_quote) + return value.replace("%", "%%") + + +class PGDialect_pygresql(PGDialect): + + driver = "pygresql" + supports_statement_cache = True + + statement_compiler = _PGCompiler + preparer = _PGIdentifierPreparer + + @classmethod + def dbapi(cls): + import pgdb + + util.warn_deprecated( + "The pygresql DBAPI is deprecated and will be removed " + "in a future version. Please use one of the supported DBAPIs to " + "connect to PostgreSQL.", + version="1.4", + ) + + return pgdb + + colspecs = util.update_copy( + PGDialect.colspecs, + { + Numeric: _PGNumeric, + HSTORE: _PGHStore, + Json: _PGJSON, + JSON: _PGJSON, + JSONB: _PGJSONB, + UUID: _PGUUID, + }, + ) + + def __init__(self, **kwargs): + super(PGDialect_pygresql, self).__init__(**kwargs) + try: + version = self.dbapi.version + m = re.match(r"(\d+)\.(\d+)", version) + version = (int(m.group(1)), int(m.group(2))) + except (AttributeError, ValueError, TypeError): + version = (0, 0) + self.dbapi_version = version + if version < (5, 0): + has_native_hstore = has_native_json = has_native_uuid = False + if version != (0, 0): + util.warn( + "PyGreSQL is only fully supported by SQLAlchemy" + " since version 5.0." + ) + else: + self.supports_unicode_statements = True + self.supports_unicode_binds = True + has_native_hstore = has_native_json = has_native_uuid = True + self.has_native_hstore = has_native_hstore + self.has_native_json = has_native_json + self.has_native_uuid = has_native_uuid + + def create_connect_args(self, url): + opts = url.translate_connect_args(username="user") + if "port" in opts: + opts["host"] = "%s:%s" % ( + opts.get("host", "").rsplit(":", 1)[0], + opts.pop("port"), + ) + opts.update(url.query) + return [], opts + + def is_disconnect(self, e, connection, cursor): + if isinstance(e, self.dbapi.Error): + if not connection: + return False + try: + connection = connection.connection + except AttributeError: + pass + else: + if not connection: + return False + try: + return connection.closed + except AttributeError: # PyGreSQL < 5.0 + return connection._cnx is None + return False + + +dialect = PGDialect_pygresql |