From 1dac2263372df2b85db5d029a45721fa158a5c9d Mon Sep 17 00:00:00 2001 From: xiubuzhe Date: Sun, 8 Oct 2023 20:59:00 +0800 Subject: first add files --- lib/sqlalchemy/dialects/postgresql/__init__.py | 117 + lib/sqlalchemy/dialects/postgresql/array.py | 413 ++ lib/sqlalchemy/dialects/postgresql/asyncpg.py | 1112 +++++ lib/sqlalchemy/dialects/postgresql/base.py | 4651 ++++++++++++++++++++ lib/sqlalchemy/dialects/postgresql/dml.py | 274 ++ lib/sqlalchemy/dialects/postgresql/ext.py | 277 ++ lib/sqlalchemy/dialects/postgresql/hstore.py | 455 ++ lib/sqlalchemy/dialects/postgresql/json.py | 327 ++ lib/sqlalchemy/dialects/postgresql/pg8000.py | 594 +++ lib/sqlalchemy/dialects/postgresql/provision.py | 124 + lib/sqlalchemy/dialects/postgresql/psycopg2.py | 1088 +++++ lib/sqlalchemy/dialects/postgresql/psycopg2cffi.py | 60 + lib/sqlalchemy/dialects/postgresql/pygresql.py | 278 ++ lib/sqlalchemy/dialects/postgresql/pypostgresql.py | 126 + lib/sqlalchemy/dialects/postgresql/ranges.py | 138 + 15 files changed, 10034 insertions(+) create mode 100644 lib/sqlalchemy/dialects/postgresql/__init__.py create mode 100644 lib/sqlalchemy/dialects/postgresql/array.py create mode 100644 lib/sqlalchemy/dialects/postgresql/asyncpg.py create mode 100644 lib/sqlalchemy/dialects/postgresql/base.py create mode 100644 lib/sqlalchemy/dialects/postgresql/dml.py create mode 100644 lib/sqlalchemy/dialects/postgresql/ext.py create mode 100644 lib/sqlalchemy/dialects/postgresql/hstore.py create mode 100644 lib/sqlalchemy/dialects/postgresql/json.py create mode 100644 lib/sqlalchemy/dialects/postgresql/pg8000.py create mode 100644 lib/sqlalchemy/dialects/postgresql/provision.py create mode 100644 lib/sqlalchemy/dialects/postgresql/psycopg2.py create mode 100644 lib/sqlalchemy/dialects/postgresql/psycopg2cffi.py create mode 100644 lib/sqlalchemy/dialects/postgresql/pygresql.py create mode 100644 lib/sqlalchemy/dialects/postgresql/pypostgresql.py create mode 100644 lib/sqlalchemy/dialects/postgresql/ranges.py (limited to 'lib/sqlalchemy/dialects/postgresql') diff --git a/lib/sqlalchemy/dialects/postgresql/__init__.py b/lib/sqlalchemy/dialects/postgresql/__init__.py new file mode 100644 index 0000000..12d9e94 --- /dev/null +++ b/lib/sqlalchemy/dialects/postgresql/__init__.py @@ -0,0 +1,117 @@ +# postgresql/__init__.py +# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors +# +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +from . import base +from . import pg8000 # noqa +from . import psycopg2 # noqa +from . import psycopg2cffi # noqa +from . import pygresql # noqa +from . import pypostgresql # noqa +from .array import All +from .array import Any +from .array import ARRAY +from .array import array +from .base import BIGINT +from .base import BIT +from .base import BOOLEAN +from .base import BYTEA +from .base import CHAR +from .base import CIDR +from .base import CreateEnumType +from .base import DATE +from .base import DOUBLE_PRECISION +from .base import DropEnumType +from .base import ENUM +from .base import FLOAT +from .base import INET +from .base import INTEGER +from .base import INTERVAL +from .base import MACADDR +from .base import MONEY +from .base import NUMERIC +from .base import OID +from .base import REAL +from .base import REGCLASS +from .base import SMALLINT +from .base import TEXT +from .base import TIME +from .base import TIMESTAMP +from .base import TSVECTOR +from .base import UUID +from .base import VARCHAR +from .dml import Insert +from .dml import insert +from .ext import aggregate_order_by +from .ext import array_agg +from .ext import ExcludeConstraint +from .hstore import HSTORE +from .hstore import hstore +from .json import JSON +from .json import JSONB +from .ranges import DATERANGE +from .ranges import INT4RANGE +from .ranges import INT8RANGE +from .ranges import NUMRANGE +from .ranges import TSRANGE +from .ranges import TSTZRANGE +from ...util import compat + +if compat.py3k: + from . import asyncpg # noqa + +base.dialect = dialect = psycopg2.dialect + + +__all__ = ( + "INTEGER", + "BIGINT", + "SMALLINT", + "VARCHAR", + "CHAR", + "TEXT", + "NUMERIC", + "FLOAT", + "REAL", + "INET", + "CIDR", + "UUID", + "BIT", + "MACADDR", + "MONEY", + "OID", + "REGCLASS", + "DOUBLE_PRECISION", + "TIMESTAMP", + "TIME", + "DATE", + "BYTEA", + "BOOLEAN", + "INTERVAL", + "ARRAY", + "ENUM", + "dialect", + "array", + "HSTORE", + "hstore", + "INT4RANGE", + "INT8RANGE", + "NUMRANGE", + "DATERANGE", + "TSVECTOR", + "TSRANGE", + "TSTZRANGE", + "JSON", + "JSONB", + "Any", + "All", + "DropEnumType", + "CreateEnumType", + "ExcludeConstraint", + "aggregate_order_by", + "array_agg", + "insert", + "Insert", +) diff --git a/lib/sqlalchemy/dialects/postgresql/array.py b/lib/sqlalchemy/dialects/postgresql/array.py new file mode 100644 index 0000000..daf7c5d --- /dev/null +++ b/lib/sqlalchemy/dialects/postgresql/array.py @@ -0,0 +1,413 @@ +# postgresql/array.py +# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors +# +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php + +import re + +from ... import types as sqltypes +from ... import util +from ...sql import coercions +from ...sql import expression +from ...sql import operators +from ...sql import roles + + +def Any(other, arrexpr, operator=operators.eq): + """A synonym for the ARRAY-level :meth:`.ARRAY.Comparator.any` method. + See that method for details. + + """ + + return arrexpr.any(other, operator) + + +def All(other, arrexpr, operator=operators.eq): + """A synonym for the ARRAY-level :meth:`.ARRAY.Comparator.all` method. + See that method for details. + + """ + + return arrexpr.all(other, operator) + + +class array(expression.ClauseList, expression.ColumnElement): + + """A PostgreSQL ARRAY literal. + + This is used to produce ARRAY literals in SQL expressions, e.g.:: + + from sqlalchemy.dialects.postgresql import array + from sqlalchemy.dialects import postgresql + from sqlalchemy import select, func + + stmt = select(array([1,2]) + array([3,4,5])) + + print(stmt.compile(dialect=postgresql.dialect())) + + Produces the SQL:: + + SELECT ARRAY[%(param_1)s, %(param_2)s] || + ARRAY[%(param_3)s, %(param_4)s, %(param_5)s]) AS anon_1 + + An instance of :class:`.array` will always have the datatype + :class:`_types.ARRAY`. The "inner" type of the array is inferred from + the values present, unless the ``type_`` keyword argument is passed:: + + array(['foo', 'bar'], type_=CHAR) + + Multidimensional arrays are produced by nesting :class:`.array` constructs. + The dimensionality of the final :class:`_types.ARRAY` + type is calculated by + recursively adding the dimensions of the inner :class:`_types.ARRAY` + type:: + + stmt = select( + array([ + array([1, 2]), array([3, 4]), array([column('q'), column('x')]) + ]) + ) + print(stmt.compile(dialect=postgresql.dialect())) + + Produces:: + + SELECT ARRAY[ARRAY[%(param_1)s, %(param_2)s], + ARRAY[%(param_3)s, %(param_4)s], ARRAY[q, x]] AS anon_1 + + .. versionadded:: 1.3.6 added support for multidimensional array literals + + .. seealso:: + + :class:`_postgresql.ARRAY` + + """ + + __visit_name__ = "array" + + stringify_dialect = "postgresql" + inherit_cache = True + + def __init__(self, clauses, **kw): + clauses = [ + coercions.expect(roles.ExpressionElementRole, c) for c in clauses + ] + + super(array, self).__init__(*clauses, **kw) + + self._type_tuple = [arg.type for arg in clauses] + main_type = kw.pop( + "type_", + self._type_tuple[0] if self._type_tuple else sqltypes.NULLTYPE, + ) + + if isinstance(main_type, ARRAY): + self.type = ARRAY( + main_type.item_type, + dimensions=main_type.dimensions + 1 + if main_type.dimensions is not None + else 2, + ) + else: + self.type = ARRAY(main_type) + + @property + def _select_iterable(self): + return (self,) + + def _bind_param(self, operator, obj, _assume_scalar=False, type_=None): + if _assume_scalar or operator is operators.getitem: + return expression.BindParameter( + None, + obj, + _compared_to_operator=operator, + type_=type_, + _compared_to_type=self.type, + unique=True, + ) + + else: + return array( + [ + self._bind_param( + operator, o, _assume_scalar=True, type_=type_ + ) + for o in obj + ] + ) + + def self_group(self, against=None): + if against in (operators.any_op, operators.all_op, operators.getitem): + return expression.Grouping(self) + else: + return self + + +CONTAINS = operators.custom_op("@>", precedence=5, is_comparison=True) + +CONTAINED_BY = operators.custom_op("<@", precedence=5, is_comparison=True) + +OVERLAP = operators.custom_op("&&", precedence=5, is_comparison=True) + + +class ARRAY(sqltypes.ARRAY): + + """PostgreSQL ARRAY type. + + .. versionchanged:: 1.1 The :class:`_postgresql.ARRAY` type is now + a subclass of the core :class:`_types.ARRAY` type. + + The :class:`_postgresql.ARRAY` type is constructed in the same way + as the core :class:`_types.ARRAY` type; a member type is required, and a + number of dimensions is recommended if the type is to be used for more + than one dimension:: + + from sqlalchemy.dialects import postgresql + + mytable = Table("mytable", metadata, + Column("data", postgresql.ARRAY(Integer, dimensions=2)) + ) + + The :class:`_postgresql.ARRAY` type provides all operations defined on the + core :class:`_types.ARRAY` type, including support for "dimensions", + indexed access, and simple matching such as + :meth:`.types.ARRAY.Comparator.any` and + :meth:`.types.ARRAY.Comparator.all`. :class:`_postgresql.ARRAY` + class also + provides PostgreSQL-specific methods for containment operations, including + :meth:`.postgresql.ARRAY.Comparator.contains` + :meth:`.postgresql.ARRAY.Comparator.contained_by`, and + :meth:`.postgresql.ARRAY.Comparator.overlap`, e.g.:: + + mytable.c.data.contains([1, 2]) + + The :class:`_postgresql.ARRAY` type may not be supported on all + PostgreSQL DBAPIs; it is currently known to work on psycopg2 only. + + Additionally, the :class:`_postgresql.ARRAY` + type does not work directly in + conjunction with the :class:`.ENUM` type. For a workaround, see the + special type at :ref:`postgresql_array_of_enum`. + + .. seealso:: + + :class:`_types.ARRAY` - base array type + + :class:`_postgresql.array` - produces a literal array value. + + """ + + class Comparator(sqltypes.ARRAY.Comparator): + + """Define comparison operations for :class:`_types.ARRAY`. + + Note that these operations are in addition to those provided + by the base :class:`.types.ARRAY.Comparator` class, including + :meth:`.types.ARRAY.Comparator.any` and + :meth:`.types.ARRAY.Comparator.all`. + + """ + + def contains(self, other, **kwargs): + """Boolean expression. Test if elements are a superset of the + elements of the argument array expression. + + kwargs may be ignored by this operator but are required for API + conformance. + """ + return self.operate(CONTAINS, other, result_type=sqltypes.Boolean) + + def contained_by(self, other): + """Boolean expression. Test if elements are a proper subset of the + elements of the argument array expression. + """ + return self.operate( + CONTAINED_BY, other, result_type=sqltypes.Boolean + ) + + def overlap(self, other): + """Boolean expression. Test if array has elements in common with + an argument array expression. + """ + return self.operate(OVERLAP, other, result_type=sqltypes.Boolean) + + comparator_factory = Comparator + + def __init__( + self, item_type, as_tuple=False, dimensions=None, zero_indexes=False + ): + """Construct an ARRAY. + + E.g.:: + + Column('myarray', ARRAY(Integer)) + + Arguments are: + + :param item_type: The data type of items of this array. Note that + dimensionality is irrelevant here, so multi-dimensional arrays like + ``INTEGER[][]``, are constructed as ``ARRAY(Integer)``, not as + ``ARRAY(ARRAY(Integer))`` or such. + + :param as_tuple=False: Specify whether return results + should be converted to tuples from lists. DBAPIs such + as psycopg2 return lists by default. When tuples are + returned, the results are hashable. + + :param dimensions: if non-None, the ARRAY will assume a fixed + number of dimensions. This will cause the DDL emitted for this + ARRAY to include the exact number of bracket clauses ``[]``, + and will also optimize the performance of the type overall. + Note that PG arrays are always implicitly "non-dimensioned", + meaning they can store any number of dimensions no matter how + they were declared. + + :param zero_indexes=False: when True, index values will be converted + between Python zero-based and PostgreSQL one-based indexes, e.g. + a value of one will be added to all index values before passing + to the database. + + .. versionadded:: 0.9.5 + + + """ + if isinstance(item_type, ARRAY): + raise ValueError( + "Do not nest ARRAY types; ARRAY(basetype) " + "handles multi-dimensional arrays of basetype" + ) + if isinstance(item_type, type): + item_type = item_type() + self.item_type = item_type + self.as_tuple = as_tuple + self.dimensions = dimensions + self.zero_indexes = zero_indexes + + @property + def hashable(self): + return self.as_tuple + + @property + def python_type(self): + return list + + def compare_values(self, x, y): + return x == y + + def _proc_array(self, arr, itemproc, dim, collection): + if dim is None: + arr = list(arr) + if ( + dim == 1 + or dim is None + and ( + # this has to be (list, tuple), or at least + # not hasattr('__iter__'), since Py3K strings + # etc. have __iter__ + not arr + or not isinstance(arr[0], (list, tuple)) + ) + ): + if itemproc: + return collection(itemproc(x) for x in arr) + else: + return collection(arr) + else: + return collection( + self._proc_array( + x, + itemproc, + dim - 1 if dim is not None else None, + collection, + ) + for x in arr + ) + + @util.memoized_property + def _against_native_enum(self): + return ( + isinstance(self.item_type, sqltypes.Enum) + and self.item_type.native_enum + ) + + def bind_expression(self, bindvalue): + return bindvalue + + def bind_processor(self, dialect): + item_proc = self.item_type.dialect_impl(dialect).bind_processor( + dialect + ) + + def process(value): + if value is None: + return value + else: + return self._proc_array( + value, item_proc, self.dimensions, list + ) + + return process + + def result_processor(self, dialect, coltype): + item_proc = self.item_type.dialect_impl(dialect).result_processor( + dialect, coltype + ) + + def process(value): + if value is None: + return value + else: + return self._proc_array( + value, + item_proc, + self.dimensions, + tuple if self.as_tuple else list, + ) + + if self._against_native_enum: + super_rp = process + pattern = re.compile(r"^{(.*)}$") + + def handle_raw_string(value): + inner = pattern.match(value).group(1) + return _split_enum_values(inner) + + def process(value): + if value is None: + return value + # isinstance(value, util.string_types) is required to handle + # the case where a TypeDecorator for and Array of Enum is + # used like was required in sa < 1.3.17 + return super_rp( + handle_raw_string(value) + if isinstance(value, util.string_types) + else value + ) + + return process + + +def _split_enum_values(array_string): + + if '"' not in array_string: + # no escape char is present so it can just split on the comma + return array_string.split(",") if array_string else [] + + # handles quoted strings from: + # r'abc,"quoted","also\\\\quoted", "quoted, comma", "esc \" quot", qpr' + # returns + # ['abc', 'quoted', 'also\\quoted', 'quoted, comma', 'esc " quot', 'qpr'] + text = array_string.replace(r"\"", "_$ESC_QUOTE$_") + text = text.replace(r"\\", "\\") + result = [] + on_quotes = re.split(r'(")', text) + in_quotes = False + for tok in on_quotes: + if tok == '"': + in_quotes = not in_quotes + elif in_quotes: + result.append(tok.replace("_$ESC_QUOTE$_", '"')) + else: + result.extend(re.findall(r"([^\s,]+),?", tok)) + return result diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py new file mode 100644 index 0000000..305ad46 --- /dev/null +++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py @@ -0,0 +1,1112 @@ +# postgresql/asyncpg.py +# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +r""" +.. dialect:: postgresql+asyncpg + :name: asyncpg + :dbapi: asyncpg + :connectstring: postgresql+asyncpg://user:password@host:port/dbname[?key=value&key=value...] + :url: https://magicstack.github.io/asyncpg/ + +The asyncpg dialect is SQLAlchemy's first Python asyncio dialect. + +Using a special asyncio mediation layer, the asyncpg dialect is usable +as the backend for the :ref:`SQLAlchemy asyncio ` +extension package. + +This dialect should normally be used only with the +:func:`_asyncio.create_async_engine` engine creation function:: + + from sqlalchemy.ext.asyncio import create_async_engine + engine = create_async_engine("postgresql+asyncpg://user:pass@hostname/dbname") + +The dialect can also be run as a "synchronous" dialect within the +:func:`_sa.create_engine` function, which will pass "await" calls into +an ad-hoc event loop. This mode of operation is of **limited use** +and is for special testing scenarios only. The mode can be enabled by +adding the SQLAlchemy-specific flag ``async_fallback`` to the URL +in conjunction with :func:`_sa.create_engine`:: + + # for testing purposes only; do not use in production! + engine = create_engine("postgresql+asyncpg://user:pass@hostname/dbname?async_fallback=true") + + +.. versionadded:: 1.4 + +.. note:: + + By default asyncpg does not decode the ``json`` and ``jsonb`` types and + returns them as strings. SQLAlchemy sets default type decoder for ``json`` + and ``jsonb`` types using the python builtin ``json.loads`` function. + The json implementation used can be changed by setting the attribute + ``json_deserializer`` when creating the engine with + :func:`create_engine` or :func:`create_async_engine`. + + +.. _asyncpg_prepared_statement_cache: + +Prepared Statement Cache +-------------------------- + +The asyncpg SQLAlchemy dialect makes use of ``asyncpg.connection.prepare()`` +for all statements. The prepared statement objects are cached after +construction which appears to grant a 10% or more performance improvement for +statement invocation. The cache is on a per-DBAPI connection basis, which +means that the primary storage for prepared statements is within DBAPI +connections pooled within the connection pool. The size of this cache +defaults to 100 statements per DBAPI connection and may be adjusted using the +``prepared_statement_cache_size`` DBAPI argument (note that while this argument +is implemented by SQLAlchemy, it is part of the DBAPI emulation portion of the +asyncpg dialect, therefore is handled as a DBAPI argument, not a dialect +argument):: + + + engine = create_async_engine("postgresql+asyncpg://user:pass@hostname/dbname?prepared_statement_cache_size=500") + +To disable the prepared statement cache, use a value of zero:: + + engine = create_async_engine("postgresql+asyncpg://user:pass@hostname/dbname?prepared_statement_cache_size=0") + +.. versionadded:: 1.4.0b2 Added ``prepared_statement_cache_size`` for asyncpg. + + +.. warning:: The ``asyncpg`` database driver necessarily uses caches for + PostgreSQL type OIDs, which become stale when custom PostgreSQL datatypes + such as ``ENUM`` objects are changed via DDL operations. Additionally, + prepared statements themselves which are optionally cached by SQLAlchemy's + driver as described above may also become "stale" when DDL has been emitted + to the PostgreSQL database which modifies the tables or other objects + involved in a particular prepared statement. + + The SQLAlchemy asyncpg dialect will invalidate these caches within its local + process when statements that represent DDL are emitted on a local + connection, but this is only controllable within a single Python process / + database engine. If DDL changes are made from other database engines + and/or processes, a running application may encounter asyncpg exceptions + ``InvalidCachedStatementError`` and/or ``InternalServerError("cache lookup + failed for type ")`` if it refers to pooled database connections which + operated upon the previous structures. The SQLAlchemy asyncpg dialect will + recover from these error cases when the driver raises these exceptions by + clearing its internal caches as well as those of the asyncpg driver in + response to them, but cannot prevent them from being raised in the first + place if the cached prepared statement or asyncpg type caches have gone + stale, nor can it retry the statement as the PostgreSQL transaction is + invalidated when these errors occur. + +Disabling the PostgreSQL JIT to improve ENUM datatype handling +--------------------------------------------------------------- + +Asyncpg has an `issue `_ when +using PostgreSQL ENUM datatypes, where upon the creation of new database +connections, an expensive query may be emitted in order to retrieve metadata +regarding custom types which has been shown to negatively affect performance. +To mitigate this issue, the PostgreSQL "jit" setting may be disabled from the +client using this setting passed to :func:`_asyncio.create_async_engine`:: + + engine = create_async_engine( + "postgresql+asyncpg://user:password@localhost/tmp", + connect_args={"server_settings": {"jit": "off"}}, + ) + +.. seealso:: + + https://github.com/MagicStack/asyncpg/issues/727 + +""" # noqa + +import collections +import decimal +import json as _py_json +import re +import time + +from . import json +from .base import _DECIMAL_TYPES +from .base import _FLOAT_TYPES +from .base import _INT_TYPES +from .base import ENUM +from .base import INTERVAL +from .base import OID +from .base import PGCompiler +from .base import PGDialect +from .base import PGExecutionContext +from .base import PGIdentifierPreparer +from .base import REGCLASS +from .base import UUID +from ... import exc +from ... import pool +from ... import processors +from ... import util +from ...engine import AdaptedConnection +from ...sql import sqltypes +from ...util.concurrency import asyncio +from ...util.concurrency import await_fallback +from ...util.concurrency import await_only + + +try: + from uuid import UUID as _python_UUID # noqa +except ImportError: + _python_UUID = None + + +class AsyncpgTime(sqltypes.Time): + def get_dbapi_type(self, dbapi): + if self.timezone: + return dbapi.TIME_W_TZ + else: + return dbapi.TIME + + +class AsyncpgDate(sqltypes.Date): + def get_dbapi_type(self, dbapi): + return dbapi.DATE + + +class AsyncpgDateTime(sqltypes.DateTime): + def get_dbapi_type(self, dbapi): + if self.timezone: + return dbapi.TIMESTAMP_W_TZ + else: + return dbapi.TIMESTAMP + + +class AsyncpgBoolean(sqltypes.Boolean): + def get_dbapi_type(self, dbapi): + return dbapi.BOOLEAN + + +class AsyncPgInterval(INTERVAL): + def get_dbapi_type(self, dbapi): + return dbapi.INTERVAL + + @classmethod + def adapt_emulated_to_native(cls, interval, **kw): + + return AsyncPgInterval(precision=interval.second_precision) + + +class AsyncPgEnum(ENUM): + def get_dbapi_type(self, dbapi): + return dbapi.ENUM + + +class AsyncpgInteger(sqltypes.Integer): + def get_dbapi_type(self, dbapi): + return dbapi.INTEGER + + +class AsyncpgBigInteger(sqltypes.BigInteger): + def get_dbapi_type(self, dbapi): + return dbapi.BIGINTEGER + + +class AsyncpgJSON(json.JSON): + def get_dbapi_type(self, dbapi): + return dbapi.JSON + + def result_processor(self, dialect, coltype): + return None + + +class AsyncpgJSONB(json.JSONB): + def get_dbapi_type(self, dbapi): + return dbapi.JSONB + + def result_processor(self, dialect, coltype): + return None + + +class AsyncpgJSONIndexType(sqltypes.JSON.JSONIndexType): + def get_dbapi_type(self, dbapi): + raise NotImplementedError("should not be here") + + +class AsyncpgJSONIntIndexType(sqltypes.JSON.JSONIntIndexType): + def get_dbapi_type(self, dbapi): + return dbapi.INTEGER + + +class AsyncpgJSONStrIndexType(sqltypes.JSON.JSONStrIndexType): + def get_dbapi_type(self, dbapi): + return dbapi.STRING + + +class AsyncpgJSONPathType(json.JSONPathType): + def bind_processor(self, dialect): + def process(value): + assert isinstance(value, util.collections_abc.Sequence) + tokens = [util.text_type(elem) for elem in value] + return tokens + + return process + + +class AsyncpgUUID(UUID): + def get_dbapi_type(self, dbapi): + return dbapi.UUID + + def bind_processor(self, dialect): + if not self.as_uuid and dialect.use_native_uuid: + + def process(value): + if value is not None: + value = _python_UUID(value) + return value + + return process + + def result_processor(self, dialect, coltype): + if not self.as_uuid and dialect.use_native_uuid: + + def process(value): + if value is not None: + value = str(value) + return value + + return process + + +class AsyncpgNumeric(sqltypes.Numeric): + def get_dbapi_type(self, dbapi): + return dbapi.NUMBER + + def bind_processor(self, dialect): + return None + + def result_processor(self, dialect, coltype): + if self.asdecimal: + if coltype in _FLOAT_TYPES: + return processors.to_decimal_processor_factory( + decimal.Decimal, self._effective_decimal_return_scale + ) + elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: + # pg8000 returns Decimal natively for 1700 + return None + else: + raise exc.InvalidRequestError( + "Unknown PG numeric type: %d" % coltype + ) + else: + if coltype in _FLOAT_TYPES: + # pg8000 returns float natively for 701 + return None + elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: + return processors.to_float + else: + raise exc.InvalidRequestError( + "Unknown PG numeric type: %d" % coltype + ) + + +class AsyncpgFloat(AsyncpgNumeric): + def get_dbapi_type(self, dbapi): + return dbapi.FLOAT + + +class AsyncpgREGCLASS(REGCLASS): + def get_dbapi_type(self, dbapi): + return dbapi.STRING + + +class AsyncpgOID(OID): + def get_dbapi_type(self, dbapi): + return dbapi.INTEGER + + +class PGExecutionContext_asyncpg(PGExecutionContext): + def handle_dbapi_exception(self, e): + if isinstance( + e, + ( + self.dialect.dbapi.InvalidCachedStatementError, + self.dialect.dbapi.InternalServerError, + ), + ): + self.dialect._invalidate_schema_cache() + + def pre_exec(self): + if self.isddl: + self.dialect._invalidate_schema_cache() + + self.cursor._invalidate_schema_cache_asof = ( + self.dialect._invalidate_schema_cache_asof + ) + + if not self.compiled: + return + + # we have to exclude ENUM because "enum" not really a "type" + # we can cast to, it has to be the name of the type itself. + # for now we just omit it from casting + self.exclude_set_input_sizes = {AsyncAdapt_asyncpg_dbapi.ENUM} + + def create_server_side_cursor(self): + return self._dbapi_connection.cursor(server_side=True) + + +class PGCompiler_asyncpg(PGCompiler): + pass + + +class PGIdentifierPreparer_asyncpg(PGIdentifierPreparer): + pass + + +class AsyncAdapt_asyncpg_cursor: + __slots__ = ( + "_adapt_connection", + "_connection", + "_rows", + "description", + "arraysize", + "rowcount", + "_inputsizes", + "_cursor", + "_invalidate_schema_cache_asof", + ) + + server_side = False + + def __init__(self, adapt_connection): + self._adapt_connection = adapt_connection + self._connection = adapt_connection._connection + self._rows = [] + self._cursor = None + self.description = None + self.arraysize = 1 + self.rowcount = -1 + self._inputsizes = None + self._invalidate_schema_cache_asof = 0 + + def close(self): + self._rows[:] = [] + + def _handle_exception(self, error): + self._adapt_connection._handle_exception(error) + + def _parameter_placeholders(self, params): + if not self._inputsizes: + return tuple("$%d" % idx for idx, _ in enumerate(params, 1)) + else: + return tuple( + "$%d::%s" % (idx, typ) if typ else "$%d" % idx + for idx, typ in enumerate( + (_pg_types.get(typ) for typ in self._inputsizes), 1 + ) + ) + + async def _prepare_and_execute(self, operation, parameters): + adapt_connection = self._adapt_connection + + async with adapt_connection._execute_mutex: + + if not adapt_connection._started: + await adapt_connection._start_transaction() + + if parameters is not None: + operation = operation % self._parameter_placeholders( + parameters + ) + else: + parameters = () + + try: + prepared_stmt, attributes = await adapt_connection._prepare( + operation, self._invalidate_schema_cache_asof + ) + + if attributes: + self.description = [ + ( + attr.name, + attr.type.oid, + None, + None, + None, + None, + None, + ) + for attr in attributes + ] + else: + self.description = None + + if self.server_side: + self._cursor = await prepared_stmt.cursor(*parameters) + self.rowcount = -1 + else: + self._rows = await prepared_stmt.fetch(*parameters) + status = prepared_stmt.get_statusmsg() + + reg = re.match( + r"(?:UPDATE|DELETE|INSERT \d+) (\d+)", status + ) + if reg: + self.rowcount = int(reg.group(1)) + else: + self.rowcount = -1 + + except Exception as error: + self._handle_exception(error) + + async def _executemany(self, operation, seq_of_parameters): + adapt_connection = self._adapt_connection + + async with adapt_connection._execute_mutex: + await adapt_connection._check_type_cache_invalidation( + self._invalidate_schema_cache_asof + ) + + if not adapt_connection._started: + await adapt_connection._start_transaction() + + operation = operation % self._parameter_placeholders( + seq_of_parameters[0] + ) + + try: + return await self._connection.executemany( + operation, seq_of_parameters + ) + except Exception as error: + self._handle_exception(error) + + def execute(self, operation, parameters=None): + self._adapt_connection.await_( + self._prepare_and_execute(operation, parameters) + ) + + def executemany(self, operation, seq_of_parameters): + return self._adapt_connection.await_( + self._executemany(operation, seq_of_parameters) + ) + + def setinputsizes(self, *inputsizes): + self._inputsizes = inputsizes + + def __iter__(self): + while self._rows: + yield self._rows.pop(0) + + def fetchone(self): + if self._rows: + return self._rows.pop(0) + else: + return None + + def fetchmany(self, size=None): + if size is None: + size = self.arraysize + + retval = self._rows[0:size] + self._rows[:] = self._rows[size:] + return retval + + def fetchall(self): + retval = self._rows[:] + self._rows[:] = [] + return retval + + +class AsyncAdapt_asyncpg_ss_cursor(AsyncAdapt_asyncpg_cursor): + + server_side = True + __slots__ = ("_rowbuffer",) + + def __init__(self, adapt_connection): + super(AsyncAdapt_asyncpg_ss_cursor, self).__init__(adapt_connection) + self._rowbuffer = None + + def close(self): + self._cursor = None + self._rowbuffer = None + + def _buffer_rows(self): + new_rows = self._adapt_connection.await_(self._cursor.fetch(50)) + self._rowbuffer = collections.deque(new_rows) + + def __aiter__(self): + return self + + async def __anext__(self): + if not self._rowbuffer: + self._buffer_rows() + + while True: + while self._rowbuffer: + yield self._rowbuffer.popleft() + + self._buffer_rows() + if not self._rowbuffer: + break + + def fetchone(self): + if not self._rowbuffer: + self._buffer_rows() + if not self._rowbuffer: + return None + return self._rowbuffer.popleft() + + def fetchmany(self, size=None): + if size is None: + return self.fetchall() + + if not self._rowbuffer: + self._buffer_rows() + + buf = list(self._rowbuffer) + lb = len(buf) + if size > lb: + buf.extend( + self._adapt_connection.await_(self._cursor.fetch(size - lb)) + ) + + result = buf[0:size] + self._rowbuffer = collections.deque(buf[size:]) + return result + + def fetchall(self): + ret = list(self._rowbuffer) + list( + self._adapt_connection.await_(self._all()) + ) + self._rowbuffer.clear() + return ret + + async def _all(self): + rows = [] + + # TODO: looks like we have to hand-roll some kind of batching here. + # hardcoding for the moment but this should be improved. + while True: + batch = await self._cursor.fetch(1000) + if batch: + rows.extend(batch) + continue + else: + break + return rows + + def executemany(self, operation, seq_of_parameters): + raise NotImplementedError( + "server side cursor doesn't support executemany yet" + ) + + +class AsyncAdapt_asyncpg_connection(AdaptedConnection): + __slots__ = ( + "dbapi", + "_connection", + "isolation_level", + "_isolation_setting", + "readonly", + "deferrable", + "_transaction", + "_started", + "_prepared_statement_cache", + "_invalidate_schema_cache_asof", + "_execute_mutex", + ) + + await_ = staticmethod(await_only) + + def __init__(self, dbapi, connection, prepared_statement_cache_size=100): + self.dbapi = dbapi + self._connection = connection + self.isolation_level = self._isolation_setting = "read_committed" + self.readonly = False + self.deferrable = False + self._transaction = None + self._started = False + self._invalidate_schema_cache_asof = time.time() + self._execute_mutex = asyncio.Lock() + + if prepared_statement_cache_size: + self._prepared_statement_cache = util.LRUCache( + prepared_statement_cache_size + ) + else: + self._prepared_statement_cache = None + + async def _check_type_cache_invalidation(self, invalidate_timestamp): + if invalidate_timestamp > self._invalidate_schema_cache_asof: + await self._connection.reload_schema_state() + self._invalidate_schema_cache_asof = invalidate_timestamp + + async def _prepare(self, operation, invalidate_timestamp): + await self._check_type_cache_invalidation(invalidate_timestamp) + + cache = self._prepared_statement_cache + if cache is None: + prepared_stmt = await self._connection.prepare(operation) + attributes = prepared_stmt.get_attributes() + return prepared_stmt, attributes + + # asyncpg uses a type cache for the "attributes" which seems to go + # stale independently of the PreparedStatement itself, so place that + # collection in the cache as well. + if operation in cache: + prepared_stmt, attributes, cached_timestamp = cache[operation] + + # preparedstatements themselves also go stale for certain DDL + # changes such as size of a VARCHAR changing, so there is also + # a cross-connection invalidation timestamp + if cached_timestamp > invalidate_timestamp: + return prepared_stmt, attributes + + prepared_stmt = await self._connection.prepare(operation) + attributes = prepared_stmt.get_attributes() + cache[operation] = (prepared_stmt, attributes, time.time()) + + return prepared_stmt, attributes + + def _handle_exception(self, error): + if self._connection.is_closed(): + self._transaction = None + self._started = False + + if not isinstance(error, AsyncAdapt_asyncpg_dbapi.Error): + exception_mapping = self.dbapi._asyncpg_error_translate + + for super_ in type(error).__mro__: + if super_ in exception_mapping: + translated_error = exception_mapping[super_]( + "%s: %s" % (type(error), error) + ) + translated_error.pgcode = ( + translated_error.sqlstate + ) = getattr(error, "sqlstate", None) + raise translated_error from error + else: + raise error + else: + raise error + + @property + def autocommit(self): + return self.isolation_level == "autocommit" + + @autocommit.setter + def autocommit(self, value): + if value: + self.isolation_level = "autocommit" + else: + self.isolation_level = self._isolation_setting + + def set_isolation_level(self, level): + if self._started: + self.rollback() + self.isolation_level = self._isolation_setting = level + + async def _start_transaction(self): + if self.isolation_level == "autocommit": + return + + try: + self._transaction = self._connection.transaction( + isolation=self.isolation_level, + readonly=self.readonly, + deferrable=self.deferrable, + ) + await self._transaction.start() + except Exception as error: + self._handle_exception(error) + else: + self._started = True + + def cursor(self, server_side=False): + if server_side: + return AsyncAdapt_asyncpg_ss_cursor(self) + else: + return AsyncAdapt_asyncpg_cursor(self) + + def rollback(self): + if self._started: + try: + self.await_(self._transaction.rollback()) + except Exception as error: + self._handle_exception(error) + finally: + self._transaction = None + self._started = False + + def commit(self): + if self._started: + try: + self.await_(self._transaction.commit()) + except Exception as error: + self._handle_exception(error) + finally: + self._transaction = None + self._started = False + + def close(self): + self.rollback() + + self.await_(self._connection.close()) + + +class AsyncAdaptFallback_asyncpg_connection(AsyncAdapt_asyncpg_connection): + __slots__ = () + + await_ = staticmethod(await_fallback) + + +class AsyncAdapt_asyncpg_dbapi: + def __init__(self, asyncpg): + self.asyncpg = asyncpg + self.paramstyle = "format" + + def connect(self, *arg, **kw): + async_fallback = kw.pop("async_fallback", False) + prepared_statement_cache_size = kw.pop( + "prepared_statement_cache_size", 100 + ) + if util.asbool(async_fallback): + return AsyncAdaptFallback_asyncpg_connection( + self, + await_fallback(self.asyncpg.connect(*arg, **kw)), + prepared_statement_cache_size=prepared_statement_cache_size, + ) + else: + return AsyncAdapt_asyncpg_connection( + self, + await_only(self.asyncpg.connect(*arg, **kw)), + prepared_statement_cache_size=prepared_statement_cache_size, + ) + + class Error(Exception): + pass + + class Warning(Exception): # noqa + pass + + class InterfaceError(Error): + pass + + class DatabaseError(Error): + pass + + class InternalError(DatabaseError): + pass + + class OperationalError(DatabaseError): + pass + + class ProgrammingError(DatabaseError): + pass + + class IntegrityError(DatabaseError): + pass + + class DataError(DatabaseError): + pass + + class NotSupportedError(DatabaseError): + pass + + class InternalServerError(InternalError): + pass + + class InvalidCachedStatementError(NotSupportedError): + def __init__(self, message): + super( + AsyncAdapt_asyncpg_dbapi.InvalidCachedStatementError, self + ).__init__( + message + " (SQLAlchemy asyncpg dialect will now invalidate " + "all prepared caches in response to this exception)", + ) + + @util.memoized_property + def _asyncpg_error_translate(self): + import asyncpg + + return { + asyncpg.exceptions.IntegrityConstraintViolationError: self.IntegrityError, # noqa: E501 + asyncpg.exceptions.PostgresError: self.Error, + asyncpg.exceptions.SyntaxOrAccessError: self.ProgrammingError, + asyncpg.exceptions.InterfaceError: self.InterfaceError, + asyncpg.exceptions.InvalidCachedStatementError: self.InvalidCachedStatementError, # noqa: E501 + asyncpg.exceptions.InternalServerError: self.InternalServerError, + } + + def Binary(self, value): + return value + + STRING = util.symbol("STRING") + TIMESTAMP = util.symbol("TIMESTAMP") + TIMESTAMP_W_TZ = util.symbol("TIMESTAMP_W_TZ") + TIME = util.symbol("TIME") + TIME_W_TZ = util.symbol("TIME_W_TZ") + DATE = util.symbol("DATE") + INTERVAL = util.symbol("INTERVAL") + NUMBER = util.symbol("NUMBER") + FLOAT = util.symbol("FLOAT") + BOOLEAN = util.symbol("BOOLEAN") + INTEGER = util.symbol("INTEGER") + BIGINTEGER = util.symbol("BIGINTEGER") + BYTES = util.symbol("BYTES") + DECIMAL = util.symbol("DECIMAL") + JSON = util.symbol("JSON") + JSONB = util.symbol("JSONB") + ENUM = util.symbol("ENUM") + UUID = util.symbol("UUID") + BYTEA = util.symbol("BYTEA") + + DATETIME = TIMESTAMP + BINARY = BYTEA + + +_pg_types = { + AsyncAdapt_asyncpg_dbapi.STRING: "varchar", + AsyncAdapt_asyncpg_dbapi.TIMESTAMP: "timestamp", + AsyncAdapt_asyncpg_dbapi.TIMESTAMP_W_TZ: "timestamp with time zone", + AsyncAdapt_asyncpg_dbapi.DATE: "date", + AsyncAdapt_asyncpg_dbapi.TIME: "time", + AsyncAdapt_asyncpg_dbapi.TIME_W_TZ: "time with time zone", + AsyncAdapt_asyncpg_dbapi.INTERVAL: "interval", + AsyncAdapt_asyncpg_dbapi.NUMBER: "numeric", + AsyncAdapt_asyncpg_dbapi.FLOAT: "float", + AsyncAdapt_asyncpg_dbapi.BOOLEAN: "bool", + AsyncAdapt_asyncpg_dbapi.INTEGER: "integer", + AsyncAdapt_asyncpg_dbapi.BIGINTEGER: "bigint", + AsyncAdapt_asyncpg_dbapi.BYTES: "bytes", + AsyncAdapt_asyncpg_dbapi.DECIMAL: "decimal", + AsyncAdapt_asyncpg_dbapi.JSON: "json", + AsyncAdapt_asyncpg_dbapi.JSONB: "jsonb", + AsyncAdapt_asyncpg_dbapi.ENUM: "enum", + AsyncAdapt_asyncpg_dbapi.UUID: "uuid", + AsyncAdapt_asyncpg_dbapi.BYTEA: "bytea", +} + + +class PGDialect_asyncpg(PGDialect): + driver = "asyncpg" + supports_statement_cache = True + + supports_unicode_statements = True + supports_server_side_cursors = True + + supports_unicode_binds = True + + default_paramstyle = "format" + supports_sane_multi_rowcount = False + execution_ctx_cls = PGExecutionContext_asyncpg + statement_compiler = PGCompiler_asyncpg + preparer = PGIdentifierPreparer_asyncpg + + use_setinputsizes = True + + use_native_uuid = True + + colspecs = util.update_copy( + PGDialect.colspecs, + { + sqltypes.Time: AsyncpgTime, + sqltypes.Date: AsyncpgDate, + sqltypes.DateTime: AsyncpgDateTime, + sqltypes.Interval: AsyncPgInterval, + INTERVAL: AsyncPgInterval, + UUID: AsyncpgUUID, + sqltypes.Boolean: AsyncpgBoolean, + sqltypes.Integer: AsyncpgInteger, + sqltypes.BigInteger: AsyncpgBigInteger, + sqltypes.Numeric: AsyncpgNumeric, + sqltypes.Float: AsyncpgFloat, + sqltypes.JSON: AsyncpgJSON, + json.JSONB: AsyncpgJSONB, + sqltypes.JSON.JSONPathType: AsyncpgJSONPathType, + sqltypes.JSON.JSONIndexType: AsyncpgJSONIndexType, + sqltypes.JSON.JSONIntIndexType: AsyncpgJSONIntIndexType, + sqltypes.JSON.JSONStrIndexType: AsyncpgJSONStrIndexType, + sqltypes.Enum: AsyncPgEnum, + OID: AsyncpgOID, + REGCLASS: AsyncpgREGCLASS, + }, + ) + is_async = True + _invalidate_schema_cache_asof = 0 + + def _invalidate_schema_cache(self): + self._invalidate_schema_cache_asof = time.time() + + @util.memoized_property + def _dbapi_version(self): + if self.dbapi and hasattr(self.dbapi, "__version__"): + return tuple( + [ + int(x) + for x in re.findall( + r"(\d+)(?:[-\.]?|$)", self.dbapi.__version__ + ) + ] + ) + else: + return (99, 99, 99) + + @classmethod + def dbapi(cls): + return AsyncAdapt_asyncpg_dbapi(__import__("asyncpg")) + + @util.memoized_property + def _isolation_lookup(self): + return { + "AUTOCOMMIT": "autocommit", + "READ COMMITTED": "read_committed", + "REPEATABLE READ": "repeatable_read", + "SERIALIZABLE": "serializable", + } + + def set_isolation_level(self, connection, level): + try: + level = self._isolation_lookup[level.replace("_", " ")] + except KeyError as err: + util.raise_( + exc.ArgumentError( + "Invalid value '%s' for isolation_level. " + "Valid isolation levels for %s are %s" + % (level, self.name, ", ".join(self._isolation_lookup)) + ), + replace_context=err, + ) + + connection.set_isolation_level(level) + + def set_readonly(self, connection, value): + connection.readonly = value + + def get_readonly(self, connection): + return connection.readonly + + def set_deferrable(self, connection, value): + connection.deferrable = value + + def get_deferrable(self, connection): + return connection.deferrable + + def create_connect_args(self, url): + opts = url.translate_connect_args(username="user") + + opts.update(url.query) + util.coerce_kw_type(opts, "prepared_statement_cache_size", int) + util.coerce_kw_type(opts, "port", int) + return ([], opts) + + @classmethod + def get_pool_class(cls, url): + + async_fallback = url.query.get("async_fallback", False) + + if util.asbool(async_fallback): + return pool.FallbackAsyncAdaptedQueuePool + else: + return pool.AsyncAdaptedQueuePool + + def is_disconnect(self, e, connection, cursor): + if connection: + return connection._connection.is_closed() + else: + return isinstance( + e, self.dbapi.InterfaceError + ) and "connection is closed" in str(e) + + def do_set_input_sizes(self, cursor, list_of_tuples, context): + if self.positional: + cursor.setinputsizes( + *[dbtype for key, dbtype, sqltype in list_of_tuples] + ) + else: + cursor.setinputsizes( + **{ + key: dbtype + for key, dbtype, sqltype in list_of_tuples + if dbtype + } + ) + + async def setup_asyncpg_json_codec(self, conn): + """set up JSON codec for asyncpg. + + This occurs for all new connections and + can be overridden by third party dialects. + + .. versionadded:: 1.4.27 + + """ + + asyncpg_connection = conn._connection + deserializer = self._json_deserializer or _py_json.loads + + def _json_decoder(bin_value): + return deserializer(bin_value.decode()) + + await asyncpg_connection.set_type_codec( + "json", + encoder=str.encode, + decoder=_json_decoder, + schema="pg_catalog", + format="binary", + ) + + async def setup_asyncpg_jsonb_codec(self, conn): + """set up JSONB codec for asyncpg. + + This occurs for all new connections and + can be overridden by third party dialects. + + .. versionadded:: 1.4.27 + + """ + + asyncpg_connection = conn._connection + deserializer = self._json_deserializer or _py_json.loads + + def _jsonb_encoder(str_value): + # \x01 is the prefix for jsonb used by PostgreSQL. + # asyncpg requires it when format='binary' + return b"\x01" + str_value.encode() + + deserializer = self._json_deserializer or _py_json.loads + + def _jsonb_decoder(bin_value): + # the byte is the \x01 prefix for jsonb used by PostgreSQL. + # asyncpg returns it when format='binary' + return deserializer(bin_value[1:].decode()) + + await asyncpg_connection.set_type_codec( + "jsonb", + encoder=_jsonb_encoder, + decoder=_jsonb_decoder, + schema="pg_catalog", + format="binary", + ) + + def on_connect(self): + """on_connect for asyncpg + + A major component of this for asyncpg is to set up type decoders at the + asyncpg level. + + See https://github.com/MagicStack/asyncpg/issues/623 for + notes on JSON/JSONB implementation. + + """ + + super_connect = super(PGDialect_asyncpg, self).on_connect() + + def connect(conn): + conn.await_(self.setup_asyncpg_json_codec(conn)) + conn.await_(self.setup_asyncpg_jsonb_codec(conn)) + if super_connect is not None: + super_connect(conn) + + return connect + + def get_driver_connection(self, connection): + return connection._connection + + +dialect = PGDialect_asyncpg diff --git a/lib/sqlalchemy/dialects/postgresql/base.py b/lib/sqlalchemy/dialects/postgresql/base.py new file mode 100644 index 0000000..eb84170 --- /dev/null +++ b/lib/sqlalchemy/dialects/postgresql/base.py @@ -0,0 +1,4651 @@ +# postgresql/base.py +# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors +# +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php + +r""" +.. dialect:: postgresql + :name: PostgreSQL + :full_support: 9.6, 10, 11, 12, 13, 14 + :normal_support: 9.6+ + :best_effort: 8+ + +.. _postgresql_sequences: + +Sequences/SERIAL/IDENTITY +------------------------- + +PostgreSQL supports sequences, and SQLAlchemy uses these as the default means +of creating new primary key values for integer-based primary key columns. When +creating tables, SQLAlchemy will issue the ``SERIAL`` datatype for +integer-based primary key columns, which generates a sequence and server side +default corresponding to the column. + +To specify a specific named sequence to be used for primary key generation, +use the :func:`~sqlalchemy.schema.Sequence` construct:: + + Table('sometable', metadata, + Column('id', Integer, Sequence('some_id_seq'), primary_key=True) + ) + +When SQLAlchemy issues a single INSERT statement, to fulfill the contract of +having the "last insert identifier" available, a RETURNING clause is added to +the INSERT statement which specifies the primary key columns should be +returned after the statement completes. The RETURNING functionality only takes +place if PostgreSQL 8.2 or later is in use. As a fallback approach, the +sequence, whether specified explicitly or implicitly via ``SERIAL``, is +executed independently beforehand, the returned value to be used in the +subsequent insert. Note that when an +:func:`~sqlalchemy.sql.expression.insert()` construct is executed using +"executemany" semantics, the "last inserted identifier" functionality does not +apply; no RETURNING clause is emitted nor is the sequence pre-executed in this +case. + +To force the usage of RETURNING by default off, specify the flag +``implicit_returning=False`` to :func:`_sa.create_engine`. + +PostgreSQL 10 and above IDENTITY columns +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +PostgreSQL 10 and above have a new IDENTITY feature that supersedes the use +of SERIAL. The :class:`_schema.Identity` construct in a +:class:`_schema.Column` can be used to control its behavior:: + + from sqlalchemy import Table, Column, MetaData, Integer, Computed + + metadata = MetaData() + + data = Table( + "data", + metadata, + Column( + 'id', Integer, Identity(start=42, cycle=True), primary_key=True + ), + Column('data', String) + ) + +The CREATE TABLE for the above :class:`_schema.Table` object would be: + +.. sourcecode:: sql + + CREATE TABLE data ( + id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 42 CYCLE), + data VARCHAR, + PRIMARY KEY (id) + ) + +.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct + in a :class:`_schema.Column` to specify the option of an autoincrementing + column. + +.. note:: + + Previous versions of SQLAlchemy did not have built-in support for rendering + of IDENTITY, and could use the following compilation hook to replace + occurrences of SERIAL with IDENTITY:: + + from sqlalchemy.schema import CreateColumn + from sqlalchemy.ext.compiler import compiles + + + @compiles(CreateColumn, 'postgresql') + def use_identity(element, compiler, **kw): + text = compiler.visit_create_column(element, **kw) + text = text.replace( + "SERIAL", "INT GENERATED BY DEFAULT AS IDENTITY" + ) + return text + + Using the above, a table such as:: + + t = Table( + 't', m, + Column('id', Integer, primary_key=True), + Column('data', String) + ) + + Will generate on the backing database as:: + + CREATE TABLE t ( + id INT GENERATED BY DEFAULT AS IDENTITY, + data VARCHAR, + PRIMARY KEY (id) + ) + +.. _postgresql_ss_cursors: + +Server Side Cursors +------------------- + +Server-side cursor support is available for the psycopg2, asyncpg +dialects and may also be available in others. + +Server side cursors are enabled on a per-statement basis by using the +:paramref:`.Connection.execution_options.stream_results` connection execution +option:: + + with engine.connect() as conn: + result = conn.execution_options(stream_results=True).execute(text("select * from table")) + +Note that some kinds of SQL statements may not be supported with +server side cursors; generally, only SQL statements that return rows should be +used with this option. + +.. deprecated:: 1.4 The dialect-level server_side_cursors flag is deprecated + and will be removed in a future release. Please use the + :paramref:`_engine.Connection.stream_results` execution option for + unbuffered cursor support. + +.. seealso:: + + :ref:`engine_stream_results` + +.. _postgresql_isolation_level: + +Transaction Isolation Level +--------------------------- + +Most SQLAlchemy dialects support setting of transaction isolation level +using the :paramref:`_sa.create_engine.isolation_level` parameter +at the :func:`_sa.create_engine` level, and at the :class:`_engine.Connection` +level via the :paramref:`.Connection.execution_options.isolation_level` +parameter. + +For PostgreSQL dialects, this feature works either by making use of the +DBAPI-specific features, such as psycopg2's isolation level flags which will +embed the isolation level setting inline with the ``"BEGIN"`` statement, or for +DBAPIs with no direct support by emitting ``SET SESSION CHARACTERISTICS AS +TRANSACTION ISOLATION LEVEL `` ahead of the ``"BEGIN"`` statement +emitted by the DBAPI. For the special AUTOCOMMIT isolation level, +DBAPI-specific techniques are used which is typically an ``.autocommit`` +flag on the DBAPI connection object. + +To set isolation level using :func:`_sa.create_engine`:: + + engine = create_engine( + "postgresql+pg8000://scott:tiger@localhost/test", + isolation_level = "REPEATABLE READ" + ) + +To set using per-connection execution options:: + + with engine.connect() as conn: + conn = conn.execution_options( + isolation_level="REPEATABLE READ" + ) + with conn.begin(): + # ... work with transaction + +There are also more options for isolation level configurations, such as +"sub-engine" objects linked to a main :class:`_engine.Engine` which each apply +different isolation level settings. See the discussion at +:ref:`dbapi_autocommit` for background. + +Valid values for ``isolation_level`` on most PostgreSQL dialects include: + +* ``READ COMMITTED`` +* ``READ UNCOMMITTED`` +* ``REPEATABLE READ`` +* ``SERIALIZABLE`` +* ``AUTOCOMMIT`` + +.. seealso:: + + :ref:`dbapi_autocommit` + + :ref:`postgresql_readonly_deferrable` + + :ref:`psycopg2_isolation_level` + + :ref:`pg8000_isolation_level` + +.. _postgresql_readonly_deferrable: + +Setting READ ONLY / DEFERRABLE +------------------------------ + +Most PostgreSQL dialects support setting the "READ ONLY" and "DEFERRABLE" +characteristics of the transaction, which is in addition to the isolation level +setting. These two attributes can be established either in conjunction with or +independently of the isolation level by passing the ``postgresql_readonly`` and +``postgresql_deferrable`` flags with +:meth:`_engine.Connection.execution_options`. The example below illustrates +passing the ``"SERIALIZABLE"`` isolation level at the same time as setting +"READ ONLY" and "DEFERRABLE":: + + with engine.connect() as conn: + conn = conn.execution_options( + isolation_level="SERIALIZABLE", + postgresql_readonly=True, + postgresql_deferrable=True + ) + with conn.begin(): + # ... work with transaction + +Note that some DBAPIs such as asyncpg only support "readonly" with +SERIALIZABLE isolation. + +.. versionadded:: 1.4 added support for the ``postgresql_readonly`` + and ``postgresql_deferrable`` execution options. + +.. _postgresql_alternate_search_path: + +Setting Alternate Search Paths on Connect +------------------------------------------ + +The PostgreSQL ``search_path`` variable refers to the list of schema names +that will be implicitly referred towards when a particular table or other +object is referenced in a SQL statement. As detailed in the next section +:ref:`postgresql_schema_reflection`, SQLAlchemy is generally organized around +the concept of keeping this variable at its default value of ``public``, +however, in order to have it set to any arbitrary name or names when connections +are used automatically, the "SET SESSION search_path" command may be invoked +for all connections in a pool using the following event handler, as discussed +at :ref:`schema_set_default_connections`:: + + from sqlalchemy import event + from sqlalchemy import create_engine + + engine = create_engine("postgresql+psycopg2://scott:tiger@host/dbname") + + @event.listens_for(engine, "connect", insert=True) + def set_search_path(dbapi_connection, connection_record): + existing_autocommit = dbapi_connection.autocommit + dbapi_connection.autocommit = True + cursor = dbapi_connection.cursor() + cursor.execute("SET SESSION search_path='%s'" % schema_name) + cursor.close() + dbapi_connection.autocommit = existing_autocommit + +The reason the recipe is complicated by use of the ``.autocommit`` DBAPI +attribute is so that when the ``SET SESSION search_path`` directive is invoked, +it is invoked outside of the scope of any transaction and therefore will not +be reverted when the DBAPI connection has a rollback. + +.. seealso:: + + :ref:`schema_set_default_connections` - in the :ref:`metadata_toplevel` documentation + + + + +.. _postgresql_schema_reflection: + +Remote-Schema Table Introspection and PostgreSQL search_path +------------------------------------------------------------ + +.. admonition:: Section Best Practices Summarized + + keep the ``search_path`` variable set to its default of ``public``, without + any other schema names. For other schema names, name these explicitly + within :class:`_schema.Table` definitions. Alternatively, the + ``postgresql_ignore_search_path`` option will cause all reflected + :class:`_schema.Table` objects to have a :attr:`_schema.Table.schema` + attribute set up. + +The PostgreSQL dialect can reflect tables from any schema, as outlined in +:ref:`metadata_reflection_schemas`. + +With regards to tables which these :class:`_schema.Table` +objects refer to via foreign key constraint, a decision must be made as to how +the ``.schema`` is represented in those remote tables, in the case where that +remote schema name is also a member of the current +`PostgreSQL search path +`_. + +By default, the PostgreSQL dialect mimics the behavior encouraged by +PostgreSQL's own ``pg_get_constraintdef()`` builtin procedure. This function +returns a sample definition for a particular foreign key constraint, +omitting the referenced schema name from that definition when the name is +also in the PostgreSQL schema search path. The interaction below +illustrates this behavior:: + + test=> CREATE TABLE test_schema.referred(id INTEGER PRIMARY KEY); + CREATE TABLE + test=> CREATE TABLE referring( + test(> id INTEGER PRIMARY KEY, + test(> referred_id INTEGER REFERENCES test_schema.referred(id)); + CREATE TABLE + test=> SET search_path TO public, test_schema; + test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM + test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n + test-> ON n.oid = c.relnamespace + test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid + test-> WHERE c.relname='referring' AND r.contype = 'f' + test-> ; + pg_get_constraintdef + --------------------------------------------------- + FOREIGN KEY (referred_id) REFERENCES referred(id) + (1 row) + +Above, we created a table ``referred`` as a member of the remote schema +``test_schema``, however when we added ``test_schema`` to the +PG ``search_path`` and then asked ``pg_get_constraintdef()`` for the +``FOREIGN KEY`` syntax, ``test_schema`` was not included in the output of +the function. + +On the other hand, if we set the search path back to the typical default +of ``public``:: + + test=> SET search_path TO public; + SET + +The same query against ``pg_get_constraintdef()`` now returns the fully +schema-qualified name for us:: + + test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM + test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n + test-> ON n.oid = c.relnamespace + test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid + test-> WHERE c.relname='referring' AND r.contype = 'f'; + pg_get_constraintdef + --------------------------------------------------------------- + FOREIGN KEY (referred_id) REFERENCES test_schema.referred(id) + (1 row) + +SQLAlchemy will by default use the return value of ``pg_get_constraintdef()`` +in order to determine the remote schema name. That is, if our ``search_path`` +were set to include ``test_schema``, and we invoked a table +reflection process as follows:: + + >>> from sqlalchemy import Table, MetaData, create_engine, text + >>> engine = create_engine("postgresql://scott:tiger@localhost/test") + >>> with engine.connect() as conn: + ... conn.execute(text("SET search_path TO test_schema, public")) + ... metadata_obj = MetaData() + ... referring = Table('referring', metadata_obj, + ... autoload_with=conn) + ... + + +The above process would deliver to the :attr:`_schema.MetaData.tables` +collection +``referred`` table named **without** the schema:: + + >>> metadata_obj.tables['referred'].schema is None + True + +To alter the behavior of reflection such that the referred schema is +maintained regardless of the ``search_path`` setting, use the +``postgresql_ignore_search_path`` option, which can be specified as a +dialect-specific argument to both :class:`_schema.Table` as well as +:meth:`_schema.MetaData.reflect`:: + + >>> with engine.connect() as conn: + ... conn.execute(text("SET search_path TO test_schema, public")) + ... metadata_obj = MetaData() + ... referring = Table('referring', metadata_obj, + ... autoload_with=conn, + ... postgresql_ignore_search_path=True) + ... + + +We will now have ``test_schema.referred`` stored as schema-qualified:: + + >>> metadata_obj.tables['test_schema.referred'].schema + 'test_schema' + +.. sidebar:: Best Practices for PostgreSQL Schema reflection + + The description of PostgreSQL schema reflection behavior is complex, and + is the product of many years of dealing with widely varied use cases and + user preferences. But in fact, there's no need to understand any of it if + you just stick to the simplest use pattern: leave the ``search_path`` set + to its default of ``public`` only, never refer to the name ``public`` as + an explicit schema name otherwise, and refer to all other schema names + explicitly when building up a :class:`_schema.Table` object. The options + described here are only for those users who can't, or prefer not to, stay + within these guidelines. + +Note that **in all cases**, the "default" schema is always reflected as +``None``. The "default" schema on PostgreSQL is that which is returned by the +PostgreSQL ``current_schema()`` function. On a typical PostgreSQL +installation, this is the name ``public``. So a table that refers to another +which is in the ``public`` (i.e. default) schema will always have the +``.schema`` attribute set to ``None``. + +.. seealso:: + + :ref:`reflection_schema_qualified_interaction` - discussion of the issue + from a backend-agnostic perspective + + `The Schema Search Path + `_ + - on the PostgreSQL website. + +INSERT/UPDATE...RETURNING +------------------------- + +The dialect supports PG 8.2's ``INSERT..RETURNING``, ``UPDATE..RETURNING`` and +``DELETE..RETURNING`` syntaxes. ``INSERT..RETURNING`` is used by default +for single-row INSERT statements in order to fetch newly generated +primary key identifiers. To specify an explicit ``RETURNING`` clause, +use the :meth:`._UpdateBase.returning` method on a per-statement basis:: + + # INSERT..RETURNING + result = table.insert().returning(table.c.col1, table.c.col2).\ + values(name='foo') + print(result.fetchall()) + + # UPDATE..RETURNING + result = table.update().returning(table.c.col1, table.c.col2).\ + where(table.c.name=='foo').values(name='bar') + print(result.fetchall()) + + # DELETE..RETURNING + result = table.delete().returning(table.c.col1, table.c.col2).\ + where(table.c.name=='foo') + print(result.fetchall()) + +.. _postgresql_insert_on_conflict: + +INSERT...ON CONFLICT (Upsert) +------------------------------ + +Starting with version 9.5, PostgreSQL allows "upserts" (update or insert) of +rows into a table via the ``ON CONFLICT`` clause of the ``INSERT`` statement. A +candidate row will only be inserted if that row does not violate any unique +constraints. In the case of a unique constraint violation, a secondary action +can occur which can be either "DO UPDATE", indicating that the data in the +target row should be updated, or "DO NOTHING", which indicates to silently skip +this row. + +Conflicts are determined using existing unique constraints and indexes. These +constraints may be identified either using their name as stated in DDL, +or they may be inferred by stating the columns and conditions that comprise +the indexes. + +SQLAlchemy provides ``ON CONFLICT`` support via the PostgreSQL-specific +:func:`_postgresql.insert()` function, which provides +the generative methods :meth:`_postgresql.Insert.on_conflict_do_update` +and :meth:`~.postgresql.Insert.on_conflict_do_nothing`: + +.. sourcecode:: pycon+sql + + >>> from sqlalchemy.dialects.postgresql import insert + >>> insert_stmt = insert(my_table).values( + ... id='some_existing_id', + ... data='inserted value') + >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing( + ... index_elements=['id'] + ... ) + >>> print(do_nothing_stmt) + {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) + ON CONFLICT (id) DO NOTHING + {stop} + + >>> do_update_stmt = insert_stmt.on_conflict_do_update( + ... constraint='pk_my_table', + ... set_=dict(data='updated value') + ... ) + >>> print(do_update_stmt) + {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) + ON CONFLICT ON CONSTRAINT pk_my_table DO UPDATE SET data = %(param_1)s + +.. versionadded:: 1.1 + +.. seealso:: + + `INSERT .. ON CONFLICT + `_ + - in the PostgreSQL documentation. + +Specifying the Target +^^^^^^^^^^^^^^^^^^^^^ + +Both methods supply the "target" of the conflict using either the +named constraint or by column inference: + +* The :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` argument + specifies a sequence containing string column names, :class:`_schema.Column` + objects, and/or SQL expression elements, which would identify a unique + index: + + .. sourcecode:: pycon+sql + + >>> do_update_stmt = insert_stmt.on_conflict_do_update( + ... index_elements=['id'], + ... set_=dict(data='updated value') + ... ) + >>> print(do_update_stmt) + {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) + ON CONFLICT (id) DO UPDATE SET data = %(param_1)s + {stop} + + >>> do_update_stmt = insert_stmt.on_conflict_do_update( + ... index_elements=[my_table.c.id], + ... set_=dict(data='updated value') + ... ) + >>> print(do_update_stmt) + {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) + ON CONFLICT (id) DO UPDATE SET data = %(param_1)s + +* When using :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` to + infer an index, a partial index can be inferred by also specifying the + use the :paramref:`_postgresql.Insert.on_conflict_do_update.index_where` parameter: + + .. sourcecode:: pycon+sql + + >>> stmt = insert(my_table).values(user_email='a@b.com', data='inserted data') + >>> stmt = stmt.on_conflict_do_update( + ... index_elements=[my_table.c.user_email], + ... index_where=my_table.c.user_email.like('%@gmail.com'), + ... set_=dict(data=stmt.excluded.data) + ... ) + >>> print(stmt) + {opensql}INSERT INTO my_table (data, user_email) + VALUES (%(data)s, %(user_email)s) ON CONFLICT (user_email) + WHERE user_email LIKE %(user_email_1)s DO UPDATE SET data = excluded.data + +* The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument is + used to specify an index directly rather than inferring it. This can be + the name of a UNIQUE constraint, a PRIMARY KEY constraint, or an INDEX: + + .. sourcecode:: pycon+sql + + >>> do_update_stmt = insert_stmt.on_conflict_do_update( + ... constraint='my_table_idx_1', + ... set_=dict(data='updated value') + ... ) + >>> print(do_update_stmt) + {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) + ON CONFLICT ON CONSTRAINT my_table_idx_1 DO UPDATE SET data = %(param_1)s + {stop} + + >>> do_update_stmt = insert_stmt.on_conflict_do_update( + ... constraint='my_table_pk', + ... set_=dict(data='updated value') + ... ) + >>> print(do_update_stmt) + {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) + ON CONFLICT ON CONSTRAINT my_table_pk DO UPDATE SET data = %(param_1)s + {stop} + +* The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument may + also refer to a SQLAlchemy construct representing a constraint, + e.g. :class:`.UniqueConstraint`, :class:`.PrimaryKeyConstraint`, + :class:`.Index`, or :class:`.ExcludeConstraint`. In this use, + if the constraint has a name, it is used directly. Otherwise, if the + constraint is unnamed, then inference will be used, where the expressions + and optional WHERE clause of the constraint will be spelled out in the + construct. This use is especially convenient + to refer to the named or unnamed primary key of a :class:`_schema.Table` + using the + :attr:`_schema.Table.primary_key` attribute: + + .. sourcecode:: pycon+sql + + >>> do_update_stmt = insert_stmt.on_conflict_do_update( + ... constraint=my_table.primary_key, + ... set_=dict(data='updated value') + ... ) + >>> print(do_update_stmt) + {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) + ON CONFLICT (id) DO UPDATE SET data = %(param_1)s + +The SET Clause +^^^^^^^^^^^^^^^ + +``ON CONFLICT...DO UPDATE`` is used to perform an update of the already +existing row, using any combination of new values as well as values +from the proposed insertion. These values are specified using the +:paramref:`_postgresql.Insert.on_conflict_do_update.set_` parameter. This +parameter accepts a dictionary which consists of direct values +for UPDATE: + +.. sourcecode:: pycon+sql + + >>> stmt = insert(my_table).values(id='some_id', data='inserted value') + >>> do_update_stmt = stmt.on_conflict_do_update( + ... index_elements=['id'], + ... set_=dict(data='updated value') + ... ) + >>> print(do_update_stmt) + {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) + ON CONFLICT (id) DO UPDATE SET data = %(param_1)s + +.. warning:: + + The :meth:`_expression.Insert.on_conflict_do_update` + method does **not** take into + account Python-side default UPDATE values or generation functions, e.g. + those specified using :paramref:`_schema.Column.onupdate`. + These values will not be exercised for an ON CONFLICT style of UPDATE, + unless they are manually specified in the + :paramref:`_postgresql.Insert.on_conflict_do_update.set_` dictionary. + +Updating using the Excluded INSERT Values +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +In order to refer to the proposed insertion row, the special alias +:attr:`~.postgresql.Insert.excluded` is available as an attribute on +the :class:`_postgresql.Insert` object; this object is a +:class:`_expression.ColumnCollection` +which alias contains all columns of the target +table: + +.. sourcecode:: pycon+sql + + >>> stmt = insert(my_table).values( + ... id='some_id', + ... data='inserted value', + ... author='jlh' + ... ) + >>> do_update_stmt = stmt.on_conflict_do_update( + ... index_elements=['id'], + ... set_=dict(data='updated value', author=stmt.excluded.author) + ... ) + >>> print(do_update_stmt) + {opensql}INSERT INTO my_table (id, data, author) + VALUES (%(id)s, %(data)s, %(author)s) + ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author + +Additional WHERE Criteria +^^^^^^^^^^^^^^^^^^^^^^^^^ + +The :meth:`_expression.Insert.on_conflict_do_update` method also accepts +a WHERE clause using the :paramref:`_postgresql.Insert.on_conflict_do_update.where` +parameter, which will limit those rows which receive an UPDATE: + +.. sourcecode:: pycon+sql + + >>> stmt = insert(my_table).values( + ... id='some_id', + ... data='inserted value', + ... author='jlh' + ... ) + >>> on_update_stmt = stmt.on_conflict_do_update( + ... index_elements=['id'], + ... set_=dict(data='updated value', author=stmt.excluded.author), + ... where=(my_table.c.status == 2) + ... ) + >>> print(on_update_stmt) + {opensql}INSERT INTO my_table (id, data, author) + VALUES (%(id)s, %(data)s, %(author)s) + ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author + WHERE my_table.status = %(status_1)s + +Skipping Rows with DO NOTHING +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +``ON CONFLICT`` may be used to skip inserting a row entirely +if any conflict with a unique or exclusion constraint occurs; below +this is illustrated using the +:meth:`~.postgresql.Insert.on_conflict_do_nothing` method: + +.. sourcecode:: pycon+sql + + >>> stmt = insert(my_table).values(id='some_id', data='inserted value') + >>> stmt = stmt.on_conflict_do_nothing(index_elements=['id']) + >>> print(stmt) + {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) + ON CONFLICT (id) DO NOTHING + +If ``DO NOTHING`` is used without specifying any columns or constraint, +it has the effect of skipping the INSERT for any unique or exclusion +constraint violation which occurs: + +.. sourcecode:: pycon+sql + + >>> stmt = insert(my_table).values(id='some_id', data='inserted value') + >>> stmt = stmt.on_conflict_do_nothing() + >>> print(stmt) + {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) + ON CONFLICT DO NOTHING + +.. _postgresql_match: + +Full Text Search +---------------- + +SQLAlchemy makes available the PostgreSQL ``@@`` operator via the +:meth:`_expression.ColumnElement.match` method on any textual column expression. + +On the PostgreSQL dialect, an expression like the following:: + + select(sometable.c.text.match("search string")) + +will emit to the database:: + + SELECT text @@ to_tsquery('search string') FROM table + +Various other PostgreSQL text search functions such as ``to_tsquery()``, +``to_tsvector()``, and ``plainto_tsquery()`` are available by explicitly using +the standard SQLAlchemy :data:`.func` construct. + +For example:: + + select(func.to_tsvector('fat cats ate rats').match('cat & rat')) + +Emits the equivalent of:: + + SELECT to_tsvector('fat cats ate rats') @@ to_tsquery('cat & rat') + +The :class:`_postgresql.TSVECTOR` type can provide for explicit CAST:: + + from sqlalchemy.dialects.postgresql import TSVECTOR + from sqlalchemy import select, cast + select(cast("some text", TSVECTOR)) + +produces a statement equivalent to:: + + SELECT CAST('some text' AS TSVECTOR) AS anon_1 + +.. tip:: + + It's important to remember that text searching in PostgreSQL is powerful but complicated, + and SQLAlchemy users are advised to reference the PostgreSQL documentation + regarding + `Full Text Search `_. + + There are important differences between ``to_tsquery`` and + ``plainto_tsquery``, the most significant of which is that ``to_tsquery`` + expects specially formatted "querytext" that is written to PostgreSQL's own + specification, while ``plainto_tsquery`` expects unformatted text that is + transformed into ``to_tsquery`` compatible querytext. This means the input to + ``.match()`` under PostgreSQL may be incompatible with the input to + ``.match()`` under another database backend. SQLAlchemy users who support + multiple backends are advised to carefully implement their usage of + ``.match()`` to work around these constraints. + +Full Text Searches in PostgreSQL are influenced by a combination of: the +PostgreSQL setting of ``default_text_search_config``, the ``regconfig`` used +to build the GIN/GiST indexes, and the ``regconfig`` optionally passed in +during a query. + +When performing a Full Text Search against a column that has a GIN or +GiST index that is already pre-computed (which is common on full text +searches) one may need to explicitly pass in a particular PostgreSQL +``regconfig`` value to ensure the query-planner utilizes the index and does +not re-compute the column on demand. + +In order to provide for this explicit query planning, or to use different +search strategies, the ``match`` method accepts a ``postgresql_regconfig`` +keyword argument:: + + select(mytable.c.id).where( + mytable.c.title.match('somestring', postgresql_regconfig='english') + ) + +Emits the equivalent of:: + + SELECT mytable.id FROM mytable + WHERE mytable.title @@ to_tsquery('english', 'somestring') + +One can also specifically pass in a `'regconfig'` value to the +``to_tsvector()`` command as the initial argument:: + + select(mytable.c.id).where( + func.to_tsvector('english', mytable.c.title )\ + .match('somestring', postgresql_regconfig='english') + ) + +produces a statement equivalent to:: + + SELECT mytable.id FROM mytable + WHERE to_tsvector('english', mytable.title) @@ + to_tsquery('english', 'somestring') + +It is recommended that you use the ``EXPLAIN ANALYZE...`` tool from +PostgreSQL to ensure that you are generating queries with SQLAlchemy that +take full advantage of any indexes you may have created for full text search. + +.. seealso:: + + `Full Text Search `_ - in the PostgreSQL documentation + + +FROM ONLY ... +------------- + +The dialect supports PostgreSQL's ONLY keyword for targeting only a particular +table in an inheritance hierarchy. This can be used to produce the +``SELECT ... FROM ONLY``, ``UPDATE ONLY ...``, and ``DELETE FROM ONLY ...`` +syntaxes. It uses SQLAlchemy's hints mechanism:: + + # SELECT ... FROM ONLY ... + result = table.select().with_hint(table, 'ONLY', 'postgresql') + print(result.fetchall()) + + # UPDATE ONLY ... + table.update(values=dict(foo='bar')).with_hint('ONLY', + dialect_name='postgresql') + + # DELETE FROM ONLY ... + table.delete().with_hint('ONLY', dialect_name='postgresql') + + +.. _postgresql_indexes: + +PostgreSQL-Specific Index Options +--------------------------------- + +Several extensions to the :class:`.Index` construct are available, specific +to the PostgreSQL dialect. + +Covering Indexes +^^^^^^^^^^^^^^^^ + +The ``postgresql_include`` option renders INCLUDE(colname) for the given +string names:: + + Index("my_index", table.c.x, postgresql_include=['y']) + +would render the index as ``CREATE INDEX my_index ON table (x) INCLUDE (y)`` + +Note that this feature requires PostgreSQL 11 or later. + +.. versionadded:: 1.4 + +.. _postgresql_partial_indexes: + +Partial Indexes +^^^^^^^^^^^^^^^ + +Partial indexes add criterion to the index definition so that the index is +applied to a subset of rows. These can be specified on :class:`.Index` +using the ``postgresql_where`` keyword argument:: + + Index('my_index', my_table.c.id, postgresql_where=my_table.c.value > 10) + +.. _postgresql_operator_classes: + +Operator Classes +^^^^^^^^^^^^^^^^ + +PostgreSQL allows the specification of an *operator class* for each column of +an index (see +https://www.postgresql.org/docs/current/interactive/indexes-opclass.html). +The :class:`.Index` construct allows these to be specified via the +``postgresql_ops`` keyword argument:: + + Index( + 'my_index', my_table.c.id, my_table.c.data, + postgresql_ops={ + 'data': 'text_pattern_ops', + 'id': 'int4_ops' + }) + +Note that the keys in the ``postgresql_ops`` dictionaries are the +"key" name of the :class:`_schema.Column`, i.e. the name used to access it from +the ``.c`` collection of :class:`_schema.Table`, which can be configured to be +different than the actual name of the column as expressed in the database. + +If ``postgresql_ops`` is to be used against a complex SQL expression such +as a function call, then to apply to the column it must be given a label +that is identified in the dictionary by name, e.g.:: + + Index( + 'my_index', my_table.c.id, + func.lower(my_table.c.data).label('data_lower'), + postgresql_ops={ + 'data_lower': 'text_pattern_ops', + 'id': 'int4_ops' + }) + +Operator classes are also supported by the +:class:`_postgresql.ExcludeConstraint` construct using the +:paramref:`_postgresql.ExcludeConstraint.ops` parameter. See that parameter for +details. + +.. versionadded:: 1.3.21 added support for operator classes with + :class:`_postgresql.ExcludeConstraint`. + + +Index Types +^^^^^^^^^^^ + +PostgreSQL provides several index types: B-Tree, Hash, GiST, and GIN, as well +as the ability for users to create their own (see +https://www.postgresql.org/docs/current/static/indexes-types.html). These can be +specified on :class:`.Index` using the ``postgresql_using`` keyword argument:: + + Index('my_index', my_table.c.data, postgresql_using='gin') + +The value passed to the keyword argument will be simply passed through to the +underlying CREATE INDEX command, so it *must* be a valid index type for your +version of PostgreSQL. + +.. _postgresql_index_storage: + +Index Storage Parameters +^^^^^^^^^^^^^^^^^^^^^^^^ + +PostgreSQL allows storage parameters to be set on indexes. The storage +parameters available depend on the index method used by the index. Storage +parameters can be specified on :class:`.Index` using the ``postgresql_with`` +keyword argument:: + + Index('my_index', my_table.c.data, postgresql_with={"fillfactor": 50}) + +.. versionadded:: 1.0.6 + +PostgreSQL allows to define the tablespace in which to create the index. +The tablespace can be specified on :class:`.Index` using the +``postgresql_tablespace`` keyword argument:: + + Index('my_index', my_table.c.data, postgresql_tablespace='my_tablespace') + +.. versionadded:: 1.1 + +Note that the same option is available on :class:`_schema.Table` as well. + +.. _postgresql_index_concurrently: + +Indexes with CONCURRENTLY +^^^^^^^^^^^^^^^^^^^^^^^^^ + +The PostgreSQL index option CONCURRENTLY is supported by passing the +flag ``postgresql_concurrently`` to the :class:`.Index` construct:: + + tbl = Table('testtbl', m, Column('data', Integer)) + + idx1 = Index('test_idx1', tbl.c.data, postgresql_concurrently=True) + +The above index construct will render DDL for CREATE INDEX, assuming +PostgreSQL 8.2 or higher is detected or for a connection-less dialect, as:: + + CREATE INDEX CONCURRENTLY test_idx1 ON testtbl (data) + +For DROP INDEX, assuming PostgreSQL 9.2 or higher is detected or for +a connection-less dialect, it will emit:: + + DROP INDEX CONCURRENTLY test_idx1 + +.. versionadded:: 1.1 support for CONCURRENTLY on DROP INDEX. The + CONCURRENTLY keyword is now only emitted if a high enough version + of PostgreSQL is detected on the connection (or for a connection-less + dialect). + +When using CONCURRENTLY, the PostgreSQL database requires that the statement +be invoked outside of a transaction block. The Python DBAPI enforces that +even for a single statement, a transaction is present, so to use this +construct, the DBAPI's "autocommit" mode must be used:: + + metadata = MetaData() + table = Table( + "foo", metadata, + Column("id", String)) + index = Index( + "foo_idx", table.c.id, postgresql_concurrently=True) + + with engine.connect() as conn: + with conn.execution_options(isolation_level='AUTOCOMMIT'): + table.create(conn) + +.. seealso:: + + :ref:`postgresql_isolation_level` + +.. _postgresql_index_reflection: + +PostgreSQL Index Reflection +--------------------------- + +The PostgreSQL database creates a UNIQUE INDEX implicitly whenever the +UNIQUE CONSTRAINT construct is used. When inspecting a table using +:class:`_reflection.Inspector`, the :meth:`_reflection.Inspector.get_indexes` +and the :meth:`_reflection.Inspector.get_unique_constraints` +will report on these +two constructs distinctly; in the case of the index, the key +``duplicates_constraint`` will be present in the index entry if it is +detected as mirroring a constraint. When performing reflection using +``Table(..., autoload_with=engine)``, the UNIQUE INDEX is **not** returned +in :attr:`_schema.Table.indexes` when it is detected as mirroring a +:class:`.UniqueConstraint` in the :attr:`_schema.Table.constraints` collection +. + +.. versionchanged:: 1.0.0 - :class:`_schema.Table` reflection now includes + :class:`.UniqueConstraint` objects present in the + :attr:`_schema.Table.constraints` + collection; the PostgreSQL backend will no longer include a "mirrored" + :class:`.Index` construct in :attr:`_schema.Table.indexes` + if it is detected + as corresponding to a unique constraint. + +Special Reflection Options +-------------------------- + +The :class:`_reflection.Inspector` +used for the PostgreSQL backend is an instance +of :class:`.PGInspector`, which offers additional methods:: + + from sqlalchemy import create_engine, inspect + + engine = create_engine("postgresql+psycopg2://localhost/test") + insp = inspect(engine) # will be a PGInspector + + print(insp.get_enums()) + +.. autoclass:: PGInspector + :members: + +.. _postgresql_table_options: + +PostgreSQL Table Options +------------------------ + +Several options for CREATE TABLE are supported directly by the PostgreSQL +dialect in conjunction with the :class:`_schema.Table` construct: + +* ``TABLESPACE``:: + + Table("some_table", metadata, ..., postgresql_tablespace='some_tablespace') + + The above option is also available on the :class:`.Index` construct. + +* ``ON COMMIT``:: + + Table("some_table", metadata, ..., postgresql_on_commit='PRESERVE ROWS') + +* ``WITH OIDS``:: + + Table("some_table", metadata, ..., postgresql_with_oids=True) + +* ``WITHOUT OIDS``:: + + Table("some_table", metadata, ..., postgresql_with_oids=False) + +* ``INHERITS``:: + + Table("some_table", metadata, ..., postgresql_inherits="some_supertable") + + Table("some_table", metadata, ..., postgresql_inherits=("t1", "t2", ...)) + + .. versionadded:: 1.0.0 + +* ``PARTITION BY``:: + + Table("some_table", metadata, ..., + postgresql_partition_by='LIST (part_column)') + + .. versionadded:: 1.2.6 + +.. seealso:: + + `PostgreSQL CREATE TABLE options + `_ - + in the PostgreSQL documentation. + +.. _postgresql_constraint_options: + +PostgreSQL Constraint Options +----------------------------- + +The following option(s) are supported by the PostgreSQL dialect in conjunction +with selected constraint constructs: + +* ``NOT VALID``: This option applies towards CHECK and FOREIGN KEY constraints + when the constraint is being added to an existing table via ALTER TABLE, + and has the effect that existing rows are not scanned during the ALTER + operation against the constraint being added. + + When using a SQL migration tool such as `Alembic `_ + that renders ALTER TABLE constructs, the ``postgresql_not_valid`` argument + may be specified as an additional keyword argument within the operation + that creates the constraint, as in the following Alembic example:: + + def update(): + op.create_foreign_key( + "fk_user_address", + "address", + "user", + ["user_id"], + ["id"], + postgresql_not_valid=True + ) + + The keyword is ultimately accepted directly by the + :class:`_schema.CheckConstraint`, :class:`_schema.ForeignKeyConstraint` + and :class:`_schema.ForeignKey` constructs; when using a tool like + Alembic, dialect-specific keyword arguments are passed through to + these constructs from the migration operation directives:: + + CheckConstraint("some_field IS NOT NULL", postgresql_not_valid=True) + + ForeignKeyConstraint(["some_id"], ["some_table.some_id"], postgresql_not_valid=True) + + .. versionadded:: 1.4.32 + + .. seealso:: + + `PostgreSQL ALTER TABLE options + `_ - + in the PostgreSQL documentation. + +.. _postgresql_table_valued_overview: + +Table values, Table and Column valued functions, Row and Tuple objects +----------------------------------------------------------------------- + +PostgreSQL makes great use of modern SQL forms such as table-valued functions, +tables and rows as values. These constructs are commonly used as part +of PostgreSQL's support for complex datatypes such as JSON, ARRAY, and other +datatypes. SQLAlchemy's SQL expression language has native support for +most table-valued and row-valued forms. + +.. _postgresql_table_valued: + +Table-Valued Functions +^^^^^^^^^^^^^^^^^^^^^^^ + +Many PostgreSQL built-in functions are intended to be used in the FROM clause +of a SELECT statement, and are capable of returning table rows or sets of table +rows. A large portion of PostgreSQL's JSON functions for example such as +``json_array_elements()``, ``json_object_keys()``, ``json_each_text()``, +``json_each()``, ``json_to_record()``, ``json_populate_recordset()`` use such +forms. These classes of SQL function calling forms in SQLAlchemy are available +using the :meth:`_functions.FunctionElement.table_valued` method in conjunction +with :class:`_functions.Function` objects generated from the :data:`_sql.func` +namespace. + +Examples from PostgreSQL's reference documentation follow below: + +* ``json_each()``:: + + >>> from sqlalchemy import select, func + >>> stmt = select(func.json_each('{"a":"foo", "b":"bar"}').table_valued("key", "value")) + >>> print(stmt) + SELECT anon_1.key, anon_1.value + FROM json_each(:json_each_1) AS anon_1 + +* ``json_populate_record()``:: + + >>> from sqlalchemy import select, func, literal_column + >>> stmt = select( + ... func.json_populate_record( + ... literal_column("null::myrowtype"), + ... '{"a":1,"b":2}' + ... ).table_valued("a", "b", name="x") + ... ) + >>> print(stmt) + SELECT x.a, x.b + FROM json_populate_record(null::myrowtype, :json_populate_record_1) AS x + +* ``json_to_record()`` - this form uses a PostgreSQL specific form of derived + columns in the alias, where we may make use of :func:`_sql.column` elements with + types to produce them. The :meth:`_functions.FunctionElement.table_valued` + method produces a :class:`_sql.TableValuedAlias` construct, and the method + :meth:`_sql.TableValuedAlias.render_derived` method sets up the derived + columns specification:: + + >>> from sqlalchemy import select, func, column, Integer, Text + >>> stmt = select( + ... func.json_to_record('{"a":1,"b":[1,2,3],"c":"bar"}').table_valued( + ... column("a", Integer), column("b", Text), column("d", Text), + ... ).render_derived(name="x", with_types=True) + ... ) + >>> print(stmt) + SELECT x.a, x.b, x.d + FROM json_to_record(:json_to_record_1) AS x(a INTEGER, b TEXT, d TEXT) + +* ``WITH ORDINALITY`` - part of the SQL standard, ``WITH ORDINALITY`` adds an + ordinal counter to the output of a function and is accepted by a limited set + of PostgreSQL functions including ``unnest()`` and ``generate_series()``. The + :meth:`_functions.FunctionElement.table_valued` method accepts a keyword + parameter ``with_ordinality`` for this purpose, which accepts the string name + that will be applied to the "ordinality" column:: + + >>> from sqlalchemy import select, func + >>> stmt = select( + ... func.generate_series(4, 1, -1). + ... table_valued("value", with_ordinality="ordinality"). + ... render_derived() + ... ) + >>> print(stmt) + SELECT anon_1.value, anon_1.ordinality + FROM generate_series(:generate_series_1, :generate_series_2, :generate_series_3) + WITH ORDINALITY AS anon_1(value, ordinality) + +.. versionadded:: 1.4.0b2 + +.. seealso:: + + :ref:`tutorial_functions_table_valued` - in the :ref:`unified_tutorial` + +.. _postgresql_column_valued: + +Column Valued Functions +^^^^^^^^^^^^^^^^^^^^^^^ + +Similar to the table valued function, a column valued function is present +in the FROM clause, but delivers itself to the columns clause as a single +scalar value. PostgreSQL functions such as ``json_array_elements()``, +``unnest()`` and ``generate_series()`` may use this form. Column valued functions are available using the +:meth:`_functions.FunctionElement.column_valued` method of :class:`_functions.FunctionElement`: + +* ``json_array_elements()``:: + + >>> from sqlalchemy import select, func + >>> stmt = select(func.json_array_elements('["one", "two"]').column_valued("x")) + >>> print(stmt) + SELECT x + FROM json_array_elements(:json_array_elements_1) AS x + +* ``unnest()`` - in order to generate a PostgreSQL ARRAY literal, the + :func:`_postgresql.array` construct may be used:: + + + >>> from sqlalchemy.dialects.postgresql import array + >>> from sqlalchemy import select, func + >>> stmt = select(func.unnest(array([1, 2])).column_valued()) + >>> print(stmt) + SELECT anon_1 + FROM unnest(ARRAY[%(param_1)s, %(param_2)s]) AS anon_1 + + The function can of course be used against an existing table-bound column + that's of type :class:`_types.ARRAY`:: + + >>> from sqlalchemy import table, column, ARRAY, Integer + >>> from sqlalchemy import select, func + >>> t = table("t", column('value', ARRAY(Integer))) + >>> stmt = select(func.unnest(t.c.value).column_valued("unnested_value")) + >>> print(stmt) + SELECT unnested_value + FROM unnest(t.value) AS unnested_value + +.. seealso:: + + :ref:`tutorial_functions_column_valued` - in the :ref:`unified_tutorial` + + +Row Types +^^^^^^^^^ + +Built-in support for rendering a ``ROW`` may be approximated using +``func.ROW`` with the :attr:`_sa.func` namespace, or by using the +:func:`_sql.tuple_` construct:: + + >>> from sqlalchemy import table, column, func, tuple_ + >>> t = table("t", column("id"), column("fk")) + >>> stmt = t.select().where( + ... tuple_(t.c.id, t.c.fk) > (1,2) + ... ).where( + ... func.ROW(t.c.id, t.c.fk) < func.ROW(3, 7) + ... ) + >>> print(stmt) + SELECT t.id, t.fk + FROM t + WHERE (t.id, t.fk) > (:param_1, :param_2) AND ROW(t.id, t.fk) < ROW(:ROW_1, :ROW_2) + +.. seealso:: + + `PostgreSQL Row Constructors + `_ + + `PostgreSQL Row Constructor Comparison + `_ + +Table Types passed to Functions +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +PostgreSQL supports passing a table as an argument to a function, which it +refers towards as a "record" type. SQLAlchemy :class:`_sql.FromClause` objects +such as :class:`_schema.Table` support this special form using the +:meth:`_sql.FromClause.table_valued` method, which is comparable to the +:meth:`_functions.FunctionElement.table_valued` method except that the collection +of columns is already established by that of the :class:`_sql.FromClause` +itself:: + + + >>> from sqlalchemy import table, column, func, select + >>> a = table( "a", column("id"), column("x"), column("y")) + >>> stmt = select(func.row_to_json(a.table_valued())) + >>> print(stmt) + SELECT row_to_json(a) AS row_to_json_1 + FROM a + +.. versionadded:: 1.4.0b2 + + +ARRAY Types +----------- + +The PostgreSQL dialect supports arrays, both as multidimensional column types +as well as array literals: + +* :class:`_postgresql.ARRAY` - ARRAY datatype + +* :class:`_postgresql.array` - array literal + +* :func:`_postgresql.array_agg` - ARRAY_AGG SQL function + +* :class:`_postgresql.aggregate_order_by` - helper for PG's ORDER BY aggregate + function syntax. + +JSON Types +---------- + +The PostgreSQL dialect supports both JSON and JSONB datatypes, including +psycopg2's native support and support for all of PostgreSQL's special +operators: + +* :class:`_postgresql.JSON` + +* :class:`_postgresql.JSONB` + +HSTORE Type +----------- + +The PostgreSQL HSTORE type as well as hstore literals are supported: + +* :class:`_postgresql.HSTORE` - HSTORE datatype + +* :class:`_postgresql.hstore` - hstore literal + +ENUM Types +---------- + +PostgreSQL has an independently creatable TYPE structure which is used +to implement an enumerated type. This approach introduces significant +complexity on the SQLAlchemy side in terms of when this type should be +CREATED and DROPPED. The type object is also an independently reflectable +entity. The following sections should be consulted: + +* :class:`_postgresql.ENUM` - DDL and typing support for ENUM. + +* :meth:`.PGInspector.get_enums` - retrieve a listing of current ENUM types + +* :meth:`.postgresql.ENUM.create` , :meth:`.postgresql.ENUM.drop` - individual + CREATE and DROP commands for ENUM. + +.. _postgresql_array_of_enum: + +Using ENUM with ARRAY +^^^^^^^^^^^^^^^^^^^^^ + +The combination of ENUM and ARRAY is not directly supported by backend +DBAPIs at this time. Prior to SQLAlchemy 1.3.17, a special workaround +was needed in order to allow this combination to work, described below. + +.. versionchanged:: 1.3.17 The combination of ENUM and ARRAY is now directly + handled by SQLAlchemy's implementation without any workarounds needed. + +.. sourcecode:: python + + from sqlalchemy import TypeDecorator + from sqlalchemy.dialects.postgresql import ARRAY + + class ArrayOfEnum(TypeDecorator): + impl = ARRAY + + def bind_expression(self, bindvalue): + return sa.cast(bindvalue, self) + + def result_processor(self, dialect, coltype): + super_rp = super(ArrayOfEnum, self).result_processor( + dialect, coltype) + + def handle_raw_string(value): + inner = re.match(r"^{(.*)}$", value).group(1) + return inner.split(",") if inner else [] + + def process(value): + if value is None: + return None + return super_rp(handle_raw_string(value)) + return process + +E.g.:: + + Table( + 'mydata', metadata, + Column('id', Integer, primary_key=True), + Column('data', ArrayOfEnum(ENUM('a', 'b, 'c', name='myenum'))) + + ) + +This type is not included as a built-in type as it would be incompatible +with a DBAPI that suddenly decides to support ARRAY of ENUM directly in +a new version. + +.. _postgresql_array_of_json: + +Using JSON/JSONB with ARRAY +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Similar to using ENUM, prior to SQLAlchemy 1.3.17, for an ARRAY of JSON/JSONB +we need to render the appropriate CAST. Current psycopg2 drivers accommodate +the result set correctly without any special steps. + +.. versionchanged:: 1.3.17 The combination of JSON/JSONB and ARRAY is now + directly handled by SQLAlchemy's implementation without any workarounds + needed. + +.. sourcecode:: python + + class CastingArray(ARRAY): + def bind_expression(self, bindvalue): + return sa.cast(bindvalue, self) + +E.g.:: + + Table( + 'mydata', metadata, + Column('id', Integer, primary_key=True), + Column('data', CastingArray(JSONB)) + ) + + +""" # noqa: E501 + +from collections import defaultdict +import datetime as dt +import re +from uuid import UUID as _python_UUID + +from . import array as _array +from . import dml +from . import hstore as _hstore +from . import json as _json +from . import ranges as _ranges +from ... import exc +from ... import schema +from ... import sql +from ... import util +from ...engine import characteristics +from ...engine import default +from ...engine import reflection +from ...sql import coercions +from ...sql import compiler +from ...sql import elements +from ...sql import expression +from ...sql import roles +from ...sql import sqltypes +from ...sql import util as sql_util +from ...sql.ddl import DDLBase +from ...types import BIGINT +from ...types import BOOLEAN +from ...types import CHAR +from ...types import DATE +from ...types import FLOAT +from ...types import INTEGER +from ...types import NUMERIC +from ...types import REAL +from ...types import SMALLINT +from ...types import TEXT +from ...types import VARCHAR + +IDX_USING = re.compile(r"^(?:btree|hash|gist|gin|[\w_]+)$", re.I) + +AUTOCOMMIT_REGEXP = re.compile( + r"\s*(?:UPDATE|INSERT|CREATE|DELETE|DROP|ALTER|GRANT|REVOKE|" + "IMPORT FOREIGN SCHEMA|REFRESH MATERIALIZED VIEW|TRUNCATE)", + re.I | re.UNICODE, +) + +RESERVED_WORDS = set( + [ + "all", + "analyse", + "analyze", + "and", + "any", + "array", + "as", + "asc", + "asymmetric", + "both", + "case", + "cast", + "check", + "collate", + "column", + "constraint", + "create", + "current_catalog", + "current_date", + "current_role", + "current_time", + "current_timestamp", + "current_user", + "default", + "deferrable", + "desc", + "distinct", + "do", + "else", + "end", + "except", + "false", + "fetch", + "for", + "foreign", + "from", + "grant", + "group", + "having", + "in", + "initially", + "intersect", + "into", + "leading", + "limit", + "localtime", + "localtimestamp", + "new", + "not", + "null", + "of", + "off", + "offset", + "old", + "on", + "only", + "or", + "order", + "placing", + "primary", + "references", + "returning", + "select", + "session_user", + "some", + "symmetric", + "table", + "then", + "to", + "trailing", + "true", + "union", + "unique", + "user", + "using", + "variadic", + "when", + "where", + "window", + "with", + "authorization", + "between", + "binary", + "cross", + "current_schema", + "freeze", + "full", + "ilike", + "inner", + "is", + "isnull", + "join", + "left", + "like", + "natural", + "notnull", + "outer", + "over", + "overlaps", + "right", + "similar", + "verbose", + ] +) + +_DECIMAL_TYPES = (1231, 1700) +_FLOAT_TYPES = (700, 701, 1021, 1022) +_INT_TYPES = (20, 21, 23, 26, 1005, 1007, 1016) + + +class BYTEA(sqltypes.LargeBinary): + __visit_name__ = "BYTEA" + + +class DOUBLE_PRECISION(sqltypes.Float): + __visit_name__ = "DOUBLE_PRECISION" + + +class INET(sqltypes.TypeEngine): + __visit_name__ = "INET" + + +PGInet = INET + + +class CIDR(sqltypes.TypeEngine): + __visit_name__ = "CIDR" + + +PGCidr = CIDR + + +class MACADDR(sqltypes.TypeEngine): + __visit_name__ = "MACADDR" + + +PGMacAddr = MACADDR + + +class MONEY(sqltypes.TypeEngine): + + r"""Provide the PostgreSQL MONEY type. + + Depending on driver, result rows using this type may return a + string value which includes currency symbols. + + For this reason, it may be preferable to provide conversion to a + numerically-based currency datatype using :class:`_types.TypeDecorator`:: + + import re + import decimal + from sqlalchemy import TypeDecorator + + class NumericMoney(TypeDecorator): + impl = MONEY + + def process_result_value(self, value: Any, dialect: Any) -> None: + if value is not None: + # adjust this for the currency and numeric + m = re.match(r"\$([\d.]+)", value) + if m: + value = decimal.Decimal(m.group(1)) + return value + + Alternatively, the conversion may be applied as a CAST using + the :meth:`_types.TypeDecorator.column_expression` method as follows:: + + import decimal + from sqlalchemy import cast + from sqlalchemy import TypeDecorator + + class NumericMoney(TypeDecorator): + impl = MONEY + + def column_expression(self, column: Any): + return cast(column, Numeric()) + + .. versionadded:: 1.2 + + """ + + __visit_name__ = "MONEY" + + +class OID(sqltypes.TypeEngine): + + """Provide the PostgreSQL OID type. + + .. versionadded:: 0.9.5 + + """ + + __visit_name__ = "OID" + + +class REGCLASS(sqltypes.TypeEngine): + + """Provide the PostgreSQL REGCLASS type. + + .. versionadded:: 1.2.7 + + """ + + __visit_name__ = "REGCLASS" + + +class TIMESTAMP(sqltypes.TIMESTAMP): + + """Provide the PostgreSQL TIMESTAMP type.""" + + __visit_name__ = "TIMESTAMP" + + def __init__(self, timezone=False, precision=None): + """Construct a TIMESTAMP. + + :param timezone: boolean value if timezone present, default False + :param precision: optional integer precision value + + .. versionadded:: 1.4 + + """ + super(TIMESTAMP, self).__init__(timezone=timezone) + self.precision = precision + + +class TIME(sqltypes.TIME): + + """PostgreSQL TIME type.""" + + __visit_name__ = "TIME" + + def __init__(self, timezone=False, precision=None): + """Construct a TIME. + + :param timezone: boolean value if timezone present, default False + :param precision: optional integer precision value + + .. versionadded:: 1.4 + + """ + super(TIME, self).__init__(timezone=timezone) + self.precision = precision + + +class INTERVAL(sqltypes.NativeForEmulated, sqltypes._AbstractInterval): + + """PostgreSQL INTERVAL type.""" + + __visit_name__ = "INTERVAL" + native = True + + def __init__(self, precision=None, fields=None): + """Construct an INTERVAL. + + :param precision: optional integer precision value + :param fields: string fields specifier. allows storage of fields + to be limited, such as ``"YEAR"``, ``"MONTH"``, ``"DAY TO HOUR"``, + etc. + + .. versionadded:: 1.2 + + """ + self.precision = precision + self.fields = fields + + @classmethod + def adapt_emulated_to_native(cls, interval, **kw): + return INTERVAL(precision=interval.second_precision) + + @property + def _type_affinity(self): + return sqltypes.Interval + + def as_generic(self, allow_nulltype=False): + return sqltypes.Interval(native=True, second_precision=self.precision) + + @property + def python_type(self): + return dt.timedelta + + def coerce_compared_value(self, op, value): + return self + + +PGInterval = INTERVAL + + +class BIT(sqltypes.TypeEngine): + __visit_name__ = "BIT" + + def __init__(self, length=None, varying=False): + if not varying: + # BIT without VARYING defaults to length 1 + self.length = length or 1 + else: + # but BIT VARYING can be unlimited-length, so no default + self.length = length + self.varying = varying + + +PGBit = BIT + + +class UUID(sqltypes.TypeEngine): + + """PostgreSQL UUID type. + + Represents the UUID column type, interpreting + data either as natively returned by the DBAPI + or as Python uuid objects. + + The UUID type is currently known to work within the prominent DBAPI + drivers supported by SQLAlchemy including psycopg2, pg8000 and + asyncpg. Support for other DBAPI drivers may be incomplete or non-present. + + """ + + __visit_name__ = "UUID" + + def __init__(self, as_uuid=False): + """Construct a UUID type. + + + :param as_uuid=False: if True, values will be interpreted + as Python uuid objects, converting to/from string via the + DBAPI. + + """ + self.as_uuid = as_uuid + + def coerce_compared_value(self, op, value): + """See :meth:`.TypeEngine.coerce_compared_value` for a description.""" + + if isinstance(value, util.string_types): + return self + else: + return super(UUID, self).coerce_compared_value(op, value) + + def bind_processor(self, dialect): + if self.as_uuid: + + def process(value): + if value is not None: + value = util.text_type(value) + return value + + return process + else: + return None + + def result_processor(self, dialect, coltype): + if self.as_uuid: + + def process(value): + if value is not None: + value = _python_UUID(value) + return value + + return process + else: + return None + + def literal_processor(self, dialect): + if self.as_uuid: + + def process(value): + if value is not None: + value = "'%s'::UUID" % value + return value + + return process + else: + + def process(value): + if value is not None: + value = "'%s'" % value + return value + + return process + + @property + def python_type(self): + return _python_UUID if self.as_uuid else str + + +PGUuid = UUID + + +class TSVECTOR(sqltypes.TypeEngine): + + """The :class:`_postgresql.TSVECTOR` type implements the PostgreSQL + text search type TSVECTOR. + + It can be used to do full text queries on natural language + documents. + + .. versionadded:: 0.9.0 + + .. seealso:: + + :ref:`postgresql_match` + + """ + + __visit_name__ = "TSVECTOR" + + +class ENUM(sqltypes.NativeForEmulated, sqltypes.Enum): + + """PostgreSQL ENUM type. + + This is a subclass of :class:`_types.Enum` which includes + support for PG's ``CREATE TYPE`` and ``DROP TYPE``. + + When the builtin type :class:`_types.Enum` is used and the + :paramref:`.Enum.native_enum` flag is left at its default of + True, the PostgreSQL backend will use a :class:`_postgresql.ENUM` + type as the implementation, so the special create/drop rules + will be used. + + The create/drop behavior of ENUM is necessarily intricate, due to the + awkward relationship the ENUM type has in relationship to the + parent table, in that it may be "owned" by just a single table, or + may be shared among many tables. + + When using :class:`_types.Enum` or :class:`_postgresql.ENUM` + in an "inline" fashion, the ``CREATE TYPE`` and ``DROP TYPE`` is emitted + corresponding to when the :meth:`_schema.Table.create` and + :meth:`_schema.Table.drop` + methods are called:: + + table = Table('sometable', metadata, + Column('some_enum', ENUM('a', 'b', 'c', name='myenum')) + ) + + table.create(engine) # will emit CREATE ENUM and CREATE TABLE + table.drop(engine) # will emit DROP TABLE and DROP ENUM + + To use a common enumerated type between multiple tables, the best + practice is to declare the :class:`_types.Enum` or + :class:`_postgresql.ENUM` independently, and associate it with the + :class:`_schema.MetaData` object itself:: + + my_enum = ENUM('a', 'b', 'c', name='myenum', metadata=metadata) + + t1 = Table('sometable_one', metadata, + Column('some_enum', myenum) + ) + + t2 = Table('sometable_two', metadata, + Column('some_enum', myenum) + ) + + When this pattern is used, care must still be taken at the level + of individual table creates. Emitting CREATE TABLE without also + specifying ``checkfirst=True`` will still cause issues:: + + t1.create(engine) # will fail: no such type 'myenum' + + If we specify ``checkfirst=True``, the individual table-level create + operation will check for the ``ENUM`` and create if not exists:: + + # will check if enum exists, and emit CREATE TYPE if not + t1.create(engine, checkfirst=True) + + When using a metadata-level ENUM type, the type will always be created + and dropped if either the metadata-wide create/drop is called:: + + metadata.create_all(engine) # will emit CREATE TYPE + metadata.drop_all(engine) # will emit DROP TYPE + + The type can also be created and dropped directly:: + + my_enum.create(engine) + my_enum.drop(engine) + + .. versionchanged:: 1.0.0 The PostgreSQL :class:`_postgresql.ENUM` type + now behaves more strictly with regards to CREATE/DROP. A metadata-level + ENUM type will only be created and dropped at the metadata level, + not the table level, with the exception of + ``table.create(checkfirst=True)``. + The ``table.drop()`` call will now emit a DROP TYPE for a table-level + enumerated type. + + """ + + native_enum = True + + def __init__(self, *enums, **kw): + """Construct an :class:`_postgresql.ENUM`. + + Arguments are the same as that of + :class:`_types.Enum`, but also including + the following parameters. + + :param create_type: Defaults to True. + Indicates that ``CREATE TYPE`` should be + emitted, after optionally checking for the + presence of the type, when the parent + table is being created; and additionally + that ``DROP TYPE`` is called when the table + is dropped. When ``False``, no check + will be performed and no ``CREATE TYPE`` + or ``DROP TYPE`` is emitted, unless + :meth:`~.postgresql.ENUM.create` + or :meth:`~.postgresql.ENUM.drop` + are called directly. + Setting to ``False`` is helpful + when invoking a creation scheme to a SQL file + without access to the actual database - + the :meth:`~.postgresql.ENUM.create` and + :meth:`~.postgresql.ENUM.drop` methods can + be used to emit SQL to a target bind. + + """ + native_enum = kw.pop("native_enum", None) + if native_enum is False: + util.warn( + "the native_enum flag does not apply to the " + "sqlalchemy.dialects.postgresql.ENUM datatype; this type " + "always refers to ENUM. Use sqlalchemy.types.Enum for " + "non-native enum." + ) + self.create_type = kw.pop("create_type", True) + super(ENUM, self).__init__(*enums, **kw) + + @classmethod + def adapt_emulated_to_native(cls, impl, **kw): + """Produce a PostgreSQL native :class:`_postgresql.ENUM` from plain + :class:`.Enum`. + + """ + kw.setdefault("validate_strings", impl.validate_strings) + kw.setdefault("name", impl.name) + kw.setdefault("schema", impl.schema) + kw.setdefault("inherit_schema", impl.inherit_schema) + kw.setdefault("metadata", impl.metadata) + kw.setdefault("_create_events", False) + kw.setdefault("values_callable", impl.values_callable) + kw.setdefault("omit_aliases", impl._omit_aliases) + return cls(**kw) + + def create(self, bind=None, checkfirst=True): + """Emit ``CREATE TYPE`` for this + :class:`_postgresql.ENUM`. + + If the underlying dialect does not support + PostgreSQL CREATE TYPE, no action is taken. + + :param bind: a connectable :class:`_engine.Engine`, + :class:`_engine.Connection`, or similar object to emit + SQL. + :param checkfirst: if ``True``, a query against + the PG catalog will be first performed to see + if the type does not exist already before + creating. + + """ + if not bind.dialect.supports_native_enum: + return + + bind._run_ddl_visitor(self.EnumGenerator, self, checkfirst=checkfirst) + + def drop(self, bind=None, checkfirst=True): + """Emit ``DROP TYPE`` for this + :class:`_postgresql.ENUM`. + + If the underlying dialect does not support + PostgreSQL DROP TYPE, no action is taken. + + :param bind: a connectable :class:`_engine.Engine`, + :class:`_engine.Connection`, or similar object to emit + SQL. + :param checkfirst: if ``True``, a query against + the PG catalog will be first performed to see + if the type actually exists before dropping. + + """ + if not bind.dialect.supports_native_enum: + return + + bind._run_ddl_visitor(self.EnumDropper, self, checkfirst=checkfirst) + + class EnumGenerator(DDLBase): + def __init__(self, dialect, connection, checkfirst=False, **kwargs): + super(ENUM.EnumGenerator, self).__init__(connection, **kwargs) + self.checkfirst = checkfirst + + def _can_create_enum(self, enum): + if not self.checkfirst: + return True + + effective_schema = self.connection.schema_for_object(enum) + + return not self.connection.dialect.has_type( + self.connection, enum.name, schema=effective_schema + ) + + def visit_enum(self, enum): + if not self._can_create_enum(enum): + return + + self.connection.execute(CreateEnumType(enum)) + + class EnumDropper(DDLBase): + def __init__(self, dialect, connection, checkfirst=False, **kwargs): + super(ENUM.EnumDropper, self).__init__(connection, **kwargs) + self.checkfirst = checkfirst + + def _can_drop_enum(self, enum): + if not self.checkfirst: + return True + + effective_schema = self.connection.schema_for_object(enum) + + return self.connection.dialect.has_type( + self.connection, enum.name, schema=effective_schema + ) + + def visit_enum(self, enum): + if not self._can_drop_enum(enum): + return + + self.connection.execute(DropEnumType(enum)) + + def _check_for_name_in_memos(self, checkfirst, kw): + """Look in the 'ddl runner' for 'memos', then + note our name in that collection. + + This to ensure a particular named enum is operated + upon only once within any kind of create/drop + sequence without relying upon "checkfirst". + + """ + if not self.create_type: + return True + if "_ddl_runner" in kw: + ddl_runner = kw["_ddl_runner"] + if "_pg_enums" in ddl_runner.memo: + pg_enums = ddl_runner.memo["_pg_enums"] + else: + pg_enums = ddl_runner.memo["_pg_enums"] = set() + present = (self.schema, self.name) in pg_enums + pg_enums.add((self.schema, self.name)) + return present + else: + return False + + def _on_table_create(self, target, bind, checkfirst=False, **kw): + if ( + checkfirst + or ( + not self.metadata + and not kw.get("_is_metadata_operation", False) + ) + ) and not self._check_for_name_in_memos(checkfirst, kw): + self.create(bind=bind, checkfirst=checkfirst) + + def _on_table_drop(self, target, bind, checkfirst=False, **kw): + if ( + not self.metadata + and not kw.get("_is_metadata_operation", False) + and not self._check_for_name_in_memos(checkfirst, kw) + ): + self.drop(bind=bind, checkfirst=checkfirst) + + def _on_metadata_create(self, target, bind, checkfirst=False, **kw): + if not self._check_for_name_in_memos(checkfirst, kw): + self.create(bind=bind, checkfirst=checkfirst) + + def _on_metadata_drop(self, target, bind, checkfirst=False, **kw): + if not self._check_for_name_in_memos(checkfirst, kw): + self.drop(bind=bind, checkfirst=checkfirst) + + +class _ColonCast(elements.Cast): + __visit_name__ = "colon_cast" + + def __init__(self, expression, type_): + self.type = type_ + self.clause = expression + self.typeclause = elements.TypeClause(type_) + + +colspecs = { + sqltypes.ARRAY: _array.ARRAY, + sqltypes.Interval: INTERVAL, + sqltypes.Enum: ENUM, + sqltypes.JSON.JSONPathType: _json.JSONPathType, + sqltypes.JSON: _json.JSON, +} + +ischema_names = { + "_array": _array.ARRAY, + "hstore": _hstore.HSTORE, + "json": _json.JSON, + "jsonb": _json.JSONB, + "int4range": _ranges.INT4RANGE, + "int8range": _ranges.INT8RANGE, + "numrange": _ranges.NUMRANGE, + "daterange": _ranges.DATERANGE, + "tsrange": _ranges.TSRANGE, + "tstzrange": _ranges.TSTZRANGE, + "integer": INTEGER, + "bigint": BIGINT, + "smallint": SMALLINT, + "character varying": VARCHAR, + "character": CHAR, + '"char"': sqltypes.String, + "name": sqltypes.String, + "text": TEXT, + "numeric": NUMERIC, + "float": FLOAT, + "real": REAL, + "inet": INET, + "cidr": CIDR, + "uuid": UUID, + "bit": BIT, + "bit varying": BIT, + "macaddr": MACADDR, + "money": MONEY, + "oid": OID, + "regclass": REGCLASS, + "double precision": DOUBLE_PRECISION, + "timestamp": TIMESTAMP, + "timestamp with time zone": TIMESTAMP, + "timestamp without time zone": TIMESTAMP, + "time with time zone": TIME, + "time without time zone": TIME, + "date": DATE, + "time": TIME, + "bytea": BYTEA, + "boolean": BOOLEAN, + "interval": INTERVAL, + "tsvector": TSVECTOR, +} + + +class PGCompiler(compiler.SQLCompiler): + def visit_colon_cast(self, element, **kw): + return "%s::%s" % ( + element.clause._compiler_dispatch(self, **kw), + element.typeclause._compiler_dispatch(self, **kw), + ) + + def visit_array(self, element, **kw): + return "ARRAY[%s]" % self.visit_clauselist(element, **kw) + + def visit_slice(self, element, **kw): + return "%s:%s" % ( + self.process(element.start, **kw), + self.process(element.stop, **kw), + ) + + def visit_json_getitem_op_binary( + self, binary, operator, _cast_applied=False, **kw + ): + if ( + not _cast_applied + and binary.type._type_affinity is not sqltypes.JSON + ): + kw["_cast_applied"] = True + return self.process(sql.cast(binary, binary.type), **kw) + + kw["eager_grouping"] = True + + return self._generate_generic_binary( + binary, " -> " if not _cast_applied else " ->> ", **kw + ) + + def visit_json_path_getitem_op_binary( + self, binary, operator, _cast_applied=False, **kw + ): + if ( + not _cast_applied + and binary.type._type_affinity is not sqltypes.JSON + ): + kw["_cast_applied"] = True + return self.process(sql.cast(binary, binary.type), **kw) + + kw["eager_grouping"] = True + return self._generate_generic_binary( + binary, " #> " if not _cast_applied else " #>> ", **kw + ) + + def visit_getitem_binary(self, binary, operator, **kw): + return "%s[%s]" % ( + self.process(binary.left, **kw), + self.process(binary.right, **kw), + ) + + def visit_aggregate_order_by(self, element, **kw): + return "%s ORDER BY %s" % ( + self.process(element.target, **kw), + self.process(element.order_by, **kw), + ) + + def visit_match_op_binary(self, binary, operator, **kw): + if "postgresql_regconfig" in binary.modifiers: + regconfig = self.render_literal_value( + binary.modifiers["postgresql_regconfig"], sqltypes.STRINGTYPE + ) + if regconfig: + return "%s @@ to_tsquery(%s, %s)" % ( + self.process(binary.left, **kw), + regconfig, + self.process(binary.right, **kw), + ) + return "%s @@ to_tsquery(%s)" % ( + self.process(binary.left, **kw), + self.process(binary.right, **kw), + ) + + def visit_ilike_op_binary(self, binary, operator, **kw): + escape = binary.modifiers.get("escape", None) + + return "%s ILIKE %s" % ( + self.process(binary.left, **kw), + self.process(binary.right, **kw), + ) + ( + " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE) + if escape + else "" + ) + + def visit_not_ilike_op_binary(self, binary, operator, **kw): + escape = binary.modifiers.get("escape", None) + return "%s NOT ILIKE %s" % ( + self.process(binary.left, **kw), + self.process(binary.right, **kw), + ) + ( + " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE) + if escape + else "" + ) + + def _regexp_match(self, base_op, binary, operator, kw): + flags = binary.modifiers["flags"] + if flags is None: + return self._generate_generic_binary( + binary, " %s " % base_op, **kw + ) + if isinstance(flags, elements.BindParameter) and flags.value == "i": + return self._generate_generic_binary( + binary, " %s* " % base_op, **kw + ) + flags = self.process(flags, **kw) + string = self.process(binary.left, **kw) + pattern = self.process(binary.right, **kw) + return "%s %s CONCAT('(?', %s, ')', %s)" % ( + string, + base_op, + flags, + pattern, + ) + + def visit_regexp_match_op_binary(self, binary, operator, **kw): + return self._regexp_match("~", binary, operator, kw) + + def visit_not_regexp_match_op_binary(self, binary, operator, **kw): + return self._regexp_match("!~", binary, operator, kw) + + def visit_regexp_replace_op_binary(self, binary, operator, **kw): + string = self.process(binary.left, **kw) + pattern = self.process(binary.right, **kw) + flags = binary.modifiers["flags"] + if flags is not None: + flags = self.process(flags, **kw) + replacement = self.process(binary.modifiers["replacement"], **kw) + if flags is None: + return "REGEXP_REPLACE(%s, %s, %s)" % ( + string, + pattern, + replacement, + ) + else: + return "REGEXP_REPLACE(%s, %s, %s, %s)" % ( + string, + pattern, + replacement, + flags, + ) + + def visit_empty_set_expr(self, element_types): + # cast the empty set to the type we are comparing against. if + # we are comparing against the null type, pick an arbitrary + # datatype for the empty set + return "SELECT %s WHERE 1!=1" % ( + ", ".join( + "CAST(NULL AS %s)" + % self.dialect.type_compiler.process( + INTEGER() if type_._isnull else type_ + ) + for type_ in element_types or [INTEGER()] + ), + ) + + def render_literal_value(self, value, type_): + value = super(PGCompiler, self).render_literal_value(value, type_) + + if self.dialect._backslash_escapes: + value = value.replace("\\", "\\\\") + return value + + def visit_sequence(self, seq, **kw): + return "nextval('%s')" % self.preparer.format_sequence(seq) + + def limit_clause(self, select, **kw): + text = "" + if select._limit_clause is not None: + text += " \n LIMIT " + self.process(select._limit_clause, **kw) + if select._offset_clause is not None: + if select._limit_clause is None: + text += "\n LIMIT ALL" + text += " OFFSET " + self.process(select._offset_clause, **kw) + return text + + def format_from_hint_text(self, sqltext, table, hint, iscrud): + if hint.upper() != "ONLY": + raise exc.CompileError("Unrecognized hint: %r" % hint) + return "ONLY " + sqltext + + def get_select_precolumns(self, select, **kw): + # Do not call super().get_select_precolumns because + # it will warn/raise when distinct on is present + if select._distinct or select._distinct_on: + if select._distinct_on: + return ( + "DISTINCT ON (" + + ", ".join( + [ + self.process(col, **kw) + for col in select._distinct_on + ] + ) + + ") " + ) + else: + return "DISTINCT " + else: + return "" + + def for_update_clause(self, select, **kw): + + if select._for_update_arg.read: + if select._for_update_arg.key_share: + tmp = " FOR KEY SHARE" + else: + tmp = " FOR SHARE" + elif select._for_update_arg.key_share: + tmp = " FOR NO KEY UPDATE" + else: + tmp = " FOR UPDATE" + + if select._for_update_arg.of: + + tables = util.OrderedSet() + for c in select._for_update_arg.of: + tables.update(sql_util.surface_selectables_only(c)) + + tmp += " OF " + ", ".join( + self.process(table, ashint=True, use_schema=False, **kw) + for table in tables + ) + + if select._for_update_arg.nowait: + tmp += " NOWAIT" + if select._for_update_arg.skip_locked: + tmp += " SKIP LOCKED" + + return tmp + + def returning_clause(self, stmt, returning_cols): + + columns = [ + self._label_returning_column(stmt, c) + for c in expression._select_iterables(returning_cols) + ] + + return "RETURNING " + ", ".join(columns) + + def visit_substring_func(self, func, **kw): + s = self.process(func.clauses.clauses[0], **kw) + start = self.process(func.clauses.clauses[1], **kw) + if len(func.clauses.clauses) > 2: + length = self.process(func.clauses.clauses[2], **kw) + return "SUBSTRING(%s FROM %s FOR %s)" % (s, start, length) + else: + return "SUBSTRING(%s FROM %s)" % (s, start) + + def _on_conflict_target(self, clause, **kw): + + if clause.constraint_target is not None: + # target may be a name of an Index, UniqueConstraint or + # ExcludeConstraint. While there is a separate + # "max_identifier_length" for indexes, PostgreSQL uses the same + # length for all objects so we can use + # truncate_and_render_constraint_name + target_text = ( + "ON CONSTRAINT %s" + % self.preparer.truncate_and_render_constraint_name( + clause.constraint_target + ) + ) + elif clause.inferred_target_elements is not None: + target_text = "(%s)" % ", ".join( + ( + self.preparer.quote(c) + if isinstance(c, util.string_types) + else self.process(c, include_table=False, use_schema=False) + ) + for c in clause.inferred_target_elements + ) + if clause.inferred_target_whereclause is not None: + target_text += " WHERE %s" % self.process( + clause.inferred_target_whereclause, + include_table=False, + use_schema=False, + ) + else: + target_text = "" + + return target_text + + @util.memoized_property + def _is_safe_for_fast_insert_values_helper(self): + # don't allow fast executemany if _post_values_clause is + # present and is not an OnConflictDoNothing. what this means + # concretely is that the + # "fast insert executemany helper" won't be used, in other + # words we won't convert "executemany()" of many parameter + # sets into a single INSERT with many elements in VALUES. + # We can't apply that optimization safely if for example the + # statement includes a clause like "ON CONFLICT DO UPDATE" + + return self.insert_single_values_expr is not None and ( + self.statement._post_values_clause is None + or isinstance( + self.statement._post_values_clause, dml.OnConflictDoNothing + ) + ) + + def visit_on_conflict_do_nothing(self, on_conflict, **kw): + + target_text = self._on_conflict_target(on_conflict, **kw) + + if target_text: + return "ON CONFLICT %s DO NOTHING" % target_text + else: + return "ON CONFLICT DO NOTHING" + + def visit_on_conflict_do_update(self, on_conflict, **kw): + + clause = on_conflict + + target_text = self._on_conflict_target(on_conflict, **kw) + + action_set_ops = [] + + set_parameters = dict(clause.update_values_to_set) + # create a list of column assignment clauses as tuples + + insert_statement = self.stack[-1]["selectable"] + cols = insert_statement.table.c + for c in cols: + col_key = c.key + + if col_key in set_parameters: + value = set_parameters.pop(col_key) + elif c in set_parameters: + value = set_parameters.pop(c) + else: + continue + + if coercions._is_literal(value): + value = elements.BindParameter(None, value, type_=c.type) + + else: + if ( + isinstance(value, elements.BindParameter) + and value.type._isnull + ): + value = value._clone() + value.type = c.type + value_text = self.process(value.self_group(), use_schema=False) + + key_text = self.preparer.quote(c.name) + action_set_ops.append("%s = %s" % (key_text, value_text)) + + # check for names that don't match columns + if set_parameters: + util.warn( + "Additional column names not matching " + "any column keys in table '%s': %s" + % ( + self.current_executable.table.name, + (", ".join("'%s'" % c for c in set_parameters)), + ) + ) + for k, v in set_parameters.items(): + key_text = ( + self.preparer.quote(k) + if isinstance(k, util.string_types) + else self.process(k, use_schema=False) + ) + value_text = self.process( + coercions.expect(roles.ExpressionElementRole, v), + use_schema=False, + ) + action_set_ops.append("%s = %s" % (key_text, value_text)) + + action_text = ", ".join(action_set_ops) + if clause.update_whereclause is not None: + action_text += " WHERE %s" % self.process( + clause.update_whereclause, include_table=True, use_schema=False + ) + + return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text) + + def update_from_clause( + self, update_stmt, from_table, extra_froms, from_hints, **kw + ): + kw["asfrom"] = True + return "FROM " + ", ".join( + t._compiler_dispatch(self, fromhints=from_hints, **kw) + for t in extra_froms + ) + + def delete_extra_from_clause( + self, delete_stmt, from_table, extra_froms, from_hints, **kw + ): + """Render the DELETE .. USING clause specific to PostgreSQL.""" + kw["asfrom"] = True + return "USING " + ", ".join( + t._compiler_dispatch(self, fromhints=from_hints, **kw) + for t in extra_froms + ) + + def fetch_clause(self, select, **kw): + # pg requires parens for non literal clauses. It's also required for + # bind parameters if a ::type casts is used by the driver (asyncpg), + # so it's easiest to just always add it + text = "" + if select._offset_clause is not None: + text += "\n OFFSET (%s) ROWS" % self.process( + select._offset_clause, **kw + ) + if select._fetch_clause is not None: + text += "\n FETCH FIRST (%s)%s ROWS %s" % ( + self.process(select._fetch_clause, **kw), + " PERCENT" if select._fetch_clause_options["percent"] else "", + "WITH TIES" + if select._fetch_clause_options["with_ties"] + else "ONLY", + ) + return text + + +class PGDDLCompiler(compiler.DDLCompiler): + def get_column_specification(self, column, **kwargs): + + colspec = self.preparer.format_column(column) + impl_type = column.type.dialect_impl(self.dialect) + if isinstance(impl_type, sqltypes.TypeDecorator): + impl_type = impl_type.impl + + has_identity = ( + column.identity is not None + and self.dialect.supports_identity_columns + ) + + if ( + column.primary_key + and column is column.table._autoincrement_column + and ( + self.dialect.supports_smallserial + or not isinstance(impl_type, sqltypes.SmallInteger) + ) + and not has_identity + and ( + column.default is None + or ( + isinstance(column.default, schema.Sequence) + and column.default.optional + ) + ) + ): + if isinstance(impl_type, sqltypes.BigInteger): + colspec += " BIGSERIAL" + elif isinstance(impl_type, sqltypes.SmallInteger): + colspec += " SMALLSERIAL" + else: + colspec += " SERIAL" + else: + colspec += " " + self.dialect.type_compiler.process( + column.type, + type_expression=column, + identifier_preparer=self.preparer, + ) + default = self.get_column_default_string(column) + if default is not None: + colspec += " DEFAULT " + default + + if column.computed is not None: + colspec += " " + self.process(column.computed) + if has_identity: + colspec += " " + self.process(column.identity) + + if not column.nullable and not has_identity: + colspec += " NOT NULL" + elif column.nullable and has_identity: + colspec += " NULL" + return colspec + + def _define_constraint_validity(self, constraint): + not_valid = constraint.dialect_options["postgresql"]["not_valid"] + return " NOT VALID" if not_valid else "" + + def visit_check_constraint(self, constraint): + if constraint._type_bound: + typ = list(constraint.columns)[0].type + if ( + isinstance(typ, sqltypes.ARRAY) + and isinstance(typ.item_type, sqltypes.Enum) + and not typ.item_type.native_enum + ): + raise exc.CompileError( + "PostgreSQL dialect cannot produce the CHECK constraint " + "for ARRAY of non-native ENUM; please specify " + "create_constraint=False on this Enum datatype." + ) + + text = super(PGDDLCompiler, self).visit_check_constraint(constraint) + text += self._define_constraint_validity(constraint) + return text + + def visit_foreign_key_constraint(self, constraint): + text = super(PGDDLCompiler, self).visit_foreign_key_constraint( + constraint + ) + text += self._define_constraint_validity(constraint) + return text + + def visit_drop_table_comment(self, drop): + return "COMMENT ON TABLE %s IS NULL" % self.preparer.format_table( + drop.element + ) + + def visit_create_enum_type(self, create): + type_ = create.element + + return "CREATE TYPE %s AS ENUM (%s)" % ( + self.preparer.format_type(type_), + ", ".join( + self.sql_compiler.process(sql.literal(e), literal_binds=True) + for e in type_.enums + ), + ) + + def visit_drop_enum_type(self, drop): + type_ = drop.element + + return "DROP TYPE %s" % (self.preparer.format_type(type_)) + + def visit_create_index(self, create): + preparer = self.preparer + index = create.element + self._verify_index_table(index) + text = "CREATE " + if index.unique: + text += "UNIQUE " + text += "INDEX " + + if self.dialect._supports_create_index_concurrently: + concurrently = index.dialect_options["postgresql"]["concurrently"] + if concurrently: + text += "CONCURRENTLY " + + if create.if_not_exists: + text += "IF NOT EXISTS " + + text += "%s ON %s " % ( + self._prepared_index_name(index, include_schema=False), + preparer.format_table(index.table), + ) + + using = index.dialect_options["postgresql"]["using"] + if using: + text += ( + "USING %s " + % self.preparer.validate_sql_phrase(using, IDX_USING).lower() + ) + + ops = index.dialect_options["postgresql"]["ops"] + text += "(%s)" % ( + ", ".join( + [ + self.sql_compiler.process( + expr.self_group() + if not isinstance(expr, expression.ColumnClause) + else expr, + include_table=False, + literal_binds=True, + ) + + ( + (" " + ops[expr.key]) + if hasattr(expr, "key") and expr.key in ops + else "" + ) + for expr in index.expressions + ] + ) + ) + + includeclause = index.dialect_options["postgresql"]["include"] + if includeclause: + inclusions = [ + index.table.c[col] + if isinstance(col, util.string_types) + else col + for col in includeclause + ] + text += " INCLUDE (%s)" % ", ".join( + [preparer.quote(c.name) for c in inclusions] + ) + + withclause = index.dialect_options["postgresql"]["with"] + if withclause: + text += " WITH (%s)" % ( + ", ".join( + [ + "%s = %s" % storage_parameter + for storage_parameter in withclause.items() + ] + ) + ) + + tablespace_name = index.dialect_options["postgresql"]["tablespace"] + if tablespace_name: + text += " TABLESPACE %s" % preparer.quote(tablespace_name) + + whereclause = index.dialect_options["postgresql"]["where"] + if whereclause is not None: + whereclause = coercions.expect( + roles.DDLExpressionRole, whereclause + ) + + where_compiled = self.sql_compiler.process( + whereclause, include_table=False, literal_binds=True + ) + text += " WHERE " + where_compiled + + return text + + def visit_drop_index(self, drop): + index = drop.element + + text = "\nDROP INDEX " + + if self.dialect._supports_drop_index_concurrently: + concurrently = index.dialect_options["postgresql"]["concurrently"] + if concurrently: + text += "CONCURRENTLY " + + if drop.if_exists: + text += "IF EXISTS " + + text += self._prepared_index_name(index, include_schema=True) + return text + + def visit_exclude_constraint(self, constraint, **kw): + text = "" + if constraint.name is not None: + text += "CONSTRAINT %s " % self.preparer.format_constraint( + constraint + ) + elements = [] + for expr, name, op in constraint._render_exprs: + kw["include_table"] = False + exclude_element = self.sql_compiler.process(expr, **kw) + ( + (" " + constraint.ops[expr.key]) + if hasattr(expr, "key") and expr.key in constraint.ops + else "" + ) + + elements.append("%s WITH %s" % (exclude_element, op)) + text += "EXCLUDE USING %s (%s)" % ( + self.preparer.validate_sql_phrase( + constraint.using, IDX_USING + ).lower(), + ", ".join(elements), + ) + if constraint.where is not None: + text += " WHERE (%s)" % self.sql_compiler.process( + constraint.where, literal_binds=True + ) + text += self.define_constraint_deferrability(constraint) + return text + + def post_create_table(self, table): + table_opts = [] + pg_opts = table.dialect_options["postgresql"] + + inherits = pg_opts.get("inherits") + if inherits is not None: + if not isinstance(inherits, (list, tuple)): + inherits = (inherits,) + table_opts.append( + "\n INHERITS ( " + + ", ".join(self.preparer.quote(name) for name in inherits) + + " )" + ) + + if pg_opts["partition_by"]: + table_opts.append("\n PARTITION BY %s" % pg_opts["partition_by"]) + + if pg_opts["with_oids"] is True: + table_opts.append("\n WITH OIDS") + elif pg_opts["with_oids"] is False: + table_opts.append("\n WITHOUT OIDS") + + if pg_opts["on_commit"]: + on_commit_options = pg_opts["on_commit"].replace("_", " ").upper() + table_opts.append("\n ON COMMIT %s" % on_commit_options) + + if pg_opts["tablespace"]: + tablespace_name = pg_opts["tablespace"] + table_opts.append( + "\n TABLESPACE %s" % self.preparer.quote(tablespace_name) + ) + + return "".join(table_opts) + + def visit_computed_column(self, generated): + if generated.persisted is False: + raise exc.CompileError( + "PostrgreSQL computed columns do not support 'virtual' " + "persistence; set the 'persisted' flag to None or True for " + "PostgreSQL support." + ) + + return "GENERATED ALWAYS AS (%s) STORED" % self.sql_compiler.process( + generated.sqltext, include_table=False, literal_binds=True + ) + + def visit_create_sequence(self, create, **kw): + prefix = None + if create.element.data_type is not None: + prefix = " AS %s" % self.type_compiler.process( + create.element.data_type + ) + + return super(PGDDLCompiler, self).visit_create_sequence( + create, prefix=prefix, **kw + ) + + +class PGTypeCompiler(compiler.GenericTypeCompiler): + def visit_TSVECTOR(self, type_, **kw): + return "TSVECTOR" + + def visit_INET(self, type_, **kw): + return "INET" + + def visit_CIDR(self, type_, **kw): + return "CIDR" + + def visit_MACADDR(self, type_, **kw): + return "MACADDR" + + def visit_MONEY(self, type_, **kw): + return "MONEY" + + def visit_OID(self, type_, **kw): + return "OID" + + def visit_REGCLASS(self, type_, **kw): + return "REGCLASS" + + def visit_FLOAT(self, type_, **kw): + if not type_.precision: + return "FLOAT" + else: + return "FLOAT(%(precision)s)" % {"precision": type_.precision} + + def visit_DOUBLE_PRECISION(self, type_, **kw): + return "DOUBLE PRECISION" + + def visit_BIGINT(self, type_, **kw): + return "BIGINT" + + def visit_HSTORE(self, type_, **kw): + return "HSTORE" + + def visit_JSON(self, type_, **kw): + return "JSON" + + def visit_JSONB(self, type_, **kw): + return "JSONB" + + def visit_INT4RANGE(self, type_, **kw): + return "INT4RANGE" + + def visit_INT8RANGE(self, type_, **kw): + return "INT8RANGE" + + def visit_NUMRANGE(self, type_, **kw): + return "NUMRANGE" + + def visit_DATERANGE(self, type_, **kw): + return "DATERANGE" + + def visit_TSRANGE(self, type_, **kw): + return "TSRANGE" + + def visit_TSTZRANGE(self, type_, **kw): + return "TSTZRANGE" + + def visit_datetime(self, type_, **kw): + return self.visit_TIMESTAMP(type_, **kw) + + def visit_enum(self, type_, **kw): + if not type_.native_enum or not self.dialect.supports_native_enum: + return super(PGTypeCompiler, self).visit_enum(type_, **kw) + else: + return self.visit_ENUM(type_, **kw) + + def visit_ENUM(self, type_, identifier_preparer=None, **kw): + if identifier_preparer is None: + identifier_preparer = self.dialect.identifier_preparer + return identifier_preparer.format_type(type_) + + def visit_TIMESTAMP(self, type_, **kw): + return "TIMESTAMP%s %s" % ( + "(%d)" % type_.precision + if getattr(type_, "precision", None) is not None + else "", + (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE", + ) + + def visit_TIME(self, type_, **kw): + return "TIME%s %s" % ( + "(%d)" % type_.precision + if getattr(type_, "precision", None) is not None + else "", + (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE", + ) + + def visit_INTERVAL(self, type_, **kw): + text = "INTERVAL" + if type_.fields is not None: + text += " " + type_.fields + if type_.precision is not None: + text += " (%d)" % type_.precision + return text + + def visit_BIT(self, type_, **kw): + if type_.varying: + compiled = "BIT VARYING" + if type_.length is not None: + compiled += "(%d)" % type_.length + else: + compiled = "BIT(%d)" % type_.length + return compiled + + def visit_UUID(self, type_, **kw): + return "UUID" + + def visit_large_binary(self, type_, **kw): + return self.visit_BYTEA(type_, **kw) + + def visit_BYTEA(self, type_, **kw): + return "BYTEA" + + def visit_ARRAY(self, type_, **kw): + + inner = self.process(type_.item_type, **kw) + return re.sub( + r"((?: COLLATE.*)?)$", + ( + r"%s\1" + % ( + "[]" + * (type_.dimensions if type_.dimensions is not None else 1) + ) + ), + inner, + count=1, + ) + + +class PGIdentifierPreparer(compiler.IdentifierPreparer): + + reserved_words = RESERVED_WORDS + + def _unquote_identifier(self, value): + if value[0] == self.initial_quote: + value = value[1:-1].replace( + self.escape_to_quote, self.escape_quote + ) + return value + + def format_type(self, type_, use_schema=True): + if not type_.name: + raise exc.CompileError("PostgreSQL ENUM type requires a name.") + + name = self.quote(type_.name) + effective_schema = self.schema_for_object(type_) + + if ( + not self.omit_schema + and use_schema + and effective_schema is not None + ): + name = self.quote_schema(effective_schema) + "." + name + return name + + +class PGInspector(reflection.Inspector): + def get_table_oid(self, table_name, schema=None): + """Return the OID for the given table name.""" + + with self._operation_context() as conn: + return self.dialect.get_table_oid( + conn, table_name, schema, info_cache=self.info_cache + ) + + def get_enums(self, schema=None): + """Return a list of ENUM objects. + + Each member is a dictionary containing these fields: + + * name - name of the enum + * schema - the schema name for the enum. + * visible - boolean, whether or not this enum is visible + in the default search path. + * labels - a list of string labels that apply to the enum. + + :param schema: schema name. If None, the default schema + (typically 'public') is used. May also be set to '*' to + indicate load enums for all schemas. + + .. versionadded:: 1.0.0 + + """ + schema = schema or self.default_schema_name + with self._operation_context() as conn: + return self.dialect._load_enums(conn, schema) + + def get_foreign_table_names(self, schema=None): + """Return a list of FOREIGN TABLE names. + + Behavior is similar to that of + :meth:`_reflection.Inspector.get_table_names`, + except that the list is limited to those tables that report a + ``relkind`` value of ``f``. + + .. versionadded:: 1.0.0 + + """ + schema = schema or self.default_schema_name + with self._operation_context() as conn: + return self.dialect._get_foreign_table_names(conn, schema) + + def get_view_names(self, schema=None, include=("plain", "materialized")): + """Return all view names in `schema`. + + :param schema: Optional, retrieve names from a non-default schema. + For special quoting, use :class:`.quoted_name`. + + :param include: specify which types of views to return. Passed + as a string value (for a single type) or a tuple (for any number + of types). Defaults to ``('plain', 'materialized')``. + + .. versionadded:: 1.1 + + """ + + with self._operation_context() as conn: + return self.dialect.get_view_names( + conn, schema, info_cache=self.info_cache, include=include + ) + + +class CreateEnumType(schema._CreateDropBase): + __visit_name__ = "create_enum_type" + + +class DropEnumType(schema._CreateDropBase): + __visit_name__ = "drop_enum_type" + + +class PGExecutionContext(default.DefaultExecutionContext): + def fire_sequence(self, seq, type_): + return self._execute_scalar( + ( + "select nextval('%s')" + % self.identifier_preparer.format_sequence(seq) + ), + type_, + ) + + def get_insert_default(self, column): + if column.primary_key and column is column.table._autoincrement_column: + if column.server_default and column.server_default.has_argument: + + # pre-execute passive defaults on primary key columns + return self._execute_scalar( + "select %s" % column.server_default.arg, column.type + ) + + elif column.default is None or ( + column.default.is_sequence and column.default.optional + ): + # execute the sequence associated with a SERIAL primary + # key column. for non-primary-key SERIAL, the ID just + # generates server side. + + try: + seq_name = column._postgresql_seq_name + except AttributeError: + tab = column.table.name + col = column.name + tab = tab[0 : 29 + max(0, (29 - len(col)))] + col = col[0 : 29 + max(0, (29 - len(tab)))] + name = "%s_%s_seq" % (tab, col) + column._postgresql_seq_name = seq_name = name + + if column.table is not None: + effective_schema = self.connection.schema_for_object( + column.table + ) + else: + effective_schema = None + + if effective_schema is not None: + exc = 'select nextval(\'"%s"."%s"\')' % ( + effective_schema, + seq_name, + ) + else: + exc = "select nextval('\"%s\"')" % (seq_name,) + + return self._execute_scalar(exc, column.type) + + return super(PGExecutionContext, self).get_insert_default(column) + + def should_autocommit_text(self, statement): + return AUTOCOMMIT_REGEXP.match(statement) + + +class PGReadOnlyConnectionCharacteristic( + characteristics.ConnectionCharacteristic +): + transactional = True + + def reset_characteristic(self, dialect, dbapi_conn): + dialect.set_readonly(dbapi_conn, False) + + def set_characteristic(self, dialect, dbapi_conn, value): + dialect.set_readonly(dbapi_conn, value) + + def get_characteristic(self, dialect, dbapi_conn): + return dialect.get_readonly(dbapi_conn) + + +class PGDeferrableConnectionCharacteristic( + characteristics.ConnectionCharacteristic +): + transactional = True + + def reset_characteristic(self, dialect, dbapi_conn): + dialect.set_deferrable(dbapi_conn, False) + + def set_characteristic(self, dialect, dbapi_conn, value): + dialect.set_deferrable(dbapi_conn, value) + + def get_characteristic(self, dialect, dbapi_conn): + return dialect.get_deferrable(dbapi_conn) + + +class PGDialect(default.DefaultDialect): + name = "postgresql" + supports_statement_cache = True + supports_alter = True + max_identifier_length = 63 + supports_sane_rowcount = True + + supports_native_enum = True + supports_native_boolean = True + supports_smallserial = True + + supports_sequences = True + sequences_optional = True + preexecute_autoincrement_sequences = True + postfetch_lastrowid = False + + supports_comments = True + supports_default_values = True + + supports_default_metavalue = True + + supports_empty_insert = False + supports_multivalues_insert = True + supports_identity_columns = True + + default_paramstyle = "pyformat" + ischema_names = ischema_names + colspecs = colspecs + + statement_compiler = PGCompiler + ddl_compiler = PGDDLCompiler + type_compiler = PGTypeCompiler + preparer = PGIdentifierPreparer + execution_ctx_cls = PGExecutionContext + inspector = PGInspector + isolation_level = None + + implicit_returning = True + full_returning = True + + connection_characteristics = ( + default.DefaultDialect.connection_characteristics + ) + connection_characteristics = connection_characteristics.union( + { + "postgresql_readonly": PGReadOnlyConnectionCharacteristic(), + "postgresql_deferrable": PGDeferrableConnectionCharacteristic(), + } + ) + + construct_arguments = [ + ( + schema.Index, + { + "using": False, + "include": None, + "where": None, + "ops": {}, + "concurrently": False, + "with": {}, + "tablespace": None, + }, + ), + ( + schema.Table, + { + "ignore_search_path": False, + "tablespace": None, + "partition_by": None, + "with_oids": None, + "on_commit": None, + "inherits": None, + }, + ), + ( + schema.CheckConstraint, + { + "not_valid": False, + }, + ), + ( + schema.ForeignKeyConstraint, + { + "not_valid": False, + }, + ), + ] + + reflection_options = ("postgresql_ignore_search_path",) + + _backslash_escapes = True + _supports_create_index_concurrently = True + _supports_drop_index_concurrently = True + + def __init__( + self, + isolation_level=None, + json_serializer=None, + json_deserializer=None, + **kwargs + ): + default.DefaultDialect.__init__(self, **kwargs) + + # the isolation_level parameter to the PGDialect itself is legacy. + # still works however the execution_options method is the one that + # is documented. + self.isolation_level = isolation_level + self._json_deserializer = json_deserializer + self._json_serializer = json_serializer + + def initialize(self, connection): + super(PGDialect, self).initialize(connection) + + if self.server_version_info <= (8, 2): + self.full_returning = self.implicit_returning = False + + self.supports_native_enum = self.server_version_info >= (8, 3) + if not self.supports_native_enum: + self.colspecs = self.colspecs.copy() + # pop base Enum type + self.colspecs.pop(sqltypes.Enum, None) + # psycopg2, others may have placed ENUM here as well + self.colspecs.pop(ENUM, None) + + # https://www.postgresql.org/docs/9.3/static/release-9-2.html#AEN116689 + self.supports_smallserial = self.server_version_info >= (9, 2) + + if self.server_version_info < (8, 2): + self._backslash_escapes = False + else: + # ensure this query is not emitted on server version < 8.2 + # as it will fail + std_string = connection.exec_driver_sql( + "show standard_conforming_strings" + ).scalar() + self._backslash_escapes = std_string == "off" + + self._supports_create_index_concurrently = ( + self.server_version_info >= (8, 2) + ) + self._supports_drop_index_concurrently = self.server_version_info >= ( + 9, + 2, + ) + self.supports_identity_columns = self.server_version_info >= (10,) + + def on_connect(self): + if self.isolation_level is not None: + + def connect(conn): + self.set_isolation_level(conn, self.isolation_level) + + return connect + else: + return None + + _isolation_lookup = set( + [ + "SERIALIZABLE", + "READ UNCOMMITTED", + "READ COMMITTED", + "REPEATABLE READ", + ] + ) + + def set_isolation_level(self, connection, level): + level = level.replace("_", " ") + if level not in self._isolation_lookup: + raise exc.ArgumentError( + "Invalid value '%s' for isolation_level. " + "Valid isolation levels for %s are %s" + % (level, self.name, ", ".join(self._isolation_lookup)) + ) + cursor = connection.cursor() + cursor.execute( + "SET SESSION CHARACTERISTICS AS TRANSACTION " + "ISOLATION LEVEL %s" % level + ) + cursor.execute("COMMIT") + cursor.close() + + def get_isolation_level(self, connection): + cursor = connection.cursor() + cursor.execute("show transaction isolation level") + val = cursor.fetchone()[0] + cursor.close() + return val.upper() + + def set_readonly(self, connection, value): + raise NotImplementedError() + + def get_readonly(self, connection): + raise NotImplementedError() + + def set_deferrable(self, connection, value): + raise NotImplementedError() + + def get_deferrable(self, connection): + raise NotImplementedError() + + def do_begin_twophase(self, connection, xid): + self.do_begin(connection.connection) + + def do_prepare_twophase(self, connection, xid): + connection.exec_driver_sql("PREPARE TRANSACTION '%s'" % xid) + + def do_rollback_twophase( + self, connection, xid, is_prepared=True, recover=False + ): + if is_prepared: + if recover: + # FIXME: ugly hack to get out of transaction + # context when committing recoverable transactions + # Must find out a way how to make the dbapi not + # open a transaction. + connection.exec_driver_sql("ROLLBACK") + connection.exec_driver_sql("ROLLBACK PREPARED '%s'" % xid) + connection.exec_driver_sql("BEGIN") + self.do_rollback(connection.connection) + else: + self.do_rollback(connection.connection) + + def do_commit_twophase( + self, connection, xid, is_prepared=True, recover=False + ): + if is_prepared: + if recover: + connection.exec_driver_sql("ROLLBACK") + connection.exec_driver_sql("COMMIT PREPARED '%s'" % xid) + connection.exec_driver_sql("BEGIN") + self.do_rollback(connection.connection) + else: + self.do_commit(connection.connection) + + def do_recover_twophase(self, connection): + resultset = connection.execute( + sql.text("SELECT gid FROM pg_prepared_xacts") + ) + return [row[0] for row in resultset] + + def _get_default_schema_name(self, connection): + return connection.exec_driver_sql("select current_schema()").scalar() + + def has_schema(self, connection, schema): + query = ( + "select nspname from pg_namespace " "where lower(nspname)=:schema" + ) + cursor = connection.execute( + sql.text(query).bindparams( + sql.bindparam( + "schema", + util.text_type(schema.lower()), + type_=sqltypes.Unicode, + ) + ) + ) + + return bool(cursor.first()) + + def has_table(self, connection, table_name, schema=None): + self._ensure_has_table_connection(connection) + # seems like case gets folded in pg_class... + if schema is None: + cursor = connection.execute( + sql.text( + "select relname from pg_class c join pg_namespace n on " + "n.oid=c.relnamespace where " + "pg_catalog.pg_table_is_visible(c.oid) " + "and relname=:name" + ).bindparams( + sql.bindparam( + "name", + util.text_type(table_name), + type_=sqltypes.Unicode, + ) + ) + ) + else: + cursor = connection.execute( + sql.text( + "select relname from pg_class c join pg_namespace n on " + "n.oid=c.relnamespace where n.nspname=:schema and " + "relname=:name" + ).bindparams( + sql.bindparam( + "name", + util.text_type(table_name), + type_=sqltypes.Unicode, + ), + sql.bindparam( + "schema", + util.text_type(schema), + type_=sqltypes.Unicode, + ), + ) + ) + return bool(cursor.first()) + + def has_sequence(self, connection, sequence_name, schema=None): + if schema is None: + schema = self.default_schema_name + cursor = connection.execute( + sql.text( + "SELECT relname FROM pg_class c join pg_namespace n on " + "n.oid=c.relnamespace where relkind='S' and " + "n.nspname=:schema and relname=:name" + ).bindparams( + sql.bindparam( + "name", + util.text_type(sequence_name), + type_=sqltypes.Unicode, + ), + sql.bindparam( + "schema", + util.text_type(schema), + type_=sqltypes.Unicode, + ), + ) + ) + + return bool(cursor.first()) + + def has_type(self, connection, type_name, schema=None): + if schema is not None: + query = """ + SELECT EXISTS ( + SELECT * FROM pg_catalog.pg_type t, pg_catalog.pg_namespace n + WHERE t.typnamespace = n.oid + AND t.typname = :typname + AND n.nspname = :nspname + ) + """ + query = sql.text(query) + else: + query = """ + SELECT EXISTS ( + SELECT * FROM pg_catalog.pg_type t + WHERE t.typname = :typname + AND pg_type_is_visible(t.oid) + ) + """ + query = sql.text(query) + query = query.bindparams( + sql.bindparam( + "typname", util.text_type(type_name), type_=sqltypes.Unicode + ) + ) + if schema is not None: + query = query.bindparams( + sql.bindparam( + "nspname", util.text_type(schema), type_=sqltypes.Unicode + ) + ) + cursor = connection.execute(query) + return bool(cursor.scalar()) + + def _get_server_version_info(self, connection): + v = connection.exec_driver_sql("select pg_catalog.version()").scalar() + m = re.match( + r".*(?:PostgreSQL|EnterpriseDB) " + r"(\d+)\.?(\d+)?(?:\.(\d+))?(?:\.\d+)?(?:devel|beta)?", + v, + ) + if not m: + raise AssertionError( + "Could not determine version from string '%s'" % v + ) + return tuple([int(x) for x in m.group(1, 2, 3) if x is not None]) + + @reflection.cache + def get_table_oid(self, connection, table_name, schema=None, **kw): + """Fetch the oid for schema.table_name. + + Several reflection methods require the table oid. The idea for using + this method is that it can be fetched one time and cached for + subsequent calls. + + """ + table_oid = None + if schema is not None: + schema_where_clause = "n.nspname = :schema" + else: + schema_where_clause = "pg_catalog.pg_table_is_visible(c.oid)" + query = ( + """ + SELECT c.oid + FROM pg_catalog.pg_class c + LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + WHERE (%s) + AND c.relname = :table_name AND c.relkind in + ('r', 'v', 'm', 'f', 'p') + """ + % schema_where_clause + ) + # Since we're binding to unicode, table_name and schema_name must be + # unicode. + table_name = util.text_type(table_name) + if schema is not None: + schema = util.text_type(schema) + s = sql.text(query).bindparams(table_name=sqltypes.Unicode) + s = s.columns(oid=sqltypes.Integer) + if schema: + s = s.bindparams(sql.bindparam("schema", type_=sqltypes.Unicode)) + c = connection.execute(s, dict(table_name=table_name, schema=schema)) + table_oid = c.scalar() + if table_oid is None: + raise exc.NoSuchTableError(table_name) + return table_oid + + @reflection.cache + def get_schema_names(self, connection, **kw): + result = connection.execute( + sql.text( + "SELECT nspname FROM pg_namespace " + "WHERE nspname NOT LIKE 'pg_%' " + "ORDER BY nspname" + ).columns(nspname=sqltypes.Unicode) + ) + return [name for name, in result] + + @reflection.cache + def get_table_names(self, connection, schema=None, **kw): + result = connection.execute( + sql.text( + "SELECT c.relname FROM pg_class c " + "JOIN pg_namespace n ON n.oid = c.relnamespace " + "WHERE n.nspname = :schema AND c.relkind in ('r', 'p')" + ).columns(relname=sqltypes.Unicode), + dict( + schema=schema + if schema is not None + else self.default_schema_name + ), + ) + return [name for name, in result] + + @reflection.cache + def _get_foreign_table_names(self, connection, schema=None, **kw): + result = connection.execute( + sql.text( + "SELECT c.relname FROM pg_class c " + "JOIN pg_namespace n ON n.oid = c.relnamespace " + "WHERE n.nspname = :schema AND c.relkind = 'f'" + ).columns(relname=sqltypes.Unicode), + dict( + schema=schema + if schema is not None + else self.default_schema_name + ), + ) + return [name for name, in result] + + @reflection.cache + def get_view_names( + self, connection, schema=None, include=("plain", "materialized"), **kw + ): + + include_kind = {"plain": "v", "materialized": "m"} + try: + kinds = [include_kind[i] for i in util.to_list(include)] + except KeyError: + raise ValueError( + "include %r unknown, needs to be a sequence containing " + "one or both of 'plain' and 'materialized'" % (include,) + ) + if not kinds: + raise ValueError( + "empty include, needs to be a sequence containing " + "one or both of 'plain' and 'materialized'" + ) + + result = connection.execute( + sql.text( + "SELECT c.relname FROM pg_class c " + "JOIN pg_namespace n ON n.oid = c.relnamespace " + "WHERE n.nspname = :schema AND c.relkind IN (%s)" + % (", ".join("'%s'" % elem for elem in kinds)) + ).columns(relname=sqltypes.Unicode), + dict( + schema=schema + if schema is not None + else self.default_schema_name + ), + ) + return [name for name, in result] + + @reflection.cache + def get_sequence_names(self, connection, schema=None, **kw): + if not schema: + schema = self.default_schema_name + cursor = connection.execute( + sql.text( + "SELECT relname FROM pg_class c join pg_namespace n on " + "n.oid=c.relnamespace where relkind='S' and " + "n.nspname=:schema" + ).bindparams( + sql.bindparam( + "schema", + util.text_type(schema), + type_=sqltypes.Unicode, + ), + ) + ) + return [row[0] for row in cursor] + + @reflection.cache + def get_view_definition(self, connection, view_name, schema=None, **kw): + view_def = connection.scalar( + sql.text( + "SELECT pg_get_viewdef(c.oid) view_def FROM pg_class c " + "JOIN pg_namespace n ON n.oid = c.relnamespace " + "WHERE n.nspname = :schema AND c.relname = :view_name " + "AND c.relkind IN ('v', 'm')" + ).columns(view_def=sqltypes.Unicode), + dict( + schema=schema + if schema is not None + else self.default_schema_name, + view_name=view_name, + ), + ) + return view_def + + @reflection.cache + def get_columns(self, connection, table_name, schema=None, **kw): + + table_oid = self.get_table_oid( + connection, table_name, schema, info_cache=kw.get("info_cache") + ) + + generated = ( + "a.attgenerated as generated" + if self.server_version_info >= (12,) + else "NULL as generated" + ) + if self.server_version_info >= (10,): + # a.attidentity != '' is required or it will reflect also + # serial columns as identity. + identity = """\ + (SELECT json_build_object( + 'always', a.attidentity = 'a', + 'start', s.seqstart, + 'increment', s.seqincrement, + 'minvalue', s.seqmin, + 'maxvalue', s.seqmax, + 'cache', s.seqcache, + 'cycle', s.seqcycle) + FROM pg_catalog.pg_sequence s + JOIN pg_catalog.pg_class c on s.seqrelid = c."oid" + WHERE c.relkind = 'S' + AND a.attidentity != '' + AND s.seqrelid = pg_catalog.pg_get_serial_sequence( + a.attrelid::regclass::text, a.attname + )::regclass::oid + ) as identity_options\ + """ + else: + identity = "NULL as identity_options" + + SQL_COLS = """ + SELECT a.attname, + pg_catalog.format_type(a.atttypid, a.atttypmod), + ( + SELECT pg_catalog.pg_get_expr(d.adbin, d.adrelid) + FROM pg_catalog.pg_attrdef d + WHERE d.adrelid = a.attrelid AND d.adnum = a.attnum + AND a.atthasdef + ) AS DEFAULT, + a.attnotnull, + a.attrelid as table_oid, + pgd.description as comment, + %s, + %s + FROM pg_catalog.pg_attribute a + LEFT JOIN pg_catalog.pg_description pgd ON ( + pgd.objoid = a.attrelid AND pgd.objsubid = a.attnum) + WHERE a.attrelid = :table_oid + AND a.attnum > 0 AND NOT a.attisdropped + ORDER BY a.attnum + """ % ( + generated, + identity, + ) + s = ( + sql.text(SQL_COLS) + .bindparams(sql.bindparam("table_oid", type_=sqltypes.Integer)) + .columns(attname=sqltypes.Unicode, default=sqltypes.Unicode) + ) + c = connection.execute(s, dict(table_oid=table_oid)) + rows = c.fetchall() + + # dictionary with (name, ) if default search path or (schema, name) + # as keys + domains = self._load_domains(connection) + + # dictionary with (name, ) if default search path or (schema, name) + # as keys + enums = dict( + ((rec["name"],), rec) + if rec["visible"] + else ((rec["schema"], rec["name"]), rec) + for rec in self._load_enums(connection, schema="*") + ) + + # format columns + columns = [] + + for ( + name, + format_type, + default_, + notnull, + table_oid, + comment, + generated, + identity, + ) in rows: + column_info = self._get_column_info( + name, + format_type, + default_, + notnull, + domains, + enums, + schema, + comment, + generated, + identity, + ) + columns.append(column_info) + return columns + + def _get_column_info( + self, + name, + format_type, + default, + notnull, + domains, + enums, + schema, + comment, + generated, + identity, + ): + def _handle_array_type(attype): + return ( + # strip '[]' from integer[], etc. + re.sub(r"\[\]$", "", attype), + attype.endswith("[]"), + ) + + # strip (*) from character varying(5), timestamp(5) + # with time zone, geometry(POLYGON), etc. + attype = re.sub(r"\(.*\)", "", format_type) + + # strip '[]' from integer[], etc. and check if an array + attype, is_array = _handle_array_type(attype) + + # strip quotes from case sensitive enum or domain names + enum_or_domain_key = tuple(util.quoted_token_parser(attype)) + + nullable = not notnull + + charlen = re.search(r"\(([\d,]+)\)", format_type) + if charlen: + charlen = charlen.group(1) + args = re.search(r"\((.*)\)", format_type) + if args and args.group(1): + args = tuple(re.split(r"\s*,\s*", args.group(1))) + else: + args = () + kwargs = {} + + if attype == "numeric": + if charlen: + prec, scale = charlen.split(",") + args = (int(prec), int(scale)) + else: + args = () + elif attype == "double precision": + args = (53,) + elif attype == "integer": + args = () + elif attype in ("timestamp with time zone", "time with time zone"): + kwargs["timezone"] = True + if charlen: + kwargs["precision"] = int(charlen) + args = () + elif attype in ( + "timestamp without time zone", + "time without time zone", + "time", + ): + kwargs["timezone"] = False + if charlen: + kwargs["precision"] = int(charlen) + args = () + elif attype == "bit varying": + kwargs["varying"] = True + if charlen: + args = (int(charlen),) + else: + args = () + elif attype.startswith("interval"): + field_match = re.match(r"interval (.+)", attype, re.I) + if charlen: + kwargs["precision"] = int(charlen) + if field_match: + kwargs["fields"] = field_match.group(1) + attype = "interval" + args = () + elif charlen: + args = (int(charlen),) + + while True: + # looping here to suit nested domains + if attype in self.ischema_names: + coltype = self.ischema_names[attype] + break + elif enum_or_domain_key in enums: + enum = enums[enum_or_domain_key] + coltype = ENUM + kwargs["name"] = enum["name"] + if not enum["visible"]: + kwargs["schema"] = enum["schema"] + args = tuple(enum["labels"]) + break + elif enum_or_domain_key in domains: + domain = domains[enum_or_domain_key] + attype = domain["attype"] + attype, is_array = _handle_array_type(attype) + # strip quotes from case sensitive enum or domain names + enum_or_domain_key = tuple(util.quoted_token_parser(attype)) + # A table can't override a not null on the domain, + # but can override nullable + nullable = nullable and domain["nullable"] + if domain["default"] and not default: + # It can, however, override the default + # value, but can't set it to null. + default = domain["default"] + continue + else: + coltype = None + break + + if coltype: + coltype = coltype(*args, **kwargs) + if is_array: + coltype = self.ischema_names["_array"](coltype) + else: + util.warn( + "Did not recognize type '%s' of column '%s'" % (attype, name) + ) + coltype = sqltypes.NULLTYPE + + # If a zero byte or blank string depending on driver (is also absent + # for older PG versions), then not a generated column. Otherwise, s = + # stored. (Other values might be added in the future.) + if generated not in (None, "", b"\x00"): + computed = dict( + sqltext=default, persisted=generated in ("s", b"s") + ) + default = None + else: + computed = None + + # adjust the default value + autoincrement = False + if default is not None: + match = re.search(r"""(nextval\(')([^']+)('.*$)""", default) + if match is not None: + if issubclass(coltype._type_affinity, sqltypes.Integer): + autoincrement = True + # the default is related to a Sequence + sch = schema + if "." not in match.group(2) and sch is not None: + # unconditionally quote the schema name. this could + # later be enhanced to obey quoting rules / + # "quote schema" + default = ( + match.group(1) + + ('"%s"' % sch) + + "." + + match.group(2) + + match.group(3) + ) + + column_info = dict( + name=name, + type=coltype, + nullable=nullable, + default=default, + autoincrement=autoincrement or identity is not None, + comment=comment, + ) + if computed is not None: + column_info["computed"] = computed + if identity is not None: + column_info["identity"] = identity + return column_info + + @reflection.cache + def get_pk_constraint(self, connection, table_name, schema=None, **kw): + table_oid = self.get_table_oid( + connection, table_name, schema, info_cache=kw.get("info_cache") + ) + + if self.server_version_info < (8, 4): + PK_SQL = """ + SELECT a.attname + FROM + pg_class t + join pg_index ix on t.oid = ix.indrelid + join pg_attribute a + on t.oid=a.attrelid AND %s + WHERE + t.oid = :table_oid and ix.indisprimary = 't' + ORDER BY a.attnum + """ % self._pg_index_any( + "a.attnum", "ix.indkey" + ) + + else: + # unnest() and generate_subscripts() both introduced in + # version 8.4 + PK_SQL = """ + SELECT a.attname + FROM pg_attribute a JOIN ( + SELECT unnest(ix.indkey) attnum, + generate_subscripts(ix.indkey, 1) ord + FROM pg_index ix + WHERE ix.indrelid = :table_oid AND ix.indisprimary + ) k ON a.attnum=k.attnum + WHERE a.attrelid = :table_oid + ORDER BY k.ord + """ + t = sql.text(PK_SQL).columns(attname=sqltypes.Unicode) + c = connection.execute(t, dict(table_oid=table_oid)) + cols = [r[0] for r in c.fetchall()] + + PK_CONS_SQL = """ + SELECT conname + FROM pg_catalog.pg_constraint r + WHERE r.conrelid = :table_oid AND r.contype = 'p' + ORDER BY 1 + """ + t = sql.text(PK_CONS_SQL).columns(conname=sqltypes.Unicode) + c = connection.execute(t, dict(table_oid=table_oid)) + name = c.scalar() + + return {"constrained_columns": cols, "name": name} + + @reflection.cache + def get_foreign_keys( + self, + connection, + table_name, + schema=None, + postgresql_ignore_search_path=False, + **kw + ): + preparer = self.identifier_preparer + table_oid = self.get_table_oid( + connection, table_name, schema, info_cache=kw.get("info_cache") + ) + + FK_SQL = """ + SELECT r.conname, + pg_catalog.pg_get_constraintdef(r.oid, true) as condef, + n.nspname as conschema + FROM pg_catalog.pg_constraint r, + pg_namespace n, + pg_class c + + WHERE r.conrelid = :table AND + r.contype = 'f' AND + c.oid = confrelid AND + n.oid = c.relnamespace + ORDER BY 1 + """ + # https://www.postgresql.org/docs/9.0/static/sql-createtable.html + FK_REGEX = re.compile( + r"FOREIGN KEY \((.*?)\) REFERENCES (?:(.*?)\.)?(.*?)\((.*?)\)" + r"[\s]?(MATCH (FULL|PARTIAL|SIMPLE)+)?" + r"[\s]?(ON UPDATE " + r"(CASCADE|RESTRICT|NO ACTION|SET NULL|SET DEFAULT)+)?" + r"[\s]?(ON DELETE " + r"(CASCADE|RESTRICT|NO ACTION|SET NULL|SET DEFAULT)+)?" + r"[\s]?(DEFERRABLE|NOT DEFERRABLE)?" + r"[\s]?(INITIALLY (DEFERRED|IMMEDIATE)+)?" + ) + + t = sql.text(FK_SQL).columns( + conname=sqltypes.Unicode, condef=sqltypes.Unicode + ) + c = connection.execute(t, dict(table=table_oid)) + fkeys = [] + for conname, condef, conschema in c.fetchall(): + m = re.search(FK_REGEX, condef).groups() + + ( + constrained_columns, + referred_schema, + referred_table, + referred_columns, + _, + match, + _, + onupdate, + _, + ondelete, + deferrable, + _, + initially, + ) = m + + if deferrable is not None: + deferrable = True if deferrable == "DEFERRABLE" else False + constrained_columns = [ + preparer._unquote_identifier(x) + for x in re.split(r"\s*,\s*", constrained_columns) + ] + + if postgresql_ignore_search_path: + # when ignoring search path, we use the actual schema + # provided it isn't the "default" schema + if conschema != self.default_schema_name: + referred_schema = conschema + else: + referred_schema = schema + elif referred_schema: + # referred_schema is the schema that we regexp'ed from + # pg_get_constraintdef(). If the schema is in the search + # path, pg_get_constraintdef() will give us None. + referred_schema = preparer._unquote_identifier(referred_schema) + elif schema is not None and schema == conschema: + # If the actual schema matches the schema of the table + # we're reflecting, then we will use that. + referred_schema = schema + + referred_table = preparer._unquote_identifier(referred_table) + referred_columns = [ + preparer._unquote_identifier(x) + for x in re.split(r"\s*,\s", referred_columns) + ] + options = { + k: v + for k, v in [ + ("onupdate", onupdate), + ("ondelete", ondelete), + ("initially", initially), + ("deferrable", deferrable), + ("match", match), + ] + if v is not None and v != "NO ACTION" + } + fkey_d = { + "name": conname, + "constrained_columns": constrained_columns, + "referred_schema": referred_schema, + "referred_table": referred_table, + "referred_columns": referred_columns, + "options": options, + } + fkeys.append(fkey_d) + return fkeys + + def _pg_index_any(self, col, compare_to): + if self.server_version_info < (8, 1): + # https://www.postgresql.org/message-id/10279.1124395722@sss.pgh.pa.us + # "In CVS tip you could replace this with "attnum = ANY (indkey)". + # Unfortunately, most array support doesn't work on int2vector in + # pre-8.1 releases, so I think you're kinda stuck with the above + # for now. + # regards, tom lane" + return "(%s)" % " OR ".join( + "%s[%d] = %s" % (compare_to, ind, col) for ind in range(0, 10) + ) + else: + return "%s = ANY(%s)" % (col, compare_to) + + @reflection.cache + def get_indexes(self, connection, table_name, schema, **kw): + table_oid = self.get_table_oid( + connection, table_name, schema, info_cache=kw.get("info_cache") + ) + + # cast indkey as varchar since it's an int2vector, + # returned as a list by some drivers such as pypostgresql + + if self.server_version_info < (8, 5): + IDX_SQL = """ + SELECT + i.relname as relname, + ix.indisunique, ix.indexprs, ix.indpred, + a.attname, a.attnum, NULL, ix.indkey%s, + %s, %s, am.amname, + NULL as indnkeyatts + FROM + pg_class t + join pg_index ix on t.oid = ix.indrelid + join pg_class i on i.oid = ix.indexrelid + left outer join + pg_attribute a + on t.oid = a.attrelid and %s + left outer join + pg_am am + on i.relam = am.oid + WHERE + t.relkind IN ('r', 'v', 'f', 'm') + and t.oid = :table_oid + and ix.indisprimary = 'f' + ORDER BY + t.relname, + i.relname + """ % ( + # version 8.3 here was based on observing the + # cast does not work in PG 8.2.4, does work in 8.3.0. + # nothing in PG changelogs regarding this. + "::varchar" if self.server_version_info >= (8, 3) else "", + "ix.indoption::varchar" + if self.server_version_info >= (8, 3) + else "NULL", + "i.reloptions" + if self.server_version_info >= (8, 2) + else "NULL", + self._pg_index_any("a.attnum", "ix.indkey"), + ) + else: + IDX_SQL = """ + SELECT + i.relname as relname, + ix.indisunique, ix.indexprs, + a.attname, a.attnum, c.conrelid, ix.indkey::varchar, + ix.indoption::varchar, i.reloptions, am.amname, + pg_get_expr(ix.indpred, ix.indrelid), + %s as indnkeyatts + FROM + pg_class t + join pg_index ix on t.oid = ix.indrelid + join pg_class i on i.oid = ix.indexrelid + left outer join + pg_attribute a + on t.oid = a.attrelid and a.attnum = ANY(ix.indkey) + left outer join + pg_constraint c + on (ix.indrelid = c.conrelid and + ix.indexrelid = c.conindid and + c.contype in ('p', 'u', 'x')) + left outer join + pg_am am + on i.relam = am.oid + WHERE + t.relkind IN ('r', 'v', 'f', 'm', 'p') + and t.oid = :table_oid + and ix.indisprimary = 'f' + ORDER BY + t.relname, + i.relname + """ % ( + "ix.indnkeyatts" + if self.server_version_info >= (11, 0) + else "NULL", + ) + + t = sql.text(IDX_SQL).columns( + relname=sqltypes.Unicode, attname=sqltypes.Unicode + ) + c = connection.execute(t, dict(table_oid=table_oid)) + + indexes = defaultdict(lambda: defaultdict(dict)) + + sv_idx_name = None + for row in c.fetchall(): + ( + idx_name, + unique, + expr, + col, + col_num, + conrelid, + idx_key, + idx_option, + options, + amname, + filter_definition, + indnkeyatts, + ) = row + + if expr: + if idx_name != sv_idx_name: + util.warn( + "Skipped unsupported reflection of " + "expression-based index %s" % idx_name + ) + sv_idx_name = idx_name + continue + + has_idx = idx_name in indexes + index = indexes[idx_name] + if col is not None: + index["cols"][col_num] = col + if not has_idx: + idx_keys = idx_key.split() + # "The number of key columns in the index, not counting any + # included columns, which are merely stored and do not + # participate in the index semantics" + if indnkeyatts and idx_keys[indnkeyatts:]: + # this is a "covering index" which has INCLUDE columns + # as well as regular index columns + inc_keys = idx_keys[indnkeyatts:] + idx_keys = idx_keys[:indnkeyatts] + else: + inc_keys = [] + + index["key"] = [int(k.strip()) for k in idx_keys] + index["inc"] = [int(k.strip()) for k in inc_keys] + + # (new in pg 8.3) + # "pg_index.indoption" is list of ints, one per column/expr. + # int acts as bitmask: 0x01=DESC, 0x02=NULLSFIRST + sorting = {} + for col_idx, col_flags in enumerate( + (idx_option or "").split() + ): + col_flags = int(col_flags.strip()) + col_sorting = () + # try to set flags only if they differ from PG defaults... + if col_flags & 0x01: + col_sorting += ("desc",) + if not (col_flags & 0x02): + col_sorting += ("nulls_last",) + else: + if col_flags & 0x02: + col_sorting += ("nulls_first",) + if col_sorting: + sorting[col_idx] = col_sorting + if sorting: + index["sorting"] = sorting + + index["unique"] = unique + if conrelid is not None: + index["duplicates_constraint"] = idx_name + if options: + index["options"] = dict( + [option.split("=") for option in options] + ) + + # it *might* be nice to include that this is 'btree' in the + # reflection info. But we don't want an Index object + # to have a ``postgresql_using`` in it that is just the + # default, so for the moment leaving this out. + if amname and amname != "btree": + index["amname"] = amname + + if filter_definition: + index["postgresql_where"] = filter_definition + + result = [] + for name, idx in indexes.items(): + entry = { + "name": name, + "unique": idx["unique"], + "column_names": [idx["cols"][i] for i in idx["key"]], + } + if self.server_version_info >= (11, 0): + # NOTE: this is legacy, this is part of dialect_options now + # as of #7382 + entry["include_columns"] = [idx["cols"][i] for i in idx["inc"]] + if "duplicates_constraint" in idx: + entry["duplicates_constraint"] = idx["duplicates_constraint"] + if "sorting" in idx: + entry["column_sorting"] = dict( + (idx["cols"][idx["key"][i]], value) + for i, value in idx["sorting"].items() + ) + if "include_columns" in entry: + entry.setdefault("dialect_options", {})[ + "postgresql_include" + ] = entry["include_columns"] + if "options" in idx: + entry.setdefault("dialect_options", {})[ + "postgresql_with" + ] = idx["options"] + if "amname" in idx: + entry.setdefault("dialect_options", {})[ + "postgresql_using" + ] = idx["amname"] + if "postgresql_where" in idx: + entry.setdefault("dialect_options", {})[ + "postgresql_where" + ] = idx["postgresql_where"] + result.append(entry) + return result + + @reflection.cache + def get_unique_constraints( + self, connection, table_name, schema=None, **kw + ): + table_oid = self.get_table_oid( + connection, table_name, schema, info_cache=kw.get("info_cache") + ) + + UNIQUE_SQL = """ + SELECT + cons.conname as name, + cons.conkey as key, + a.attnum as col_num, + a.attname as col_name + FROM + pg_catalog.pg_constraint cons + join pg_attribute a + on cons.conrelid = a.attrelid AND + a.attnum = ANY(cons.conkey) + WHERE + cons.conrelid = :table_oid AND + cons.contype = 'u' + """ + + t = sql.text(UNIQUE_SQL).columns(col_name=sqltypes.Unicode) + c = connection.execute(t, dict(table_oid=table_oid)) + + uniques = defaultdict(lambda: defaultdict(dict)) + for row in c.fetchall(): + uc = uniques[row.name] + uc["key"] = row.key + uc["cols"][row.col_num] = row.col_name + + return [ + {"name": name, "column_names": [uc["cols"][i] for i in uc["key"]]} + for name, uc in uniques.items() + ] + + @reflection.cache + def get_table_comment(self, connection, table_name, schema=None, **kw): + table_oid = self.get_table_oid( + connection, table_name, schema, info_cache=kw.get("info_cache") + ) + + COMMENT_SQL = """ + SELECT + pgd.description as table_comment + FROM + pg_catalog.pg_description pgd + WHERE + pgd.objsubid = 0 AND + pgd.objoid = :table_oid + """ + + c = connection.execute( + sql.text(COMMENT_SQL), dict(table_oid=table_oid) + ) + return {"text": c.scalar()} + + @reflection.cache + def get_check_constraints(self, connection, table_name, schema=None, **kw): + table_oid = self.get_table_oid( + connection, table_name, schema, info_cache=kw.get("info_cache") + ) + + CHECK_SQL = """ + SELECT + cons.conname as name, + pg_get_constraintdef(cons.oid) as src + FROM + pg_catalog.pg_constraint cons + WHERE + cons.conrelid = :table_oid AND + cons.contype = 'c' + """ + + c = connection.execute(sql.text(CHECK_SQL), dict(table_oid=table_oid)) + + ret = [] + for name, src in c: + # samples: + # "CHECK (((a > 1) AND (a < 5)))" + # "CHECK (((a = 1) OR ((a > 2) AND (a < 5))))" + # "CHECK (((a > 1) AND (a < 5))) NOT VALID" + # "CHECK (some_boolean_function(a))" + # "CHECK (((a\n < 1)\n OR\n (a\n >= 5))\n)" + + m = re.match( + r"^CHECK *\((.+)\)( NOT VALID)?$", src, flags=re.DOTALL + ) + if not m: + util.warn("Could not parse CHECK constraint text: %r" % src) + sqltext = "" + else: + sqltext = re.compile( + r"^[\s\n]*\((.+)\)[\s\n]*$", flags=re.DOTALL + ).sub(r"\1", m.group(1)) + entry = {"name": name, "sqltext": sqltext} + if m and m.group(2): + entry["dialect_options"] = {"not_valid": True} + + ret.append(entry) + return ret + + def _load_enums(self, connection, schema=None): + schema = schema or self.default_schema_name + if not self.supports_native_enum: + return {} + + # Load data types for enums: + SQL_ENUMS = """ + SELECT t.typname as "name", + -- no enum defaults in 8.4 at least + -- t.typdefault as "default", + pg_catalog.pg_type_is_visible(t.oid) as "visible", + n.nspname as "schema", + e.enumlabel as "label" + FROM pg_catalog.pg_type t + LEFT JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace + LEFT JOIN pg_catalog.pg_enum e ON t.oid = e.enumtypid + WHERE t.typtype = 'e' + """ + + if schema != "*": + SQL_ENUMS += "AND n.nspname = :schema " + + # e.oid gives us label order within an enum + SQL_ENUMS += 'ORDER BY "schema", "name", e.oid' + + s = sql.text(SQL_ENUMS).columns( + attname=sqltypes.Unicode, label=sqltypes.Unicode + ) + + if schema != "*": + s = s.bindparams(schema=schema) + + c = connection.execute(s) + + enums = [] + enum_by_name = {} + for enum in c.fetchall(): + key = (enum.schema, enum.name) + if key in enum_by_name: + enum_by_name[key]["labels"].append(enum.label) + else: + enum_by_name[key] = enum_rec = { + "name": enum.name, + "schema": enum.schema, + "visible": enum.visible, + "labels": [], + } + if enum.label is not None: + enum_rec["labels"].append(enum.label) + enums.append(enum_rec) + return enums + + def _load_domains(self, connection): + # Load data types for domains: + SQL_DOMAINS = """ + SELECT t.typname as "name", + pg_catalog.format_type(t.typbasetype, t.typtypmod) as "attype", + not t.typnotnull as "nullable", + t.typdefault as "default", + pg_catalog.pg_type_is_visible(t.oid) as "visible", + n.nspname as "schema" + FROM pg_catalog.pg_type t + LEFT JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace + WHERE t.typtype = 'd' + """ + + s = sql.text(SQL_DOMAINS) + c = connection.execution_options(future_result=True).execute(s) + + domains = {} + for domain in c.mappings(): + domain = domain + # strip (30) from character varying(30) + attype = re.search(r"([^\(]+)", domain["attype"]).group(1) + # 'visible' just means whether or not the domain is in a + # schema that's on the search path -- or not overridden by + # a schema with higher precedence. If it's not visible, + # it will be prefixed with the schema-name when it's used. + if domain["visible"]: + key = (domain["name"],) + else: + key = (domain["schema"], domain["name"]) + + domains[key] = { + "attype": attype, + "nullable": domain["nullable"], + "default": domain["default"], + } + + return domains diff --git a/lib/sqlalchemy/dialects/postgresql/dml.py b/lib/sqlalchemy/dialects/postgresql/dml.py new file mode 100644 index 0000000..b483774 --- /dev/null +++ b/lib/sqlalchemy/dialects/postgresql/dml.py @@ -0,0 +1,274 @@ +# postgresql/on_conflict.py +# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors +# +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php + +from . import ext +from ... import util +from ...sql import coercions +from ...sql import roles +from ...sql import schema +from ...sql.base import _exclusive_against +from ...sql.base import _generative +from ...sql.base import ColumnCollection +from ...sql.dml import Insert as StandardInsert +from ...sql.elements import ClauseElement +from ...sql.expression import alias +from ...util.langhelpers import public_factory + + +__all__ = ("Insert", "insert") + + +class Insert(StandardInsert): + """PostgreSQL-specific implementation of INSERT. + + Adds methods for PG-specific syntaxes such as ON CONFLICT. + + The :class:`_postgresql.Insert` object is created using the + :func:`sqlalchemy.dialects.postgresql.insert` function. + + .. versionadded:: 1.1 + + """ + + stringify_dialect = "postgresql" + inherit_cache = False + + @util.memoized_property + def excluded(self): + """Provide the ``excluded`` namespace for an ON CONFLICT statement + + PG's ON CONFLICT clause allows reference to the row that would + be inserted, known as ``excluded``. This attribute provides + all columns in this row to be referenceable. + + .. tip:: The :attr:`_postgresql.Insert.excluded` attribute is an + instance of :class:`_expression.ColumnCollection`, which provides + an interface the same as that of the :attr:`_schema.Table.c` + collection described at :ref:`metadata_tables_and_columns`. + With this collection, ordinary names are accessible like attributes + (e.g. ``stmt.excluded.some_column``), but special names and + dictionary method names should be accessed using indexed access, + such as ``stmt.excluded["column name"]`` or + ``stmt.excluded["values"]``. See the docstring for + :class:`_expression.ColumnCollection` for further examples. + + .. seealso:: + + :ref:`postgresql_insert_on_conflict` - example of how + to use :attr:`_expression.Insert.excluded` + + """ + return alias(self.table, name="excluded").columns + + _on_conflict_exclusive = _exclusive_against( + "_post_values_clause", + msgs={ + "_post_values_clause": "This Insert construct already has " + "an ON CONFLICT clause established" + }, + ) + + @_generative + @_on_conflict_exclusive + def on_conflict_do_update( + self, + constraint=None, + index_elements=None, + index_where=None, + set_=None, + where=None, + ): + r""" + Specifies a DO UPDATE SET action for ON CONFLICT clause. + + Either the ``constraint`` or ``index_elements`` argument is + required, but only one of these can be specified. + + :param constraint: + The name of a unique or exclusion constraint on the table, + or the constraint object itself if it has a .name attribute. + + :param index_elements: + A sequence consisting of string column names, :class:`_schema.Column` + objects, or other column expression objects that will be used + to infer a target index. + + :param index_where: + Additional WHERE criterion that can be used to infer a + conditional target index. + + :param set\_: + A dictionary or other mapping object + where the keys are either names of columns in the target table, + or :class:`_schema.Column` objects or other ORM-mapped columns + matching that of the target table, and expressions or literals + as values, specifying the ``SET`` actions to take. + + .. versionadded:: 1.4 The + :paramref:`_postgresql.Insert.on_conflict_do_update.set_` + parameter supports :class:`_schema.Column` objects from the target + :class:`_schema.Table` as keys. + + .. warning:: This dictionary does **not** take into account + Python-specified default UPDATE values or generation functions, + e.g. those specified using :paramref:`_schema.Column.onupdate`. + These values will not be exercised for an ON CONFLICT style of + UPDATE, unless they are manually specified in the + :paramref:`.Insert.on_conflict_do_update.set_` dictionary. + + :param where: + Optional argument. If present, can be a literal SQL + string or an acceptable expression for a ``WHERE`` clause + that restricts the rows affected by ``DO UPDATE SET``. Rows + not meeting the ``WHERE`` condition will not be updated + (effectively a ``DO NOTHING`` for those rows). + + .. versionadded:: 1.1 + + + .. seealso:: + + :ref:`postgresql_insert_on_conflict` + + """ + self._post_values_clause = OnConflictDoUpdate( + constraint, index_elements, index_where, set_, where + ) + + @_generative + @_on_conflict_exclusive + def on_conflict_do_nothing( + self, constraint=None, index_elements=None, index_where=None + ): + """ + Specifies a DO NOTHING action for ON CONFLICT clause. + + The ``constraint`` and ``index_elements`` arguments + are optional, but only one of these can be specified. + + :param constraint: + The name of a unique or exclusion constraint on the table, + or the constraint object itself if it has a .name attribute. + + :param index_elements: + A sequence consisting of string column names, :class:`_schema.Column` + objects, or other column expression objects that will be used + to infer a target index. + + :param index_where: + Additional WHERE criterion that can be used to infer a + conditional target index. + + .. versionadded:: 1.1 + + .. seealso:: + + :ref:`postgresql_insert_on_conflict` + + """ + self._post_values_clause = OnConflictDoNothing( + constraint, index_elements, index_where + ) + + +insert = public_factory( + Insert, ".dialects.postgresql.insert", ".dialects.postgresql.Insert" +) + + +class OnConflictClause(ClauseElement): + stringify_dialect = "postgresql" + + def __init__(self, constraint=None, index_elements=None, index_where=None): + + if constraint is not None: + if not isinstance(constraint, util.string_types) and isinstance( + constraint, + (schema.Index, schema.Constraint, ext.ExcludeConstraint), + ): + constraint = getattr(constraint, "name") or constraint + + if constraint is not None: + if index_elements is not None: + raise ValueError( + "'constraint' and 'index_elements' are mutually exclusive" + ) + + if isinstance(constraint, util.string_types): + self.constraint_target = constraint + self.inferred_target_elements = None + self.inferred_target_whereclause = None + elif isinstance(constraint, schema.Index): + index_elements = constraint.expressions + index_where = constraint.dialect_options["postgresql"].get( + "where" + ) + elif isinstance(constraint, ext.ExcludeConstraint): + index_elements = constraint.columns + index_where = constraint.where + else: + index_elements = constraint.columns + index_where = constraint.dialect_options["postgresql"].get( + "where" + ) + + if index_elements is not None: + self.constraint_target = None + self.inferred_target_elements = index_elements + self.inferred_target_whereclause = index_where + elif constraint is None: + self.constraint_target = ( + self.inferred_target_elements + ) = self.inferred_target_whereclause = None + + +class OnConflictDoNothing(OnConflictClause): + __visit_name__ = "on_conflict_do_nothing" + + +class OnConflictDoUpdate(OnConflictClause): + __visit_name__ = "on_conflict_do_update" + + def __init__( + self, + constraint=None, + index_elements=None, + index_where=None, + set_=None, + where=None, + ): + super(OnConflictDoUpdate, self).__init__( + constraint=constraint, + index_elements=index_elements, + index_where=index_where, + ) + + if ( + self.inferred_target_elements is None + and self.constraint_target is None + ): + raise ValueError( + "Either constraint or index_elements, " + "but not both, must be specified unless DO NOTHING" + ) + + if isinstance(set_, dict): + if not set_: + raise ValueError("set parameter dictionary must not be empty") + elif isinstance(set_, ColumnCollection): + set_ = dict(set_) + else: + raise ValueError( + "set parameter must be a non-empty dictionary " + "or a ColumnCollection such as the `.c.` collection " + "of a Table object" + ) + self.update_values_to_set = [ + (coercions.expect(roles.DMLColumnRole, key), value) + for key, value in set_.items() + ] + self.update_whereclause = where diff --git a/lib/sqlalchemy/dialects/postgresql/ext.py b/lib/sqlalchemy/dialects/postgresql/ext.py new file mode 100644 index 0000000..9e52ee1 --- /dev/null +++ b/lib/sqlalchemy/dialects/postgresql/ext.py @@ -0,0 +1,277 @@ +# postgresql/ext.py +# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors +# +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php + +from .array import ARRAY +from ... import util +from ...sql import coercions +from ...sql import elements +from ...sql import expression +from ...sql import functions +from ...sql import roles +from ...sql import schema +from ...sql.schema import ColumnCollectionConstraint + + +class aggregate_order_by(expression.ColumnElement): + """Represent a PostgreSQL aggregate order by expression. + + E.g.:: + + from sqlalchemy.dialects.postgresql import aggregate_order_by + expr = func.array_agg(aggregate_order_by(table.c.a, table.c.b.desc())) + stmt = select(expr) + + would represent the expression:: + + SELECT array_agg(a ORDER BY b DESC) FROM table; + + Similarly:: + + expr = func.string_agg( + table.c.a, + aggregate_order_by(literal_column("','"), table.c.a) + ) + stmt = select(expr) + + Would represent:: + + SELECT string_agg(a, ',' ORDER BY a) FROM table; + + .. versionadded:: 1.1 + + .. versionchanged:: 1.2.13 - the ORDER BY argument may be multiple terms + + .. seealso:: + + :class:`_functions.array_agg` + + """ + + __visit_name__ = "aggregate_order_by" + + stringify_dialect = "postgresql" + inherit_cache = False + + def __init__(self, target, *order_by): + self.target = coercions.expect(roles.ExpressionElementRole, target) + self.type = self.target.type + + _lob = len(order_by) + if _lob == 0: + raise TypeError("at least one ORDER BY element is required") + elif _lob == 1: + self.order_by = coercions.expect( + roles.ExpressionElementRole, order_by[0] + ) + else: + self.order_by = elements.ClauseList( + *order_by, _literal_as_text_role=roles.ExpressionElementRole + ) + + def self_group(self, against=None): + return self + + def get_children(self, **kwargs): + return self.target, self.order_by + + def _copy_internals(self, clone=elements._clone, **kw): + self.target = clone(self.target, **kw) + self.order_by = clone(self.order_by, **kw) + + @property + def _from_objects(self): + return self.target._from_objects + self.order_by._from_objects + + +class ExcludeConstraint(ColumnCollectionConstraint): + """A table-level EXCLUDE constraint. + + Defines an EXCLUDE constraint as described in the `PostgreSQL + documentation`__. + + __ https://www.postgresql.org/docs/current/static/sql-createtable.html#SQL-CREATETABLE-EXCLUDE + + """ # noqa + + __visit_name__ = "exclude_constraint" + + where = None + inherit_cache = False + + create_drop_stringify_dialect = "postgresql" + + @elements._document_text_coercion( + "where", + ":class:`.ExcludeConstraint`", + ":paramref:`.ExcludeConstraint.where`", + ) + def __init__(self, *elements, **kw): + r""" + Create an :class:`.ExcludeConstraint` object. + + E.g.:: + + const = ExcludeConstraint( + (Column('period'), '&&'), + (Column('group'), '='), + where=(Column('group') != 'some group'), + ops={'group': 'my_operator_class'} + ) + + The constraint is normally embedded into the :class:`_schema.Table` + construct + directly, or added later using :meth:`.append_constraint`:: + + some_table = Table( + 'some_table', metadata, + Column('id', Integer, primary_key=True), + Column('period', TSRANGE()), + Column('group', String) + ) + + some_table.append_constraint( + ExcludeConstraint( + (some_table.c.period, '&&'), + (some_table.c.group, '='), + where=some_table.c.group != 'some group', + name='some_table_excl_const', + ops={'group': 'my_operator_class'} + ) + ) + + :param \*elements: + + A sequence of two tuples of the form ``(column, operator)`` where + "column" is a SQL expression element or a raw SQL string, most + typically a :class:`_schema.Column` object, + and "operator" is a string + containing the operator to use. In order to specify a column name + when a :class:`_schema.Column` object is not available, + while ensuring + that any necessary quoting rules take effect, an ad-hoc + :class:`_schema.Column` or :func:`_expression.column` + object should be + used. + + :param name: + Optional, the in-database name of this constraint. + + :param deferrable: + Optional bool. If set, emit DEFERRABLE or NOT DEFERRABLE when + issuing DDL for this constraint. + + :param initially: + Optional string. If set, emit INITIALLY when issuing DDL + for this constraint. + + :param using: + Optional string. If set, emit USING when issuing DDL + for this constraint. Defaults to 'gist'. + + :param where: + Optional SQL expression construct or literal SQL string. + If set, emit WHERE when issuing DDL + for this constraint. + + :param ops: + Optional dictionary. Used to define operator classes for the + elements; works the same way as that of the + :ref:`postgresql_ops ` + parameter specified to the :class:`_schema.Index` construct. + + .. versionadded:: 1.3.21 + + .. seealso:: + + :ref:`postgresql_operator_classes` - general description of how + PostgreSQL operator classes are specified. + + """ + columns = [] + render_exprs = [] + self.operators = {} + + expressions, operators = zip(*elements) + + for (expr, column, strname, add_element), operator in zip( + coercions.expect_col_expression_collection( + roles.DDLConstraintColumnRole, expressions + ), + operators, + ): + if add_element is not None: + columns.append(add_element) + + name = column.name if column is not None else strname + + if name is not None: + # backwards compat + self.operators[name] = operator + + render_exprs.append((expr, name, operator)) + + self._render_exprs = render_exprs + + ColumnCollectionConstraint.__init__( + self, + *columns, + name=kw.get("name"), + deferrable=kw.get("deferrable"), + initially=kw.get("initially") + ) + self.using = kw.get("using", "gist") + where = kw.get("where") + if where is not None: + self.where = coercions.expect(roles.StatementOptionRole, where) + + self.ops = kw.get("ops", {}) + + def _set_parent(self, table, **kw): + super(ExcludeConstraint, self)._set_parent(table) + + self._render_exprs = [ + ( + expr if isinstance(expr, elements.ClauseElement) else colexpr, + name, + operator, + ) + for (expr, name, operator), colexpr in util.zip_longest( + self._render_exprs, self.columns + ) + ] + + def _copy(self, target_table=None, **kw): + elements = [ + ( + schema._copy_expression(expr, self.parent, target_table), + self.operators[expr.name], + ) + for expr in self.columns + ] + c = self.__class__( + *elements, + name=self.name, + deferrable=self.deferrable, + initially=self.initially, + where=self.where, + using=self.using + ) + c.dispatch._update(self.dispatch) + return c + + +def array_agg(*arg, **kw): + """PostgreSQL-specific form of :class:`_functions.array_agg`, ensures + return type is :class:`_postgresql.ARRAY` and not + the plain :class:`_types.ARRAY`, unless an explicit ``type_`` + is passed. + + .. versionadded:: 1.1 + + """ + kw["_default_array_type"] = ARRAY + return functions.func.array_agg(*arg, **kw) diff --git a/lib/sqlalchemy/dialects/postgresql/hstore.py b/lib/sqlalchemy/dialects/postgresql/hstore.py new file mode 100644 index 0000000..29800d2 --- /dev/null +++ b/lib/sqlalchemy/dialects/postgresql/hstore.py @@ -0,0 +1,455 @@ +# postgresql/hstore.py +# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors +# +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php + +import re + +from .array import ARRAY +from ... import types as sqltypes +from ... import util +from ...sql import functions as sqlfunc +from ...sql import operators + + +__all__ = ("HSTORE", "hstore") + +idx_precedence = operators._PRECEDENCE[operators.json_getitem_op] + +GETITEM = operators.custom_op( + "->", + precedence=idx_precedence, + natural_self_precedent=True, + eager_grouping=True, +) + +HAS_KEY = operators.custom_op( + "?", + precedence=idx_precedence, + natural_self_precedent=True, + eager_grouping=True, +) + +HAS_ALL = operators.custom_op( + "?&", + precedence=idx_precedence, + natural_self_precedent=True, + eager_grouping=True, +) + +HAS_ANY = operators.custom_op( + "?|", + precedence=idx_precedence, + natural_self_precedent=True, + eager_grouping=True, +) + +CONTAINS = operators.custom_op( + "@>", + precedence=idx_precedence, + natural_self_precedent=True, + eager_grouping=True, +) + +CONTAINED_BY = operators.custom_op( + "<@", + precedence=idx_precedence, + natural_self_precedent=True, + eager_grouping=True, +) + + +class HSTORE(sqltypes.Indexable, sqltypes.Concatenable, sqltypes.TypeEngine): + """Represent the PostgreSQL HSTORE type. + + The :class:`.HSTORE` type stores dictionaries containing strings, e.g.:: + + data_table = Table('data_table', metadata, + Column('id', Integer, primary_key=True), + Column('data', HSTORE) + ) + + with engine.connect() as conn: + conn.execute( + data_table.insert(), + data = {"key1": "value1", "key2": "value2"} + ) + + :class:`.HSTORE` provides for a wide range of operations, including: + + * Index operations:: + + data_table.c.data['some key'] == 'some value' + + * Containment operations:: + + data_table.c.data.has_key('some key') + + data_table.c.data.has_all(['one', 'two', 'three']) + + * Concatenation:: + + data_table.c.data + {"k1": "v1"} + + For a full list of special methods see + :class:`.HSTORE.comparator_factory`. + + For usage with the SQLAlchemy ORM, it may be desirable to combine + the usage of :class:`.HSTORE` with :class:`.MutableDict` dictionary + now part of the :mod:`sqlalchemy.ext.mutable` + extension. This extension will allow "in-place" changes to the + dictionary, e.g. addition of new keys or replacement/removal of existing + keys to/from the current dictionary, to produce events which will be + detected by the unit of work:: + + from sqlalchemy.ext.mutable import MutableDict + + class MyClass(Base): + __tablename__ = 'data_table' + + id = Column(Integer, primary_key=True) + data = Column(MutableDict.as_mutable(HSTORE)) + + my_object = session.query(MyClass).one() + + # in-place mutation, requires Mutable extension + # in order for the ORM to detect + my_object.data['some_key'] = 'some value' + + session.commit() + + When the :mod:`sqlalchemy.ext.mutable` extension is not used, the ORM + will not be alerted to any changes to the contents of an existing + dictionary, unless that dictionary value is re-assigned to the + HSTORE-attribute itself, thus generating a change event. + + .. seealso:: + + :class:`.hstore` - render the PostgreSQL ``hstore()`` function. + + + """ + + __visit_name__ = "HSTORE" + hashable = False + text_type = sqltypes.Text() + + def __init__(self, text_type=None): + """Construct a new :class:`.HSTORE`. + + :param text_type: the type that should be used for indexed values. + Defaults to :class:`_types.Text`. + + .. versionadded:: 1.1.0 + + """ + if text_type is not None: + self.text_type = text_type + + class Comparator( + sqltypes.Indexable.Comparator, sqltypes.Concatenable.Comparator + ): + """Define comparison operations for :class:`.HSTORE`.""" + + def has_key(self, other): + """Boolean expression. Test for presence of a key. Note that the + key may be a SQLA expression. + """ + return self.operate(HAS_KEY, other, result_type=sqltypes.Boolean) + + def has_all(self, other): + """Boolean expression. Test for presence of all keys in jsonb""" + return self.operate(HAS_ALL, other, result_type=sqltypes.Boolean) + + def has_any(self, other): + """Boolean expression. Test for presence of any key in jsonb""" + return self.operate(HAS_ANY, other, result_type=sqltypes.Boolean) + + def contains(self, other, **kwargs): + """Boolean expression. Test if keys (or array) are a superset + of/contained the keys of the argument jsonb expression. + + kwargs may be ignored by this operator but are required for API + conformance. + """ + return self.operate(CONTAINS, other, result_type=sqltypes.Boolean) + + def contained_by(self, other): + """Boolean expression. Test if keys are a proper subset of the + keys of the argument jsonb expression. + """ + return self.operate( + CONTAINED_BY, other, result_type=sqltypes.Boolean + ) + + def _setup_getitem(self, index): + return GETITEM, index, self.type.text_type + + def defined(self, key): + """Boolean expression. Test for presence of a non-NULL value for + the key. Note that the key may be a SQLA expression. + """ + return _HStoreDefinedFunction(self.expr, key) + + def delete(self, key): + """HStore expression. Returns the contents of this hstore with the + given key deleted. Note that the key may be a SQLA expression. + """ + if isinstance(key, dict): + key = _serialize_hstore(key) + return _HStoreDeleteFunction(self.expr, key) + + def slice(self, array): + """HStore expression. Returns a subset of an hstore defined by + array of keys. + """ + return _HStoreSliceFunction(self.expr, array) + + def keys(self): + """Text array expression. Returns array of keys.""" + return _HStoreKeysFunction(self.expr) + + def vals(self): + """Text array expression. Returns array of values.""" + return _HStoreValsFunction(self.expr) + + def array(self): + """Text array expression. Returns array of alternating keys and + values. + """ + return _HStoreArrayFunction(self.expr) + + def matrix(self): + """Text array expression. Returns array of [key, value] pairs.""" + return _HStoreMatrixFunction(self.expr) + + comparator_factory = Comparator + + def bind_processor(self, dialect): + if util.py2k: + encoding = dialect.encoding + + def process(value): + if isinstance(value, dict): + return _serialize_hstore(value).encode(encoding) + else: + return value + + else: + + def process(value): + if isinstance(value, dict): + return _serialize_hstore(value) + else: + return value + + return process + + def result_processor(self, dialect, coltype): + if util.py2k: + encoding = dialect.encoding + + def process(value): + if value is not None: + return _parse_hstore(value.decode(encoding)) + else: + return value + + else: + + def process(value): + if value is not None: + return _parse_hstore(value) + else: + return value + + return process + + +class hstore(sqlfunc.GenericFunction): + """Construct an hstore value within a SQL expression using the + PostgreSQL ``hstore()`` function. + + The :class:`.hstore` function accepts one or two arguments as described + in the PostgreSQL documentation. + + E.g.:: + + from sqlalchemy.dialects.postgresql import array, hstore + + select(hstore('key1', 'value1')) + + select( + hstore( + array(['key1', 'key2', 'key3']), + array(['value1', 'value2', 'value3']) + ) + ) + + .. seealso:: + + :class:`.HSTORE` - the PostgreSQL ``HSTORE`` datatype. + + """ + + type = HSTORE + name = "hstore" + inherit_cache = True + + +class _HStoreDefinedFunction(sqlfunc.GenericFunction): + type = sqltypes.Boolean + name = "defined" + inherit_cache = True + + +class _HStoreDeleteFunction(sqlfunc.GenericFunction): + type = HSTORE + name = "delete" + inherit_cache = True + + +class _HStoreSliceFunction(sqlfunc.GenericFunction): + type = HSTORE + name = "slice" + inherit_cache = True + + +class _HStoreKeysFunction(sqlfunc.GenericFunction): + type = ARRAY(sqltypes.Text) + name = "akeys" + inherit_cache = True + + +class _HStoreValsFunction(sqlfunc.GenericFunction): + type = ARRAY(sqltypes.Text) + name = "avals" + inherit_cache = True + + +class _HStoreArrayFunction(sqlfunc.GenericFunction): + type = ARRAY(sqltypes.Text) + name = "hstore_to_array" + inherit_cache = True + + +class _HStoreMatrixFunction(sqlfunc.GenericFunction): + type = ARRAY(sqltypes.Text) + name = "hstore_to_matrix" + inherit_cache = True + + +# +# parsing. note that none of this is used with the psycopg2 backend, +# which provides its own native extensions. +# + +# My best guess at the parsing rules of hstore literals, since no formal +# grammar is given. This is mostly reverse engineered from PG's input parser +# behavior. +HSTORE_PAIR_RE = re.compile( + r""" +( + "(?P (\\ . | [^"])* )" # Quoted key +) +[ ]* => [ ]* # Pair operator, optional adjoining whitespace +( + (?P NULL ) # NULL value + | "(?P (\\ . | [^"])* )" # Quoted value +) +""", + re.VERBOSE, +) + +HSTORE_DELIMITER_RE = re.compile( + r""" +[ ]* , [ ]* +""", + re.VERBOSE, +) + + +def _parse_error(hstore_str, pos): + """format an unmarshalling error.""" + + ctx = 20 + hslen = len(hstore_str) + + parsed_tail = hstore_str[max(pos - ctx - 1, 0) : min(pos, hslen)] + residual = hstore_str[min(pos, hslen) : min(pos + ctx + 1, hslen)] + + if len(parsed_tail) > ctx: + parsed_tail = "[...]" + parsed_tail[1:] + if len(residual) > ctx: + residual = residual[:-1] + "[...]" + + return "After %r, could not parse residual at position %d: %r" % ( + parsed_tail, + pos, + residual, + ) + + +def _parse_hstore(hstore_str): + """Parse an hstore from its literal string representation. + + Attempts to approximate PG's hstore input parsing rules as closely as + possible. Although currently this is not strictly necessary, since the + current implementation of hstore's output syntax is stricter than what it + accepts as input, the documentation makes no guarantees that will always + be the case. + + + + """ + result = {} + pos = 0 + pair_match = HSTORE_PAIR_RE.match(hstore_str) + + while pair_match is not None: + key = pair_match.group("key").replace(r"\"", '"').replace("\\\\", "\\") + if pair_match.group("value_null"): + value = None + else: + value = ( + pair_match.group("value") + .replace(r"\"", '"') + .replace("\\\\", "\\") + ) + result[key] = value + + pos += pair_match.end() + + delim_match = HSTORE_DELIMITER_RE.match(hstore_str[pos:]) + if delim_match is not None: + pos += delim_match.end() + + pair_match = HSTORE_PAIR_RE.match(hstore_str[pos:]) + + if pos != len(hstore_str): + raise ValueError(_parse_error(hstore_str, pos)) + + return result + + +def _serialize_hstore(val): + """Serialize a dictionary into an hstore literal. Keys and values must + both be strings (except None for values). + + """ + + def esc(s, position): + if position == "value" and s is None: + return "NULL" + elif isinstance(s, util.string_types): + return '"%s"' % s.replace("\\", "\\\\").replace('"', r"\"") + else: + raise ValueError( + "%r in %s position is not a string." % (s, position) + ) + + return ", ".join( + "%s=>%s" % (esc(k, "key"), esc(v, "value")) for k, v in val.items() + ) diff --git a/lib/sqlalchemy/dialects/postgresql/json.py b/lib/sqlalchemy/dialects/postgresql/json.py new file mode 100644 index 0000000..daaaeac --- /dev/null +++ b/lib/sqlalchemy/dialects/postgresql/json.py @@ -0,0 +1,327 @@ +# postgresql/json.py +# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors +# +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +from __future__ import absolute_import + +from ... import types as sqltypes +from ... import util +from ...sql import operators + + +__all__ = ("JSON", "JSONB") + +idx_precedence = operators._PRECEDENCE[operators.json_getitem_op] + +ASTEXT = operators.custom_op( + "->>", + precedence=idx_precedence, + natural_self_precedent=True, + eager_grouping=True, +) + +JSONPATH_ASTEXT = operators.custom_op( + "#>>", + precedence=idx_precedence, + natural_self_precedent=True, + eager_grouping=True, +) + + +HAS_KEY = operators.custom_op( + "?", + precedence=idx_precedence, + natural_self_precedent=True, + eager_grouping=True, +) + +HAS_ALL = operators.custom_op( + "?&", + precedence=idx_precedence, + natural_self_precedent=True, + eager_grouping=True, +) + +HAS_ANY = operators.custom_op( + "?|", + precedence=idx_precedence, + natural_self_precedent=True, + eager_grouping=True, +) + +CONTAINS = operators.custom_op( + "@>", + precedence=idx_precedence, + natural_self_precedent=True, + eager_grouping=True, +) + +CONTAINED_BY = operators.custom_op( + "<@", + precedence=idx_precedence, + natural_self_precedent=True, + eager_grouping=True, +) + + +class JSONPathType(sqltypes.JSON.JSONPathType): + def bind_processor(self, dialect): + super_proc = self.string_bind_processor(dialect) + + def process(value): + assert isinstance(value, util.collections_abc.Sequence) + tokens = [util.text_type(elem) for elem in value] + value = "{%s}" % (", ".join(tokens)) + if super_proc: + value = super_proc(value) + return value + + return process + + def literal_processor(self, dialect): + super_proc = self.string_literal_processor(dialect) + + def process(value): + assert isinstance(value, util.collections_abc.Sequence) + tokens = [util.text_type(elem) for elem in value] + value = "{%s}" % (", ".join(tokens)) + if super_proc: + value = super_proc(value) + return value + + return process + + +class JSON(sqltypes.JSON): + """Represent the PostgreSQL JSON type. + + :class:`_postgresql.JSON` is used automatically whenever the base + :class:`_types.JSON` datatype is used against a PostgreSQL backend, + however base :class:`_types.JSON` datatype does not provide Python + accessors for PostgreSQL-specific comparison methods such as + :meth:`_postgresql.JSON.Comparator.astext`; additionally, to use + PostgreSQL ``JSONB``, the :class:`_postgresql.JSONB` datatype should + be used explicitly. + + .. seealso:: + + :class:`_types.JSON` - main documentation for the generic + cross-platform JSON datatype. + + The operators provided by the PostgreSQL version of :class:`_types.JSON` + include: + + * Index operations (the ``->`` operator):: + + data_table.c.data['some key'] + + data_table.c.data[5] + + + * Index operations returning text (the ``->>`` operator):: + + data_table.c.data['some key'].astext == 'some value' + + Note that equivalent functionality is available via the + :attr:`.JSON.Comparator.as_string` accessor. + + * Index operations with CAST + (equivalent to ``CAST(col ->> ['some key'] AS )``):: + + data_table.c.data['some key'].astext.cast(Integer) == 5 + + Note that equivalent functionality is available via the + :attr:`.JSON.Comparator.as_integer` and similar accessors. + + * Path index operations (the ``#>`` operator):: + + data_table.c.data[('key_1', 'key_2', 5, ..., 'key_n')] + + * Path index operations returning text (the ``#>>`` operator):: + + data_table.c.data[('key_1', 'key_2', 5, ..., 'key_n')].astext == 'some value' + + .. versionchanged:: 1.1 The :meth:`_expression.ColumnElement.cast` + operator on + JSON objects now requires that the :attr:`.JSON.Comparator.astext` + modifier be called explicitly, if the cast works only from a textual + string. + + Index operations return an expression object whose type defaults to + :class:`_types.JSON` by default, + so that further JSON-oriented instructions + may be called upon the result type. + + Custom serializers and deserializers are specified at the dialect level, + that is using :func:`_sa.create_engine`. The reason for this is that when + using psycopg2, the DBAPI only allows serializers at the per-cursor + or per-connection level. E.g.:: + + engine = create_engine("postgresql://scott:tiger@localhost/test", + json_serializer=my_serialize_fn, + json_deserializer=my_deserialize_fn + ) + + When using the psycopg2 dialect, the json_deserializer is registered + against the database using ``psycopg2.extras.register_default_json``. + + .. seealso:: + + :class:`_types.JSON` - Core level JSON type + + :class:`_postgresql.JSONB` + + .. versionchanged:: 1.1 :class:`_postgresql.JSON` is now a PostgreSQL- + specific specialization of the new :class:`_types.JSON` type. + + """ # noqa + + astext_type = sqltypes.Text() + + def __init__(self, none_as_null=False, astext_type=None): + """Construct a :class:`_types.JSON` type. + + :param none_as_null: if True, persist the value ``None`` as a + SQL NULL value, not the JSON encoding of ``null``. Note that + when this flag is False, the :func:`.null` construct can still + be used to persist a NULL value:: + + from sqlalchemy import null + conn.execute(table.insert(), data=null()) + + .. versionchanged:: 0.9.8 - Added ``none_as_null``, and :func:`.null` + is now supported in order to persist a NULL value. + + .. seealso:: + + :attr:`_types.JSON.NULL` + + :param astext_type: the type to use for the + :attr:`.JSON.Comparator.astext` + accessor on indexed attributes. Defaults to :class:`_types.Text`. + + .. versionadded:: 1.1 + + """ + super(JSON, self).__init__(none_as_null=none_as_null) + if astext_type is not None: + self.astext_type = astext_type + + class Comparator(sqltypes.JSON.Comparator): + """Define comparison operations for :class:`_types.JSON`.""" + + @property + def astext(self): + """On an indexed expression, use the "astext" (e.g. "->>") + conversion when rendered in SQL. + + E.g.:: + + select(data_table.c.data['some key'].astext) + + .. seealso:: + + :meth:`_expression.ColumnElement.cast` + + """ + if isinstance(self.expr.right.type, sqltypes.JSON.JSONPathType): + return self.expr.left.operate( + JSONPATH_ASTEXT, + self.expr.right, + result_type=self.type.astext_type, + ) + else: + return self.expr.left.operate( + ASTEXT, self.expr.right, result_type=self.type.astext_type + ) + + comparator_factory = Comparator + + +class JSONB(JSON): + """Represent the PostgreSQL JSONB type. + + The :class:`_postgresql.JSONB` type stores arbitrary JSONB format data, + e.g.:: + + data_table = Table('data_table', metadata, + Column('id', Integer, primary_key=True), + Column('data', JSONB) + ) + + with engine.connect() as conn: + conn.execute( + data_table.insert(), + data = {"key1": "value1", "key2": "value2"} + ) + + The :class:`_postgresql.JSONB` type includes all operations provided by + :class:`_types.JSON`, including the same behaviors for indexing + operations. + It also adds additional operators specific to JSONB, including + :meth:`.JSONB.Comparator.has_key`, :meth:`.JSONB.Comparator.has_all`, + :meth:`.JSONB.Comparator.has_any`, :meth:`.JSONB.Comparator.contains`, + and :meth:`.JSONB.Comparator.contained_by`. + + Like the :class:`_types.JSON` type, the :class:`_postgresql.JSONB` + type does not detect + in-place changes when used with the ORM, unless the + :mod:`sqlalchemy.ext.mutable` extension is used. + + Custom serializers and deserializers + are shared with the :class:`_types.JSON` class, + using the ``json_serializer`` + and ``json_deserializer`` keyword arguments. These must be specified + at the dialect level using :func:`_sa.create_engine`. When using + psycopg2, the serializers are associated with the jsonb type using + ``psycopg2.extras.register_default_jsonb`` on a per-connection basis, + in the same way that ``psycopg2.extras.register_default_json`` is used + to register these handlers with the json type. + + .. versionadded:: 0.9.7 + + .. seealso:: + + :class:`_types.JSON` + + """ + + __visit_name__ = "JSONB" + + class Comparator(JSON.Comparator): + """Define comparison operations for :class:`_types.JSON`.""" + + def has_key(self, other): + """Boolean expression. Test for presence of a key. Note that the + key may be a SQLA expression. + """ + return self.operate(HAS_KEY, other, result_type=sqltypes.Boolean) + + def has_all(self, other): + """Boolean expression. Test for presence of all keys in jsonb""" + return self.operate(HAS_ALL, other, result_type=sqltypes.Boolean) + + def has_any(self, other): + """Boolean expression. Test for presence of any key in jsonb""" + return self.operate(HAS_ANY, other, result_type=sqltypes.Boolean) + + def contains(self, other, **kwargs): + """Boolean expression. Test if keys (or array) are a superset + of/contained the keys of the argument jsonb expression. + + kwargs may be ignored by this operator but are required for API + conformance. + """ + return self.operate(CONTAINS, other, result_type=sqltypes.Boolean) + + def contained_by(self, other): + """Boolean expression. Test if keys are a proper subset of the + keys of the argument jsonb expression. + """ + return self.operate( + CONTAINED_BY, other, result_type=sqltypes.Boolean + ) + + comparator_factory = Comparator diff --git a/lib/sqlalchemy/dialects/postgresql/pg8000.py b/lib/sqlalchemy/dialects/postgresql/pg8000.py new file mode 100644 index 0000000..98561a9 --- /dev/null +++ b/lib/sqlalchemy/dialects/postgresql/pg8000.py @@ -0,0 +1,594 @@ +# postgresql/pg8000.py +# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +r""" +.. dialect:: postgresql+pg8000 + :name: pg8000 + :dbapi: pg8000 + :connectstring: postgresql+pg8000://user:password@host:port/dbname[?key=value&key=value...] + :url: https://pypi.org/project/pg8000/ + +.. versionchanged:: 1.4 The pg8000 dialect has been updated for version + 1.16.6 and higher, and is again part of SQLAlchemy's continuous integration + with full feature support. + +.. _pg8000_unicode: + +Unicode +------- + +pg8000 will encode / decode string values between it and the server using the +PostgreSQL ``client_encoding`` parameter; by default this is the value in +the ``postgresql.conf`` file, which often defaults to ``SQL_ASCII``. +Typically, this can be changed to ``utf-8``, as a more useful default:: + + #client_encoding = sql_ascii # actually, defaults to database + # encoding + client_encoding = utf8 + +The ``client_encoding`` can be overridden for a session by executing the SQL: + +SET CLIENT_ENCODING TO 'utf8'; + +SQLAlchemy will execute this SQL on all new connections based on the value +passed to :func:`_sa.create_engine` using the ``client_encoding`` parameter:: + + engine = create_engine( + "postgresql+pg8000://user:pass@host/dbname", client_encoding='utf8') + +.. _pg8000_ssl: + +SSL Connections +--------------- + +pg8000 accepts a Python ``SSLContext`` object which may be specified using the +:paramref:`_sa.create_engine.connect_args` dictionary:: + + import ssl + ssl_context = ssl.create_default_context() + engine = sa.create_engine( + "postgresql+pg8000://scott:tiger@192.168.0.199/test", + connect_args={"ssl_context": ssl_context}, + ) + +If the server uses an automatically-generated certificate that is self-signed +or does not match the host name (as seen from the client), it may also be +necessary to disable hostname checking:: + + import ssl + ssl_context = ssl.create_default_context() + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + engine = sa.create_engine( + "postgresql+pg8000://scott:tiger@192.168.0.199/test", + connect_args={"ssl_context": ssl_context}, + ) + +.. _pg8000_isolation_level: + +pg8000 Transaction Isolation Level +------------------------------------- + +The pg8000 dialect offers the same isolation level settings as that +of the :ref:`psycopg2 ` dialect: + +* ``READ COMMITTED`` +* ``READ UNCOMMITTED`` +* ``REPEATABLE READ`` +* ``SERIALIZABLE`` +* ``AUTOCOMMIT`` + +.. seealso:: + + :ref:`postgresql_isolation_level` + + :ref:`psycopg2_isolation_level` + + +""" # noqa +import decimal +import re +from uuid import UUID as _python_UUID + +from .array import ARRAY as PGARRAY +from .base import _ColonCast +from .base import _DECIMAL_TYPES +from .base import _FLOAT_TYPES +from .base import _INT_TYPES +from .base import ENUM +from .base import INTERVAL +from .base import PGCompiler +from .base import PGDialect +from .base import PGExecutionContext +from .base import PGIdentifierPreparer +from .base import UUID +from .json import JSON +from .json import JSONB +from .json import JSONPathType +from ... import exc +from ... import processors +from ... import types as sqltypes +from ... import util +from ...sql.elements import quoted_name + + +class _PGNumeric(sqltypes.Numeric): + def result_processor(self, dialect, coltype): + if self.asdecimal: + if coltype in _FLOAT_TYPES: + return processors.to_decimal_processor_factory( + decimal.Decimal, self._effective_decimal_return_scale + ) + elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: + # pg8000 returns Decimal natively for 1700 + return None + else: + raise exc.InvalidRequestError( + "Unknown PG numeric type: %d" % coltype + ) + else: + if coltype in _FLOAT_TYPES: + # pg8000 returns float natively for 701 + return None + elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: + return processors.to_float + else: + raise exc.InvalidRequestError( + "Unknown PG numeric type: %d" % coltype + ) + + +class _PGNumericNoBind(_PGNumeric): + def bind_processor(self, dialect): + return None + + +class _PGJSON(JSON): + def result_processor(self, dialect, coltype): + return None + + def get_dbapi_type(self, dbapi): + return dbapi.JSON + + +class _PGJSONB(JSONB): + def result_processor(self, dialect, coltype): + return None + + def get_dbapi_type(self, dbapi): + return dbapi.JSONB + + +class _PGJSONIndexType(sqltypes.JSON.JSONIndexType): + def get_dbapi_type(self, dbapi): + raise NotImplementedError("should not be here") + + +class _PGJSONIntIndexType(sqltypes.JSON.JSONIntIndexType): + def get_dbapi_type(self, dbapi): + return dbapi.INTEGER + + +class _PGJSONStrIndexType(sqltypes.JSON.JSONStrIndexType): + def get_dbapi_type(self, dbapi): + return dbapi.STRING + + +class _PGJSONPathType(JSONPathType): + def get_dbapi_type(self, dbapi): + return 1009 + + +class _PGUUID(UUID): + def bind_processor(self, dialect): + if not self.as_uuid: + + def process(value): + if value is not None: + value = _python_UUID(value) + return value + + return process + + def result_processor(self, dialect, coltype): + if not self.as_uuid: + + def process(value): + if value is not None: + value = str(value) + return value + + return process + + +class _PGEnum(ENUM): + def get_dbapi_type(self, dbapi): + return dbapi.UNKNOWN + + +class _PGInterval(INTERVAL): + def get_dbapi_type(self, dbapi): + return dbapi.INTERVAL + + @classmethod + def adapt_emulated_to_native(cls, interval, **kw): + return _PGInterval(precision=interval.second_precision) + + +class _PGTimeStamp(sqltypes.DateTime): + def get_dbapi_type(self, dbapi): + if self.timezone: + # TIMESTAMPTZOID + return 1184 + else: + # TIMESTAMPOID + return 1114 + + +class _PGTime(sqltypes.Time): + def get_dbapi_type(self, dbapi): + return dbapi.TIME + + +class _PGInteger(sqltypes.Integer): + def get_dbapi_type(self, dbapi): + return dbapi.INTEGER + + +class _PGSmallInteger(sqltypes.SmallInteger): + def get_dbapi_type(self, dbapi): + return dbapi.INTEGER + + +class _PGNullType(sqltypes.NullType): + def get_dbapi_type(self, dbapi): + return dbapi.NULLTYPE + + +class _PGBigInteger(sqltypes.BigInteger): + def get_dbapi_type(self, dbapi): + return dbapi.BIGINTEGER + + +class _PGBoolean(sqltypes.Boolean): + def get_dbapi_type(self, dbapi): + return dbapi.BOOLEAN + + +class _PGARRAY(PGARRAY): + def bind_expression(self, bindvalue): + return _ColonCast(bindvalue, self) + + +_server_side_id = util.counter() + + +class PGExecutionContext_pg8000(PGExecutionContext): + def create_server_side_cursor(self): + ident = "c_%s_%s" % (hex(id(self))[2:], hex(_server_side_id())[2:]) + return ServerSideCursor(self._dbapi_connection.cursor(), ident) + + def pre_exec(self): + if not self.compiled: + return + + +class ServerSideCursor: + server_side = True + + def __init__(self, cursor, ident): + self.ident = ident + self.cursor = cursor + + @property + def connection(self): + return self.cursor.connection + + @property + def rowcount(self): + return self.cursor.rowcount + + @property + def description(self): + return self.cursor.description + + def execute(self, operation, args=(), stream=None): + op = "DECLARE " + self.ident + " NO SCROLL CURSOR FOR " + operation + self.cursor.execute(op, args, stream=stream) + return self + + def executemany(self, operation, param_sets): + self.cursor.executemany(operation, param_sets) + return self + + def fetchone(self): + self.cursor.execute("FETCH FORWARD 1 FROM " + self.ident) + return self.cursor.fetchone() + + def fetchmany(self, num=None): + if num is None: + return self.fetchall() + else: + self.cursor.execute( + "FETCH FORWARD " + str(int(num)) + " FROM " + self.ident + ) + return self.cursor.fetchall() + + def fetchall(self): + self.cursor.execute("FETCH FORWARD ALL FROM " + self.ident) + return self.cursor.fetchall() + + def close(self): + self.cursor.execute("CLOSE " + self.ident) + self.cursor.close() + + def setinputsizes(self, *sizes): + self.cursor.setinputsizes(*sizes) + + def setoutputsize(self, size, column=None): + pass + + +class PGCompiler_pg8000(PGCompiler): + def visit_mod_binary(self, binary, operator, **kw): + return ( + self.process(binary.left, **kw) + + " %% " + + self.process(binary.right, **kw) + ) + + +class PGIdentifierPreparer_pg8000(PGIdentifierPreparer): + def __init__(self, *args, **kwargs): + PGIdentifierPreparer.__init__(self, *args, **kwargs) + self._double_percents = False + + +class PGDialect_pg8000(PGDialect): + driver = "pg8000" + supports_statement_cache = True + + supports_unicode_statements = True + + supports_unicode_binds = True + + default_paramstyle = "format" + supports_sane_multi_rowcount = True + execution_ctx_cls = PGExecutionContext_pg8000 + statement_compiler = PGCompiler_pg8000 + preparer = PGIdentifierPreparer_pg8000 + supports_server_side_cursors = True + + use_setinputsizes = True + + # reversed as of pg8000 1.16.6. 1.16.5 and lower + # are no longer compatible + description_encoding = None + # description_encoding = "use_encoding" + + colspecs = util.update_copy( + PGDialect.colspecs, + { + sqltypes.Numeric: _PGNumericNoBind, + sqltypes.Float: _PGNumeric, + sqltypes.JSON: _PGJSON, + sqltypes.Boolean: _PGBoolean, + sqltypes.NullType: _PGNullType, + JSONB: _PGJSONB, + sqltypes.JSON.JSONPathType: _PGJSONPathType, + sqltypes.JSON.JSONIndexType: _PGJSONIndexType, + sqltypes.JSON.JSONIntIndexType: _PGJSONIntIndexType, + sqltypes.JSON.JSONStrIndexType: _PGJSONStrIndexType, + UUID: _PGUUID, + sqltypes.Interval: _PGInterval, + INTERVAL: _PGInterval, + sqltypes.DateTime: _PGTimeStamp, + sqltypes.Time: _PGTime, + sqltypes.Integer: _PGInteger, + sqltypes.SmallInteger: _PGSmallInteger, + sqltypes.BigInteger: _PGBigInteger, + sqltypes.Enum: _PGEnum, + sqltypes.ARRAY: _PGARRAY, + }, + ) + + def __init__(self, client_encoding=None, **kwargs): + PGDialect.__init__(self, **kwargs) + self.client_encoding = client_encoding + + if self._dbapi_version < (1, 16, 6): + raise NotImplementedError("pg8000 1.16.6 or greater is required") + + @util.memoized_property + def _dbapi_version(self): + if self.dbapi and hasattr(self.dbapi, "__version__"): + return tuple( + [ + int(x) + for x in re.findall( + r"(\d+)(?:[-\.]?|$)", self.dbapi.__version__ + ) + ] + ) + else: + return (99, 99, 99) + + @classmethod + def dbapi(cls): + return __import__("pg8000") + + def create_connect_args(self, url): + opts = url.translate_connect_args(username="user") + if "port" in opts: + opts["port"] = int(opts["port"]) + opts.update(url.query) + return ([], opts) + + def is_disconnect(self, e, connection, cursor): + if isinstance(e, self.dbapi.InterfaceError) and "network error" in str( + e + ): + # new as of pg8000 1.19.0 for broken connections + return True + + # connection was closed normally + return "connection is closed" in str(e) + + def set_isolation_level(self, connection, level): + level = level.replace("_", " ") + + # adjust for ConnectionFairy possibly being present + if hasattr(connection, "dbapi_connection"): + connection = connection.dbapi_connection + + if level == "AUTOCOMMIT": + connection.autocommit = True + elif level in self._isolation_lookup: + connection.autocommit = False + cursor = connection.cursor() + cursor.execute( + "SET SESSION CHARACTERISTICS AS TRANSACTION " + "ISOLATION LEVEL %s" % level + ) + cursor.execute("COMMIT") + cursor.close() + else: + raise exc.ArgumentError( + "Invalid value '%s' for isolation_level. " + "Valid isolation levels for %s are %s or AUTOCOMMIT" + % (level, self.name, ", ".join(self._isolation_lookup)) + ) + + def set_readonly(self, connection, value): + cursor = connection.cursor() + try: + cursor.execute( + "SET SESSION CHARACTERISTICS AS TRANSACTION %s" + % ("READ ONLY" if value else "READ WRITE") + ) + cursor.execute("COMMIT") + finally: + cursor.close() + + def get_readonly(self, connection): + cursor = connection.cursor() + try: + cursor.execute("show transaction_read_only") + val = cursor.fetchone()[0] + finally: + cursor.close() + + return val == "on" + + def set_deferrable(self, connection, value): + cursor = connection.cursor() + try: + cursor.execute( + "SET SESSION CHARACTERISTICS AS TRANSACTION %s" + % ("DEFERRABLE" if value else "NOT DEFERRABLE") + ) + cursor.execute("COMMIT") + finally: + cursor.close() + + def get_deferrable(self, connection): + cursor = connection.cursor() + try: + cursor.execute("show transaction_deferrable") + val = cursor.fetchone()[0] + finally: + cursor.close() + + return val == "on" + + def set_client_encoding(self, connection, client_encoding): + # adjust for ConnectionFairy possibly being present + if hasattr(connection, "dbapi_connection"): + connection = connection.dbapi_connection + + cursor = connection.cursor() + cursor.execute("SET CLIENT_ENCODING TO '" + client_encoding + "'") + cursor.execute("COMMIT") + cursor.close() + + def do_set_input_sizes(self, cursor, list_of_tuples, context): + if self.positional: + cursor.setinputsizes( + *[dbtype for key, dbtype, sqltype in list_of_tuples] + ) + else: + cursor.setinputsizes( + **{ + key: dbtype + for key, dbtype, sqltype in list_of_tuples + if dbtype + } + ) + + def do_begin_twophase(self, connection, xid): + connection.connection.tpc_begin((0, xid, "")) + + def do_prepare_twophase(self, connection, xid): + connection.connection.tpc_prepare() + + def do_rollback_twophase( + self, connection, xid, is_prepared=True, recover=False + ): + connection.connection.tpc_rollback((0, xid, "")) + + def do_commit_twophase( + self, connection, xid, is_prepared=True, recover=False + ): + connection.connection.tpc_commit((0, xid, "")) + + def do_recover_twophase(self, connection): + return [row[1] for row in connection.connection.tpc_recover()] + + def on_connect(self): + fns = [] + + def on_connect(conn): + conn.py_types[quoted_name] = conn.py_types[util.text_type] + + fns.append(on_connect) + + if self.client_encoding is not None: + + def on_connect(conn): + self.set_client_encoding(conn, self.client_encoding) + + fns.append(on_connect) + + if self.isolation_level is not None: + + def on_connect(conn): + self.set_isolation_level(conn, self.isolation_level) + + fns.append(on_connect) + + if self._json_deserializer: + + def on_connect(conn): + # json + conn.register_in_adapter(114, self._json_deserializer) + + # jsonb + conn.register_in_adapter(3802, self._json_deserializer) + + fns.append(on_connect) + + if len(fns) > 0: + + def on_connect(conn): + for fn in fns: + fn(conn) + + return on_connect + else: + return None + + +dialect = PGDialect_pg8000 diff --git a/lib/sqlalchemy/dialects/postgresql/provision.py b/lib/sqlalchemy/dialects/postgresql/provision.py new file mode 100644 index 0000000..98470f3 --- /dev/null +++ b/lib/sqlalchemy/dialects/postgresql/provision.py @@ -0,0 +1,124 @@ +import time + +from ... import exc +from ... import inspect +from ... import text +from ...testing import warn_test_suite +from ...testing.provision import create_db +from ...testing.provision import drop_all_schema_objects_post_tables +from ...testing.provision import drop_all_schema_objects_pre_tables +from ...testing.provision import drop_db +from ...testing.provision import log +from ...testing.provision import prepare_for_drop_tables +from ...testing.provision import set_default_schema_on_connection +from ...testing.provision import temp_table_keyword_args + + +@create_db.for_db("postgresql") +def _pg_create_db(cfg, eng, ident): + template_db = cfg.options.postgresql_templatedb + + with eng.execution_options(isolation_level="AUTOCOMMIT").begin() as conn: + + if not template_db: + template_db = conn.exec_driver_sql( + "select current_database()" + ).scalar() + + attempt = 0 + while True: + try: + conn.exec_driver_sql( + "CREATE DATABASE %s TEMPLATE %s" % (ident, template_db) + ) + except exc.OperationalError as err: + attempt += 1 + if attempt >= 3: + raise + if "accessed by other users" in str(err): + log.info( + "Waiting to create %s, URI %r, " + "template DB %s is in use sleeping for .5", + ident, + eng.url, + template_db, + ) + time.sleep(0.5) + except: + raise + else: + break + + +@drop_db.for_db("postgresql") +def _pg_drop_db(cfg, eng, ident): + with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn: + with conn.begin(): + conn.execute( + text( + "select pg_terminate_backend(pid) from pg_stat_activity " + "where usename=current_user and pid != pg_backend_pid() " + "and datname=:dname" + ), + dict(dname=ident), + ) + conn.exec_driver_sql("DROP DATABASE %s" % ident) + + +@temp_table_keyword_args.for_db("postgresql") +def _postgresql_temp_table_keyword_args(cfg, eng): + return {"prefixes": ["TEMPORARY"]} + + +@set_default_schema_on_connection.for_db("postgresql") +def _postgresql_set_default_schema_on_connection( + cfg, dbapi_connection, schema_name +): + existing_autocommit = dbapi_connection.autocommit + dbapi_connection.autocommit = True + cursor = dbapi_connection.cursor() + cursor.execute("SET SESSION search_path='%s'" % schema_name) + cursor.close() + dbapi_connection.autocommit = existing_autocommit + + +@drop_all_schema_objects_pre_tables.for_db("postgresql") +def drop_all_schema_objects_pre_tables(cfg, eng): + with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn: + for xid in conn.execute("select gid from pg_prepared_xacts").scalars(): + conn.execute("ROLLBACK PREPARED '%s'" % xid) + + +@drop_all_schema_objects_post_tables.for_db("postgresql") +def drop_all_schema_objects_post_tables(cfg, eng): + from sqlalchemy.dialects import postgresql + + inspector = inspect(eng) + with eng.begin() as conn: + for enum in inspector.get_enums("*"): + conn.execute( + postgresql.DropEnumType( + postgresql.ENUM(name=enum["name"], schema=enum["schema"]) + ) + ) + + +@prepare_for_drop_tables.for_db("postgresql") +def prepare_for_drop_tables(config, connection): + """Ensure there are no locks on the current username/database.""" + + result = connection.exec_driver_sql( + "select pid, state, wait_event_type, query " + # "select pg_terminate_backend(pid), state, wait_event_type " + "from pg_stat_activity where " + "usename=current_user " + "and datname=current_database() and state='idle in transaction' " + "and pid != pg_backend_pid()" + ) + rows = result.all() # noqa + if rows: + warn_test_suite( + "PostgreSQL may not be able to DROP tables due to " + "idle in transaction: %s" + % ("; ".join(row._mapping["query"] for row in rows)) + ) diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg2.py b/lib/sqlalchemy/dialects/postgresql/psycopg2.py new file mode 100644 index 0000000..6747427 --- /dev/null +++ b/lib/sqlalchemy/dialects/postgresql/psycopg2.py @@ -0,0 +1,1088 @@ +# postgresql/psycopg2.py +# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors +# +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +r""" +.. dialect:: postgresql+psycopg2 + :name: psycopg2 + :dbapi: psycopg2 + :connectstring: postgresql+psycopg2://user:password@host:port/dbname[?key=value&key=value...] + :url: https://pypi.org/project/psycopg2/ + +psycopg2 Connect Arguments +-------------------------- + +Keyword arguments that are specific to the SQLAlchemy psycopg2 dialect +may be passed to :func:`_sa.create_engine()`, and include the following: + + +* ``isolation_level``: This option, available for all PostgreSQL dialects, + includes the ``AUTOCOMMIT`` isolation level when using the psycopg2 + dialect. This option sets the **default** isolation level for the + connection that is set immediately upon connection to the database before + the connection is pooled. This option is generally superseded by the more + modern :paramref:`_engine.Connection.execution_options.isolation_level` + execution option, detailed at :ref:`dbapi_autocommit`. + + .. seealso:: + + :ref:`psycopg2_isolation_level` + + :ref:`dbapi_autocommit` + + +* ``client_encoding``: sets the client encoding in a libpq-agnostic way, + using psycopg2's ``set_client_encoding()`` method. + + .. seealso:: + + :ref:`psycopg2_unicode` + +* ``use_native_unicode``: Under Python 2 only, this can be set to False to + disable the use of psycopg2's native Unicode support. + + .. seealso:: + + :ref:`psycopg2_disable_native_unicode` + + +* ``executemany_mode``, ``executemany_batch_page_size``, + ``executemany_values_page_size``: Allows use of psycopg2 + extensions for optimizing "executemany"-style queries. See the referenced + section below for details. + + .. seealso:: + + :ref:`psycopg2_executemany_mode` + +.. tip:: + + The above keyword arguments are **dialect** keyword arguments, meaning + that they are passed as explicit keyword arguments to :func:`_sa.create_engine()`:: + + engine = create_engine( + "postgresql+psycopg2://scott:tiger@localhost/test", + isolation_level="SERIALIZABLE", + ) + + These should not be confused with **DBAPI** connect arguments, which + are passed as part of the :paramref:`_sa.create_engine.connect_args` + dictionary and/or are passed in the URL query string, as detailed in + the section :ref:`custom_dbapi_args`. + +.. _psycopg2_ssl: + +SSL Connections +--------------- + +The psycopg2 module has a connection argument named ``sslmode`` for +controlling its behavior regarding secure (SSL) connections. The default is +``sslmode=prefer``; it will attempt an SSL connection and if that fails it +will fall back to an unencrypted connection. ``sslmode=require`` may be used +to ensure that only secure connections are established. Consult the +psycopg2 / libpq documentation for further options that are available. + +Note that ``sslmode`` is specific to psycopg2 so it is included in the +connection URI:: + + engine = sa.create_engine( + "postgresql+psycopg2://scott:tiger@192.168.0.199:5432/test?sslmode=require" + ) + +Unix Domain Connections +------------------------ + +psycopg2 supports connecting via Unix domain connections. When the ``host`` +portion of the URL is omitted, SQLAlchemy passes ``None`` to psycopg2, +which specifies Unix-domain communication rather than TCP/IP communication:: + + create_engine("postgresql+psycopg2://user:password@/dbname") + +By default, the socket file used is to connect to a Unix-domain socket +in ``/tmp``, or whatever socket directory was specified when PostgreSQL +was built. This value can be overridden by passing a pathname to psycopg2, +using ``host`` as an additional keyword argument:: + + create_engine("postgresql+psycopg2://user:password@/dbname?host=/var/lib/postgresql") + +.. seealso:: + + `PQconnectdbParams \ + `_ + +.. _psycopg2_multi_host: + +Specifying multiple fallback hosts +----------------------------------- + +psycopg2 supports multiple connection points in the connection string. +When the ``host`` parameter is used multiple times in the query section of +the URL, SQLAlchemy will create a single string of the host and port +information provided to make the connections. Tokens may consist of +``host::port`` or just ``host``; in the latter case, the default port +is selected by libpq. In the example below, three host connections +are specified, for ``HostA::PortA``, ``HostB`` connecting to the default port, +and ``HostC::PortC``:: + + create_engine( + "postgresql+psycopg2://user:password@/dbname?host=HostA:PortA&host=HostB&host=HostC:PortC" + ) + +As an alternative, libpq query string format also may be used; this specifies +``host`` and ``port`` as single query string arguments with comma-separated +lists - the default port can be chosen by indicating an empty value +in the comma separated list:: + + create_engine( + "postgresql+psycopg2://user:password@/dbname?host=HostA,HostB,HostC&port=PortA,,PortC" + ) + +With either URL style, connections to each host is attempted based on a +configurable strategy, which may be configured using the libpq +``target_session_attrs`` parameter. Per libpq this defaults to ``any`` +which indicates a connection to each host is then attempted until a connection is successful. +Other strategies include ``primary``, ``prefer-standby``, etc. The complete +list is documented by PostgreSQL at +`libpq connection strings `_. + +For example, to indicate two hosts using the ``primary`` strategy:: + + create_engine( + "postgresql+psycopg2://user:password@/dbname?host=HostA:PortA&host=HostB&host=HostC:PortC&target_session_attrs=primary" + ) + +.. versionchanged:: 1.4.40 Port specification in psycopg2 multiple host format + is repaired, previously ports were not correctly interpreted in this context. + libpq comma-separated format is also now supported. + +.. versionadded:: 1.3.20 Support for multiple hosts in PostgreSQL connection + string. + +.. seealso:: + + `libpq connection strings `_ - please refer + to this section in the libpq documentation for complete background on multiple host support. + + +Empty DSN Connections / Environment Variable Connections +--------------------------------------------------------- + +The psycopg2 DBAPI can connect to PostgreSQL by passing an empty DSN to the +libpq client library, which by default indicates to connect to a localhost +PostgreSQL database that is open for "trust" connections. This behavior can be +further tailored using a particular set of environment variables which are +prefixed with ``PG_...``, which are consumed by ``libpq`` to take the place of +any or all elements of the connection string. + +For this form, the URL can be passed without any elements other than the +initial scheme:: + + engine = create_engine('postgresql+psycopg2://') + +In the above form, a blank "dsn" string is passed to the ``psycopg2.connect()`` +function which in turn represents an empty DSN passed to libpq. + +.. versionadded:: 1.3.2 support for parameter-less connections with psycopg2. + +.. seealso:: + + `Environment Variables\ + `_ - + PostgreSQL documentation on how to use ``PG_...`` + environment variables for connections. + +.. _psycopg2_execution_options: + +Per-Statement/Connection Execution Options +------------------------------------------- + +The following DBAPI-specific options are respected when used with +:meth:`_engine.Connection.execution_options`, +:meth:`.Executable.execution_options`, +:meth:`_query.Query.execution_options`, +in addition to those not specific to DBAPIs: + +* ``isolation_level`` - Set the transaction isolation level for the lifespan + of a :class:`_engine.Connection` (can only be set on a connection, + not a statement + or query). See :ref:`psycopg2_isolation_level`. + +* ``stream_results`` - Enable or disable usage of psycopg2 server side + cursors - this feature makes use of "named" cursors in combination with + special result handling methods so that result rows are not fully buffered. + Defaults to False, meaning cursors are buffered by default. + +* ``max_row_buffer`` - when using ``stream_results``, an integer value that + specifies the maximum number of rows to buffer at a time. This is + interpreted by the :class:`.BufferedRowCursorResult`, and if omitted the + buffer will grow to ultimately store 1000 rows at a time. + + .. versionchanged:: 1.4 The ``max_row_buffer`` size can now be greater than + 1000, and the buffer will grow to that size. + +.. _psycopg2_batch_mode: + +.. _psycopg2_executemany_mode: + +Psycopg2 Fast Execution Helpers +------------------------------- + +Modern versions of psycopg2 include a feature known as +`Fast Execution Helpers \ +`_, which +have been shown in benchmarking to improve psycopg2's executemany() +performance, primarily with INSERT statements, by multiple orders of magnitude. +SQLAlchemy internally makes use of these extensions for ``executemany()`` style +calls, which correspond to lists of parameters being passed to +:meth:`_engine.Connection.execute` as detailed in :ref:`multiple parameter +sets `. The ORM also uses this mode internally whenever +possible. + +The two available extensions on the psycopg2 side are the ``execute_values()`` +and ``execute_batch()`` functions. The psycopg2 dialect defaults to using the +``execute_values()`` extension for all qualifying INSERT statements. + +.. versionchanged:: 1.4 The psycopg2 dialect now defaults to a new mode + ``"values_only"`` for ``executemany_mode``, which allows an order of + magnitude performance improvement for INSERT statements, but does not + include "batch" mode for UPDATE and DELETE statements which removes the + ability of ``cursor.rowcount`` to function correctly. + +The use of these extensions is controlled by the ``executemany_mode`` flag +which may be passed to :func:`_sa.create_engine`:: + + engine = create_engine( + "postgresql+psycopg2://scott:tiger@host/dbname", + executemany_mode='values_plus_batch') + + +Possible options for ``executemany_mode`` include: + +* ``values_only`` - this is the default value. the psycopg2 execute_values() + extension is used for qualifying INSERT statements, which rewrites the INSERT + to include multiple VALUES clauses so that many parameter sets can be + inserted with one statement. + + .. versionadded:: 1.4 Added ``"values_only"`` setting for ``executemany_mode`` + which is also now the default. + +* ``None`` - No psycopg2 extensions are not used, and the usual + ``cursor.executemany()`` method is used when invoking statements with + multiple parameter sets. + +* ``'batch'`` - Uses ``psycopg2.extras.execute_batch`` for all qualifying + INSERT, UPDATE and DELETE statements, so that multiple copies + of a SQL query, each one corresponding to a parameter set passed to + ``executemany()``, are joined into a single SQL string separated by a + semicolon. When using this mode, the :attr:`_engine.CursorResult.rowcount` + attribute will not contain a value for executemany-style executions. + +* ``'values_plus_batch'``- ``execute_values`` is used for qualifying INSERT + statements, ``execute_batch`` is used for UPDATE and DELETE. + When using this mode, the :attr:`_engine.CursorResult.rowcount` + attribute will not contain a value for executemany-style executions against + UPDATE and DELETE statements. + +By "qualifying statements", we mean that the statement being executed +must be a Core :func:`_expression.insert`, :func:`_expression.update` +or :func:`_expression.delete` construct, and not a plain textual SQL +string or one constructed using :func:`_expression.text`. When using the +ORM, all insert/update/delete statements used by the ORM flush process +are qualifying. + +The "page size" for the "values" and "batch" strategies can be affected +by using the ``executemany_batch_page_size`` and +``executemany_values_page_size`` engine parameters. These +control how many parameter sets +should be represented in each execution. The "values" page size defaults +to 1000, which is different that psycopg2's default. The "batch" page +size defaults to 100. These can be affected by passing new values to +:func:`_engine.create_engine`:: + + engine = create_engine( + "postgresql+psycopg2://scott:tiger@host/dbname", + executemany_mode='values', + executemany_values_page_size=10000, executemany_batch_page_size=500) + +.. versionchanged:: 1.4 + + The default for ``executemany_values_page_size`` is now 1000, up from + 100. + +.. seealso:: + + :ref:`tutorial_multiple_parameters` - General information on using the + :class:`_engine.Connection` + object to execute statements in such a way as to make + use of the DBAPI ``.executemany()`` method. + + +.. _psycopg2_unicode: + +Unicode with Psycopg2 +---------------------- + +The psycopg2 DBAPI driver supports Unicode data transparently. Under Python 2 +only, the SQLAlchemy psycopg2 dialect will enable the +``psycopg2.extensions.UNICODE`` extension by default to ensure Unicode is +handled properly; under Python 3, this is psycopg2's default behavior. + +The client character encoding can be controlled for the psycopg2 dialect +in the following ways: + +* For PostgreSQL 9.1 and above, the ``client_encoding`` parameter may be + passed in the database URL; this parameter is consumed by the underlying + ``libpq`` PostgreSQL client library:: + + engine = create_engine("postgresql+psycopg2://user:pass@host/dbname?client_encoding=utf8") + + Alternatively, the above ``client_encoding`` value may be passed using + :paramref:`_sa.create_engine.connect_args` for programmatic establishment with + ``libpq``:: + + engine = create_engine( + "postgresql+psycopg2://user:pass@host/dbname", + connect_args={'client_encoding': 'utf8'} + ) + +* For all PostgreSQL versions, psycopg2 supports a client-side encoding + value that will be passed to database connections when they are first + established. The SQLAlchemy psycopg2 dialect supports this using the + ``client_encoding`` parameter passed to :func:`_sa.create_engine`:: + + engine = create_engine( + "postgresql+psycopg2://user:pass@host/dbname", + client_encoding="utf8" + ) + + .. tip:: The above ``client_encoding`` parameter admittedly is very similar + in appearance to usage of the parameter within the + :paramref:`_sa.create_engine.connect_args` dictionary; the difference + above is that the parameter is consumed by psycopg2 and is + passed to the database connection using ``SET client_encoding TO + 'utf8'``; in the previously mentioned style, the parameter is instead + passed through psycopg2 and consumed by the ``libpq`` library. + +* A common way to set up client encoding with PostgreSQL databases is to + ensure it is configured within the server-side postgresql.conf file; + this is the recommended way to set encoding for a server that is + consistently of one encoding in all databases:: + + # postgresql.conf file + + # client_encoding = sql_ascii # actually, defaults to database + # encoding + client_encoding = utf8 + +.. _psycopg2_disable_native_unicode: + +Disabling Native Unicode +^^^^^^^^^^^^^^^^^^^^^^^^ + +Under Python 2 only, SQLAlchemy can also be instructed to skip the usage of the +psycopg2 ``UNICODE`` extension and to instead utilize its own unicode +encode/decode services, which are normally reserved only for those DBAPIs that +don't fully support unicode directly. Passing ``use_native_unicode=False`` to +:func:`_sa.create_engine` will disable usage of ``psycopg2.extensions. +UNICODE``. SQLAlchemy will instead encode data itself into Python bytestrings +on the way in and coerce from bytes on the way back, using the value of the +:func:`_sa.create_engine` ``encoding`` parameter, which defaults to ``utf-8``. +SQLAlchemy's own unicode encode/decode functionality is steadily becoming +obsolete as most DBAPIs now support unicode fully. + + +Transactions +------------ + +The psycopg2 dialect fully supports SAVEPOINT and two-phase commit operations. + +.. _psycopg2_isolation_level: + +Psycopg2 Transaction Isolation Level +------------------------------------- + +As discussed in :ref:`postgresql_isolation_level`, +all PostgreSQL dialects support setting of transaction isolation level +both via the ``isolation_level`` parameter passed to :func:`_sa.create_engine` +, +as well as the ``isolation_level`` argument used by +:meth:`_engine.Connection.execution_options`. When using the psycopg2 dialect +, these +options make use of psycopg2's ``set_isolation_level()`` connection method, +rather than emitting a PostgreSQL directive; this is because psycopg2's +API-level setting is always emitted at the start of each transaction in any +case. + +The psycopg2 dialect supports these constants for isolation level: + +* ``READ COMMITTED`` +* ``READ UNCOMMITTED`` +* ``REPEATABLE READ`` +* ``SERIALIZABLE`` +* ``AUTOCOMMIT`` + +.. seealso:: + + :ref:`postgresql_isolation_level` + + :ref:`pg8000_isolation_level` + + +NOTICE logging +--------------- + +The psycopg2 dialect will log PostgreSQL NOTICE messages +via the ``sqlalchemy.dialects.postgresql`` logger. When this logger +is set to the ``logging.INFO`` level, notice messages will be logged:: + + import logging + + logging.getLogger('sqlalchemy.dialects.postgresql').setLevel(logging.INFO) + +Above, it is assumed that logging is configured externally. If this is not +the case, configuration such as ``logging.basicConfig()`` must be utilized:: + + import logging + + logging.basicConfig() # log messages to stdout + logging.getLogger('sqlalchemy.dialects.postgresql').setLevel(logging.INFO) + +.. seealso:: + + `Logging HOWTO `_ - on the python.org website + +.. _psycopg2_hstore: + +HSTORE type +------------ + +The ``psycopg2`` DBAPI includes an extension to natively handle marshalling of +the HSTORE type. The SQLAlchemy psycopg2 dialect will enable this extension +by default when psycopg2 version 2.4 or greater is used, and +it is detected that the target database has the HSTORE type set up for use. +In other words, when the dialect makes the first +connection, a sequence like the following is performed: + +1. Request the available HSTORE oids using + ``psycopg2.extras.HstoreAdapter.get_oids()``. + If this function returns a list of HSTORE identifiers, we then determine + that the ``HSTORE`` extension is present. + This function is **skipped** if the version of psycopg2 installed is + less than version 2.4. + +2. If the ``use_native_hstore`` flag is at its default of ``True``, and + we've detected that ``HSTORE`` oids are available, the + ``psycopg2.extensions.register_hstore()`` extension is invoked for all + connections. + +The ``register_hstore()`` extension has the effect of **all Python +dictionaries being accepted as parameters regardless of the type of target +column in SQL**. The dictionaries are converted by this extension into a +textual HSTORE expression. If this behavior is not desired, disable the +use of the hstore extension by setting ``use_native_hstore`` to ``False`` as +follows:: + + engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test", + use_native_hstore=False) + +The ``HSTORE`` type is **still supported** when the +``psycopg2.extensions.register_hstore()`` extension is not used. It merely +means that the coercion between Python dictionaries and the HSTORE +string format, on both the parameter side and the result side, will take +place within SQLAlchemy's own marshalling logic, and not that of ``psycopg2`` +which may be more performant. + +""" # noqa +from __future__ import absolute_import + +import decimal +import logging +import re +from uuid import UUID as _python_UUID + +from .array import ARRAY as PGARRAY +from .base import _ColonCast +from .base import _DECIMAL_TYPES +from .base import _FLOAT_TYPES +from .base import _INT_TYPES +from .base import ENUM +from .base import PGCompiler +from .base import PGDialect +from .base import PGExecutionContext +from .base import PGIdentifierPreparer +from .base import UUID +from .hstore import HSTORE +from .json import JSON +from .json import JSONB +from ... import exc +from ... import processors +from ... import types as sqltypes +from ... import util +from ...engine import cursor as _cursor +from ...util import collections_abc + + +logger = logging.getLogger("sqlalchemy.dialects.postgresql") + + +class _PGNumeric(sqltypes.Numeric): + def bind_processor(self, dialect): + return None + + def result_processor(self, dialect, coltype): + if self.asdecimal: + if coltype in _FLOAT_TYPES: + return processors.to_decimal_processor_factory( + decimal.Decimal, self._effective_decimal_return_scale + ) + elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: + # pg8000 returns Decimal natively for 1700 + return None + else: + raise exc.InvalidRequestError( + "Unknown PG numeric type: %d" % coltype + ) + else: + if coltype in _FLOAT_TYPES: + # pg8000 returns float natively for 701 + return None + elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: + return processors.to_float + else: + raise exc.InvalidRequestError( + "Unknown PG numeric type: %d" % coltype + ) + + +class _PGEnum(ENUM): + def result_processor(self, dialect, coltype): + if util.py2k and self._expect_unicode is True: + # for py2k, if the enum type needs unicode data (which is set up as + # part of the Enum() constructor based on values passed as py2k + # unicode objects) we have to use our own converters since + # psycopg2's don't work, a rare exception to the "modern DBAPIs + # support unicode everywhere" theme of deprecating + # convert_unicode=True. Use the special "force_nocheck" directive + # which forces unicode conversion to happen on the Python side + # without an isinstance() check. in py3k psycopg2 does the right + # thing automatically. + self._expect_unicode = "force_nocheck" + return super(_PGEnum, self).result_processor(dialect, coltype) + + +class _PGHStore(HSTORE): + def bind_processor(self, dialect): + if dialect._has_native_hstore: + return None + else: + return super(_PGHStore, self).bind_processor(dialect) + + def result_processor(self, dialect, coltype): + if dialect._has_native_hstore: + return None + else: + return super(_PGHStore, self).result_processor(dialect, coltype) + + +class _PGARRAY(PGARRAY): + def bind_expression(self, bindvalue): + return _ColonCast(bindvalue, self) + + +class _PGJSON(JSON): + def result_processor(self, dialect, coltype): + return None + + +class _PGJSONB(JSONB): + def result_processor(self, dialect, coltype): + return None + + +class _PGUUID(UUID): + def bind_processor(self, dialect): + if not self.as_uuid and dialect.use_native_uuid: + + def process(value): + if value is not None: + value = _python_UUID(value) + return value + + return process + + def result_processor(self, dialect, coltype): + if not self.as_uuid and dialect.use_native_uuid: + + def process(value): + if value is not None: + value = str(value) + return value + + return process + + +_server_side_id = util.counter() + + +class PGExecutionContext_psycopg2(PGExecutionContext): + _psycopg2_fetched_rows = None + + def create_server_side_cursor(self): + # use server-side cursors: + # https://lists.initd.org/pipermail/psycopg/2007-January/005251.html + ident = "c_%s_%s" % (hex(id(self))[2:], hex(_server_side_id())[2:]) + return self._dbapi_connection.cursor(ident) + + def post_exec(self): + if ( + self._psycopg2_fetched_rows + and self.compiled + and self.compiled.returning + ): + # psycopg2 execute_values will provide for a real cursor where + # cursor.description works correctly. however, it executes the + # INSERT statement multiple times for multiple pages of rows, so + # while this cursor also supports calling .fetchall() directly, in + # order to get the list of all rows inserted across multiple pages, + # we have to retrieve the aggregated list from the execute_values() + # function directly. + strat_cls = _cursor.FullyBufferedCursorFetchStrategy + self.cursor_fetch_strategy = strat_cls( + self.cursor, initial_buffer=self._psycopg2_fetched_rows + ) + self._log_notices(self.cursor) + + def _log_notices(self, cursor): + # check also that notices is an iterable, after it's already + # established that we will be iterating through it. This is to get + # around test suites such as SQLAlchemy's using a Mock object for + # cursor + if not cursor.connection.notices or not isinstance( + cursor.connection.notices, collections_abc.Iterable + ): + return + + for notice in cursor.connection.notices: + # NOTICE messages have a + # newline character at the end + logger.info(notice.rstrip()) + + cursor.connection.notices[:] = [] + + +class PGCompiler_psycopg2(PGCompiler): + pass + + +class PGIdentifierPreparer_psycopg2(PGIdentifierPreparer): + pass + + +EXECUTEMANY_PLAIN = util.symbol("executemany_plain", canonical=0) +EXECUTEMANY_BATCH = util.symbol("executemany_batch", canonical=1) +EXECUTEMANY_VALUES = util.symbol("executemany_values", canonical=2) +EXECUTEMANY_VALUES_PLUS_BATCH = util.symbol( + "executemany_values_plus_batch", + canonical=EXECUTEMANY_BATCH | EXECUTEMANY_VALUES, +) + + +class PGDialect_psycopg2(PGDialect): + driver = "psycopg2" + + supports_statement_cache = True + + if util.py2k: + # turn off supports_unicode_statements for Python 2. psycopg2 supports + # unicode statements in Py2K. But! it does not support unicode *bound + # parameter names* because it uses the Python "%" operator to + # interpolate these into the string, and this fails. So for Py2K, we + # have to use full-on encoding for statements and parameters before + # passing to cursor.execute(). + supports_unicode_statements = False + + supports_server_side_cursors = True + + default_paramstyle = "pyformat" + # set to true based on psycopg2 version + supports_sane_multi_rowcount = False + execution_ctx_cls = PGExecutionContext_psycopg2 + statement_compiler = PGCompiler_psycopg2 + preparer = PGIdentifierPreparer_psycopg2 + psycopg2_version = (0, 0) + + _has_native_hstore = True + + engine_config_types = PGDialect.engine_config_types.union( + {"use_native_unicode": util.asbool} + ) + + colspecs = util.update_copy( + PGDialect.colspecs, + { + sqltypes.Numeric: _PGNumeric, + ENUM: _PGEnum, # needs force_unicode + sqltypes.Enum: _PGEnum, # needs force_unicode + HSTORE: _PGHStore, + JSON: _PGJSON, + sqltypes.JSON: _PGJSON, + JSONB: _PGJSONB, + UUID: _PGUUID, + sqltypes.ARRAY: _PGARRAY, + }, + ) + + def __init__( + self, + use_native_unicode=True, + client_encoding=None, + use_native_hstore=True, + use_native_uuid=True, + executemany_mode="values_only", + executemany_batch_page_size=100, + executemany_values_page_size=1000, + **kwargs + ): + PGDialect.__init__(self, **kwargs) + self.use_native_unicode = use_native_unicode + if not use_native_unicode and not util.py2k: + raise exc.ArgumentError( + "psycopg2 native_unicode mode is required under Python 3" + ) + if not use_native_hstore: + self._has_native_hstore = False + self.use_native_hstore = use_native_hstore + self.use_native_uuid = use_native_uuid + self.supports_unicode_binds = use_native_unicode + self.client_encoding = client_encoding + + # Parse executemany_mode argument, allowing it to be only one of the + # symbol names + self.executemany_mode = util.symbol.parse_user_argument( + executemany_mode, + { + EXECUTEMANY_PLAIN: [None], + EXECUTEMANY_BATCH: ["batch"], + EXECUTEMANY_VALUES: ["values_only"], + EXECUTEMANY_VALUES_PLUS_BATCH: ["values_plus_batch", "values"], + }, + "executemany_mode", + ) + + if self.executemany_mode & EXECUTEMANY_VALUES: + self.insert_executemany_returning = True + + self.executemany_batch_page_size = executemany_batch_page_size + self.executemany_values_page_size = executemany_values_page_size + + if self.dbapi and hasattr(self.dbapi, "__version__"): + m = re.match(r"(\d+)\.(\d+)(?:\.(\d+))?", self.dbapi.__version__) + if m: + self.psycopg2_version = tuple( + int(x) for x in m.group(1, 2, 3) if x is not None + ) + + if self.psycopg2_version < (2, 7): + raise ImportError( + "psycopg2 version 2.7 or higher is required." + ) + + def initialize(self, connection): + super(PGDialect_psycopg2, self).initialize(connection) + self._has_native_hstore = ( + self.use_native_hstore + and self._hstore_oids(connection.connection) is not None + ) + + # PGDialect.initialize() checks server version for <= 8.2 and sets + # this flag to False if so + if not self.full_returning: + self.insert_executemany_returning = False + self.executemany_mode = EXECUTEMANY_PLAIN + + self.supports_sane_multi_rowcount = not ( + self.executemany_mode & EXECUTEMANY_BATCH + ) + + @classmethod + def dbapi(cls): + import psycopg2 + + return psycopg2 + + @classmethod + def _psycopg2_extensions(cls): + from psycopg2 import extensions + + return extensions + + @classmethod + def _psycopg2_extras(cls): + from psycopg2 import extras + + return extras + + @util.memoized_property + def _isolation_lookup(self): + extensions = self._psycopg2_extensions() + return { + "AUTOCOMMIT": extensions.ISOLATION_LEVEL_AUTOCOMMIT, + "READ COMMITTED": extensions.ISOLATION_LEVEL_READ_COMMITTED, + "READ UNCOMMITTED": extensions.ISOLATION_LEVEL_READ_UNCOMMITTED, + "REPEATABLE READ": extensions.ISOLATION_LEVEL_REPEATABLE_READ, + "SERIALIZABLE": extensions.ISOLATION_LEVEL_SERIALIZABLE, + } + + def set_isolation_level(self, connection, level): + try: + level = self._isolation_lookup[level.replace("_", " ")] + except KeyError as err: + util.raise_( + exc.ArgumentError( + "Invalid value '%s' for isolation_level. " + "Valid isolation levels for %s are %s" + % (level, self.name, ", ".join(self._isolation_lookup)) + ), + replace_context=err, + ) + + connection.set_isolation_level(level) + + def set_readonly(self, connection, value): + connection.readonly = value + + def get_readonly(self, connection): + return connection.readonly + + def set_deferrable(self, connection, value): + connection.deferrable = value + + def get_deferrable(self, connection): + return connection.deferrable + + def do_ping(self, dbapi_connection): + cursor = None + before_autocommit = dbapi_connection.autocommit + try: + if not before_autocommit: + dbapi_connection.autocommit = True + cursor = dbapi_connection.cursor() + try: + cursor.execute(self._dialect_specific_select_one) + finally: + cursor.close() + if not before_autocommit and not dbapi_connection.closed: + dbapi_connection.autocommit = before_autocommit + except self.dbapi.Error as err: + if self.is_disconnect(err, dbapi_connection, cursor): + return False + else: + raise + else: + return True + + def on_connect(self): + extras = self._psycopg2_extras() + extensions = self._psycopg2_extensions() + + fns = [] + if self.client_encoding is not None: + + def on_connect(conn): + conn.set_client_encoding(self.client_encoding) + + fns.append(on_connect) + + if self.isolation_level is not None: + + def on_connect(conn): + self.set_isolation_level(conn, self.isolation_level) + + fns.append(on_connect) + + if self.dbapi and self.use_native_uuid: + + def on_connect(conn): + extras.register_uuid(None, conn) + + fns.append(on_connect) + + if util.py2k and self.dbapi and self.use_native_unicode: + + def on_connect(conn): + extensions.register_type(extensions.UNICODE, conn) + extensions.register_type(extensions.UNICODEARRAY, conn) + + fns.append(on_connect) + + if self.dbapi and self.use_native_hstore: + + def on_connect(conn): + hstore_oids = self._hstore_oids(conn) + if hstore_oids is not None: + oid, array_oid = hstore_oids + kw = {"oid": oid} + if util.py2k: + kw["unicode"] = True + kw["array_oid"] = array_oid + extras.register_hstore(conn, **kw) + + fns.append(on_connect) + + if self.dbapi and self._json_deserializer: + + def on_connect(conn): + extras.register_default_json( + conn, loads=self._json_deserializer + ) + extras.register_default_jsonb( + conn, loads=self._json_deserializer + ) + + fns.append(on_connect) + + if fns: + + def on_connect(conn): + for fn in fns: + fn(conn) + + return on_connect + else: + return None + + def do_executemany(self, cursor, statement, parameters, context=None): + if ( + self.executemany_mode & EXECUTEMANY_VALUES + and context + and context.isinsert + and context.compiled._is_safe_for_fast_insert_values_helper + ): + executemany_values = ( + "(%s)" % context.compiled.insert_single_values_expr + ) + if not self.supports_unicode_statements: + executemany_values = executemany_values.encode(self.encoding) + + # guard for statement that was altered via event hook or similar + if executemany_values not in statement: + executemany_values = None + else: + executemany_values = None + + if executemany_values: + statement = statement.replace(executemany_values, "%s") + if self.executemany_values_page_size: + kwargs = {"page_size": self.executemany_values_page_size} + else: + kwargs = {} + xtras = self._psycopg2_extras() + context._psycopg2_fetched_rows = xtras.execute_values( + cursor, + statement, + parameters, + template=executemany_values, + fetch=bool(context.compiled.returning), + **kwargs + ) + + elif self.executemany_mode & EXECUTEMANY_BATCH: + if self.executemany_batch_page_size: + kwargs = {"page_size": self.executemany_batch_page_size} + else: + kwargs = {} + self._psycopg2_extras().execute_batch( + cursor, statement, parameters, **kwargs + ) + else: + cursor.executemany(statement, parameters) + + @util.memoized_instancemethod + def _hstore_oids(self, conn): + extras = self._psycopg2_extras() + if hasattr(conn, "dbapi_connection"): + conn = conn.dbapi_connection + oids = extras.HstoreAdapter.get_oids(conn) + if oids is not None and oids[0]: + return oids[0:2] + else: + return None + + def create_connect_args(self, url): + opts = url.translate_connect_args(username="user") + + is_multihost = False + if "host" in url.query: + is_multihost = isinstance(url.query["host"], (list, tuple)) + + if opts or url.query: + if not opts: + opts = {} + if "port" in opts: + opts["port"] = int(opts["port"]) + opts.update(url.query) + if is_multihost: + hosts, ports = zip( + *[ + token.split(":") if ":" in token else (token, "") + for token in url.query["host"] + ] + ) + opts["host"] = ",".join(hosts) + if "port" in opts: + raise exc.ArgumentError( + "Can't mix 'multihost' formats together; use " + '"host=h1,h2,h3&port=p1,p2,p3" or ' + '"host=h1:p1&host=h2:p2&host=h3:p3" separately' + ) + opts["port"] = ",".join(ports) + return ([], opts) + else: + # no connection arguments whatsoever; psycopg2.connect() + # requires that "dsn" be present as a blank string. + return ([""], opts) + + def is_disconnect(self, e, connection, cursor): + if isinstance(e, self.dbapi.Error): + # check the "closed" flag. this might not be + # present on old psycopg2 versions. Also, + # this flag doesn't actually help in a lot of disconnect + # situations, so don't rely on it. + if getattr(connection, "closed", False): + return True + + # checks based on strings. in the case that .closed + # didn't cut it, fall back onto these. + str_e = str(e).partition("\n")[0] + for msg in [ + # these error messages from libpq: interfaces/libpq/fe-misc.c + # and interfaces/libpq/fe-secure.c. + "terminating connection", + "closed the connection", + "connection not open", + "could not receive data from server", + "could not send data to server", + # psycopg2 client errors, psycopg2/connection.h, + # psycopg2/cursor.h + "connection already closed", + "cursor already closed", + # not sure where this path is originally from, it may + # be obsolete. It really says "losed", not "closed". + "losed the connection unexpectedly", + # these can occur in newer SSL + "connection has been closed unexpectedly", + "SSL error: decryption failed or bad record mac", + "SSL SYSCALL error: Bad file descriptor", + "SSL SYSCALL error: EOF detected", + "SSL SYSCALL error: Operation timed out", + "SSL SYSCALL error: Bad address", + ]: + idx = str_e.find(msg) + if idx >= 0 and '"' not in str_e[:idx]: + return True + return False + + +dialect = PGDialect_psycopg2 diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg2cffi.py b/lib/sqlalchemy/dialects/postgresql/psycopg2cffi.py new file mode 100644 index 0000000..10d1aae --- /dev/null +++ b/lib/sqlalchemy/dialects/postgresql/psycopg2cffi.py @@ -0,0 +1,60 @@ +# testing/engines.py +# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors +# +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +r""" +.. dialect:: postgresql+psycopg2cffi + :name: psycopg2cffi + :dbapi: psycopg2cffi + :connectstring: postgresql+psycopg2cffi://user:password@host:port/dbname[?key=value&key=value...] + :url: https://pypi.org/project/psycopg2cffi/ + +``psycopg2cffi`` is an adaptation of ``psycopg2``, using CFFI for the C +layer. This makes it suitable for use in e.g. PyPy. Documentation +is as per ``psycopg2``. + +.. versionadded:: 1.0.0 + +.. seealso:: + + :mod:`sqlalchemy.dialects.postgresql.psycopg2` + +""" # noqa +from .psycopg2 import PGDialect_psycopg2 + + +class PGDialect_psycopg2cffi(PGDialect_psycopg2): + driver = "psycopg2cffi" + supports_unicode_statements = True + supports_statement_cache = True + + # psycopg2cffi's first release is 2.5.0, but reports + # __version__ as 2.4.4. Subsequent releases seem to have + # fixed this. + + FEATURE_VERSION_MAP = dict( + native_json=(2, 4, 4), + native_jsonb=(2, 7, 1), + sane_multi_rowcount=(2, 4, 4), + array_oid=(2, 4, 4), + hstore_adapter=(2, 4, 4), + ) + + @classmethod + def dbapi(cls): + return __import__("psycopg2cffi") + + @classmethod + def _psycopg2_extensions(cls): + root = __import__("psycopg2cffi", fromlist=["extensions"]) + return root.extensions + + @classmethod + def _psycopg2_extras(cls): + root = __import__("psycopg2cffi", fromlist=["extras"]) + return root.extras + + +dialect = PGDialect_psycopg2cffi diff --git a/lib/sqlalchemy/dialects/postgresql/pygresql.py b/lib/sqlalchemy/dialects/postgresql/pygresql.py new file mode 100644 index 0000000..d273b8c --- /dev/null +++ b/lib/sqlalchemy/dialects/postgresql/pygresql.py @@ -0,0 +1,278 @@ +# postgresql/pygresql.py +# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors +# +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +""" +.. dialect:: postgresql+pygresql + :name: pygresql + :dbapi: pgdb + :connectstring: postgresql+pygresql://user:password@host:port/dbname[?key=value&key=value...] + :url: https://www.pygresql.org/ + +.. note:: + + The pygresql dialect is **not tested as part of SQLAlchemy's continuous + integration** and may have unresolved issues. The recommended PostgreSQL + dialect is psycopg2. + +.. deprecated:: 1.4 The pygresql DBAPI is deprecated and will be removed + in a future version. Please use one of the supported DBAPIs to + connect to PostgreSQL. + +""" # noqa + +import decimal +import re + +from .base import _DECIMAL_TYPES +from .base import _FLOAT_TYPES +from .base import _INT_TYPES +from .base import PGCompiler +from .base import PGDialect +from .base import PGIdentifierPreparer +from .base import UUID +from .hstore import HSTORE +from .json import JSON +from .json import JSONB +from ... import exc +from ... import processors +from ... import util +from ...sql.elements import Null +from ...types import JSON as Json +from ...types import Numeric + + +class _PGNumeric(Numeric): + def bind_processor(self, dialect): + return None + + def result_processor(self, dialect, coltype): + if not isinstance(coltype, int): + coltype = coltype.oid + if self.asdecimal: + if coltype in _FLOAT_TYPES: + return processors.to_decimal_processor_factory( + decimal.Decimal, self._effective_decimal_return_scale + ) + elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: + # PyGreSQL returns Decimal natively for 1700 (numeric) + return None + else: + raise exc.InvalidRequestError( + "Unknown PG numeric type: %d" % coltype + ) + else: + if coltype in _FLOAT_TYPES: + # PyGreSQL returns float natively for 701 (float8) + return None + elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: + return processors.to_float + else: + raise exc.InvalidRequestError( + "Unknown PG numeric type: %d" % coltype + ) + + +class _PGHStore(HSTORE): + def bind_processor(self, dialect): + if not dialect.has_native_hstore: + return super(_PGHStore, self).bind_processor(dialect) + hstore = dialect.dbapi.Hstore + + def process(value): + if isinstance(value, dict): + return hstore(value) + return value + + return process + + def result_processor(self, dialect, coltype): + if not dialect.has_native_hstore: + return super(_PGHStore, self).result_processor(dialect, coltype) + + +class _PGJSON(JSON): + def bind_processor(self, dialect): + if not dialect.has_native_json: + return super(_PGJSON, self).bind_processor(dialect) + json = dialect.dbapi.Json + + def process(value): + if value is self.NULL: + value = None + elif isinstance(value, Null) or ( + value is None and self.none_as_null + ): + return None + if value is None or isinstance(value, (dict, list)): + return json(value) + return value + + return process + + def result_processor(self, dialect, coltype): + if not dialect.has_native_json: + return super(_PGJSON, self).result_processor(dialect, coltype) + + +class _PGJSONB(JSONB): + def bind_processor(self, dialect): + if not dialect.has_native_json: + return super(_PGJSONB, self).bind_processor(dialect) + json = dialect.dbapi.Json + + def process(value): + if value is self.NULL: + value = None + elif isinstance(value, Null) or ( + value is None and self.none_as_null + ): + return None + if value is None or isinstance(value, (dict, list)): + return json(value) + return value + + return process + + def result_processor(self, dialect, coltype): + if not dialect.has_native_json: + return super(_PGJSONB, self).result_processor(dialect, coltype) + + +class _PGUUID(UUID): + def bind_processor(self, dialect): + if not dialect.has_native_uuid: + return super(_PGUUID, self).bind_processor(dialect) + uuid = dialect.dbapi.Uuid + + def process(value): + if value is None: + return None + if isinstance(value, (str, bytes)): + if len(value) == 16: + return uuid(bytes=value) + return uuid(value) + if isinstance(value, int): + return uuid(int=value) + return value + + return process + + def result_processor(self, dialect, coltype): + if not dialect.has_native_uuid: + return super(_PGUUID, self).result_processor(dialect, coltype) + if not self.as_uuid: + + def process(value): + if value is not None: + return str(value) + + return process + + +class _PGCompiler(PGCompiler): + def visit_mod_binary(self, binary, operator, **kw): + return ( + self.process(binary.left, **kw) + + " %% " + + self.process(binary.right, **kw) + ) + + def post_process_text(self, text): + return text.replace("%", "%%") + + +class _PGIdentifierPreparer(PGIdentifierPreparer): + def _escape_identifier(self, value): + value = value.replace(self.escape_quote, self.escape_to_quote) + return value.replace("%", "%%") + + +class PGDialect_pygresql(PGDialect): + + driver = "pygresql" + supports_statement_cache = True + + statement_compiler = _PGCompiler + preparer = _PGIdentifierPreparer + + @classmethod + def dbapi(cls): + import pgdb + + util.warn_deprecated( + "The pygresql DBAPI is deprecated and will be removed " + "in a future version. Please use one of the supported DBAPIs to " + "connect to PostgreSQL.", + version="1.4", + ) + + return pgdb + + colspecs = util.update_copy( + PGDialect.colspecs, + { + Numeric: _PGNumeric, + HSTORE: _PGHStore, + Json: _PGJSON, + JSON: _PGJSON, + JSONB: _PGJSONB, + UUID: _PGUUID, + }, + ) + + def __init__(self, **kwargs): + super(PGDialect_pygresql, self).__init__(**kwargs) + try: + version = self.dbapi.version + m = re.match(r"(\d+)\.(\d+)", version) + version = (int(m.group(1)), int(m.group(2))) + except (AttributeError, ValueError, TypeError): + version = (0, 0) + self.dbapi_version = version + if version < (5, 0): + has_native_hstore = has_native_json = has_native_uuid = False + if version != (0, 0): + util.warn( + "PyGreSQL is only fully supported by SQLAlchemy" + " since version 5.0." + ) + else: + self.supports_unicode_statements = True + self.supports_unicode_binds = True + has_native_hstore = has_native_json = has_native_uuid = True + self.has_native_hstore = has_native_hstore + self.has_native_json = has_native_json + self.has_native_uuid = has_native_uuid + + def create_connect_args(self, url): + opts = url.translate_connect_args(username="user") + if "port" in opts: + opts["host"] = "%s:%s" % ( + opts.get("host", "").rsplit(":", 1)[0], + opts.pop("port"), + ) + opts.update(url.query) + return [], opts + + def is_disconnect(self, e, connection, cursor): + if isinstance(e, self.dbapi.Error): + if not connection: + return False + try: + connection = connection.connection + except AttributeError: + pass + else: + if not connection: + return False + try: + return connection.closed + except AttributeError: # PyGreSQL < 5.0 + return connection._cnx is None + return False + + +dialect = PGDialect_pygresql diff --git a/lib/sqlalchemy/dialects/postgresql/pypostgresql.py b/lib/sqlalchemy/dialects/postgresql/pypostgresql.py new file mode 100644 index 0000000..886e368 --- /dev/null +++ b/lib/sqlalchemy/dialects/postgresql/pypostgresql.py @@ -0,0 +1,126 @@ +# postgresql/pypostgresql.py +# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors +# +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +""" +.. dialect:: postgresql+pypostgresql + :name: py-postgresql + :dbapi: pypostgresql + :connectstring: postgresql+pypostgresql://user:password@host:port/dbname[?key=value&key=value...] + :url: https://python.projects.pgfoundry.org/ + +.. note:: + + The pypostgresql dialect is **not tested as part of SQLAlchemy's continuous + integration** and may have unresolved issues. The recommended PostgreSQL + driver is psycopg2. + +.. deprecated:: 1.4 The py-postgresql DBAPI is deprecated and will be removed + in a future version. This DBAPI is superseded by the external + version available at external-dialect_. Please use the external version or + one of the supported DBAPIs to connect to PostgreSQL. + +.. TODO update link +.. _external-dialect: https://github.com/PyGreSQL + +""" # noqa + +from .base import PGDialect +from .base import PGExecutionContext +from ... import processors +from ... import types as sqltypes +from ... import util + + +class PGNumeric(sqltypes.Numeric): + def bind_processor(self, dialect): + return processors.to_str + + def result_processor(self, dialect, coltype): + if self.asdecimal: + return None + else: + return processors.to_float + + +class PGExecutionContext_pypostgresql(PGExecutionContext): + pass + + +class PGDialect_pypostgresql(PGDialect): + driver = "pypostgresql" + + supports_statement_cache = True + supports_unicode_statements = True + supports_unicode_binds = True + description_encoding = None + default_paramstyle = "pyformat" + + # requires trunk version to support sane rowcounts + # TODO: use dbapi version information to set this flag appropriately + supports_sane_rowcount = True + supports_sane_multi_rowcount = False + + execution_ctx_cls = PGExecutionContext_pypostgresql + colspecs = util.update_copy( + PGDialect.colspecs, + { + sqltypes.Numeric: PGNumeric, + # prevents PGNumeric from being used + sqltypes.Float: sqltypes.Float, + }, + ) + + @classmethod + def dbapi(cls): + from postgresql.driver import dbapi20 + + # TODO update link + util.warn_deprecated( + "The py-postgresql DBAPI is deprecated and will be removed " + "in a future version. This DBAPI is superseded by the external" + "version available at https://github.com/PyGreSQL. Please " + "use one of the supported DBAPIs to connect to PostgreSQL.", + version="1.4", + ) + + return dbapi20 + + _DBAPI_ERROR_NAMES = [ + "Error", + "InterfaceError", + "DatabaseError", + "DataError", + "OperationalError", + "IntegrityError", + "InternalError", + "ProgrammingError", + "NotSupportedError", + ] + + @util.memoized_property + def dbapi_exception_translation_map(self): + if self.dbapi is None: + return {} + + return dict( + (getattr(self.dbapi, name).__name__, name) + for name in self._DBAPI_ERROR_NAMES + ) + + def create_connect_args(self, url): + opts = url.translate_connect_args(username="user") + if "port" in opts: + opts["port"] = int(opts["port"]) + else: + opts["port"] = 5432 + opts.update(url.query) + return ([], opts) + + def is_disconnect(self, e, connection, cursor): + return "connection is closed" in str(e) + + +dialect = PGDialect_pypostgresql diff --git a/lib/sqlalchemy/dialects/postgresql/ranges.py b/lib/sqlalchemy/dialects/postgresql/ranges.py new file mode 100644 index 0000000..51f3b04 --- /dev/null +++ b/lib/sqlalchemy/dialects/postgresql/ranges.py @@ -0,0 +1,138 @@ +# Copyright (C) 2013-2022 the SQLAlchemy authors and contributors +# +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php + +from ... import types as sqltypes + + +__all__ = ("INT4RANGE", "INT8RANGE", "NUMRANGE") + + +class RangeOperators(object): + """ + This mixin provides functionality for the Range Operators + listed in the Range Operators table of the `PostgreSQL documentation`__ + for Range Functions and Operators. It is used by all the range types + provided in the ``postgres`` dialect and can likely be used for + any range types you create yourself. + + __ https://www.postgresql.org/docs/current/static/functions-range.html + + No extra support is provided for the Range Functions listed in the Range + Functions table of the PostgreSQL documentation. For these, the normal + :func:`~sqlalchemy.sql.expression.func` object should be used. + + """ + + class comparator_factory(sqltypes.Concatenable.Comparator): + """Define comparison operations for range types.""" + + def __ne__(self, other): + "Boolean expression. Returns true if two ranges are not equal" + if other is None: + return super(RangeOperators.comparator_factory, self).__ne__( + other + ) + else: + return self.expr.op("<>", is_comparison=True)(other) + + def contains(self, other, **kw): + """Boolean expression. Returns true if the right hand operand, + which can be an element or a range, is contained within the + column. + + kwargs may be ignored by this operator but are required for API + conformance. + """ + return self.expr.op("@>", is_comparison=True)(other) + + def contained_by(self, other): + """Boolean expression. Returns true if the column is contained + within the right hand operand. + """ + return self.expr.op("<@", is_comparison=True)(other) + + def overlaps(self, other): + """Boolean expression. Returns true if the column overlaps + (has points in common with) the right hand operand. + """ + return self.expr.op("&&", is_comparison=True)(other) + + def strictly_left_of(self, other): + """Boolean expression. Returns true if the column is strictly + left of the right hand operand. + """ + return self.expr.op("<<", is_comparison=True)(other) + + __lshift__ = strictly_left_of + + def strictly_right_of(self, other): + """Boolean expression. Returns true if the column is strictly + right of the right hand operand. + """ + return self.expr.op(">>", is_comparison=True)(other) + + __rshift__ = strictly_right_of + + def not_extend_right_of(self, other): + """Boolean expression. Returns true if the range in the column + does not extend right of the range in the operand. + """ + return self.expr.op("&<", is_comparison=True)(other) + + def not_extend_left_of(self, other): + """Boolean expression. Returns true if the range in the column + does not extend left of the range in the operand. + """ + return self.expr.op("&>", is_comparison=True)(other) + + def adjacent_to(self, other): + """Boolean expression. Returns true if the range in the column + is adjacent to the range in the operand. + """ + return self.expr.op("-|-", is_comparison=True)(other) + + def __add__(self, other): + """Range expression. Returns the union of the two ranges. + Will raise an exception if the resulting range is not + contiguous. + """ + return self.expr.op("+")(other) + + +class INT4RANGE(RangeOperators, sqltypes.TypeEngine): + """Represent the PostgreSQL INT4RANGE type.""" + + __visit_name__ = "INT4RANGE" + + +class INT8RANGE(RangeOperators, sqltypes.TypeEngine): + """Represent the PostgreSQL INT8RANGE type.""" + + __visit_name__ = "INT8RANGE" + + +class NUMRANGE(RangeOperators, sqltypes.TypeEngine): + """Represent the PostgreSQL NUMRANGE type.""" + + __visit_name__ = "NUMRANGE" + + +class DATERANGE(RangeOperators, sqltypes.TypeEngine): + """Represent the PostgreSQL DATERANGE type.""" + + __visit_name__ = "DATERANGE" + + +class TSRANGE(RangeOperators, sqltypes.TypeEngine): + """Represent the PostgreSQL TSRANGE type.""" + + __visit_name__ = "TSRANGE" + + +class TSTZRANGE(RangeOperators, sqltypes.TypeEngine): + """Represent the PostgreSQL TSTZRANGE type.""" + + __visit_name__ = "TSTZRANGE" -- cgit v1.2.3