diff options
author | xiubuzhe <xiubuzhe@sina.com> | 2023-10-08 20:59:00 +0800 |
---|---|---|
committer | xiubuzhe <xiubuzhe@sina.com> | 2023-10-08 20:59:00 +0800 |
commit | 1dac2263372df2b85db5d029a45721fa158a5c9d (patch) | |
tree | 0365f9c57df04178a726d7584ca6a6b955a7ce6a /lib/sqlalchemy/dialects/postgresql/pg8000.py | |
parent | b494be364bb39e1de128ada7dc576a729d99907e (diff) | |
download | sunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.tar.gz sunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.tar.bz2 sunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.zip |
first add files
Diffstat (limited to 'lib/sqlalchemy/dialects/postgresql/pg8000.py')
-rw-r--r-- | lib/sqlalchemy/dialects/postgresql/pg8000.py | 594 |
1 files changed, 594 insertions, 0 deletions
diff --git a/lib/sqlalchemy/dialects/postgresql/pg8000.py b/lib/sqlalchemy/dialects/postgresql/pg8000.py new file mode 100644 index 0000000..98561a9 --- /dev/null +++ b/lib/sqlalchemy/dialects/postgresql/pg8000.py @@ -0,0 +1,594 @@ +# postgresql/pg8000.py +# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors <see AUTHORS +# file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +r""" +.. dialect:: postgresql+pg8000 + :name: pg8000 + :dbapi: pg8000 + :connectstring: postgresql+pg8000://user:password@host:port/dbname[?key=value&key=value...] + :url: https://pypi.org/project/pg8000/ + +.. versionchanged:: 1.4 The pg8000 dialect has been updated for version + 1.16.6 and higher, and is again part of SQLAlchemy's continuous integration + with full feature support. + +.. _pg8000_unicode: + +Unicode +------- + +pg8000 will encode / decode string values between it and the server using the +PostgreSQL ``client_encoding`` parameter; by default this is the value in +the ``postgresql.conf`` file, which often defaults to ``SQL_ASCII``. +Typically, this can be changed to ``utf-8``, as a more useful default:: + + #client_encoding = sql_ascii # actually, defaults to database + # encoding + client_encoding = utf8 + +The ``client_encoding`` can be overridden for a session by executing the SQL: + +SET CLIENT_ENCODING TO 'utf8'; + +SQLAlchemy will execute this SQL on all new connections based on the value +passed to :func:`_sa.create_engine` using the ``client_encoding`` parameter:: + + engine = create_engine( + "postgresql+pg8000://user:pass@host/dbname", client_encoding='utf8') + +.. _pg8000_ssl: + +SSL Connections +--------------- + +pg8000 accepts a Python ``SSLContext`` object which may be specified using the +:paramref:`_sa.create_engine.connect_args` dictionary:: + + import ssl + ssl_context = ssl.create_default_context() + engine = sa.create_engine( + "postgresql+pg8000://scott:tiger@192.168.0.199/test", + connect_args={"ssl_context": ssl_context}, + ) + +If the server uses an automatically-generated certificate that is self-signed +or does not match the host name (as seen from the client), it may also be +necessary to disable hostname checking:: + + import ssl + ssl_context = ssl.create_default_context() + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + engine = sa.create_engine( + "postgresql+pg8000://scott:tiger@192.168.0.199/test", + connect_args={"ssl_context": ssl_context}, + ) + +.. _pg8000_isolation_level: + +pg8000 Transaction Isolation Level +------------------------------------- + +The pg8000 dialect offers the same isolation level settings as that +of the :ref:`psycopg2 <psycopg2_isolation_level>` dialect: + +* ``READ COMMITTED`` +* ``READ UNCOMMITTED`` +* ``REPEATABLE READ`` +* ``SERIALIZABLE`` +* ``AUTOCOMMIT`` + +.. seealso:: + + :ref:`postgresql_isolation_level` + + :ref:`psycopg2_isolation_level` + + +""" # noqa +import decimal +import re +from uuid import UUID as _python_UUID + +from .array import ARRAY as PGARRAY +from .base import _ColonCast +from .base import _DECIMAL_TYPES +from .base import _FLOAT_TYPES +from .base import _INT_TYPES +from .base import ENUM +from .base import INTERVAL +from .base import PGCompiler +from .base import PGDialect +from .base import PGExecutionContext +from .base import PGIdentifierPreparer +from .base import UUID +from .json import JSON +from .json import JSONB +from .json import JSONPathType +from ... import exc +from ... import processors +from ... import types as sqltypes +from ... import util +from ...sql.elements import quoted_name + + +class _PGNumeric(sqltypes.Numeric): + def result_processor(self, dialect, coltype): + if self.asdecimal: + if coltype in _FLOAT_TYPES: + return processors.to_decimal_processor_factory( + decimal.Decimal, self._effective_decimal_return_scale + ) + elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: + # pg8000 returns Decimal natively for 1700 + return None + else: + raise exc.InvalidRequestError( + "Unknown PG numeric type: %d" % coltype + ) + else: + if coltype in _FLOAT_TYPES: + # pg8000 returns float natively for 701 + return None + elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: + return processors.to_float + else: + raise exc.InvalidRequestError( + "Unknown PG numeric type: %d" % coltype + ) + + +class _PGNumericNoBind(_PGNumeric): + def bind_processor(self, dialect): + return None + + +class _PGJSON(JSON): + def result_processor(self, dialect, coltype): + return None + + def get_dbapi_type(self, dbapi): + return dbapi.JSON + + +class _PGJSONB(JSONB): + def result_processor(self, dialect, coltype): + return None + + def get_dbapi_type(self, dbapi): + return dbapi.JSONB + + +class _PGJSONIndexType(sqltypes.JSON.JSONIndexType): + def get_dbapi_type(self, dbapi): + raise NotImplementedError("should not be here") + + +class _PGJSONIntIndexType(sqltypes.JSON.JSONIntIndexType): + def get_dbapi_type(self, dbapi): + return dbapi.INTEGER + + +class _PGJSONStrIndexType(sqltypes.JSON.JSONStrIndexType): + def get_dbapi_type(self, dbapi): + return dbapi.STRING + + +class _PGJSONPathType(JSONPathType): + def get_dbapi_type(self, dbapi): + return 1009 + + +class _PGUUID(UUID): + def bind_processor(self, dialect): + if not self.as_uuid: + + def process(value): + if value is not None: + value = _python_UUID(value) + return value + + return process + + def result_processor(self, dialect, coltype): + if not self.as_uuid: + + def process(value): + if value is not None: + value = str(value) + return value + + return process + + +class _PGEnum(ENUM): + def get_dbapi_type(self, dbapi): + return dbapi.UNKNOWN + + +class _PGInterval(INTERVAL): + def get_dbapi_type(self, dbapi): + return dbapi.INTERVAL + + @classmethod + def adapt_emulated_to_native(cls, interval, **kw): + return _PGInterval(precision=interval.second_precision) + + +class _PGTimeStamp(sqltypes.DateTime): + def get_dbapi_type(self, dbapi): + if self.timezone: + # TIMESTAMPTZOID + return 1184 + else: + # TIMESTAMPOID + return 1114 + + +class _PGTime(sqltypes.Time): + def get_dbapi_type(self, dbapi): + return dbapi.TIME + + +class _PGInteger(sqltypes.Integer): + def get_dbapi_type(self, dbapi): + return dbapi.INTEGER + + +class _PGSmallInteger(sqltypes.SmallInteger): + def get_dbapi_type(self, dbapi): + return dbapi.INTEGER + + +class _PGNullType(sqltypes.NullType): + def get_dbapi_type(self, dbapi): + return dbapi.NULLTYPE + + +class _PGBigInteger(sqltypes.BigInteger): + def get_dbapi_type(self, dbapi): + return dbapi.BIGINTEGER + + +class _PGBoolean(sqltypes.Boolean): + def get_dbapi_type(self, dbapi): + return dbapi.BOOLEAN + + +class _PGARRAY(PGARRAY): + def bind_expression(self, bindvalue): + return _ColonCast(bindvalue, self) + + +_server_side_id = util.counter() + + +class PGExecutionContext_pg8000(PGExecutionContext): + def create_server_side_cursor(self): + ident = "c_%s_%s" % (hex(id(self))[2:], hex(_server_side_id())[2:]) + return ServerSideCursor(self._dbapi_connection.cursor(), ident) + + def pre_exec(self): + if not self.compiled: + return + + +class ServerSideCursor: + server_side = True + + def __init__(self, cursor, ident): + self.ident = ident + self.cursor = cursor + + @property + def connection(self): + return self.cursor.connection + + @property + def rowcount(self): + return self.cursor.rowcount + + @property + def description(self): + return self.cursor.description + + def execute(self, operation, args=(), stream=None): + op = "DECLARE " + self.ident + " NO SCROLL CURSOR FOR " + operation + self.cursor.execute(op, args, stream=stream) + return self + + def executemany(self, operation, param_sets): + self.cursor.executemany(operation, param_sets) + return self + + def fetchone(self): + self.cursor.execute("FETCH FORWARD 1 FROM " + self.ident) + return self.cursor.fetchone() + + def fetchmany(self, num=None): + if num is None: + return self.fetchall() + else: + self.cursor.execute( + "FETCH FORWARD " + str(int(num)) + " FROM " + self.ident + ) + return self.cursor.fetchall() + + def fetchall(self): + self.cursor.execute("FETCH FORWARD ALL FROM " + self.ident) + return self.cursor.fetchall() + + def close(self): + self.cursor.execute("CLOSE " + self.ident) + self.cursor.close() + + def setinputsizes(self, *sizes): + self.cursor.setinputsizes(*sizes) + + def setoutputsize(self, size, column=None): + pass + + +class PGCompiler_pg8000(PGCompiler): + def visit_mod_binary(self, binary, operator, **kw): + return ( + self.process(binary.left, **kw) + + " %% " + + self.process(binary.right, **kw) + ) + + +class PGIdentifierPreparer_pg8000(PGIdentifierPreparer): + def __init__(self, *args, **kwargs): + PGIdentifierPreparer.__init__(self, *args, **kwargs) + self._double_percents = False + + +class PGDialect_pg8000(PGDialect): + driver = "pg8000" + supports_statement_cache = True + + supports_unicode_statements = True + + supports_unicode_binds = True + + default_paramstyle = "format" + supports_sane_multi_rowcount = True + execution_ctx_cls = PGExecutionContext_pg8000 + statement_compiler = PGCompiler_pg8000 + preparer = PGIdentifierPreparer_pg8000 + supports_server_side_cursors = True + + use_setinputsizes = True + + # reversed as of pg8000 1.16.6. 1.16.5 and lower + # are no longer compatible + description_encoding = None + # description_encoding = "use_encoding" + + colspecs = util.update_copy( + PGDialect.colspecs, + { + sqltypes.Numeric: _PGNumericNoBind, + sqltypes.Float: _PGNumeric, + sqltypes.JSON: _PGJSON, + sqltypes.Boolean: _PGBoolean, + sqltypes.NullType: _PGNullType, + JSONB: _PGJSONB, + sqltypes.JSON.JSONPathType: _PGJSONPathType, + sqltypes.JSON.JSONIndexType: _PGJSONIndexType, + sqltypes.JSON.JSONIntIndexType: _PGJSONIntIndexType, + sqltypes.JSON.JSONStrIndexType: _PGJSONStrIndexType, + UUID: _PGUUID, + sqltypes.Interval: _PGInterval, + INTERVAL: _PGInterval, + sqltypes.DateTime: _PGTimeStamp, + sqltypes.Time: _PGTime, + sqltypes.Integer: _PGInteger, + sqltypes.SmallInteger: _PGSmallInteger, + sqltypes.BigInteger: _PGBigInteger, + sqltypes.Enum: _PGEnum, + sqltypes.ARRAY: _PGARRAY, + }, + ) + + def __init__(self, client_encoding=None, **kwargs): + PGDialect.__init__(self, **kwargs) + self.client_encoding = client_encoding + + if self._dbapi_version < (1, 16, 6): + raise NotImplementedError("pg8000 1.16.6 or greater is required") + + @util.memoized_property + def _dbapi_version(self): + if self.dbapi and hasattr(self.dbapi, "__version__"): + return tuple( + [ + int(x) + for x in re.findall( + r"(\d+)(?:[-\.]?|$)", self.dbapi.__version__ + ) + ] + ) + else: + return (99, 99, 99) + + @classmethod + def dbapi(cls): + return __import__("pg8000") + + def create_connect_args(self, url): + opts = url.translate_connect_args(username="user") + if "port" in opts: + opts["port"] = int(opts["port"]) + opts.update(url.query) + return ([], opts) + + def is_disconnect(self, e, connection, cursor): + if isinstance(e, self.dbapi.InterfaceError) and "network error" in str( + e + ): + # new as of pg8000 1.19.0 for broken connections + return True + + # connection was closed normally + return "connection is closed" in str(e) + + def set_isolation_level(self, connection, level): + level = level.replace("_", " ") + + # adjust for ConnectionFairy possibly being present + if hasattr(connection, "dbapi_connection"): + connection = connection.dbapi_connection + + if level == "AUTOCOMMIT": + connection.autocommit = True + elif level in self._isolation_lookup: + connection.autocommit = False + cursor = connection.cursor() + cursor.execute( + "SET SESSION CHARACTERISTICS AS TRANSACTION " + "ISOLATION LEVEL %s" % level + ) + cursor.execute("COMMIT") + cursor.close() + else: + raise exc.ArgumentError( + "Invalid value '%s' for isolation_level. " + "Valid isolation levels for %s are %s or AUTOCOMMIT" + % (level, self.name, ", ".join(self._isolation_lookup)) + ) + + def set_readonly(self, connection, value): + cursor = connection.cursor() + try: + cursor.execute( + "SET SESSION CHARACTERISTICS AS TRANSACTION %s" + % ("READ ONLY" if value else "READ WRITE") + ) + cursor.execute("COMMIT") + finally: + cursor.close() + + def get_readonly(self, connection): + cursor = connection.cursor() + try: + cursor.execute("show transaction_read_only") + val = cursor.fetchone()[0] + finally: + cursor.close() + + return val == "on" + + def set_deferrable(self, connection, value): + cursor = connection.cursor() + try: + cursor.execute( + "SET SESSION CHARACTERISTICS AS TRANSACTION %s" + % ("DEFERRABLE" if value else "NOT DEFERRABLE") + ) + cursor.execute("COMMIT") + finally: + cursor.close() + + def get_deferrable(self, connection): + cursor = connection.cursor() + try: + cursor.execute("show transaction_deferrable") + val = cursor.fetchone()[0] + finally: + cursor.close() + + return val == "on" + + def set_client_encoding(self, connection, client_encoding): + # adjust for ConnectionFairy possibly being present + if hasattr(connection, "dbapi_connection"): + connection = connection.dbapi_connection + + cursor = connection.cursor() + cursor.execute("SET CLIENT_ENCODING TO '" + client_encoding + "'") + cursor.execute("COMMIT") + cursor.close() + + def do_set_input_sizes(self, cursor, list_of_tuples, context): + if self.positional: + cursor.setinputsizes( + *[dbtype for key, dbtype, sqltype in list_of_tuples] + ) + else: + cursor.setinputsizes( + **{ + key: dbtype + for key, dbtype, sqltype in list_of_tuples + if dbtype + } + ) + + def do_begin_twophase(self, connection, xid): + connection.connection.tpc_begin((0, xid, "")) + + def do_prepare_twophase(self, connection, xid): + connection.connection.tpc_prepare() + + def do_rollback_twophase( + self, connection, xid, is_prepared=True, recover=False + ): + connection.connection.tpc_rollback((0, xid, "")) + + def do_commit_twophase( + self, connection, xid, is_prepared=True, recover=False + ): + connection.connection.tpc_commit((0, xid, "")) + + def do_recover_twophase(self, connection): + return [row[1] for row in connection.connection.tpc_recover()] + + def on_connect(self): + fns = [] + + def on_connect(conn): + conn.py_types[quoted_name] = conn.py_types[util.text_type] + + fns.append(on_connect) + + if self.client_encoding is not None: + + def on_connect(conn): + self.set_client_encoding(conn, self.client_encoding) + + fns.append(on_connect) + + if self.isolation_level is not None: + + def on_connect(conn): + self.set_isolation_level(conn, self.isolation_level) + + fns.append(on_connect) + + if self._json_deserializer: + + def on_connect(conn): + # json + conn.register_in_adapter(114, self._json_deserializer) + + # jsonb + conn.register_in_adapter(3802, self._json_deserializer) + + fns.append(on_connect) + + if len(fns) > 0: + + def on_connect(conn): + for fn in fns: + fn(conn) + + return on_connect + else: + return None + + +dialect = PGDialect_pg8000 |