summaryrefslogtreecommitdiffstats
path: root/lib/sqlalchemy/dialects/postgresql
diff options
context:
space:
mode:
authorxiubuzhe <xiubuzhe@sina.com>2023-10-08 20:59:00 +0800
committerxiubuzhe <xiubuzhe@sina.com>2023-10-08 20:59:00 +0800
commit1dac2263372df2b85db5d029a45721fa158a5c9d (patch)
tree0365f9c57df04178a726d7584ca6a6b955a7ce6a /lib/sqlalchemy/dialects/postgresql
parentb494be364bb39e1de128ada7dc576a729d99907e (diff)
downloadsunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.tar.gz
sunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.tar.bz2
sunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.zip
first add files
Diffstat (limited to 'lib/sqlalchemy/dialects/postgresql')
-rw-r--r--lib/sqlalchemy/dialects/postgresql/__init__.py117
-rw-r--r--lib/sqlalchemy/dialects/postgresql/array.py413
-rw-r--r--lib/sqlalchemy/dialects/postgresql/asyncpg.py1112
-rw-r--r--lib/sqlalchemy/dialects/postgresql/base.py4651
-rw-r--r--lib/sqlalchemy/dialects/postgresql/dml.py274
-rw-r--r--lib/sqlalchemy/dialects/postgresql/ext.py277
-rw-r--r--lib/sqlalchemy/dialects/postgresql/hstore.py455
-rw-r--r--lib/sqlalchemy/dialects/postgresql/json.py327
-rw-r--r--lib/sqlalchemy/dialects/postgresql/pg8000.py594
-rw-r--r--lib/sqlalchemy/dialects/postgresql/provision.py124
-rw-r--r--lib/sqlalchemy/dialects/postgresql/psycopg2.py1088
-rw-r--r--lib/sqlalchemy/dialects/postgresql/psycopg2cffi.py60
-rw-r--r--lib/sqlalchemy/dialects/postgresql/pygresql.py278
-rw-r--r--lib/sqlalchemy/dialects/postgresql/pypostgresql.py126
-rw-r--r--lib/sqlalchemy/dialects/postgresql/ranges.py138
15 files changed, 10034 insertions, 0 deletions
diff --git a/lib/sqlalchemy/dialects/postgresql/__init__.py b/lib/sqlalchemy/dialects/postgresql/__init__.py
new file mode 100644
index 0000000..12d9e94
--- /dev/null
+++ b/lib/sqlalchemy/dialects/postgresql/__init__.py
@@ -0,0 +1,117 @@
+# postgresql/__init__.py
+# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+from . import base
+from . import pg8000 # noqa
+from . import psycopg2 # noqa
+from . import psycopg2cffi # noqa
+from . import pygresql # noqa
+from . import pypostgresql # noqa
+from .array import All
+from .array import Any
+from .array import ARRAY
+from .array import array
+from .base import BIGINT
+from .base import BIT
+from .base import BOOLEAN
+from .base import BYTEA
+from .base import CHAR
+from .base import CIDR
+from .base import CreateEnumType
+from .base import DATE
+from .base import DOUBLE_PRECISION
+from .base import DropEnumType
+from .base import ENUM
+from .base import FLOAT
+from .base import INET
+from .base import INTEGER
+from .base import INTERVAL
+from .base import MACADDR
+from .base import MONEY
+from .base import NUMERIC
+from .base import OID
+from .base import REAL
+from .base import REGCLASS
+from .base import SMALLINT
+from .base import TEXT
+from .base import TIME
+from .base import TIMESTAMP
+from .base import TSVECTOR
+from .base import UUID
+from .base import VARCHAR
+from .dml import Insert
+from .dml import insert
+from .ext import aggregate_order_by
+from .ext import array_agg
+from .ext import ExcludeConstraint
+from .hstore import HSTORE
+from .hstore import hstore
+from .json import JSON
+from .json import JSONB
+from .ranges import DATERANGE
+from .ranges import INT4RANGE
+from .ranges import INT8RANGE
+from .ranges import NUMRANGE
+from .ranges import TSRANGE
+from .ranges import TSTZRANGE
+from ...util import compat
+
+if compat.py3k:
+ from . import asyncpg # noqa
+
+base.dialect = dialect = psycopg2.dialect
+
+
+__all__ = (
+ "INTEGER",
+ "BIGINT",
+ "SMALLINT",
+ "VARCHAR",
+ "CHAR",
+ "TEXT",
+ "NUMERIC",
+ "FLOAT",
+ "REAL",
+ "INET",
+ "CIDR",
+ "UUID",
+ "BIT",
+ "MACADDR",
+ "MONEY",
+ "OID",
+ "REGCLASS",
+ "DOUBLE_PRECISION",
+ "TIMESTAMP",
+ "TIME",
+ "DATE",
+ "BYTEA",
+ "BOOLEAN",
+ "INTERVAL",
+ "ARRAY",
+ "ENUM",
+ "dialect",
+ "array",
+ "HSTORE",
+ "hstore",
+ "INT4RANGE",
+ "INT8RANGE",
+ "NUMRANGE",
+ "DATERANGE",
+ "TSVECTOR",
+ "TSRANGE",
+ "TSTZRANGE",
+ "JSON",
+ "JSONB",
+ "Any",
+ "All",
+ "DropEnumType",
+ "CreateEnumType",
+ "ExcludeConstraint",
+ "aggregate_order_by",
+ "array_agg",
+ "insert",
+ "Insert",
+)
diff --git a/lib/sqlalchemy/dialects/postgresql/array.py b/lib/sqlalchemy/dialects/postgresql/array.py
new file mode 100644
index 0000000..daf7c5d
--- /dev/null
+++ b/lib/sqlalchemy/dialects/postgresql/array.py
@@ -0,0 +1,413 @@
+# postgresql/array.py
+# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+
+import re
+
+from ... import types as sqltypes
+from ... import util
+from ...sql import coercions
+from ...sql import expression
+from ...sql import operators
+from ...sql import roles
+
+
+def Any(other, arrexpr, operator=operators.eq):
+ """A synonym for the ARRAY-level :meth:`.ARRAY.Comparator.any` method.
+ See that method for details.
+
+ """
+
+ return arrexpr.any(other, operator)
+
+
+def All(other, arrexpr, operator=operators.eq):
+ """A synonym for the ARRAY-level :meth:`.ARRAY.Comparator.all` method.
+ See that method for details.
+
+ """
+
+ return arrexpr.all(other, operator)
+
+
+class array(expression.ClauseList, expression.ColumnElement):
+
+ """A PostgreSQL ARRAY literal.
+
+ This is used to produce ARRAY literals in SQL expressions, e.g.::
+
+ from sqlalchemy.dialects.postgresql import array
+ from sqlalchemy.dialects import postgresql
+ from sqlalchemy import select, func
+
+ stmt = select(array([1,2]) + array([3,4,5]))
+
+ print(stmt.compile(dialect=postgresql.dialect()))
+
+ Produces the SQL::
+
+ SELECT ARRAY[%(param_1)s, %(param_2)s] ||
+ ARRAY[%(param_3)s, %(param_4)s, %(param_5)s]) AS anon_1
+
+ An instance of :class:`.array` will always have the datatype
+ :class:`_types.ARRAY`. The "inner" type of the array is inferred from
+ the values present, unless the ``type_`` keyword argument is passed::
+
+ array(['foo', 'bar'], type_=CHAR)
+
+ Multidimensional arrays are produced by nesting :class:`.array` constructs.
+ The dimensionality of the final :class:`_types.ARRAY`
+ type is calculated by
+ recursively adding the dimensions of the inner :class:`_types.ARRAY`
+ type::
+
+ stmt = select(
+ array([
+ array([1, 2]), array([3, 4]), array([column('q'), column('x')])
+ ])
+ )
+ print(stmt.compile(dialect=postgresql.dialect()))
+
+ Produces::
+
+ SELECT ARRAY[ARRAY[%(param_1)s, %(param_2)s],
+ ARRAY[%(param_3)s, %(param_4)s], ARRAY[q, x]] AS anon_1
+
+ .. versionadded:: 1.3.6 added support for multidimensional array literals
+
+ .. seealso::
+
+ :class:`_postgresql.ARRAY`
+
+ """
+
+ __visit_name__ = "array"
+
+ stringify_dialect = "postgresql"
+ inherit_cache = True
+
+ def __init__(self, clauses, **kw):
+ clauses = [
+ coercions.expect(roles.ExpressionElementRole, c) for c in clauses
+ ]
+
+ super(array, self).__init__(*clauses, **kw)
+
+ self._type_tuple = [arg.type for arg in clauses]
+ main_type = kw.pop(
+ "type_",
+ self._type_tuple[0] if self._type_tuple else sqltypes.NULLTYPE,
+ )
+
+ if isinstance(main_type, ARRAY):
+ self.type = ARRAY(
+ main_type.item_type,
+ dimensions=main_type.dimensions + 1
+ if main_type.dimensions is not None
+ else 2,
+ )
+ else:
+ self.type = ARRAY(main_type)
+
+ @property
+ def _select_iterable(self):
+ return (self,)
+
+ def _bind_param(self, operator, obj, _assume_scalar=False, type_=None):
+ if _assume_scalar or operator is operators.getitem:
+ return expression.BindParameter(
+ None,
+ obj,
+ _compared_to_operator=operator,
+ type_=type_,
+ _compared_to_type=self.type,
+ unique=True,
+ )
+
+ else:
+ return array(
+ [
+ self._bind_param(
+ operator, o, _assume_scalar=True, type_=type_
+ )
+ for o in obj
+ ]
+ )
+
+ def self_group(self, against=None):
+ if against in (operators.any_op, operators.all_op, operators.getitem):
+ return expression.Grouping(self)
+ else:
+ return self
+
+
+CONTAINS = operators.custom_op("@>", precedence=5, is_comparison=True)
+
+CONTAINED_BY = operators.custom_op("<@", precedence=5, is_comparison=True)
+
+OVERLAP = operators.custom_op("&&", precedence=5, is_comparison=True)
+
+
+class ARRAY(sqltypes.ARRAY):
+
+ """PostgreSQL ARRAY type.
+
+ .. versionchanged:: 1.1 The :class:`_postgresql.ARRAY` type is now
+ a subclass of the core :class:`_types.ARRAY` type.
+
+ The :class:`_postgresql.ARRAY` type is constructed in the same way
+ as the core :class:`_types.ARRAY` type; a member type is required, and a
+ number of dimensions is recommended if the type is to be used for more
+ than one dimension::
+
+ from sqlalchemy.dialects import postgresql
+
+ mytable = Table("mytable", metadata,
+ Column("data", postgresql.ARRAY(Integer, dimensions=2))
+ )
+
+ The :class:`_postgresql.ARRAY` type provides all operations defined on the
+ core :class:`_types.ARRAY` type, including support for "dimensions",
+ indexed access, and simple matching such as
+ :meth:`.types.ARRAY.Comparator.any` and
+ :meth:`.types.ARRAY.Comparator.all`. :class:`_postgresql.ARRAY`
+ class also
+ provides PostgreSQL-specific methods for containment operations, including
+ :meth:`.postgresql.ARRAY.Comparator.contains`
+ :meth:`.postgresql.ARRAY.Comparator.contained_by`, and
+ :meth:`.postgresql.ARRAY.Comparator.overlap`, e.g.::
+
+ mytable.c.data.contains([1, 2])
+
+ The :class:`_postgresql.ARRAY` type may not be supported on all
+ PostgreSQL DBAPIs; it is currently known to work on psycopg2 only.
+
+ Additionally, the :class:`_postgresql.ARRAY`
+ type does not work directly in
+ conjunction with the :class:`.ENUM` type. For a workaround, see the
+ special type at :ref:`postgresql_array_of_enum`.
+
+ .. seealso::
+
+ :class:`_types.ARRAY` - base array type
+
+ :class:`_postgresql.array` - produces a literal array value.
+
+ """
+
+ class Comparator(sqltypes.ARRAY.Comparator):
+
+ """Define comparison operations for :class:`_types.ARRAY`.
+
+ Note that these operations are in addition to those provided
+ by the base :class:`.types.ARRAY.Comparator` class, including
+ :meth:`.types.ARRAY.Comparator.any` and
+ :meth:`.types.ARRAY.Comparator.all`.
+
+ """
+
+ def contains(self, other, **kwargs):
+ """Boolean expression. Test if elements are a superset of the
+ elements of the argument array expression.
+
+ kwargs may be ignored by this operator but are required for API
+ conformance.
+ """
+ return self.operate(CONTAINS, other, result_type=sqltypes.Boolean)
+
+ def contained_by(self, other):
+ """Boolean expression. Test if elements are a proper subset of the
+ elements of the argument array expression.
+ """
+ return self.operate(
+ CONTAINED_BY, other, result_type=sqltypes.Boolean
+ )
+
+ def overlap(self, other):
+ """Boolean expression. Test if array has elements in common with
+ an argument array expression.
+ """
+ return self.operate(OVERLAP, other, result_type=sqltypes.Boolean)
+
+ comparator_factory = Comparator
+
+ def __init__(
+ self, item_type, as_tuple=False, dimensions=None, zero_indexes=False
+ ):
+ """Construct an ARRAY.
+
+ E.g.::
+
+ Column('myarray', ARRAY(Integer))
+
+ Arguments are:
+
+ :param item_type: The data type of items of this array. Note that
+ dimensionality is irrelevant here, so multi-dimensional arrays like
+ ``INTEGER[][]``, are constructed as ``ARRAY(Integer)``, not as
+ ``ARRAY(ARRAY(Integer))`` or such.
+
+ :param as_tuple=False: Specify whether return results
+ should be converted to tuples from lists. DBAPIs such
+ as psycopg2 return lists by default. When tuples are
+ returned, the results are hashable.
+
+ :param dimensions: if non-None, the ARRAY will assume a fixed
+ number of dimensions. This will cause the DDL emitted for this
+ ARRAY to include the exact number of bracket clauses ``[]``,
+ and will also optimize the performance of the type overall.
+ Note that PG arrays are always implicitly "non-dimensioned",
+ meaning they can store any number of dimensions no matter how
+ they were declared.
+
+ :param zero_indexes=False: when True, index values will be converted
+ between Python zero-based and PostgreSQL one-based indexes, e.g.
+ a value of one will be added to all index values before passing
+ to the database.
+
+ .. versionadded:: 0.9.5
+
+
+ """
+ if isinstance(item_type, ARRAY):
+ raise ValueError(
+ "Do not nest ARRAY types; ARRAY(basetype) "
+ "handles multi-dimensional arrays of basetype"
+ )
+ if isinstance(item_type, type):
+ item_type = item_type()
+ self.item_type = item_type
+ self.as_tuple = as_tuple
+ self.dimensions = dimensions
+ self.zero_indexes = zero_indexes
+
+ @property
+ def hashable(self):
+ return self.as_tuple
+
+ @property
+ def python_type(self):
+ return list
+
+ def compare_values(self, x, y):
+ return x == y
+
+ def _proc_array(self, arr, itemproc, dim, collection):
+ if dim is None:
+ arr = list(arr)
+ if (
+ dim == 1
+ or dim is None
+ and (
+ # this has to be (list, tuple), or at least
+ # not hasattr('__iter__'), since Py3K strings
+ # etc. have __iter__
+ not arr
+ or not isinstance(arr[0], (list, tuple))
+ )
+ ):
+ if itemproc:
+ return collection(itemproc(x) for x in arr)
+ else:
+ return collection(arr)
+ else:
+ return collection(
+ self._proc_array(
+ x,
+ itemproc,
+ dim - 1 if dim is not None else None,
+ collection,
+ )
+ for x in arr
+ )
+
+ @util.memoized_property
+ def _against_native_enum(self):
+ return (
+ isinstance(self.item_type, sqltypes.Enum)
+ and self.item_type.native_enum
+ )
+
+ def bind_expression(self, bindvalue):
+ return bindvalue
+
+ def bind_processor(self, dialect):
+ item_proc = self.item_type.dialect_impl(dialect).bind_processor(
+ dialect
+ )
+
+ def process(value):
+ if value is None:
+ return value
+ else:
+ return self._proc_array(
+ value, item_proc, self.dimensions, list
+ )
+
+ return process
+
+ def result_processor(self, dialect, coltype):
+ item_proc = self.item_type.dialect_impl(dialect).result_processor(
+ dialect, coltype
+ )
+
+ def process(value):
+ if value is None:
+ return value
+ else:
+ return self._proc_array(
+ value,
+ item_proc,
+ self.dimensions,
+ tuple if self.as_tuple else list,
+ )
+
+ if self._against_native_enum:
+ super_rp = process
+ pattern = re.compile(r"^{(.*)}$")
+
+ def handle_raw_string(value):
+ inner = pattern.match(value).group(1)
+ return _split_enum_values(inner)
+
+ def process(value):
+ if value is None:
+ return value
+ # isinstance(value, util.string_types) is required to handle
+ # the case where a TypeDecorator for and Array of Enum is
+ # used like was required in sa < 1.3.17
+ return super_rp(
+ handle_raw_string(value)
+ if isinstance(value, util.string_types)
+ else value
+ )
+
+ return process
+
+
+def _split_enum_values(array_string):
+
+ if '"' not in array_string:
+ # no escape char is present so it can just split on the comma
+ return array_string.split(",") if array_string else []
+
+ # handles quoted strings from:
+ # r'abc,"quoted","also\\\\quoted", "quoted, comma", "esc \" quot", qpr'
+ # returns
+ # ['abc', 'quoted', 'also\\quoted', 'quoted, comma', 'esc " quot', 'qpr']
+ text = array_string.replace(r"\"", "_$ESC_QUOTE$_")
+ text = text.replace(r"\\", "\\")
+ result = []
+ on_quotes = re.split(r'(")', text)
+ in_quotes = False
+ for tok in on_quotes:
+ if tok == '"':
+ in_quotes = not in_quotes
+ elif in_quotes:
+ result.append(tok.replace("_$ESC_QUOTE$_", '"'))
+ else:
+ result.extend(re.findall(r"([^\s,]+),?", tok))
+ return result
diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
new file mode 100644
index 0000000..305ad46
--- /dev/null
+++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
@@ -0,0 +1,1112 @@
+# postgresql/asyncpg.py
+# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors <see AUTHORS
+# file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+r"""
+.. dialect:: postgresql+asyncpg
+ :name: asyncpg
+ :dbapi: asyncpg
+ :connectstring: postgresql+asyncpg://user:password@host:port/dbname[?key=value&key=value...]
+ :url: https://magicstack.github.io/asyncpg/
+
+The asyncpg dialect is SQLAlchemy's first Python asyncio dialect.
+
+Using a special asyncio mediation layer, the asyncpg dialect is usable
+as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>`
+extension package.
+
+This dialect should normally be used only with the
+:func:`_asyncio.create_async_engine` engine creation function::
+
+ from sqlalchemy.ext.asyncio import create_async_engine
+ engine = create_async_engine("postgresql+asyncpg://user:pass@hostname/dbname")
+
+The dialect can also be run as a "synchronous" dialect within the
+:func:`_sa.create_engine` function, which will pass "await" calls into
+an ad-hoc event loop. This mode of operation is of **limited use**
+and is for special testing scenarios only. The mode can be enabled by
+adding the SQLAlchemy-specific flag ``async_fallback`` to the URL
+in conjunction with :func:`_sa.create_engine`::
+
+ # for testing purposes only; do not use in production!
+ engine = create_engine("postgresql+asyncpg://user:pass@hostname/dbname?async_fallback=true")
+
+
+.. versionadded:: 1.4
+
+.. note::
+
+ By default asyncpg does not decode the ``json`` and ``jsonb`` types and
+ returns them as strings. SQLAlchemy sets default type decoder for ``json``
+ and ``jsonb`` types using the python builtin ``json.loads`` function.
+ The json implementation used can be changed by setting the attribute
+ ``json_deserializer`` when creating the engine with
+ :func:`create_engine` or :func:`create_async_engine`.
+
+
+.. _asyncpg_prepared_statement_cache:
+
+Prepared Statement Cache
+--------------------------
+
+The asyncpg SQLAlchemy dialect makes use of ``asyncpg.connection.prepare()``
+for all statements. The prepared statement objects are cached after
+construction which appears to grant a 10% or more performance improvement for
+statement invocation. The cache is on a per-DBAPI connection basis, which
+means that the primary storage for prepared statements is within DBAPI
+connections pooled within the connection pool. The size of this cache
+defaults to 100 statements per DBAPI connection and may be adjusted using the
+``prepared_statement_cache_size`` DBAPI argument (note that while this argument
+is implemented by SQLAlchemy, it is part of the DBAPI emulation portion of the
+asyncpg dialect, therefore is handled as a DBAPI argument, not a dialect
+argument)::
+
+
+ engine = create_async_engine("postgresql+asyncpg://user:pass@hostname/dbname?prepared_statement_cache_size=500")
+
+To disable the prepared statement cache, use a value of zero::
+
+ engine = create_async_engine("postgresql+asyncpg://user:pass@hostname/dbname?prepared_statement_cache_size=0")
+
+.. versionadded:: 1.4.0b2 Added ``prepared_statement_cache_size`` for asyncpg.
+
+
+.. warning:: The ``asyncpg`` database driver necessarily uses caches for
+ PostgreSQL type OIDs, which become stale when custom PostgreSQL datatypes
+ such as ``ENUM`` objects are changed via DDL operations. Additionally,
+ prepared statements themselves which are optionally cached by SQLAlchemy's
+ driver as described above may also become "stale" when DDL has been emitted
+ to the PostgreSQL database which modifies the tables or other objects
+ involved in a particular prepared statement.
+
+ The SQLAlchemy asyncpg dialect will invalidate these caches within its local
+ process when statements that represent DDL are emitted on a local
+ connection, but this is only controllable within a single Python process /
+ database engine. If DDL changes are made from other database engines
+ and/or processes, a running application may encounter asyncpg exceptions
+ ``InvalidCachedStatementError`` and/or ``InternalServerError("cache lookup
+ failed for type <oid>")`` if it refers to pooled database connections which
+ operated upon the previous structures. The SQLAlchemy asyncpg dialect will
+ recover from these error cases when the driver raises these exceptions by
+ clearing its internal caches as well as those of the asyncpg driver in
+ response to them, but cannot prevent them from being raised in the first
+ place if the cached prepared statement or asyncpg type caches have gone
+ stale, nor can it retry the statement as the PostgreSQL transaction is
+ invalidated when these errors occur.
+
+Disabling the PostgreSQL JIT to improve ENUM datatype handling
+---------------------------------------------------------------
+
+Asyncpg has an `issue <https://github.com/MagicStack/asyncpg/issues/727>`_ when
+using PostgreSQL ENUM datatypes, where upon the creation of new database
+connections, an expensive query may be emitted in order to retrieve metadata
+regarding custom types which has been shown to negatively affect performance.
+To mitigate this issue, the PostgreSQL "jit" setting may be disabled from the
+client using this setting passed to :func:`_asyncio.create_async_engine`::
+
+ engine = create_async_engine(
+ "postgresql+asyncpg://user:password@localhost/tmp",
+ connect_args={"server_settings": {"jit": "off"}},
+ )
+
+.. seealso::
+
+ https://github.com/MagicStack/asyncpg/issues/727
+
+""" # noqa
+
+import collections
+import decimal
+import json as _py_json
+import re
+import time
+
+from . import json
+from .base import _DECIMAL_TYPES
+from .base import _FLOAT_TYPES
+from .base import _INT_TYPES
+from .base import ENUM
+from .base import INTERVAL
+from .base import OID
+from .base import PGCompiler
+from .base import PGDialect
+from .base import PGExecutionContext
+from .base import PGIdentifierPreparer
+from .base import REGCLASS
+from .base import UUID
+from ... import exc
+from ... import pool
+from ... import processors
+from ... import util
+from ...engine import AdaptedConnection
+from ...sql import sqltypes
+from ...util.concurrency import asyncio
+from ...util.concurrency import await_fallback
+from ...util.concurrency import await_only
+
+
+try:
+ from uuid import UUID as _python_UUID # noqa
+except ImportError:
+ _python_UUID = None
+
+
+class AsyncpgTime(sqltypes.Time):
+ def get_dbapi_type(self, dbapi):
+ if self.timezone:
+ return dbapi.TIME_W_TZ
+ else:
+ return dbapi.TIME
+
+
+class AsyncpgDate(sqltypes.Date):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.DATE
+
+
+class AsyncpgDateTime(sqltypes.DateTime):
+ def get_dbapi_type(self, dbapi):
+ if self.timezone:
+ return dbapi.TIMESTAMP_W_TZ
+ else:
+ return dbapi.TIMESTAMP
+
+
+class AsyncpgBoolean(sqltypes.Boolean):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.BOOLEAN
+
+
+class AsyncPgInterval(INTERVAL):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.INTERVAL
+
+ @classmethod
+ def adapt_emulated_to_native(cls, interval, **kw):
+
+ return AsyncPgInterval(precision=interval.second_precision)
+
+
+class AsyncPgEnum(ENUM):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.ENUM
+
+
+class AsyncpgInteger(sqltypes.Integer):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.INTEGER
+
+
+class AsyncpgBigInteger(sqltypes.BigInteger):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.BIGINTEGER
+
+
+class AsyncpgJSON(json.JSON):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.JSON
+
+ def result_processor(self, dialect, coltype):
+ return None
+
+
+class AsyncpgJSONB(json.JSONB):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.JSONB
+
+ def result_processor(self, dialect, coltype):
+ return None
+
+
+class AsyncpgJSONIndexType(sqltypes.JSON.JSONIndexType):
+ def get_dbapi_type(self, dbapi):
+ raise NotImplementedError("should not be here")
+
+
+class AsyncpgJSONIntIndexType(sqltypes.JSON.JSONIntIndexType):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.INTEGER
+
+
+class AsyncpgJSONStrIndexType(sqltypes.JSON.JSONStrIndexType):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.STRING
+
+
+class AsyncpgJSONPathType(json.JSONPathType):
+ def bind_processor(self, dialect):
+ def process(value):
+ assert isinstance(value, util.collections_abc.Sequence)
+ tokens = [util.text_type(elem) for elem in value]
+ return tokens
+
+ return process
+
+
+class AsyncpgUUID(UUID):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.UUID
+
+ def bind_processor(self, dialect):
+ if not self.as_uuid and dialect.use_native_uuid:
+
+ def process(value):
+ if value is not None:
+ value = _python_UUID(value)
+ return value
+
+ return process
+
+ def result_processor(self, dialect, coltype):
+ if not self.as_uuid and dialect.use_native_uuid:
+
+ def process(value):
+ if value is not None:
+ value = str(value)
+ return value
+
+ return process
+
+
+class AsyncpgNumeric(sqltypes.Numeric):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.NUMBER
+
+ def bind_processor(self, dialect):
+ return None
+
+ def result_processor(self, dialect, coltype):
+ if self.asdecimal:
+ if coltype in _FLOAT_TYPES:
+ return processors.to_decimal_processor_factory(
+ decimal.Decimal, self._effective_decimal_return_scale
+ )
+ elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
+ # pg8000 returns Decimal natively for 1700
+ return None
+ else:
+ raise exc.InvalidRequestError(
+ "Unknown PG numeric type: %d" % coltype
+ )
+ else:
+ if coltype in _FLOAT_TYPES:
+ # pg8000 returns float natively for 701
+ return None
+ elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
+ return processors.to_float
+ else:
+ raise exc.InvalidRequestError(
+ "Unknown PG numeric type: %d" % coltype
+ )
+
+
+class AsyncpgFloat(AsyncpgNumeric):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.FLOAT
+
+
+class AsyncpgREGCLASS(REGCLASS):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.STRING
+
+
+class AsyncpgOID(OID):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.INTEGER
+
+
+class PGExecutionContext_asyncpg(PGExecutionContext):
+ def handle_dbapi_exception(self, e):
+ if isinstance(
+ e,
+ (
+ self.dialect.dbapi.InvalidCachedStatementError,
+ self.dialect.dbapi.InternalServerError,
+ ),
+ ):
+ self.dialect._invalidate_schema_cache()
+
+ def pre_exec(self):
+ if self.isddl:
+ self.dialect._invalidate_schema_cache()
+
+ self.cursor._invalidate_schema_cache_asof = (
+ self.dialect._invalidate_schema_cache_asof
+ )
+
+ if not self.compiled:
+ return
+
+ # we have to exclude ENUM because "enum" not really a "type"
+ # we can cast to, it has to be the name of the type itself.
+ # for now we just omit it from casting
+ self.exclude_set_input_sizes = {AsyncAdapt_asyncpg_dbapi.ENUM}
+
+ def create_server_side_cursor(self):
+ return self._dbapi_connection.cursor(server_side=True)
+
+
+class PGCompiler_asyncpg(PGCompiler):
+ pass
+
+
+class PGIdentifierPreparer_asyncpg(PGIdentifierPreparer):
+ pass
+
+
+class AsyncAdapt_asyncpg_cursor:
+ __slots__ = (
+ "_adapt_connection",
+ "_connection",
+ "_rows",
+ "description",
+ "arraysize",
+ "rowcount",
+ "_inputsizes",
+ "_cursor",
+ "_invalidate_schema_cache_asof",
+ )
+
+ server_side = False
+
+ def __init__(self, adapt_connection):
+ self._adapt_connection = adapt_connection
+ self._connection = adapt_connection._connection
+ self._rows = []
+ self._cursor = None
+ self.description = None
+ self.arraysize = 1
+ self.rowcount = -1
+ self._inputsizes = None
+ self._invalidate_schema_cache_asof = 0
+
+ def close(self):
+ self._rows[:] = []
+
+ def _handle_exception(self, error):
+ self._adapt_connection._handle_exception(error)
+
+ def _parameter_placeholders(self, params):
+ if not self._inputsizes:
+ return tuple("$%d" % idx for idx, _ in enumerate(params, 1))
+ else:
+ return tuple(
+ "$%d::%s" % (idx, typ) if typ else "$%d" % idx
+ for idx, typ in enumerate(
+ (_pg_types.get(typ) for typ in self._inputsizes), 1
+ )
+ )
+
+ async def _prepare_and_execute(self, operation, parameters):
+ adapt_connection = self._adapt_connection
+
+ async with adapt_connection._execute_mutex:
+
+ if not adapt_connection._started:
+ await adapt_connection._start_transaction()
+
+ if parameters is not None:
+ operation = operation % self._parameter_placeholders(
+ parameters
+ )
+ else:
+ parameters = ()
+
+ try:
+ prepared_stmt, attributes = await adapt_connection._prepare(
+ operation, self._invalidate_schema_cache_asof
+ )
+
+ if attributes:
+ self.description = [
+ (
+ attr.name,
+ attr.type.oid,
+ None,
+ None,
+ None,
+ None,
+ None,
+ )
+ for attr in attributes
+ ]
+ else:
+ self.description = None
+
+ if self.server_side:
+ self._cursor = await prepared_stmt.cursor(*parameters)
+ self.rowcount = -1
+ else:
+ self._rows = await prepared_stmt.fetch(*parameters)
+ status = prepared_stmt.get_statusmsg()
+
+ reg = re.match(
+ r"(?:UPDATE|DELETE|INSERT \d+) (\d+)", status
+ )
+ if reg:
+ self.rowcount = int(reg.group(1))
+ else:
+ self.rowcount = -1
+
+ except Exception as error:
+ self._handle_exception(error)
+
+ async def _executemany(self, operation, seq_of_parameters):
+ adapt_connection = self._adapt_connection
+
+ async with adapt_connection._execute_mutex:
+ await adapt_connection._check_type_cache_invalidation(
+ self._invalidate_schema_cache_asof
+ )
+
+ if not adapt_connection._started:
+ await adapt_connection._start_transaction()
+
+ operation = operation % self._parameter_placeholders(
+ seq_of_parameters[0]
+ )
+
+ try:
+ return await self._connection.executemany(
+ operation, seq_of_parameters
+ )
+ except Exception as error:
+ self._handle_exception(error)
+
+ def execute(self, operation, parameters=None):
+ self._adapt_connection.await_(
+ self._prepare_and_execute(operation, parameters)
+ )
+
+ def executemany(self, operation, seq_of_parameters):
+ return self._adapt_connection.await_(
+ self._executemany(operation, seq_of_parameters)
+ )
+
+ def setinputsizes(self, *inputsizes):
+ self._inputsizes = inputsizes
+
+ def __iter__(self):
+ while self._rows:
+ yield self._rows.pop(0)
+
+ def fetchone(self):
+ if self._rows:
+ return self._rows.pop(0)
+ else:
+ return None
+
+ def fetchmany(self, size=None):
+ if size is None:
+ size = self.arraysize
+
+ retval = self._rows[0:size]
+ self._rows[:] = self._rows[size:]
+ return retval
+
+ def fetchall(self):
+ retval = self._rows[:]
+ self._rows[:] = []
+ return retval
+
+
+class AsyncAdapt_asyncpg_ss_cursor(AsyncAdapt_asyncpg_cursor):
+
+ server_side = True
+ __slots__ = ("_rowbuffer",)
+
+ def __init__(self, adapt_connection):
+ super(AsyncAdapt_asyncpg_ss_cursor, self).__init__(adapt_connection)
+ self._rowbuffer = None
+
+ def close(self):
+ self._cursor = None
+ self._rowbuffer = None
+
+ def _buffer_rows(self):
+ new_rows = self._adapt_connection.await_(self._cursor.fetch(50))
+ self._rowbuffer = collections.deque(new_rows)
+
+ def __aiter__(self):
+ return self
+
+ async def __anext__(self):
+ if not self._rowbuffer:
+ self._buffer_rows()
+
+ while True:
+ while self._rowbuffer:
+ yield self._rowbuffer.popleft()
+
+ self._buffer_rows()
+ if not self._rowbuffer:
+ break
+
+ def fetchone(self):
+ if not self._rowbuffer:
+ self._buffer_rows()
+ if not self._rowbuffer:
+ return None
+ return self._rowbuffer.popleft()
+
+ def fetchmany(self, size=None):
+ if size is None:
+ return self.fetchall()
+
+ if not self._rowbuffer:
+ self._buffer_rows()
+
+ buf = list(self._rowbuffer)
+ lb = len(buf)
+ if size > lb:
+ buf.extend(
+ self._adapt_connection.await_(self._cursor.fetch(size - lb))
+ )
+
+ result = buf[0:size]
+ self._rowbuffer = collections.deque(buf[size:])
+ return result
+
+ def fetchall(self):
+ ret = list(self._rowbuffer) + list(
+ self._adapt_connection.await_(self._all())
+ )
+ self._rowbuffer.clear()
+ return ret
+
+ async def _all(self):
+ rows = []
+
+ # TODO: looks like we have to hand-roll some kind of batching here.
+ # hardcoding for the moment but this should be improved.
+ while True:
+ batch = await self._cursor.fetch(1000)
+ if batch:
+ rows.extend(batch)
+ continue
+ else:
+ break
+ return rows
+
+ def executemany(self, operation, seq_of_parameters):
+ raise NotImplementedError(
+ "server side cursor doesn't support executemany yet"
+ )
+
+
+class AsyncAdapt_asyncpg_connection(AdaptedConnection):
+ __slots__ = (
+ "dbapi",
+ "_connection",
+ "isolation_level",
+ "_isolation_setting",
+ "readonly",
+ "deferrable",
+ "_transaction",
+ "_started",
+ "_prepared_statement_cache",
+ "_invalidate_schema_cache_asof",
+ "_execute_mutex",
+ )
+
+ await_ = staticmethod(await_only)
+
+ def __init__(self, dbapi, connection, prepared_statement_cache_size=100):
+ self.dbapi = dbapi
+ self._connection = connection
+ self.isolation_level = self._isolation_setting = "read_committed"
+ self.readonly = False
+ self.deferrable = False
+ self._transaction = None
+ self._started = False
+ self._invalidate_schema_cache_asof = time.time()
+ self._execute_mutex = asyncio.Lock()
+
+ if prepared_statement_cache_size:
+ self._prepared_statement_cache = util.LRUCache(
+ prepared_statement_cache_size
+ )
+ else:
+ self._prepared_statement_cache = None
+
+ async def _check_type_cache_invalidation(self, invalidate_timestamp):
+ if invalidate_timestamp > self._invalidate_schema_cache_asof:
+ await self._connection.reload_schema_state()
+ self._invalidate_schema_cache_asof = invalidate_timestamp
+
+ async def _prepare(self, operation, invalidate_timestamp):
+ await self._check_type_cache_invalidation(invalidate_timestamp)
+
+ cache = self._prepared_statement_cache
+ if cache is None:
+ prepared_stmt = await self._connection.prepare(operation)
+ attributes = prepared_stmt.get_attributes()
+ return prepared_stmt, attributes
+
+ # asyncpg uses a type cache for the "attributes" which seems to go
+ # stale independently of the PreparedStatement itself, so place that
+ # collection in the cache as well.
+ if operation in cache:
+ prepared_stmt, attributes, cached_timestamp = cache[operation]
+
+ # preparedstatements themselves also go stale for certain DDL
+ # changes such as size of a VARCHAR changing, so there is also
+ # a cross-connection invalidation timestamp
+ if cached_timestamp > invalidate_timestamp:
+ return prepared_stmt, attributes
+
+ prepared_stmt = await self._connection.prepare(operation)
+ attributes = prepared_stmt.get_attributes()
+ cache[operation] = (prepared_stmt, attributes, time.time())
+
+ return prepared_stmt, attributes
+
+ def _handle_exception(self, error):
+ if self._connection.is_closed():
+ self._transaction = None
+ self._started = False
+
+ if not isinstance(error, AsyncAdapt_asyncpg_dbapi.Error):
+ exception_mapping = self.dbapi._asyncpg_error_translate
+
+ for super_ in type(error).__mro__:
+ if super_ in exception_mapping:
+ translated_error = exception_mapping[super_](
+ "%s: %s" % (type(error), error)
+ )
+ translated_error.pgcode = (
+ translated_error.sqlstate
+ ) = getattr(error, "sqlstate", None)
+ raise translated_error from error
+ else:
+ raise error
+ else:
+ raise error
+
+ @property
+ def autocommit(self):
+ return self.isolation_level == "autocommit"
+
+ @autocommit.setter
+ def autocommit(self, value):
+ if value:
+ self.isolation_level = "autocommit"
+ else:
+ self.isolation_level = self._isolation_setting
+
+ def set_isolation_level(self, level):
+ if self._started:
+ self.rollback()
+ self.isolation_level = self._isolation_setting = level
+
+ async def _start_transaction(self):
+ if self.isolation_level == "autocommit":
+ return
+
+ try:
+ self._transaction = self._connection.transaction(
+ isolation=self.isolation_level,
+ readonly=self.readonly,
+ deferrable=self.deferrable,
+ )
+ await self._transaction.start()
+ except Exception as error:
+ self._handle_exception(error)
+ else:
+ self._started = True
+
+ def cursor(self, server_side=False):
+ if server_side:
+ return AsyncAdapt_asyncpg_ss_cursor(self)
+ else:
+ return AsyncAdapt_asyncpg_cursor(self)
+
+ def rollback(self):
+ if self._started:
+ try:
+ self.await_(self._transaction.rollback())
+ except Exception as error:
+ self._handle_exception(error)
+ finally:
+ self._transaction = None
+ self._started = False
+
+ def commit(self):
+ if self._started:
+ try:
+ self.await_(self._transaction.commit())
+ except Exception as error:
+ self._handle_exception(error)
+ finally:
+ self._transaction = None
+ self._started = False
+
+ def close(self):
+ self.rollback()
+
+ self.await_(self._connection.close())
+
+
+class AsyncAdaptFallback_asyncpg_connection(AsyncAdapt_asyncpg_connection):
+ __slots__ = ()
+
+ await_ = staticmethod(await_fallback)
+
+
+class AsyncAdapt_asyncpg_dbapi:
+ def __init__(self, asyncpg):
+ self.asyncpg = asyncpg
+ self.paramstyle = "format"
+
+ def connect(self, *arg, **kw):
+ async_fallback = kw.pop("async_fallback", False)
+ prepared_statement_cache_size = kw.pop(
+ "prepared_statement_cache_size", 100
+ )
+ if util.asbool(async_fallback):
+ return AsyncAdaptFallback_asyncpg_connection(
+ self,
+ await_fallback(self.asyncpg.connect(*arg, **kw)),
+ prepared_statement_cache_size=prepared_statement_cache_size,
+ )
+ else:
+ return AsyncAdapt_asyncpg_connection(
+ self,
+ await_only(self.asyncpg.connect(*arg, **kw)),
+ prepared_statement_cache_size=prepared_statement_cache_size,
+ )
+
+ class Error(Exception):
+ pass
+
+ class Warning(Exception): # noqa
+ pass
+
+ class InterfaceError(Error):
+ pass
+
+ class DatabaseError(Error):
+ pass
+
+ class InternalError(DatabaseError):
+ pass
+
+ class OperationalError(DatabaseError):
+ pass
+
+ class ProgrammingError(DatabaseError):
+ pass
+
+ class IntegrityError(DatabaseError):
+ pass
+
+ class DataError(DatabaseError):
+ pass
+
+ class NotSupportedError(DatabaseError):
+ pass
+
+ class InternalServerError(InternalError):
+ pass
+
+ class InvalidCachedStatementError(NotSupportedError):
+ def __init__(self, message):
+ super(
+ AsyncAdapt_asyncpg_dbapi.InvalidCachedStatementError, self
+ ).__init__(
+ message + " (SQLAlchemy asyncpg dialect will now invalidate "
+ "all prepared caches in response to this exception)",
+ )
+
+ @util.memoized_property
+ def _asyncpg_error_translate(self):
+ import asyncpg
+
+ return {
+ asyncpg.exceptions.IntegrityConstraintViolationError: self.IntegrityError, # noqa: E501
+ asyncpg.exceptions.PostgresError: self.Error,
+ asyncpg.exceptions.SyntaxOrAccessError: self.ProgrammingError,
+ asyncpg.exceptions.InterfaceError: self.InterfaceError,
+ asyncpg.exceptions.InvalidCachedStatementError: self.InvalidCachedStatementError, # noqa: E501
+ asyncpg.exceptions.InternalServerError: self.InternalServerError,
+ }
+
+ def Binary(self, value):
+ return value
+
+ STRING = util.symbol("STRING")
+ TIMESTAMP = util.symbol("TIMESTAMP")
+ TIMESTAMP_W_TZ = util.symbol("TIMESTAMP_W_TZ")
+ TIME = util.symbol("TIME")
+ TIME_W_TZ = util.symbol("TIME_W_TZ")
+ DATE = util.symbol("DATE")
+ INTERVAL = util.symbol("INTERVAL")
+ NUMBER = util.symbol("NUMBER")
+ FLOAT = util.symbol("FLOAT")
+ BOOLEAN = util.symbol("BOOLEAN")
+ INTEGER = util.symbol("INTEGER")
+ BIGINTEGER = util.symbol("BIGINTEGER")
+ BYTES = util.symbol("BYTES")
+ DECIMAL = util.symbol("DECIMAL")
+ JSON = util.symbol("JSON")
+ JSONB = util.symbol("JSONB")
+ ENUM = util.symbol("ENUM")
+ UUID = util.symbol("UUID")
+ BYTEA = util.symbol("BYTEA")
+
+ DATETIME = TIMESTAMP
+ BINARY = BYTEA
+
+
+_pg_types = {
+ AsyncAdapt_asyncpg_dbapi.STRING: "varchar",
+ AsyncAdapt_asyncpg_dbapi.TIMESTAMP: "timestamp",
+ AsyncAdapt_asyncpg_dbapi.TIMESTAMP_W_TZ: "timestamp with time zone",
+ AsyncAdapt_asyncpg_dbapi.DATE: "date",
+ AsyncAdapt_asyncpg_dbapi.TIME: "time",
+ AsyncAdapt_asyncpg_dbapi.TIME_W_TZ: "time with time zone",
+ AsyncAdapt_asyncpg_dbapi.INTERVAL: "interval",
+ AsyncAdapt_asyncpg_dbapi.NUMBER: "numeric",
+ AsyncAdapt_asyncpg_dbapi.FLOAT: "float",
+ AsyncAdapt_asyncpg_dbapi.BOOLEAN: "bool",
+ AsyncAdapt_asyncpg_dbapi.INTEGER: "integer",
+ AsyncAdapt_asyncpg_dbapi.BIGINTEGER: "bigint",
+ AsyncAdapt_asyncpg_dbapi.BYTES: "bytes",
+ AsyncAdapt_asyncpg_dbapi.DECIMAL: "decimal",
+ AsyncAdapt_asyncpg_dbapi.JSON: "json",
+ AsyncAdapt_asyncpg_dbapi.JSONB: "jsonb",
+ AsyncAdapt_asyncpg_dbapi.ENUM: "enum",
+ AsyncAdapt_asyncpg_dbapi.UUID: "uuid",
+ AsyncAdapt_asyncpg_dbapi.BYTEA: "bytea",
+}
+
+
+class PGDialect_asyncpg(PGDialect):
+ driver = "asyncpg"
+ supports_statement_cache = True
+
+ supports_unicode_statements = True
+ supports_server_side_cursors = True
+
+ supports_unicode_binds = True
+
+ default_paramstyle = "format"
+ supports_sane_multi_rowcount = False
+ execution_ctx_cls = PGExecutionContext_asyncpg
+ statement_compiler = PGCompiler_asyncpg
+ preparer = PGIdentifierPreparer_asyncpg
+
+ use_setinputsizes = True
+
+ use_native_uuid = True
+
+ colspecs = util.update_copy(
+ PGDialect.colspecs,
+ {
+ sqltypes.Time: AsyncpgTime,
+ sqltypes.Date: AsyncpgDate,
+ sqltypes.DateTime: AsyncpgDateTime,
+ sqltypes.Interval: AsyncPgInterval,
+ INTERVAL: AsyncPgInterval,
+ UUID: AsyncpgUUID,
+ sqltypes.Boolean: AsyncpgBoolean,
+ sqltypes.Integer: AsyncpgInteger,
+ sqltypes.BigInteger: AsyncpgBigInteger,
+ sqltypes.Numeric: AsyncpgNumeric,
+ sqltypes.Float: AsyncpgFloat,
+ sqltypes.JSON: AsyncpgJSON,
+ json.JSONB: AsyncpgJSONB,
+ sqltypes.JSON.JSONPathType: AsyncpgJSONPathType,
+ sqltypes.JSON.JSONIndexType: AsyncpgJSONIndexType,
+ sqltypes.JSON.JSONIntIndexType: AsyncpgJSONIntIndexType,
+ sqltypes.JSON.JSONStrIndexType: AsyncpgJSONStrIndexType,
+ sqltypes.Enum: AsyncPgEnum,
+ OID: AsyncpgOID,
+ REGCLASS: AsyncpgREGCLASS,
+ },
+ )
+ is_async = True
+ _invalidate_schema_cache_asof = 0
+
+ def _invalidate_schema_cache(self):
+ self._invalidate_schema_cache_asof = time.time()
+
+ @util.memoized_property
+ def _dbapi_version(self):
+ if self.dbapi and hasattr(self.dbapi, "__version__"):
+ return tuple(
+ [
+ int(x)
+ for x in re.findall(
+ r"(\d+)(?:[-\.]?|$)", self.dbapi.__version__
+ )
+ ]
+ )
+ else:
+ return (99, 99, 99)
+
+ @classmethod
+ def dbapi(cls):
+ return AsyncAdapt_asyncpg_dbapi(__import__("asyncpg"))
+
+ @util.memoized_property
+ def _isolation_lookup(self):
+ return {
+ "AUTOCOMMIT": "autocommit",
+ "READ COMMITTED": "read_committed",
+ "REPEATABLE READ": "repeatable_read",
+ "SERIALIZABLE": "serializable",
+ }
+
+ def set_isolation_level(self, connection, level):
+ try:
+ level = self._isolation_lookup[level.replace("_", " ")]
+ except KeyError as err:
+ util.raise_(
+ exc.ArgumentError(
+ "Invalid value '%s' for isolation_level. "
+ "Valid isolation levels for %s are %s"
+ % (level, self.name, ", ".join(self._isolation_lookup))
+ ),
+ replace_context=err,
+ )
+
+ connection.set_isolation_level(level)
+
+ def set_readonly(self, connection, value):
+ connection.readonly = value
+
+ def get_readonly(self, connection):
+ return connection.readonly
+
+ def set_deferrable(self, connection, value):
+ connection.deferrable = value
+
+ def get_deferrable(self, connection):
+ return connection.deferrable
+
+ def create_connect_args(self, url):
+ opts = url.translate_connect_args(username="user")
+
+ opts.update(url.query)
+ util.coerce_kw_type(opts, "prepared_statement_cache_size", int)
+ util.coerce_kw_type(opts, "port", int)
+ return ([], opts)
+
+ @classmethod
+ def get_pool_class(cls, url):
+
+ async_fallback = url.query.get("async_fallback", False)
+
+ if util.asbool(async_fallback):
+ return pool.FallbackAsyncAdaptedQueuePool
+ else:
+ return pool.AsyncAdaptedQueuePool
+
+ def is_disconnect(self, e, connection, cursor):
+ if connection:
+ return connection._connection.is_closed()
+ else:
+ return isinstance(
+ e, self.dbapi.InterfaceError
+ ) and "connection is closed" in str(e)
+
+ def do_set_input_sizes(self, cursor, list_of_tuples, context):
+ if self.positional:
+ cursor.setinputsizes(
+ *[dbtype for key, dbtype, sqltype in list_of_tuples]
+ )
+ else:
+ cursor.setinputsizes(
+ **{
+ key: dbtype
+ for key, dbtype, sqltype in list_of_tuples
+ if dbtype
+ }
+ )
+
+ async def setup_asyncpg_json_codec(self, conn):
+ """set up JSON codec for asyncpg.
+
+ This occurs for all new connections and
+ can be overridden by third party dialects.
+
+ .. versionadded:: 1.4.27
+
+ """
+
+ asyncpg_connection = conn._connection
+ deserializer = self._json_deserializer or _py_json.loads
+
+ def _json_decoder(bin_value):
+ return deserializer(bin_value.decode())
+
+ await asyncpg_connection.set_type_codec(
+ "json",
+ encoder=str.encode,
+ decoder=_json_decoder,
+ schema="pg_catalog",
+ format="binary",
+ )
+
+ async def setup_asyncpg_jsonb_codec(self, conn):
+ """set up JSONB codec for asyncpg.
+
+ This occurs for all new connections and
+ can be overridden by third party dialects.
+
+ .. versionadded:: 1.4.27
+
+ """
+
+ asyncpg_connection = conn._connection
+ deserializer = self._json_deserializer or _py_json.loads
+
+ def _jsonb_encoder(str_value):
+ # \x01 is the prefix for jsonb used by PostgreSQL.
+ # asyncpg requires it when format='binary'
+ return b"\x01" + str_value.encode()
+
+ deserializer = self._json_deserializer or _py_json.loads
+
+ def _jsonb_decoder(bin_value):
+ # the byte is the \x01 prefix for jsonb used by PostgreSQL.
+ # asyncpg returns it when format='binary'
+ return deserializer(bin_value[1:].decode())
+
+ await asyncpg_connection.set_type_codec(
+ "jsonb",
+ encoder=_jsonb_encoder,
+ decoder=_jsonb_decoder,
+ schema="pg_catalog",
+ format="binary",
+ )
+
+ def on_connect(self):
+ """on_connect for asyncpg
+
+ A major component of this for asyncpg is to set up type decoders at the
+ asyncpg level.
+
+ See https://github.com/MagicStack/asyncpg/issues/623 for
+ notes on JSON/JSONB implementation.
+
+ """
+
+ super_connect = super(PGDialect_asyncpg, self).on_connect()
+
+ def connect(conn):
+ conn.await_(self.setup_asyncpg_json_codec(conn))
+ conn.await_(self.setup_asyncpg_jsonb_codec(conn))
+ if super_connect is not None:
+ super_connect(conn)
+
+ return connect
+
+ def get_driver_connection(self, connection):
+ return connection._connection
+
+
+dialect = PGDialect_asyncpg
diff --git a/lib/sqlalchemy/dialects/postgresql/base.py b/lib/sqlalchemy/dialects/postgresql/base.py
new file mode 100644
index 0000000..eb84170
--- /dev/null
+++ b/lib/sqlalchemy/dialects/postgresql/base.py
@@ -0,0 +1,4651 @@
+# postgresql/base.py
+# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+
+r"""
+.. dialect:: postgresql
+ :name: PostgreSQL
+ :full_support: 9.6, 10, 11, 12, 13, 14
+ :normal_support: 9.6+
+ :best_effort: 8+
+
+.. _postgresql_sequences:
+
+Sequences/SERIAL/IDENTITY
+-------------------------
+
+PostgreSQL supports sequences, and SQLAlchemy uses these as the default means
+of creating new primary key values for integer-based primary key columns. When
+creating tables, SQLAlchemy will issue the ``SERIAL`` datatype for
+integer-based primary key columns, which generates a sequence and server side
+default corresponding to the column.
+
+To specify a specific named sequence to be used for primary key generation,
+use the :func:`~sqlalchemy.schema.Sequence` construct::
+
+ Table('sometable', metadata,
+ Column('id', Integer, Sequence('some_id_seq'), primary_key=True)
+ )
+
+When SQLAlchemy issues a single INSERT statement, to fulfill the contract of
+having the "last insert identifier" available, a RETURNING clause is added to
+the INSERT statement which specifies the primary key columns should be
+returned after the statement completes. The RETURNING functionality only takes
+place if PostgreSQL 8.2 or later is in use. As a fallback approach, the
+sequence, whether specified explicitly or implicitly via ``SERIAL``, is
+executed independently beforehand, the returned value to be used in the
+subsequent insert. Note that when an
+:func:`~sqlalchemy.sql.expression.insert()` construct is executed using
+"executemany" semantics, the "last inserted identifier" functionality does not
+apply; no RETURNING clause is emitted nor is the sequence pre-executed in this
+case.
+
+To force the usage of RETURNING by default off, specify the flag
+``implicit_returning=False`` to :func:`_sa.create_engine`.
+
+PostgreSQL 10 and above IDENTITY columns
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+PostgreSQL 10 and above have a new IDENTITY feature that supersedes the use
+of SERIAL. The :class:`_schema.Identity` construct in a
+:class:`_schema.Column` can be used to control its behavior::
+
+ from sqlalchemy import Table, Column, MetaData, Integer, Computed
+
+ metadata = MetaData()
+
+ data = Table(
+ "data",
+ metadata,
+ Column(
+ 'id', Integer, Identity(start=42, cycle=True), primary_key=True
+ ),
+ Column('data', String)
+ )
+
+The CREATE TABLE for the above :class:`_schema.Table` object would be:
+
+.. sourcecode:: sql
+
+ CREATE TABLE data (
+ id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 42 CYCLE),
+ data VARCHAR,
+ PRIMARY KEY (id)
+ )
+
+.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct
+ in a :class:`_schema.Column` to specify the option of an autoincrementing
+ column.
+
+.. note::
+
+ Previous versions of SQLAlchemy did not have built-in support for rendering
+ of IDENTITY, and could use the following compilation hook to replace
+ occurrences of SERIAL with IDENTITY::
+
+ from sqlalchemy.schema import CreateColumn
+ from sqlalchemy.ext.compiler import compiles
+
+
+ @compiles(CreateColumn, 'postgresql')
+ def use_identity(element, compiler, **kw):
+ text = compiler.visit_create_column(element, **kw)
+ text = text.replace(
+ "SERIAL", "INT GENERATED BY DEFAULT AS IDENTITY"
+ )
+ return text
+
+ Using the above, a table such as::
+
+ t = Table(
+ 't', m,
+ Column('id', Integer, primary_key=True),
+ Column('data', String)
+ )
+
+ Will generate on the backing database as::
+
+ CREATE TABLE t (
+ id INT GENERATED BY DEFAULT AS IDENTITY,
+ data VARCHAR,
+ PRIMARY KEY (id)
+ )
+
+.. _postgresql_ss_cursors:
+
+Server Side Cursors
+-------------------
+
+Server-side cursor support is available for the psycopg2, asyncpg
+dialects and may also be available in others.
+
+Server side cursors are enabled on a per-statement basis by using the
+:paramref:`.Connection.execution_options.stream_results` connection execution
+option::
+
+ with engine.connect() as conn:
+ result = conn.execution_options(stream_results=True).execute(text("select * from table"))
+
+Note that some kinds of SQL statements may not be supported with
+server side cursors; generally, only SQL statements that return rows should be
+used with this option.
+
+.. deprecated:: 1.4 The dialect-level server_side_cursors flag is deprecated
+ and will be removed in a future release. Please use the
+ :paramref:`_engine.Connection.stream_results` execution option for
+ unbuffered cursor support.
+
+.. seealso::
+
+ :ref:`engine_stream_results`
+
+.. _postgresql_isolation_level:
+
+Transaction Isolation Level
+---------------------------
+
+Most SQLAlchemy dialects support setting of transaction isolation level
+using the :paramref:`_sa.create_engine.isolation_level` parameter
+at the :func:`_sa.create_engine` level, and at the :class:`_engine.Connection`
+level via the :paramref:`.Connection.execution_options.isolation_level`
+parameter.
+
+For PostgreSQL dialects, this feature works either by making use of the
+DBAPI-specific features, such as psycopg2's isolation level flags which will
+embed the isolation level setting inline with the ``"BEGIN"`` statement, or for
+DBAPIs with no direct support by emitting ``SET SESSION CHARACTERISTICS AS
+TRANSACTION ISOLATION LEVEL <level>`` ahead of the ``"BEGIN"`` statement
+emitted by the DBAPI. For the special AUTOCOMMIT isolation level,
+DBAPI-specific techniques are used which is typically an ``.autocommit``
+flag on the DBAPI connection object.
+
+To set isolation level using :func:`_sa.create_engine`::
+
+ engine = create_engine(
+ "postgresql+pg8000://scott:tiger@localhost/test",
+ isolation_level = "REPEATABLE READ"
+ )
+
+To set using per-connection execution options::
+
+ with engine.connect() as conn:
+ conn = conn.execution_options(
+ isolation_level="REPEATABLE READ"
+ )
+ with conn.begin():
+ # ... work with transaction
+
+There are also more options for isolation level configurations, such as
+"sub-engine" objects linked to a main :class:`_engine.Engine` which each apply
+different isolation level settings. See the discussion at
+:ref:`dbapi_autocommit` for background.
+
+Valid values for ``isolation_level`` on most PostgreSQL dialects include:
+
+* ``READ COMMITTED``
+* ``READ UNCOMMITTED``
+* ``REPEATABLE READ``
+* ``SERIALIZABLE``
+* ``AUTOCOMMIT``
+
+.. seealso::
+
+ :ref:`dbapi_autocommit`
+
+ :ref:`postgresql_readonly_deferrable`
+
+ :ref:`psycopg2_isolation_level`
+
+ :ref:`pg8000_isolation_level`
+
+.. _postgresql_readonly_deferrable:
+
+Setting READ ONLY / DEFERRABLE
+------------------------------
+
+Most PostgreSQL dialects support setting the "READ ONLY" and "DEFERRABLE"
+characteristics of the transaction, which is in addition to the isolation level
+setting. These two attributes can be established either in conjunction with or
+independently of the isolation level by passing the ``postgresql_readonly`` and
+``postgresql_deferrable`` flags with
+:meth:`_engine.Connection.execution_options`. The example below illustrates
+passing the ``"SERIALIZABLE"`` isolation level at the same time as setting
+"READ ONLY" and "DEFERRABLE"::
+
+ with engine.connect() as conn:
+ conn = conn.execution_options(
+ isolation_level="SERIALIZABLE",
+ postgresql_readonly=True,
+ postgresql_deferrable=True
+ )
+ with conn.begin():
+ # ... work with transaction
+
+Note that some DBAPIs such as asyncpg only support "readonly" with
+SERIALIZABLE isolation.
+
+.. versionadded:: 1.4 added support for the ``postgresql_readonly``
+ and ``postgresql_deferrable`` execution options.
+
+.. _postgresql_alternate_search_path:
+
+Setting Alternate Search Paths on Connect
+------------------------------------------
+
+The PostgreSQL ``search_path`` variable refers to the list of schema names
+that will be implicitly referred towards when a particular table or other
+object is referenced in a SQL statement. As detailed in the next section
+:ref:`postgresql_schema_reflection`, SQLAlchemy is generally organized around
+the concept of keeping this variable at its default value of ``public``,
+however, in order to have it set to any arbitrary name or names when connections
+are used automatically, the "SET SESSION search_path" command may be invoked
+for all connections in a pool using the following event handler, as discussed
+at :ref:`schema_set_default_connections`::
+
+ from sqlalchemy import event
+ from sqlalchemy import create_engine
+
+ engine = create_engine("postgresql+psycopg2://scott:tiger@host/dbname")
+
+ @event.listens_for(engine, "connect", insert=True)
+ def set_search_path(dbapi_connection, connection_record):
+ existing_autocommit = dbapi_connection.autocommit
+ dbapi_connection.autocommit = True
+ cursor = dbapi_connection.cursor()
+ cursor.execute("SET SESSION search_path='%s'" % schema_name)
+ cursor.close()
+ dbapi_connection.autocommit = existing_autocommit
+
+The reason the recipe is complicated by use of the ``.autocommit`` DBAPI
+attribute is so that when the ``SET SESSION search_path`` directive is invoked,
+it is invoked outside of the scope of any transaction and therefore will not
+be reverted when the DBAPI connection has a rollback.
+
+.. seealso::
+
+ :ref:`schema_set_default_connections` - in the :ref:`metadata_toplevel` documentation
+
+
+
+
+.. _postgresql_schema_reflection:
+
+Remote-Schema Table Introspection and PostgreSQL search_path
+------------------------------------------------------------
+
+.. admonition:: Section Best Practices Summarized
+
+ keep the ``search_path`` variable set to its default of ``public``, without
+ any other schema names. For other schema names, name these explicitly
+ within :class:`_schema.Table` definitions. Alternatively, the
+ ``postgresql_ignore_search_path`` option will cause all reflected
+ :class:`_schema.Table` objects to have a :attr:`_schema.Table.schema`
+ attribute set up.
+
+The PostgreSQL dialect can reflect tables from any schema, as outlined in
+:ref:`metadata_reflection_schemas`.
+
+With regards to tables which these :class:`_schema.Table`
+objects refer to via foreign key constraint, a decision must be made as to how
+the ``.schema`` is represented in those remote tables, in the case where that
+remote schema name is also a member of the current
+`PostgreSQL search path
+<https://www.postgresql.org/docs/current/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_.
+
+By default, the PostgreSQL dialect mimics the behavior encouraged by
+PostgreSQL's own ``pg_get_constraintdef()`` builtin procedure. This function
+returns a sample definition for a particular foreign key constraint,
+omitting the referenced schema name from that definition when the name is
+also in the PostgreSQL schema search path. The interaction below
+illustrates this behavior::
+
+ test=> CREATE TABLE test_schema.referred(id INTEGER PRIMARY KEY);
+ CREATE TABLE
+ test=> CREATE TABLE referring(
+ test(> id INTEGER PRIMARY KEY,
+ test(> referred_id INTEGER REFERENCES test_schema.referred(id));
+ CREATE TABLE
+ test=> SET search_path TO public, test_schema;
+ test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM
+ test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n
+ test-> ON n.oid = c.relnamespace
+ test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid
+ test-> WHERE c.relname='referring' AND r.contype = 'f'
+ test-> ;
+ pg_get_constraintdef
+ ---------------------------------------------------
+ FOREIGN KEY (referred_id) REFERENCES referred(id)
+ (1 row)
+
+Above, we created a table ``referred`` as a member of the remote schema
+``test_schema``, however when we added ``test_schema`` to the
+PG ``search_path`` and then asked ``pg_get_constraintdef()`` for the
+``FOREIGN KEY`` syntax, ``test_schema`` was not included in the output of
+the function.
+
+On the other hand, if we set the search path back to the typical default
+of ``public``::
+
+ test=> SET search_path TO public;
+ SET
+
+The same query against ``pg_get_constraintdef()`` now returns the fully
+schema-qualified name for us::
+
+ test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM
+ test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n
+ test-> ON n.oid = c.relnamespace
+ test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid
+ test-> WHERE c.relname='referring' AND r.contype = 'f';
+ pg_get_constraintdef
+ ---------------------------------------------------------------
+ FOREIGN KEY (referred_id) REFERENCES test_schema.referred(id)
+ (1 row)
+
+SQLAlchemy will by default use the return value of ``pg_get_constraintdef()``
+in order to determine the remote schema name. That is, if our ``search_path``
+were set to include ``test_schema``, and we invoked a table
+reflection process as follows::
+
+ >>> from sqlalchemy import Table, MetaData, create_engine, text
+ >>> engine = create_engine("postgresql://scott:tiger@localhost/test")
+ >>> with engine.connect() as conn:
+ ... conn.execute(text("SET search_path TO test_schema, public"))
+ ... metadata_obj = MetaData()
+ ... referring = Table('referring', metadata_obj,
+ ... autoload_with=conn)
+ ...
+ <sqlalchemy.engine.result.CursorResult object at 0x101612ed0>
+
+The above process would deliver to the :attr:`_schema.MetaData.tables`
+collection
+``referred`` table named **without** the schema::
+
+ >>> metadata_obj.tables['referred'].schema is None
+ True
+
+To alter the behavior of reflection such that the referred schema is
+maintained regardless of the ``search_path`` setting, use the
+``postgresql_ignore_search_path`` option, which can be specified as a
+dialect-specific argument to both :class:`_schema.Table` as well as
+:meth:`_schema.MetaData.reflect`::
+
+ >>> with engine.connect() as conn:
+ ... conn.execute(text("SET search_path TO test_schema, public"))
+ ... metadata_obj = MetaData()
+ ... referring = Table('referring', metadata_obj,
+ ... autoload_with=conn,
+ ... postgresql_ignore_search_path=True)
+ ...
+ <sqlalchemy.engine.result.CursorResult object at 0x1016126d0>
+
+We will now have ``test_schema.referred`` stored as schema-qualified::
+
+ >>> metadata_obj.tables['test_schema.referred'].schema
+ 'test_schema'
+
+.. sidebar:: Best Practices for PostgreSQL Schema reflection
+
+ The description of PostgreSQL schema reflection behavior is complex, and
+ is the product of many years of dealing with widely varied use cases and
+ user preferences. But in fact, there's no need to understand any of it if
+ you just stick to the simplest use pattern: leave the ``search_path`` set
+ to its default of ``public`` only, never refer to the name ``public`` as
+ an explicit schema name otherwise, and refer to all other schema names
+ explicitly when building up a :class:`_schema.Table` object. The options
+ described here are only for those users who can't, or prefer not to, stay
+ within these guidelines.
+
+Note that **in all cases**, the "default" schema is always reflected as
+``None``. The "default" schema on PostgreSQL is that which is returned by the
+PostgreSQL ``current_schema()`` function. On a typical PostgreSQL
+installation, this is the name ``public``. So a table that refers to another
+which is in the ``public`` (i.e. default) schema will always have the
+``.schema`` attribute set to ``None``.
+
+.. seealso::
+
+ :ref:`reflection_schema_qualified_interaction` - discussion of the issue
+ from a backend-agnostic perspective
+
+ `The Schema Search Path
+ <https://www.postgresql.org/docs/current/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_
+ - on the PostgreSQL website.
+
+INSERT/UPDATE...RETURNING
+-------------------------
+
+The dialect supports PG 8.2's ``INSERT..RETURNING``, ``UPDATE..RETURNING`` and
+``DELETE..RETURNING`` syntaxes. ``INSERT..RETURNING`` is used by default
+for single-row INSERT statements in order to fetch newly generated
+primary key identifiers. To specify an explicit ``RETURNING`` clause,
+use the :meth:`._UpdateBase.returning` method on a per-statement basis::
+
+ # INSERT..RETURNING
+ result = table.insert().returning(table.c.col1, table.c.col2).\
+ values(name='foo')
+ print(result.fetchall())
+
+ # UPDATE..RETURNING
+ result = table.update().returning(table.c.col1, table.c.col2).\
+ where(table.c.name=='foo').values(name='bar')
+ print(result.fetchall())
+
+ # DELETE..RETURNING
+ result = table.delete().returning(table.c.col1, table.c.col2).\
+ where(table.c.name=='foo')
+ print(result.fetchall())
+
+.. _postgresql_insert_on_conflict:
+
+INSERT...ON CONFLICT (Upsert)
+------------------------------
+
+Starting with version 9.5, PostgreSQL allows "upserts" (update or insert) of
+rows into a table via the ``ON CONFLICT`` clause of the ``INSERT`` statement. A
+candidate row will only be inserted if that row does not violate any unique
+constraints. In the case of a unique constraint violation, a secondary action
+can occur which can be either "DO UPDATE", indicating that the data in the
+target row should be updated, or "DO NOTHING", which indicates to silently skip
+this row.
+
+Conflicts are determined using existing unique constraints and indexes. These
+constraints may be identified either using their name as stated in DDL,
+or they may be inferred by stating the columns and conditions that comprise
+the indexes.
+
+SQLAlchemy provides ``ON CONFLICT`` support via the PostgreSQL-specific
+:func:`_postgresql.insert()` function, which provides
+the generative methods :meth:`_postgresql.Insert.on_conflict_do_update`
+and :meth:`~.postgresql.Insert.on_conflict_do_nothing`:
+
+.. sourcecode:: pycon+sql
+
+ >>> from sqlalchemy.dialects.postgresql import insert
+ >>> insert_stmt = insert(my_table).values(
+ ... id='some_existing_id',
+ ... data='inserted value')
+ >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing(
+ ... index_elements=['id']
+ ... )
+ >>> print(do_nothing_stmt)
+ {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
+ ON CONFLICT (id) DO NOTHING
+ {stop}
+
+ >>> do_update_stmt = insert_stmt.on_conflict_do_update(
+ ... constraint='pk_my_table',
+ ... set_=dict(data='updated value')
+ ... )
+ >>> print(do_update_stmt)
+ {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
+ ON CONFLICT ON CONSTRAINT pk_my_table DO UPDATE SET data = %(param_1)s
+
+.. versionadded:: 1.1
+
+.. seealso::
+
+ `INSERT .. ON CONFLICT
+ <https://www.postgresql.org/docs/current/static/sql-insert.html#SQL-ON-CONFLICT>`_
+ - in the PostgreSQL documentation.
+
+Specifying the Target
+^^^^^^^^^^^^^^^^^^^^^
+
+Both methods supply the "target" of the conflict using either the
+named constraint or by column inference:
+
+* The :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` argument
+ specifies a sequence containing string column names, :class:`_schema.Column`
+ objects, and/or SQL expression elements, which would identify a unique
+ index:
+
+ .. sourcecode:: pycon+sql
+
+ >>> do_update_stmt = insert_stmt.on_conflict_do_update(
+ ... index_elements=['id'],
+ ... set_=dict(data='updated value')
+ ... )
+ >>> print(do_update_stmt)
+ {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
+ ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
+ {stop}
+
+ >>> do_update_stmt = insert_stmt.on_conflict_do_update(
+ ... index_elements=[my_table.c.id],
+ ... set_=dict(data='updated value')
+ ... )
+ >>> print(do_update_stmt)
+ {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
+ ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
+
+* When using :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` to
+ infer an index, a partial index can be inferred by also specifying the
+ use the :paramref:`_postgresql.Insert.on_conflict_do_update.index_where` parameter:
+
+ .. sourcecode:: pycon+sql
+
+ >>> stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
+ >>> stmt = stmt.on_conflict_do_update(
+ ... index_elements=[my_table.c.user_email],
+ ... index_where=my_table.c.user_email.like('%@gmail.com'),
+ ... set_=dict(data=stmt.excluded.data)
+ ... )
+ >>> print(stmt)
+ {opensql}INSERT INTO my_table (data, user_email)
+ VALUES (%(data)s, %(user_email)s) ON CONFLICT (user_email)
+ WHERE user_email LIKE %(user_email_1)s DO UPDATE SET data = excluded.data
+
+* The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument is
+ used to specify an index directly rather than inferring it. This can be
+ the name of a UNIQUE constraint, a PRIMARY KEY constraint, or an INDEX:
+
+ .. sourcecode:: pycon+sql
+
+ >>> do_update_stmt = insert_stmt.on_conflict_do_update(
+ ... constraint='my_table_idx_1',
+ ... set_=dict(data='updated value')
+ ... )
+ >>> print(do_update_stmt)
+ {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
+ ON CONFLICT ON CONSTRAINT my_table_idx_1 DO UPDATE SET data = %(param_1)s
+ {stop}
+
+ >>> do_update_stmt = insert_stmt.on_conflict_do_update(
+ ... constraint='my_table_pk',
+ ... set_=dict(data='updated value')
+ ... )
+ >>> print(do_update_stmt)
+ {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
+ ON CONFLICT ON CONSTRAINT my_table_pk DO UPDATE SET data = %(param_1)s
+ {stop}
+
+* The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument may
+ also refer to a SQLAlchemy construct representing a constraint,
+ e.g. :class:`.UniqueConstraint`, :class:`.PrimaryKeyConstraint`,
+ :class:`.Index`, or :class:`.ExcludeConstraint`. In this use,
+ if the constraint has a name, it is used directly. Otherwise, if the
+ constraint is unnamed, then inference will be used, where the expressions
+ and optional WHERE clause of the constraint will be spelled out in the
+ construct. This use is especially convenient
+ to refer to the named or unnamed primary key of a :class:`_schema.Table`
+ using the
+ :attr:`_schema.Table.primary_key` attribute:
+
+ .. sourcecode:: pycon+sql
+
+ >>> do_update_stmt = insert_stmt.on_conflict_do_update(
+ ... constraint=my_table.primary_key,
+ ... set_=dict(data='updated value')
+ ... )
+ >>> print(do_update_stmt)
+ {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
+ ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
+
+The SET Clause
+^^^^^^^^^^^^^^^
+
+``ON CONFLICT...DO UPDATE`` is used to perform an update of the already
+existing row, using any combination of new values as well as values
+from the proposed insertion. These values are specified using the
+:paramref:`_postgresql.Insert.on_conflict_do_update.set_` parameter. This
+parameter accepts a dictionary which consists of direct values
+for UPDATE:
+
+.. sourcecode:: pycon+sql
+
+ >>> stmt = insert(my_table).values(id='some_id', data='inserted value')
+ >>> do_update_stmt = stmt.on_conflict_do_update(
+ ... index_elements=['id'],
+ ... set_=dict(data='updated value')
+ ... )
+ >>> print(do_update_stmt)
+ {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
+ ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
+
+.. warning::
+
+ The :meth:`_expression.Insert.on_conflict_do_update`
+ method does **not** take into
+ account Python-side default UPDATE values or generation functions, e.g.
+ those specified using :paramref:`_schema.Column.onupdate`.
+ These values will not be exercised for an ON CONFLICT style of UPDATE,
+ unless they are manually specified in the
+ :paramref:`_postgresql.Insert.on_conflict_do_update.set_` dictionary.
+
+Updating using the Excluded INSERT Values
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+In order to refer to the proposed insertion row, the special alias
+:attr:`~.postgresql.Insert.excluded` is available as an attribute on
+the :class:`_postgresql.Insert` object; this object is a
+:class:`_expression.ColumnCollection`
+which alias contains all columns of the target
+table:
+
+.. sourcecode:: pycon+sql
+
+ >>> stmt = insert(my_table).values(
+ ... id='some_id',
+ ... data='inserted value',
+ ... author='jlh'
+ ... )
+ >>> do_update_stmt = stmt.on_conflict_do_update(
+ ... index_elements=['id'],
+ ... set_=dict(data='updated value', author=stmt.excluded.author)
+ ... )
+ >>> print(do_update_stmt)
+ {opensql}INSERT INTO my_table (id, data, author)
+ VALUES (%(id)s, %(data)s, %(author)s)
+ ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author
+
+Additional WHERE Criteria
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The :meth:`_expression.Insert.on_conflict_do_update` method also accepts
+a WHERE clause using the :paramref:`_postgresql.Insert.on_conflict_do_update.where`
+parameter, which will limit those rows which receive an UPDATE:
+
+.. sourcecode:: pycon+sql
+
+ >>> stmt = insert(my_table).values(
+ ... id='some_id',
+ ... data='inserted value',
+ ... author='jlh'
+ ... )
+ >>> on_update_stmt = stmt.on_conflict_do_update(
+ ... index_elements=['id'],
+ ... set_=dict(data='updated value', author=stmt.excluded.author),
+ ... where=(my_table.c.status == 2)
+ ... )
+ >>> print(on_update_stmt)
+ {opensql}INSERT INTO my_table (id, data, author)
+ VALUES (%(id)s, %(data)s, %(author)s)
+ ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author
+ WHERE my_table.status = %(status_1)s
+
+Skipping Rows with DO NOTHING
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+``ON CONFLICT`` may be used to skip inserting a row entirely
+if any conflict with a unique or exclusion constraint occurs; below
+this is illustrated using the
+:meth:`~.postgresql.Insert.on_conflict_do_nothing` method:
+
+.. sourcecode:: pycon+sql
+
+ >>> stmt = insert(my_table).values(id='some_id', data='inserted value')
+ >>> stmt = stmt.on_conflict_do_nothing(index_elements=['id'])
+ >>> print(stmt)
+ {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
+ ON CONFLICT (id) DO NOTHING
+
+If ``DO NOTHING`` is used without specifying any columns or constraint,
+it has the effect of skipping the INSERT for any unique or exclusion
+constraint violation which occurs:
+
+.. sourcecode:: pycon+sql
+
+ >>> stmt = insert(my_table).values(id='some_id', data='inserted value')
+ >>> stmt = stmt.on_conflict_do_nothing()
+ >>> print(stmt)
+ {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
+ ON CONFLICT DO NOTHING
+
+.. _postgresql_match:
+
+Full Text Search
+----------------
+
+SQLAlchemy makes available the PostgreSQL ``@@`` operator via the
+:meth:`_expression.ColumnElement.match` method on any textual column expression.
+
+On the PostgreSQL dialect, an expression like the following::
+
+ select(sometable.c.text.match("search string"))
+
+will emit to the database::
+
+ SELECT text @@ to_tsquery('search string') FROM table
+
+Various other PostgreSQL text search functions such as ``to_tsquery()``,
+``to_tsvector()``, and ``plainto_tsquery()`` are available by explicitly using
+the standard SQLAlchemy :data:`.func` construct.
+
+For example::
+
+ select(func.to_tsvector('fat cats ate rats').match('cat & rat'))
+
+Emits the equivalent of::
+
+ SELECT to_tsvector('fat cats ate rats') @@ to_tsquery('cat & rat')
+
+The :class:`_postgresql.TSVECTOR` type can provide for explicit CAST::
+
+ from sqlalchemy.dialects.postgresql import TSVECTOR
+ from sqlalchemy import select, cast
+ select(cast("some text", TSVECTOR))
+
+produces a statement equivalent to::
+
+ SELECT CAST('some text' AS TSVECTOR) AS anon_1
+
+.. tip::
+
+ It's important to remember that text searching in PostgreSQL is powerful but complicated,
+ and SQLAlchemy users are advised to reference the PostgreSQL documentation
+ regarding
+ `Full Text Search <https://www.postgresql.org/docs/current/textsearch-controls.html>`_.
+
+ There are important differences between ``to_tsquery`` and
+ ``plainto_tsquery``, the most significant of which is that ``to_tsquery``
+ expects specially formatted "querytext" that is written to PostgreSQL's own
+ specification, while ``plainto_tsquery`` expects unformatted text that is
+ transformed into ``to_tsquery`` compatible querytext. This means the input to
+ ``.match()`` under PostgreSQL may be incompatible with the input to
+ ``.match()`` under another database backend. SQLAlchemy users who support
+ multiple backends are advised to carefully implement their usage of
+ ``.match()`` to work around these constraints.
+
+Full Text Searches in PostgreSQL are influenced by a combination of: the
+PostgreSQL setting of ``default_text_search_config``, the ``regconfig`` used
+to build the GIN/GiST indexes, and the ``regconfig`` optionally passed in
+during a query.
+
+When performing a Full Text Search against a column that has a GIN or
+GiST index that is already pre-computed (which is common on full text
+searches) one may need to explicitly pass in a particular PostgreSQL
+``regconfig`` value to ensure the query-planner utilizes the index and does
+not re-compute the column on demand.
+
+In order to provide for this explicit query planning, or to use different
+search strategies, the ``match`` method accepts a ``postgresql_regconfig``
+keyword argument::
+
+ select(mytable.c.id).where(
+ mytable.c.title.match('somestring', postgresql_regconfig='english')
+ )
+
+Emits the equivalent of::
+
+ SELECT mytable.id FROM mytable
+ WHERE mytable.title @@ to_tsquery('english', 'somestring')
+
+One can also specifically pass in a `'regconfig'` value to the
+``to_tsvector()`` command as the initial argument::
+
+ select(mytable.c.id).where(
+ func.to_tsvector('english', mytable.c.title )\
+ .match('somestring', postgresql_regconfig='english')
+ )
+
+produces a statement equivalent to::
+
+ SELECT mytable.id FROM mytable
+ WHERE to_tsvector('english', mytable.title) @@
+ to_tsquery('english', 'somestring')
+
+It is recommended that you use the ``EXPLAIN ANALYZE...`` tool from
+PostgreSQL to ensure that you are generating queries with SQLAlchemy that
+take full advantage of any indexes you may have created for full text search.
+
+.. seealso::
+
+ `Full Text Search <https://www.postgresql.org/docs/current/textsearch-controls.html>`_ - in the PostgreSQL documentation
+
+
+FROM ONLY ...
+-------------
+
+The dialect supports PostgreSQL's ONLY keyword for targeting only a particular
+table in an inheritance hierarchy. This can be used to produce the
+``SELECT ... FROM ONLY``, ``UPDATE ONLY ...``, and ``DELETE FROM ONLY ...``
+syntaxes. It uses SQLAlchemy's hints mechanism::
+
+ # SELECT ... FROM ONLY ...
+ result = table.select().with_hint(table, 'ONLY', 'postgresql')
+ print(result.fetchall())
+
+ # UPDATE ONLY ...
+ table.update(values=dict(foo='bar')).with_hint('ONLY',
+ dialect_name='postgresql')
+
+ # DELETE FROM ONLY ...
+ table.delete().with_hint('ONLY', dialect_name='postgresql')
+
+
+.. _postgresql_indexes:
+
+PostgreSQL-Specific Index Options
+---------------------------------
+
+Several extensions to the :class:`.Index` construct are available, specific
+to the PostgreSQL dialect.
+
+Covering Indexes
+^^^^^^^^^^^^^^^^
+
+The ``postgresql_include`` option renders INCLUDE(colname) for the given
+string names::
+
+ Index("my_index", table.c.x, postgresql_include=['y'])
+
+would render the index as ``CREATE INDEX my_index ON table (x) INCLUDE (y)``
+
+Note that this feature requires PostgreSQL 11 or later.
+
+.. versionadded:: 1.4
+
+.. _postgresql_partial_indexes:
+
+Partial Indexes
+^^^^^^^^^^^^^^^
+
+Partial indexes add criterion to the index definition so that the index is
+applied to a subset of rows. These can be specified on :class:`.Index`
+using the ``postgresql_where`` keyword argument::
+
+ Index('my_index', my_table.c.id, postgresql_where=my_table.c.value > 10)
+
+.. _postgresql_operator_classes:
+
+Operator Classes
+^^^^^^^^^^^^^^^^
+
+PostgreSQL allows the specification of an *operator class* for each column of
+an index (see
+https://www.postgresql.org/docs/current/interactive/indexes-opclass.html).
+The :class:`.Index` construct allows these to be specified via the
+``postgresql_ops`` keyword argument::
+
+ Index(
+ 'my_index', my_table.c.id, my_table.c.data,
+ postgresql_ops={
+ 'data': 'text_pattern_ops',
+ 'id': 'int4_ops'
+ })
+
+Note that the keys in the ``postgresql_ops`` dictionaries are the
+"key" name of the :class:`_schema.Column`, i.e. the name used to access it from
+the ``.c`` collection of :class:`_schema.Table`, which can be configured to be
+different than the actual name of the column as expressed in the database.
+
+If ``postgresql_ops`` is to be used against a complex SQL expression such
+as a function call, then to apply to the column it must be given a label
+that is identified in the dictionary by name, e.g.::
+
+ Index(
+ 'my_index', my_table.c.id,
+ func.lower(my_table.c.data).label('data_lower'),
+ postgresql_ops={
+ 'data_lower': 'text_pattern_ops',
+ 'id': 'int4_ops'
+ })
+
+Operator classes are also supported by the
+:class:`_postgresql.ExcludeConstraint` construct using the
+:paramref:`_postgresql.ExcludeConstraint.ops` parameter. See that parameter for
+details.
+
+.. versionadded:: 1.3.21 added support for operator classes with
+ :class:`_postgresql.ExcludeConstraint`.
+
+
+Index Types
+^^^^^^^^^^^
+
+PostgreSQL provides several index types: B-Tree, Hash, GiST, and GIN, as well
+as the ability for users to create their own (see
+https://www.postgresql.org/docs/current/static/indexes-types.html). These can be
+specified on :class:`.Index` using the ``postgresql_using`` keyword argument::
+
+ Index('my_index', my_table.c.data, postgresql_using='gin')
+
+The value passed to the keyword argument will be simply passed through to the
+underlying CREATE INDEX command, so it *must* be a valid index type for your
+version of PostgreSQL.
+
+.. _postgresql_index_storage:
+
+Index Storage Parameters
+^^^^^^^^^^^^^^^^^^^^^^^^
+
+PostgreSQL allows storage parameters to be set on indexes. The storage
+parameters available depend on the index method used by the index. Storage
+parameters can be specified on :class:`.Index` using the ``postgresql_with``
+keyword argument::
+
+ Index('my_index', my_table.c.data, postgresql_with={"fillfactor": 50})
+
+.. versionadded:: 1.0.6
+
+PostgreSQL allows to define the tablespace in which to create the index.
+The tablespace can be specified on :class:`.Index` using the
+``postgresql_tablespace`` keyword argument::
+
+ Index('my_index', my_table.c.data, postgresql_tablespace='my_tablespace')
+
+.. versionadded:: 1.1
+
+Note that the same option is available on :class:`_schema.Table` as well.
+
+.. _postgresql_index_concurrently:
+
+Indexes with CONCURRENTLY
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The PostgreSQL index option CONCURRENTLY is supported by passing the
+flag ``postgresql_concurrently`` to the :class:`.Index` construct::
+
+ tbl = Table('testtbl', m, Column('data', Integer))
+
+ idx1 = Index('test_idx1', tbl.c.data, postgresql_concurrently=True)
+
+The above index construct will render DDL for CREATE INDEX, assuming
+PostgreSQL 8.2 or higher is detected or for a connection-less dialect, as::
+
+ CREATE INDEX CONCURRENTLY test_idx1 ON testtbl (data)
+
+For DROP INDEX, assuming PostgreSQL 9.2 or higher is detected or for
+a connection-less dialect, it will emit::
+
+ DROP INDEX CONCURRENTLY test_idx1
+
+.. versionadded:: 1.1 support for CONCURRENTLY on DROP INDEX. The
+ CONCURRENTLY keyword is now only emitted if a high enough version
+ of PostgreSQL is detected on the connection (or for a connection-less
+ dialect).
+
+When using CONCURRENTLY, the PostgreSQL database requires that the statement
+be invoked outside of a transaction block. The Python DBAPI enforces that
+even for a single statement, a transaction is present, so to use this
+construct, the DBAPI's "autocommit" mode must be used::
+
+ metadata = MetaData()
+ table = Table(
+ "foo", metadata,
+ Column("id", String))
+ index = Index(
+ "foo_idx", table.c.id, postgresql_concurrently=True)
+
+ with engine.connect() as conn:
+ with conn.execution_options(isolation_level='AUTOCOMMIT'):
+ table.create(conn)
+
+.. seealso::
+
+ :ref:`postgresql_isolation_level`
+
+.. _postgresql_index_reflection:
+
+PostgreSQL Index Reflection
+---------------------------
+
+The PostgreSQL database creates a UNIQUE INDEX implicitly whenever the
+UNIQUE CONSTRAINT construct is used. When inspecting a table using
+:class:`_reflection.Inspector`, the :meth:`_reflection.Inspector.get_indexes`
+and the :meth:`_reflection.Inspector.get_unique_constraints`
+will report on these
+two constructs distinctly; in the case of the index, the key
+``duplicates_constraint`` will be present in the index entry if it is
+detected as mirroring a constraint. When performing reflection using
+``Table(..., autoload_with=engine)``, the UNIQUE INDEX is **not** returned
+in :attr:`_schema.Table.indexes` when it is detected as mirroring a
+:class:`.UniqueConstraint` in the :attr:`_schema.Table.constraints` collection
+.
+
+.. versionchanged:: 1.0.0 - :class:`_schema.Table` reflection now includes
+ :class:`.UniqueConstraint` objects present in the
+ :attr:`_schema.Table.constraints`
+ collection; the PostgreSQL backend will no longer include a "mirrored"
+ :class:`.Index` construct in :attr:`_schema.Table.indexes`
+ if it is detected
+ as corresponding to a unique constraint.
+
+Special Reflection Options
+--------------------------
+
+The :class:`_reflection.Inspector`
+used for the PostgreSQL backend is an instance
+of :class:`.PGInspector`, which offers additional methods::
+
+ from sqlalchemy import create_engine, inspect
+
+ engine = create_engine("postgresql+psycopg2://localhost/test")
+ insp = inspect(engine) # will be a PGInspector
+
+ print(insp.get_enums())
+
+.. autoclass:: PGInspector
+ :members:
+
+.. _postgresql_table_options:
+
+PostgreSQL Table Options
+------------------------
+
+Several options for CREATE TABLE are supported directly by the PostgreSQL
+dialect in conjunction with the :class:`_schema.Table` construct:
+
+* ``TABLESPACE``::
+
+ Table("some_table", metadata, ..., postgresql_tablespace='some_tablespace')
+
+ The above option is also available on the :class:`.Index` construct.
+
+* ``ON COMMIT``::
+
+ Table("some_table", metadata, ..., postgresql_on_commit='PRESERVE ROWS')
+
+* ``WITH OIDS``::
+
+ Table("some_table", metadata, ..., postgresql_with_oids=True)
+
+* ``WITHOUT OIDS``::
+
+ Table("some_table", metadata, ..., postgresql_with_oids=False)
+
+* ``INHERITS``::
+
+ Table("some_table", metadata, ..., postgresql_inherits="some_supertable")
+
+ Table("some_table", metadata, ..., postgresql_inherits=("t1", "t2", ...))
+
+ .. versionadded:: 1.0.0
+
+* ``PARTITION BY``::
+
+ Table("some_table", metadata, ...,
+ postgresql_partition_by='LIST (part_column)')
+
+ .. versionadded:: 1.2.6
+
+.. seealso::
+
+ `PostgreSQL CREATE TABLE options
+ <https://www.postgresql.org/docs/current/static/sql-createtable.html>`_ -
+ in the PostgreSQL documentation.
+
+.. _postgresql_constraint_options:
+
+PostgreSQL Constraint Options
+-----------------------------
+
+The following option(s) are supported by the PostgreSQL dialect in conjunction
+with selected constraint constructs:
+
+* ``NOT VALID``: This option applies towards CHECK and FOREIGN KEY constraints
+ when the constraint is being added to an existing table via ALTER TABLE,
+ and has the effect that existing rows are not scanned during the ALTER
+ operation against the constraint being added.
+
+ When using a SQL migration tool such as `Alembic <https://alembic.sqlalchemy.org>`_
+ that renders ALTER TABLE constructs, the ``postgresql_not_valid`` argument
+ may be specified as an additional keyword argument within the operation
+ that creates the constraint, as in the following Alembic example::
+
+ def update():
+ op.create_foreign_key(
+ "fk_user_address",
+ "address",
+ "user",
+ ["user_id"],
+ ["id"],
+ postgresql_not_valid=True
+ )
+
+ The keyword is ultimately accepted directly by the
+ :class:`_schema.CheckConstraint`, :class:`_schema.ForeignKeyConstraint`
+ and :class:`_schema.ForeignKey` constructs; when using a tool like
+ Alembic, dialect-specific keyword arguments are passed through to
+ these constructs from the migration operation directives::
+
+ CheckConstraint("some_field IS NOT NULL", postgresql_not_valid=True)
+
+ ForeignKeyConstraint(["some_id"], ["some_table.some_id"], postgresql_not_valid=True)
+
+ .. versionadded:: 1.4.32
+
+ .. seealso::
+
+ `PostgreSQL ALTER TABLE options
+ <https://www.postgresql.org/docs/current/static/sql-altertable.html>`_ -
+ in the PostgreSQL documentation.
+
+.. _postgresql_table_valued_overview:
+
+Table values, Table and Column valued functions, Row and Tuple objects
+-----------------------------------------------------------------------
+
+PostgreSQL makes great use of modern SQL forms such as table-valued functions,
+tables and rows as values. These constructs are commonly used as part
+of PostgreSQL's support for complex datatypes such as JSON, ARRAY, and other
+datatypes. SQLAlchemy's SQL expression language has native support for
+most table-valued and row-valued forms.
+
+.. _postgresql_table_valued:
+
+Table-Valued Functions
+^^^^^^^^^^^^^^^^^^^^^^^
+
+Many PostgreSQL built-in functions are intended to be used in the FROM clause
+of a SELECT statement, and are capable of returning table rows or sets of table
+rows. A large portion of PostgreSQL's JSON functions for example such as
+``json_array_elements()``, ``json_object_keys()``, ``json_each_text()``,
+``json_each()``, ``json_to_record()``, ``json_populate_recordset()`` use such
+forms. These classes of SQL function calling forms in SQLAlchemy are available
+using the :meth:`_functions.FunctionElement.table_valued` method in conjunction
+with :class:`_functions.Function` objects generated from the :data:`_sql.func`
+namespace.
+
+Examples from PostgreSQL's reference documentation follow below:
+
+* ``json_each()``::
+
+ >>> from sqlalchemy import select, func
+ >>> stmt = select(func.json_each('{"a":"foo", "b":"bar"}').table_valued("key", "value"))
+ >>> print(stmt)
+ SELECT anon_1.key, anon_1.value
+ FROM json_each(:json_each_1) AS anon_1
+
+* ``json_populate_record()``::
+
+ >>> from sqlalchemy import select, func, literal_column
+ >>> stmt = select(
+ ... func.json_populate_record(
+ ... literal_column("null::myrowtype"),
+ ... '{"a":1,"b":2}'
+ ... ).table_valued("a", "b", name="x")
+ ... )
+ >>> print(stmt)
+ SELECT x.a, x.b
+ FROM json_populate_record(null::myrowtype, :json_populate_record_1) AS x
+
+* ``json_to_record()`` - this form uses a PostgreSQL specific form of derived
+ columns in the alias, where we may make use of :func:`_sql.column` elements with
+ types to produce them. The :meth:`_functions.FunctionElement.table_valued`
+ method produces a :class:`_sql.TableValuedAlias` construct, and the method
+ :meth:`_sql.TableValuedAlias.render_derived` method sets up the derived
+ columns specification::
+
+ >>> from sqlalchemy import select, func, column, Integer, Text
+ >>> stmt = select(
+ ... func.json_to_record('{"a":1,"b":[1,2,3],"c":"bar"}').table_valued(
+ ... column("a", Integer), column("b", Text), column("d", Text),
+ ... ).render_derived(name="x", with_types=True)
+ ... )
+ >>> print(stmt)
+ SELECT x.a, x.b, x.d
+ FROM json_to_record(:json_to_record_1) AS x(a INTEGER, b TEXT, d TEXT)
+
+* ``WITH ORDINALITY`` - part of the SQL standard, ``WITH ORDINALITY`` adds an
+ ordinal counter to the output of a function and is accepted by a limited set
+ of PostgreSQL functions including ``unnest()`` and ``generate_series()``. The
+ :meth:`_functions.FunctionElement.table_valued` method accepts a keyword
+ parameter ``with_ordinality`` for this purpose, which accepts the string name
+ that will be applied to the "ordinality" column::
+
+ >>> from sqlalchemy import select, func
+ >>> stmt = select(
+ ... func.generate_series(4, 1, -1).
+ ... table_valued("value", with_ordinality="ordinality").
+ ... render_derived()
+ ... )
+ >>> print(stmt)
+ SELECT anon_1.value, anon_1.ordinality
+ FROM generate_series(:generate_series_1, :generate_series_2, :generate_series_3)
+ WITH ORDINALITY AS anon_1(value, ordinality)
+
+.. versionadded:: 1.4.0b2
+
+.. seealso::
+
+ :ref:`tutorial_functions_table_valued` - in the :ref:`unified_tutorial`
+
+.. _postgresql_column_valued:
+
+Column Valued Functions
+^^^^^^^^^^^^^^^^^^^^^^^
+
+Similar to the table valued function, a column valued function is present
+in the FROM clause, but delivers itself to the columns clause as a single
+scalar value. PostgreSQL functions such as ``json_array_elements()``,
+``unnest()`` and ``generate_series()`` may use this form. Column valued functions are available using the
+:meth:`_functions.FunctionElement.column_valued` method of :class:`_functions.FunctionElement`:
+
+* ``json_array_elements()``::
+
+ >>> from sqlalchemy import select, func
+ >>> stmt = select(func.json_array_elements('["one", "two"]').column_valued("x"))
+ >>> print(stmt)
+ SELECT x
+ FROM json_array_elements(:json_array_elements_1) AS x
+
+* ``unnest()`` - in order to generate a PostgreSQL ARRAY literal, the
+ :func:`_postgresql.array` construct may be used::
+
+
+ >>> from sqlalchemy.dialects.postgresql import array
+ >>> from sqlalchemy import select, func
+ >>> stmt = select(func.unnest(array([1, 2])).column_valued())
+ >>> print(stmt)
+ SELECT anon_1
+ FROM unnest(ARRAY[%(param_1)s, %(param_2)s]) AS anon_1
+
+ The function can of course be used against an existing table-bound column
+ that's of type :class:`_types.ARRAY`::
+
+ >>> from sqlalchemy import table, column, ARRAY, Integer
+ >>> from sqlalchemy import select, func
+ >>> t = table("t", column('value', ARRAY(Integer)))
+ >>> stmt = select(func.unnest(t.c.value).column_valued("unnested_value"))
+ >>> print(stmt)
+ SELECT unnested_value
+ FROM unnest(t.value) AS unnested_value
+
+.. seealso::
+
+ :ref:`tutorial_functions_column_valued` - in the :ref:`unified_tutorial`
+
+
+Row Types
+^^^^^^^^^
+
+Built-in support for rendering a ``ROW`` may be approximated using
+``func.ROW`` with the :attr:`_sa.func` namespace, or by using the
+:func:`_sql.tuple_` construct::
+
+ >>> from sqlalchemy import table, column, func, tuple_
+ >>> t = table("t", column("id"), column("fk"))
+ >>> stmt = t.select().where(
+ ... tuple_(t.c.id, t.c.fk) > (1,2)
+ ... ).where(
+ ... func.ROW(t.c.id, t.c.fk) < func.ROW(3, 7)
+ ... )
+ >>> print(stmt)
+ SELECT t.id, t.fk
+ FROM t
+ WHERE (t.id, t.fk) > (:param_1, :param_2) AND ROW(t.id, t.fk) < ROW(:ROW_1, :ROW_2)
+
+.. seealso::
+
+ `PostgreSQL Row Constructors
+ <https://www.postgresql.org/docs/current/sql-expressions.html#SQL-SYNTAX-ROW-CONSTRUCTORS>`_
+
+ `PostgreSQL Row Constructor Comparison
+ <https://www.postgresql.org/docs/current/functions-comparisons.html#ROW-WISE-COMPARISON>`_
+
+Table Types passed to Functions
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+PostgreSQL supports passing a table as an argument to a function, which it
+refers towards as a "record" type. SQLAlchemy :class:`_sql.FromClause` objects
+such as :class:`_schema.Table` support this special form using the
+:meth:`_sql.FromClause.table_valued` method, which is comparable to the
+:meth:`_functions.FunctionElement.table_valued` method except that the collection
+of columns is already established by that of the :class:`_sql.FromClause`
+itself::
+
+
+ >>> from sqlalchemy import table, column, func, select
+ >>> a = table( "a", column("id"), column("x"), column("y"))
+ >>> stmt = select(func.row_to_json(a.table_valued()))
+ >>> print(stmt)
+ SELECT row_to_json(a) AS row_to_json_1
+ FROM a
+
+.. versionadded:: 1.4.0b2
+
+
+ARRAY Types
+-----------
+
+The PostgreSQL dialect supports arrays, both as multidimensional column types
+as well as array literals:
+
+* :class:`_postgresql.ARRAY` - ARRAY datatype
+
+* :class:`_postgresql.array` - array literal
+
+* :func:`_postgresql.array_agg` - ARRAY_AGG SQL function
+
+* :class:`_postgresql.aggregate_order_by` - helper for PG's ORDER BY aggregate
+ function syntax.
+
+JSON Types
+----------
+
+The PostgreSQL dialect supports both JSON and JSONB datatypes, including
+psycopg2's native support and support for all of PostgreSQL's special
+operators:
+
+* :class:`_postgresql.JSON`
+
+* :class:`_postgresql.JSONB`
+
+HSTORE Type
+-----------
+
+The PostgreSQL HSTORE type as well as hstore literals are supported:
+
+* :class:`_postgresql.HSTORE` - HSTORE datatype
+
+* :class:`_postgresql.hstore` - hstore literal
+
+ENUM Types
+----------
+
+PostgreSQL has an independently creatable TYPE structure which is used
+to implement an enumerated type. This approach introduces significant
+complexity on the SQLAlchemy side in terms of when this type should be
+CREATED and DROPPED. The type object is also an independently reflectable
+entity. The following sections should be consulted:
+
+* :class:`_postgresql.ENUM` - DDL and typing support for ENUM.
+
+* :meth:`.PGInspector.get_enums` - retrieve a listing of current ENUM types
+
+* :meth:`.postgresql.ENUM.create` , :meth:`.postgresql.ENUM.drop` - individual
+ CREATE and DROP commands for ENUM.
+
+.. _postgresql_array_of_enum:
+
+Using ENUM with ARRAY
+^^^^^^^^^^^^^^^^^^^^^
+
+The combination of ENUM and ARRAY is not directly supported by backend
+DBAPIs at this time. Prior to SQLAlchemy 1.3.17, a special workaround
+was needed in order to allow this combination to work, described below.
+
+.. versionchanged:: 1.3.17 The combination of ENUM and ARRAY is now directly
+ handled by SQLAlchemy's implementation without any workarounds needed.
+
+.. sourcecode:: python
+
+ from sqlalchemy import TypeDecorator
+ from sqlalchemy.dialects.postgresql import ARRAY
+
+ class ArrayOfEnum(TypeDecorator):
+ impl = ARRAY
+
+ def bind_expression(self, bindvalue):
+ return sa.cast(bindvalue, self)
+
+ def result_processor(self, dialect, coltype):
+ super_rp = super(ArrayOfEnum, self).result_processor(
+ dialect, coltype)
+
+ def handle_raw_string(value):
+ inner = re.match(r"^{(.*)}$", value).group(1)
+ return inner.split(",") if inner else []
+
+ def process(value):
+ if value is None:
+ return None
+ return super_rp(handle_raw_string(value))
+ return process
+
+E.g.::
+
+ Table(
+ 'mydata', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', ArrayOfEnum(ENUM('a', 'b, 'c', name='myenum')))
+
+ )
+
+This type is not included as a built-in type as it would be incompatible
+with a DBAPI that suddenly decides to support ARRAY of ENUM directly in
+a new version.
+
+.. _postgresql_array_of_json:
+
+Using JSON/JSONB with ARRAY
+^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Similar to using ENUM, prior to SQLAlchemy 1.3.17, for an ARRAY of JSON/JSONB
+we need to render the appropriate CAST. Current psycopg2 drivers accommodate
+the result set correctly without any special steps.
+
+.. versionchanged:: 1.3.17 The combination of JSON/JSONB and ARRAY is now
+ directly handled by SQLAlchemy's implementation without any workarounds
+ needed.
+
+.. sourcecode:: python
+
+ class CastingArray(ARRAY):
+ def bind_expression(self, bindvalue):
+ return sa.cast(bindvalue, self)
+
+E.g.::
+
+ Table(
+ 'mydata', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', CastingArray(JSONB))
+ )
+
+
+""" # noqa: E501
+
+from collections import defaultdict
+import datetime as dt
+import re
+from uuid import UUID as _python_UUID
+
+from . import array as _array
+from . import dml
+from . import hstore as _hstore
+from . import json as _json
+from . import ranges as _ranges
+from ... import exc
+from ... import schema
+from ... import sql
+from ... import util
+from ...engine import characteristics
+from ...engine import default
+from ...engine import reflection
+from ...sql import coercions
+from ...sql import compiler
+from ...sql import elements
+from ...sql import expression
+from ...sql import roles
+from ...sql import sqltypes
+from ...sql import util as sql_util
+from ...sql.ddl import DDLBase
+from ...types import BIGINT
+from ...types import BOOLEAN
+from ...types import CHAR
+from ...types import DATE
+from ...types import FLOAT
+from ...types import INTEGER
+from ...types import NUMERIC
+from ...types import REAL
+from ...types import SMALLINT
+from ...types import TEXT
+from ...types import VARCHAR
+
+IDX_USING = re.compile(r"^(?:btree|hash|gist|gin|[\w_]+)$", re.I)
+
+AUTOCOMMIT_REGEXP = re.compile(
+ r"\s*(?:UPDATE|INSERT|CREATE|DELETE|DROP|ALTER|GRANT|REVOKE|"
+ "IMPORT FOREIGN SCHEMA|REFRESH MATERIALIZED VIEW|TRUNCATE)",
+ re.I | re.UNICODE,
+)
+
+RESERVED_WORDS = set(
+ [
+ "all",
+ "analyse",
+ "analyze",
+ "and",
+ "any",
+ "array",
+ "as",
+ "asc",
+ "asymmetric",
+ "both",
+ "case",
+ "cast",
+ "check",
+ "collate",
+ "column",
+ "constraint",
+ "create",
+ "current_catalog",
+ "current_date",
+ "current_role",
+ "current_time",
+ "current_timestamp",
+ "current_user",
+ "default",
+ "deferrable",
+ "desc",
+ "distinct",
+ "do",
+ "else",
+ "end",
+ "except",
+ "false",
+ "fetch",
+ "for",
+ "foreign",
+ "from",
+ "grant",
+ "group",
+ "having",
+ "in",
+ "initially",
+ "intersect",
+ "into",
+ "leading",
+ "limit",
+ "localtime",
+ "localtimestamp",
+ "new",
+ "not",
+ "null",
+ "of",
+ "off",
+ "offset",
+ "old",
+ "on",
+ "only",
+ "or",
+ "order",
+ "placing",
+ "primary",
+ "references",
+ "returning",
+ "select",
+ "session_user",
+ "some",
+ "symmetric",
+ "table",
+ "then",
+ "to",
+ "trailing",
+ "true",
+ "union",
+ "unique",
+ "user",
+ "using",
+ "variadic",
+ "when",
+ "where",
+ "window",
+ "with",
+ "authorization",
+ "between",
+ "binary",
+ "cross",
+ "current_schema",
+ "freeze",
+ "full",
+ "ilike",
+ "inner",
+ "is",
+ "isnull",
+ "join",
+ "left",
+ "like",
+ "natural",
+ "notnull",
+ "outer",
+ "over",
+ "overlaps",
+ "right",
+ "similar",
+ "verbose",
+ ]
+)
+
+_DECIMAL_TYPES = (1231, 1700)
+_FLOAT_TYPES = (700, 701, 1021, 1022)
+_INT_TYPES = (20, 21, 23, 26, 1005, 1007, 1016)
+
+
+class BYTEA(sqltypes.LargeBinary):
+ __visit_name__ = "BYTEA"
+
+
+class DOUBLE_PRECISION(sqltypes.Float):
+ __visit_name__ = "DOUBLE_PRECISION"
+
+
+class INET(sqltypes.TypeEngine):
+ __visit_name__ = "INET"
+
+
+PGInet = INET
+
+
+class CIDR(sqltypes.TypeEngine):
+ __visit_name__ = "CIDR"
+
+
+PGCidr = CIDR
+
+
+class MACADDR(sqltypes.TypeEngine):
+ __visit_name__ = "MACADDR"
+
+
+PGMacAddr = MACADDR
+
+
+class MONEY(sqltypes.TypeEngine):
+
+ r"""Provide the PostgreSQL MONEY type.
+
+ Depending on driver, result rows using this type may return a
+ string value which includes currency symbols.
+
+ For this reason, it may be preferable to provide conversion to a
+ numerically-based currency datatype using :class:`_types.TypeDecorator`::
+
+ import re
+ import decimal
+ from sqlalchemy import TypeDecorator
+
+ class NumericMoney(TypeDecorator):
+ impl = MONEY
+
+ def process_result_value(self, value: Any, dialect: Any) -> None:
+ if value is not None:
+ # adjust this for the currency and numeric
+ m = re.match(r"\$([\d.]+)", value)
+ if m:
+ value = decimal.Decimal(m.group(1))
+ return value
+
+ Alternatively, the conversion may be applied as a CAST using
+ the :meth:`_types.TypeDecorator.column_expression` method as follows::
+
+ import decimal
+ from sqlalchemy import cast
+ from sqlalchemy import TypeDecorator
+
+ class NumericMoney(TypeDecorator):
+ impl = MONEY
+
+ def column_expression(self, column: Any):
+ return cast(column, Numeric())
+
+ .. versionadded:: 1.2
+
+ """
+
+ __visit_name__ = "MONEY"
+
+
+class OID(sqltypes.TypeEngine):
+
+ """Provide the PostgreSQL OID type.
+
+ .. versionadded:: 0.9.5
+
+ """
+
+ __visit_name__ = "OID"
+
+
+class REGCLASS(sqltypes.TypeEngine):
+
+ """Provide the PostgreSQL REGCLASS type.
+
+ .. versionadded:: 1.2.7
+
+ """
+
+ __visit_name__ = "REGCLASS"
+
+
+class TIMESTAMP(sqltypes.TIMESTAMP):
+
+ """Provide the PostgreSQL TIMESTAMP type."""
+
+ __visit_name__ = "TIMESTAMP"
+
+ def __init__(self, timezone=False, precision=None):
+ """Construct a TIMESTAMP.
+
+ :param timezone: boolean value if timezone present, default False
+ :param precision: optional integer precision value
+
+ .. versionadded:: 1.4
+
+ """
+ super(TIMESTAMP, self).__init__(timezone=timezone)
+ self.precision = precision
+
+
+class TIME(sqltypes.TIME):
+
+ """PostgreSQL TIME type."""
+
+ __visit_name__ = "TIME"
+
+ def __init__(self, timezone=False, precision=None):
+ """Construct a TIME.
+
+ :param timezone: boolean value if timezone present, default False
+ :param precision: optional integer precision value
+
+ .. versionadded:: 1.4
+
+ """
+ super(TIME, self).__init__(timezone=timezone)
+ self.precision = precision
+
+
+class INTERVAL(sqltypes.NativeForEmulated, sqltypes._AbstractInterval):
+
+ """PostgreSQL INTERVAL type."""
+
+ __visit_name__ = "INTERVAL"
+ native = True
+
+ def __init__(self, precision=None, fields=None):
+ """Construct an INTERVAL.
+
+ :param precision: optional integer precision value
+ :param fields: string fields specifier. allows storage of fields
+ to be limited, such as ``"YEAR"``, ``"MONTH"``, ``"DAY TO HOUR"``,
+ etc.
+
+ .. versionadded:: 1.2
+
+ """
+ self.precision = precision
+ self.fields = fields
+
+ @classmethod
+ def adapt_emulated_to_native(cls, interval, **kw):
+ return INTERVAL(precision=interval.second_precision)
+
+ @property
+ def _type_affinity(self):
+ return sqltypes.Interval
+
+ def as_generic(self, allow_nulltype=False):
+ return sqltypes.Interval(native=True, second_precision=self.precision)
+
+ @property
+ def python_type(self):
+ return dt.timedelta
+
+ def coerce_compared_value(self, op, value):
+ return self
+
+
+PGInterval = INTERVAL
+
+
+class BIT(sqltypes.TypeEngine):
+ __visit_name__ = "BIT"
+
+ def __init__(self, length=None, varying=False):
+ if not varying:
+ # BIT without VARYING defaults to length 1
+ self.length = length or 1
+ else:
+ # but BIT VARYING can be unlimited-length, so no default
+ self.length = length
+ self.varying = varying
+
+
+PGBit = BIT
+
+
+class UUID(sqltypes.TypeEngine):
+
+ """PostgreSQL UUID type.
+
+ Represents the UUID column type, interpreting
+ data either as natively returned by the DBAPI
+ or as Python uuid objects.
+
+ The UUID type is currently known to work within the prominent DBAPI
+ drivers supported by SQLAlchemy including psycopg2, pg8000 and
+ asyncpg. Support for other DBAPI drivers may be incomplete or non-present.
+
+ """
+
+ __visit_name__ = "UUID"
+
+ def __init__(self, as_uuid=False):
+ """Construct a UUID type.
+
+
+ :param as_uuid=False: if True, values will be interpreted
+ as Python uuid objects, converting to/from string via the
+ DBAPI.
+
+ """
+ self.as_uuid = as_uuid
+
+ def coerce_compared_value(self, op, value):
+ """See :meth:`.TypeEngine.coerce_compared_value` for a description."""
+
+ if isinstance(value, util.string_types):
+ return self
+ else:
+ return super(UUID, self).coerce_compared_value(op, value)
+
+ def bind_processor(self, dialect):
+ if self.as_uuid:
+
+ def process(value):
+ if value is not None:
+ value = util.text_type(value)
+ return value
+
+ return process
+ else:
+ return None
+
+ def result_processor(self, dialect, coltype):
+ if self.as_uuid:
+
+ def process(value):
+ if value is not None:
+ value = _python_UUID(value)
+ return value
+
+ return process
+ else:
+ return None
+
+ def literal_processor(self, dialect):
+ if self.as_uuid:
+
+ def process(value):
+ if value is not None:
+ value = "'%s'::UUID" % value
+ return value
+
+ return process
+ else:
+
+ def process(value):
+ if value is not None:
+ value = "'%s'" % value
+ return value
+
+ return process
+
+ @property
+ def python_type(self):
+ return _python_UUID if self.as_uuid else str
+
+
+PGUuid = UUID
+
+
+class TSVECTOR(sqltypes.TypeEngine):
+
+ """The :class:`_postgresql.TSVECTOR` type implements the PostgreSQL
+ text search type TSVECTOR.
+
+ It can be used to do full text queries on natural language
+ documents.
+
+ .. versionadded:: 0.9.0
+
+ .. seealso::
+
+ :ref:`postgresql_match`
+
+ """
+
+ __visit_name__ = "TSVECTOR"
+
+
+class ENUM(sqltypes.NativeForEmulated, sqltypes.Enum):
+
+ """PostgreSQL ENUM type.
+
+ This is a subclass of :class:`_types.Enum` which includes
+ support for PG's ``CREATE TYPE`` and ``DROP TYPE``.
+
+ When the builtin type :class:`_types.Enum` is used and the
+ :paramref:`.Enum.native_enum` flag is left at its default of
+ True, the PostgreSQL backend will use a :class:`_postgresql.ENUM`
+ type as the implementation, so the special create/drop rules
+ will be used.
+
+ The create/drop behavior of ENUM is necessarily intricate, due to the
+ awkward relationship the ENUM type has in relationship to the
+ parent table, in that it may be "owned" by just a single table, or
+ may be shared among many tables.
+
+ When using :class:`_types.Enum` or :class:`_postgresql.ENUM`
+ in an "inline" fashion, the ``CREATE TYPE`` and ``DROP TYPE`` is emitted
+ corresponding to when the :meth:`_schema.Table.create` and
+ :meth:`_schema.Table.drop`
+ methods are called::
+
+ table = Table('sometable', metadata,
+ Column('some_enum', ENUM('a', 'b', 'c', name='myenum'))
+ )
+
+ table.create(engine) # will emit CREATE ENUM and CREATE TABLE
+ table.drop(engine) # will emit DROP TABLE and DROP ENUM
+
+ To use a common enumerated type between multiple tables, the best
+ practice is to declare the :class:`_types.Enum` or
+ :class:`_postgresql.ENUM` independently, and associate it with the
+ :class:`_schema.MetaData` object itself::
+
+ my_enum = ENUM('a', 'b', 'c', name='myenum', metadata=metadata)
+
+ t1 = Table('sometable_one', metadata,
+ Column('some_enum', myenum)
+ )
+
+ t2 = Table('sometable_two', metadata,
+ Column('some_enum', myenum)
+ )
+
+ When this pattern is used, care must still be taken at the level
+ of individual table creates. Emitting CREATE TABLE without also
+ specifying ``checkfirst=True`` will still cause issues::
+
+ t1.create(engine) # will fail: no such type 'myenum'
+
+ If we specify ``checkfirst=True``, the individual table-level create
+ operation will check for the ``ENUM`` and create if not exists::
+
+ # will check if enum exists, and emit CREATE TYPE if not
+ t1.create(engine, checkfirst=True)
+
+ When using a metadata-level ENUM type, the type will always be created
+ and dropped if either the metadata-wide create/drop is called::
+
+ metadata.create_all(engine) # will emit CREATE TYPE
+ metadata.drop_all(engine) # will emit DROP TYPE
+
+ The type can also be created and dropped directly::
+
+ my_enum.create(engine)
+ my_enum.drop(engine)
+
+ .. versionchanged:: 1.0.0 The PostgreSQL :class:`_postgresql.ENUM` type
+ now behaves more strictly with regards to CREATE/DROP. A metadata-level
+ ENUM type will only be created and dropped at the metadata level,
+ not the table level, with the exception of
+ ``table.create(checkfirst=True)``.
+ The ``table.drop()`` call will now emit a DROP TYPE for a table-level
+ enumerated type.
+
+ """
+
+ native_enum = True
+
+ def __init__(self, *enums, **kw):
+ """Construct an :class:`_postgresql.ENUM`.
+
+ Arguments are the same as that of
+ :class:`_types.Enum`, but also including
+ the following parameters.
+
+ :param create_type: Defaults to True.
+ Indicates that ``CREATE TYPE`` should be
+ emitted, after optionally checking for the
+ presence of the type, when the parent
+ table is being created; and additionally
+ that ``DROP TYPE`` is called when the table
+ is dropped. When ``False``, no check
+ will be performed and no ``CREATE TYPE``
+ or ``DROP TYPE`` is emitted, unless
+ :meth:`~.postgresql.ENUM.create`
+ or :meth:`~.postgresql.ENUM.drop`
+ are called directly.
+ Setting to ``False`` is helpful
+ when invoking a creation scheme to a SQL file
+ without access to the actual database -
+ the :meth:`~.postgresql.ENUM.create` and
+ :meth:`~.postgresql.ENUM.drop` methods can
+ be used to emit SQL to a target bind.
+
+ """
+ native_enum = kw.pop("native_enum", None)
+ if native_enum is False:
+ util.warn(
+ "the native_enum flag does not apply to the "
+ "sqlalchemy.dialects.postgresql.ENUM datatype; this type "
+ "always refers to ENUM. Use sqlalchemy.types.Enum for "
+ "non-native enum."
+ )
+ self.create_type = kw.pop("create_type", True)
+ super(ENUM, self).__init__(*enums, **kw)
+
+ @classmethod
+ def adapt_emulated_to_native(cls, impl, **kw):
+ """Produce a PostgreSQL native :class:`_postgresql.ENUM` from plain
+ :class:`.Enum`.
+
+ """
+ kw.setdefault("validate_strings", impl.validate_strings)
+ kw.setdefault("name", impl.name)
+ kw.setdefault("schema", impl.schema)
+ kw.setdefault("inherit_schema", impl.inherit_schema)
+ kw.setdefault("metadata", impl.metadata)
+ kw.setdefault("_create_events", False)
+ kw.setdefault("values_callable", impl.values_callable)
+ kw.setdefault("omit_aliases", impl._omit_aliases)
+ return cls(**kw)
+
+ def create(self, bind=None, checkfirst=True):
+ """Emit ``CREATE TYPE`` for this
+ :class:`_postgresql.ENUM`.
+
+ If the underlying dialect does not support
+ PostgreSQL CREATE TYPE, no action is taken.
+
+ :param bind: a connectable :class:`_engine.Engine`,
+ :class:`_engine.Connection`, or similar object to emit
+ SQL.
+ :param checkfirst: if ``True``, a query against
+ the PG catalog will be first performed to see
+ if the type does not exist already before
+ creating.
+
+ """
+ if not bind.dialect.supports_native_enum:
+ return
+
+ bind._run_ddl_visitor(self.EnumGenerator, self, checkfirst=checkfirst)
+
+ def drop(self, bind=None, checkfirst=True):
+ """Emit ``DROP TYPE`` for this
+ :class:`_postgresql.ENUM`.
+
+ If the underlying dialect does not support
+ PostgreSQL DROP TYPE, no action is taken.
+
+ :param bind: a connectable :class:`_engine.Engine`,
+ :class:`_engine.Connection`, or similar object to emit
+ SQL.
+ :param checkfirst: if ``True``, a query against
+ the PG catalog will be first performed to see
+ if the type actually exists before dropping.
+
+ """
+ if not bind.dialect.supports_native_enum:
+ return
+
+ bind._run_ddl_visitor(self.EnumDropper, self, checkfirst=checkfirst)
+
+ class EnumGenerator(DDLBase):
+ def __init__(self, dialect, connection, checkfirst=False, **kwargs):
+ super(ENUM.EnumGenerator, self).__init__(connection, **kwargs)
+ self.checkfirst = checkfirst
+
+ def _can_create_enum(self, enum):
+ if not self.checkfirst:
+ return True
+
+ effective_schema = self.connection.schema_for_object(enum)
+
+ return not self.connection.dialect.has_type(
+ self.connection, enum.name, schema=effective_schema
+ )
+
+ def visit_enum(self, enum):
+ if not self._can_create_enum(enum):
+ return
+
+ self.connection.execute(CreateEnumType(enum))
+
+ class EnumDropper(DDLBase):
+ def __init__(self, dialect, connection, checkfirst=False, **kwargs):
+ super(ENUM.EnumDropper, self).__init__(connection, **kwargs)
+ self.checkfirst = checkfirst
+
+ def _can_drop_enum(self, enum):
+ if not self.checkfirst:
+ return True
+
+ effective_schema = self.connection.schema_for_object(enum)
+
+ return self.connection.dialect.has_type(
+ self.connection, enum.name, schema=effective_schema
+ )
+
+ def visit_enum(self, enum):
+ if not self._can_drop_enum(enum):
+ return
+
+ self.connection.execute(DropEnumType(enum))
+
+ def _check_for_name_in_memos(self, checkfirst, kw):
+ """Look in the 'ddl runner' for 'memos', then
+ note our name in that collection.
+
+ This to ensure a particular named enum is operated
+ upon only once within any kind of create/drop
+ sequence without relying upon "checkfirst".
+
+ """
+ if not self.create_type:
+ return True
+ if "_ddl_runner" in kw:
+ ddl_runner = kw["_ddl_runner"]
+ if "_pg_enums" in ddl_runner.memo:
+ pg_enums = ddl_runner.memo["_pg_enums"]
+ else:
+ pg_enums = ddl_runner.memo["_pg_enums"] = set()
+ present = (self.schema, self.name) in pg_enums
+ pg_enums.add((self.schema, self.name))
+ return present
+ else:
+ return False
+
+ def _on_table_create(self, target, bind, checkfirst=False, **kw):
+ if (
+ checkfirst
+ or (
+ not self.metadata
+ and not kw.get("_is_metadata_operation", False)
+ )
+ ) and not self._check_for_name_in_memos(checkfirst, kw):
+ self.create(bind=bind, checkfirst=checkfirst)
+
+ def _on_table_drop(self, target, bind, checkfirst=False, **kw):
+ if (
+ not self.metadata
+ and not kw.get("_is_metadata_operation", False)
+ and not self._check_for_name_in_memos(checkfirst, kw)
+ ):
+ self.drop(bind=bind, checkfirst=checkfirst)
+
+ def _on_metadata_create(self, target, bind, checkfirst=False, **kw):
+ if not self._check_for_name_in_memos(checkfirst, kw):
+ self.create(bind=bind, checkfirst=checkfirst)
+
+ def _on_metadata_drop(self, target, bind, checkfirst=False, **kw):
+ if not self._check_for_name_in_memos(checkfirst, kw):
+ self.drop(bind=bind, checkfirst=checkfirst)
+
+
+class _ColonCast(elements.Cast):
+ __visit_name__ = "colon_cast"
+
+ def __init__(self, expression, type_):
+ self.type = type_
+ self.clause = expression
+ self.typeclause = elements.TypeClause(type_)
+
+
+colspecs = {
+ sqltypes.ARRAY: _array.ARRAY,
+ sqltypes.Interval: INTERVAL,
+ sqltypes.Enum: ENUM,
+ sqltypes.JSON.JSONPathType: _json.JSONPathType,
+ sqltypes.JSON: _json.JSON,
+}
+
+ischema_names = {
+ "_array": _array.ARRAY,
+ "hstore": _hstore.HSTORE,
+ "json": _json.JSON,
+ "jsonb": _json.JSONB,
+ "int4range": _ranges.INT4RANGE,
+ "int8range": _ranges.INT8RANGE,
+ "numrange": _ranges.NUMRANGE,
+ "daterange": _ranges.DATERANGE,
+ "tsrange": _ranges.TSRANGE,
+ "tstzrange": _ranges.TSTZRANGE,
+ "integer": INTEGER,
+ "bigint": BIGINT,
+ "smallint": SMALLINT,
+ "character varying": VARCHAR,
+ "character": CHAR,
+ '"char"': sqltypes.String,
+ "name": sqltypes.String,
+ "text": TEXT,
+ "numeric": NUMERIC,
+ "float": FLOAT,
+ "real": REAL,
+ "inet": INET,
+ "cidr": CIDR,
+ "uuid": UUID,
+ "bit": BIT,
+ "bit varying": BIT,
+ "macaddr": MACADDR,
+ "money": MONEY,
+ "oid": OID,
+ "regclass": REGCLASS,
+ "double precision": DOUBLE_PRECISION,
+ "timestamp": TIMESTAMP,
+ "timestamp with time zone": TIMESTAMP,
+ "timestamp without time zone": TIMESTAMP,
+ "time with time zone": TIME,
+ "time without time zone": TIME,
+ "date": DATE,
+ "time": TIME,
+ "bytea": BYTEA,
+ "boolean": BOOLEAN,
+ "interval": INTERVAL,
+ "tsvector": TSVECTOR,
+}
+
+
+class PGCompiler(compiler.SQLCompiler):
+ def visit_colon_cast(self, element, **kw):
+ return "%s::%s" % (
+ element.clause._compiler_dispatch(self, **kw),
+ element.typeclause._compiler_dispatch(self, **kw),
+ )
+
+ def visit_array(self, element, **kw):
+ return "ARRAY[%s]" % self.visit_clauselist(element, **kw)
+
+ def visit_slice(self, element, **kw):
+ return "%s:%s" % (
+ self.process(element.start, **kw),
+ self.process(element.stop, **kw),
+ )
+
+ def visit_json_getitem_op_binary(
+ self, binary, operator, _cast_applied=False, **kw
+ ):
+ if (
+ not _cast_applied
+ and binary.type._type_affinity is not sqltypes.JSON
+ ):
+ kw["_cast_applied"] = True
+ return self.process(sql.cast(binary, binary.type), **kw)
+
+ kw["eager_grouping"] = True
+
+ return self._generate_generic_binary(
+ binary, " -> " if not _cast_applied else " ->> ", **kw
+ )
+
+ def visit_json_path_getitem_op_binary(
+ self, binary, operator, _cast_applied=False, **kw
+ ):
+ if (
+ not _cast_applied
+ and binary.type._type_affinity is not sqltypes.JSON
+ ):
+ kw["_cast_applied"] = True
+ return self.process(sql.cast(binary, binary.type), **kw)
+
+ kw["eager_grouping"] = True
+ return self._generate_generic_binary(
+ binary, " #> " if not _cast_applied else " #>> ", **kw
+ )
+
+ def visit_getitem_binary(self, binary, operator, **kw):
+ return "%s[%s]" % (
+ self.process(binary.left, **kw),
+ self.process(binary.right, **kw),
+ )
+
+ def visit_aggregate_order_by(self, element, **kw):
+ return "%s ORDER BY %s" % (
+ self.process(element.target, **kw),
+ self.process(element.order_by, **kw),
+ )
+
+ def visit_match_op_binary(self, binary, operator, **kw):
+ if "postgresql_regconfig" in binary.modifiers:
+ regconfig = self.render_literal_value(
+ binary.modifiers["postgresql_regconfig"], sqltypes.STRINGTYPE
+ )
+ if regconfig:
+ return "%s @@ to_tsquery(%s, %s)" % (
+ self.process(binary.left, **kw),
+ regconfig,
+ self.process(binary.right, **kw),
+ )
+ return "%s @@ to_tsquery(%s)" % (
+ self.process(binary.left, **kw),
+ self.process(binary.right, **kw),
+ )
+
+ def visit_ilike_op_binary(self, binary, operator, **kw):
+ escape = binary.modifiers.get("escape", None)
+
+ return "%s ILIKE %s" % (
+ self.process(binary.left, **kw),
+ self.process(binary.right, **kw),
+ ) + (
+ " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
+ if escape
+ else ""
+ )
+
+ def visit_not_ilike_op_binary(self, binary, operator, **kw):
+ escape = binary.modifiers.get("escape", None)
+ return "%s NOT ILIKE %s" % (
+ self.process(binary.left, **kw),
+ self.process(binary.right, **kw),
+ ) + (
+ " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
+ if escape
+ else ""
+ )
+
+ def _regexp_match(self, base_op, binary, operator, kw):
+ flags = binary.modifiers["flags"]
+ if flags is None:
+ return self._generate_generic_binary(
+ binary, " %s " % base_op, **kw
+ )
+ if isinstance(flags, elements.BindParameter) and flags.value == "i":
+ return self._generate_generic_binary(
+ binary, " %s* " % base_op, **kw
+ )
+ flags = self.process(flags, **kw)
+ string = self.process(binary.left, **kw)
+ pattern = self.process(binary.right, **kw)
+ return "%s %s CONCAT('(?', %s, ')', %s)" % (
+ string,
+ base_op,
+ flags,
+ pattern,
+ )
+
+ def visit_regexp_match_op_binary(self, binary, operator, **kw):
+ return self._regexp_match("~", binary, operator, kw)
+
+ def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
+ return self._regexp_match("!~", binary, operator, kw)
+
+ def visit_regexp_replace_op_binary(self, binary, operator, **kw):
+ string = self.process(binary.left, **kw)
+ pattern = self.process(binary.right, **kw)
+ flags = binary.modifiers["flags"]
+ if flags is not None:
+ flags = self.process(flags, **kw)
+ replacement = self.process(binary.modifiers["replacement"], **kw)
+ if flags is None:
+ return "REGEXP_REPLACE(%s, %s, %s)" % (
+ string,
+ pattern,
+ replacement,
+ )
+ else:
+ return "REGEXP_REPLACE(%s, %s, %s, %s)" % (
+ string,
+ pattern,
+ replacement,
+ flags,
+ )
+
+ def visit_empty_set_expr(self, element_types):
+ # cast the empty set to the type we are comparing against. if
+ # we are comparing against the null type, pick an arbitrary
+ # datatype for the empty set
+ return "SELECT %s WHERE 1!=1" % (
+ ", ".join(
+ "CAST(NULL AS %s)"
+ % self.dialect.type_compiler.process(
+ INTEGER() if type_._isnull else type_
+ )
+ for type_ in element_types or [INTEGER()]
+ ),
+ )
+
+ def render_literal_value(self, value, type_):
+ value = super(PGCompiler, self).render_literal_value(value, type_)
+
+ if self.dialect._backslash_escapes:
+ value = value.replace("\\", "\\\\")
+ return value
+
+ def visit_sequence(self, seq, **kw):
+ return "nextval('%s')" % self.preparer.format_sequence(seq)
+
+ def limit_clause(self, select, **kw):
+ text = ""
+ if select._limit_clause is not None:
+ text += " \n LIMIT " + self.process(select._limit_clause, **kw)
+ if select._offset_clause is not None:
+ if select._limit_clause is None:
+ text += "\n LIMIT ALL"
+ text += " OFFSET " + self.process(select._offset_clause, **kw)
+ return text
+
+ def format_from_hint_text(self, sqltext, table, hint, iscrud):
+ if hint.upper() != "ONLY":
+ raise exc.CompileError("Unrecognized hint: %r" % hint)
+ return "ONLY " + sqltext
+
+ def get_select_precolumns(self, select, **kw):
+ # Do not call super().get_select_precolumns because
+ # it will warn/raise when distinct on is present
+ if select._distinct or select._distinct_on:
+ if select._distinct_on:
+ return (
+ "DISTINCT ON ("
+ + ", ".join(
+ [
+ self.process(col, **kw)
+ for col in select._distinct_on
+ ]
+ )
+ + ") "
+ )
+ else:
+ return "DISTINCT "
+ else:
+ return ""
+
+ def for_update_clause(self, select, **kw):
+
+ if select._for_update_arg.read:
+ if select._for_update_arg.key_share:
+ tmp = " FOR KEY SHARE"
+ else:
+ tmp = " FOR SHARE"
+ elif select._for_update_arg.key_share:
+ tmp = " FOR NO KEY UPDATE"
+ else:
+ tmp = " FOR UPDATE"
+
+ if select._for_update_arg.of:
+
+ tables = util.OrderedSet()
+ for c in select._for_update_arg.of:
+ tables.update(sql_util.surface_selectables_only(c))
+
+ tmp += " OF " + ", ".join(
+ self.process(table, ashint=True, use_schema=False, **kw)
+ for table in tables
+ )
+
+ if select._for_update_arg.nowait:
+ tmp += " NOWAIT"
+ if select._for_update_arg.skip_locked:
+ tmp += " SKIP LOCKED"
+
+ return tmp
+
+ def returning_clause(self, stmt, returning_cols):
+
+ columns = [
+ self._label_returning_column(stmt, c)
+ for c in expression._select_iterables(returning_cols)
+ ]
+
+ return "RETURNING " + ", ".join(columns)
+
+ def visit_substring_func(self, func, **kw):
+ s = self.process(func.clauses.clauses[0], **kw)
+ start = self.process(func.clauses.clauses[1], **kw)
+ if len(func.clauses.clauses) > 2:
+ length = self.process(func.clauses.clauses[2], **kw)
+ return "SUBSTRING(%s FROM %s FOR %s)" % (s, start, length)
+ else:
+ return "SUBSTRING(%s FROM %s)" % (s, start)
+
+ def _on_conflict_target(self, clause, **kw):
+
+ if clause.constraint_target is not None:
+ # target may be a name of an Index, UniqueConstraint or
+ # ExcludeConstraint. While there is a separate
+ # "max_identifier_length" for indexes, PostgreSQL uses the same
+ # length for all objects so we can use
+ # truncate_and_render_constraint_name
+ target_text = (
+ "ON CONSTRAINT %s"
+ % self.preparer.truncate_and_render_constraint_name(
+ clause.constraint_target
+ )
+ )
+ elif clause.inferred_target_elements is not None:
+ target_text = "(%s)" % ", ".join(
+ (
+ self.preparer.quote(c)
+ if isinstance(c, util.string_types)
+ else self.process(c, include_table=False, use_schema=False)
+ )
+ for c in clause.inferred_target_elements
+ )
+ if clause.inferred_target_whereclause is not None:
+ target_text += " WHERE %s" % self.process(
+ clause.inferred_target_whereclause,
+ include_table=False,
+ use_schema=False,
+ )
+ else:
+ target_text = ""
+
+ return target_text
+
+ @util.memoized_property
+ def _is_safe_for_fast_insert_values_helper(self):
+ # don't allow fast executemany if _post_values_clause is
+ # present and is not an OnConflictDoNothing. what this means
+ # concretely is that the
+ # "fast insert executemany helper" won't be used, in other
+ # words we won't convert "executemany()" of many parameter
+ # sets into a single INSERT with many elements in VALUES.
+ # We can't apply that optimization safely if for example the
+ # statement includes a clause like "ON CONFLICT DO UPDATE"
+
+ return self.insert_single_values_expr is not None and (
+ self.statement._post_values_clause is None
+ or isinstance(
+ self.statement._post_values_clause, dml.OnConflictDoNothing
+ )
+ )
+
+ def visit_on_conflict_do_nothing(self, on_conflict, **kw):
+
+ target_text = self._on_conflict_target(on_conflict, **kw)
+
+ if target_text:
+ return "ON CONFLICT %s DO NOTHING" % target_text
+ else:
+ return "ON CONFLICT DO NOTHING"
+
+ def visit_on_conflict_do_update(self, on_conflict, **kw):
+
+ clause = on_conflict
+
+ target_text = self._on_conflict_target(on_conflict, **kw)
+
+ action_set_ops = []
+
+ set_parameters = dict(clause.update_values_to_set)
+ # create a list of column assignment clauses as tuples
+
+ insert_statement = self.stack[-1]["selectable"]
+ cols = insert_statement.table.c
+ for c in cols:
+ col_key = c.key
+
+ if col_key in set_parameters:
+ value = set_parameters.pop(col_key)
+ elif c in set_parameters:
+ value = set_parameters.pop(c)
+ else:
+ continue
+
+ if coercions._is_literal(value):
+ value = elements.BindParameter(None, value, type_=c.type)
+
+ else:
+ if (
+ isinstance(value, elements.BindParameter)
+ and value.type._isnull
+ ):
+ value = value._clone()
+ value.type = c.type
+ value_text = self.process(value.self_group(), use_schema=False)
+
+ key_text = self.preparer.quote(c.name)
+ action_set_ops.append("%s = %s" % (key_text, value_text))
+
+ # check for names that don't match columns
+ if set_parameters:
+ util.warn(
+ "Additional column names not matching "
+ "any column keys in table '%s': %s"
+ % (
+ self.current_executable.table.name,
+ (", ".join("'%s'" % c for c in set_parameters)),
+ )
+ )
+ for k, v in set_parameters.items():
+ key_text = (
+ self.preparer.quote(k)
+ if isinstance(k, util.string_types)
+ else self.process(k, use_schema=False)
+ )
+ value_text = self.process(
+ coercions.expect(roles.ExpressionElementRole, v),
+ use_schema=False,
+ )
+ action_set_ops.append("%s = %s" % (key_text, value_text))
+
+ action_text = ", ".join(action_set_ops)
+ if clause.update_whereclause is not None:
+ action_text += " WHERE %s" % self.process(
+ clause.update_whereclause, include_table=True, use_schema=False
+ )
+
+ return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text)
+
+ def update_from_clause(
+ self, update_stmt, from_table, extra_froms, from_hints, **kw
+ ):
+ kw["asfrom"] = True
+ return "FROM " + ", ".join(
+ t._compiler_dispatch(self, fromhints=from_hints, **kw)
+ for t in extra_froms
+ )
+
+ def delete_extra_from_clause(
+ self, delete_stmt, from_table, extra_froms, from_hints, **kw
+ ):
+ """Render the DELETE .. USING clause specific to PostgreSQL."""
+ kw["asfrom"] = True
+ return "USING " + ", ".join(
+ t._compiler_dispatch(self, fromhints=from_hints, **kw)
+ for t in extra_froms
+ )
+
+ def fetch_clause(self, select, **kw):
+ # pg requires parens for non literal clauses. It's also required for
+ # bind parameters if a ::type casts is used by the driver (asyncpg),
+ # so it's easiest to just always add it
+ text = ""
+ if select._offset_clause is not None:
+ text += "\n OFFSET (%s) ROWS" % self.process(
+ select._offset_clause, **kw
+ )
+ if select._fetch_clause is not None:
+ text += "\n FETCH FIRST (%s)%s ROWS %s" % (
+ self.process(select._fetch_clause, **kw),
+ " PERCENT" if select._fetch_clause_options["percent"] else "",
+ "WITH TIES"
+ if select._fetch_clause_options["with_ties"]
+ else "ONLY",
+ )
+ return text
+
+
+class PGDDLCompiler(compiler.DDLCompiler):
+ def get_column_specification(self, column, **kwargs):
+
+ colspec = self.preparer.format_column(column)
+ impl_type = column.type.dialect_impl(self.dialect)
+ if isinstance(impl_type, sqltypes.TypeDecorator):
+ impl_type = impl_type.impl
+
+ has_identity = (
+ column.identity is not None
+ and self.dialect.supports_identity_columns
+ )
+
+ if (
+ column.primary_key
+ and column is column.table._autoincrement_column
+ and (
+ self.dialect.supports_smallserial
+ or not isinstance(impl_type, sqltypes.SmallInteger)
+ )
+ and not has_identity
+ and (
+ column.default is None
+ or (
+ isinstance(column.default, schema.Sequence)
+ and column.default.optional
+ )
+ )
+ ):
+ if isinstance(impl_type, sqltypes.BigInteger):
+ colspec += " BIGSERIAL"
+ elif isinstance(impl_type, sqltypes.SmallInteger):
+ colspec += " SMALLSERIAL"
+ else:
+ colspec += " SERIAL"
+ else:
+ colspec += " " + self.dialect.type_compiler.process(
+ column.type,
+ type_expression=column,
+ identifier_preparer=self.preparer,
+ )
+ default = self.get_column_default_string(column)
+ if default is not None:
+ colspec += " DEFAULT " + default
+
+ if column.computed is not None:
+ colspec += " " + self.process(column.computed)
+ if has_identity:
+ colspec += " " + self.process(column.identity)
+
+ if not column.nullable and not has_identity:
+ colspec += " NOT NULL"
+ elif column.nullable and has_identity:
+ colspec += " NULL"
+ return colspec
+
+ def _define_constraint_validity(self, constraint):
+ not_valid = constraint.dialect_options["postgresql"]["not_valid"]
+ return " NOT VALID" if not_valid else ""
+
+ def visit_check_constraint(self, constraint):
+ if constraint._type_bound:
+ typ = list(constraint.columns)[0].type
+ if (
+ isinstance(typ, sqltypes.ARRAY)
+ and isinstance(typ.item_type, sqltypes.Enum)
+ and not typ.item_type.native_enum
+ ):
+ raise exc.CompileError(
+ "PostgreSQL dialect cannot produce the CHECK constraint "
+ "for ARRAY of non-native ENUM; please specify "
+ "create_constraint=False on this Enum datatype."
+ )
+
+ text = super(PGDDLCompiler, self).visit_check_constraint(constraint)
+ text += self._define_constraint_validity(constraint)
+ return text
+
+ def visit_foreign_key_constraint(self, constraint):
+ text = super(PGDDLCompiler, self).visit_foreign_key_constraint(
+ constraint
+ )
+ text += self._define_constraint_validity(constraint)
+ return text
+
+ def visit_drop_table_comment(self, drop):
+ return "COMMENT ON TABLE %s IS NULL" % self.preparer.format_table(
+ drop.element
+ )
+
+ def visit_create_enum_type(self, create):
+ type_ = create.element
+
+ return "CREATE TYPE %s AS ENUM (%s)" % (
+ self.preparer.format_type(type_),
+ ", ".join(
+ self.sql_compiler.process(sql.literal(e), literal_binds=True)
+ for e in type_.enums
+ ),
+ )
+
+ def visit_drop_enum_type(self, drop):
+ type_ = drop.element
+
+ return "DROP TYPE %s" % (self.preparer.format_type(type_))
+
+ def visit_create_index(self, create):
+ preparer = self.preparer
+ index = create.element
+ self._verify_index_table(index)
+ text = "CREATE "
+ if index.unique:
+ text += "UNIQUE "
+ text += "INDEX "
+
+ if self.dialect._supports_create_index_concurrently:
+ concurrently = index.dialect_options["postgresql"]["concurrently"]
+ if concurrently:
+ text += "CONCURRENTLY "
+
+ if create.if_not_exists:
+ text += "IF NOT EXISTS "
+
+ text += "%s ON %s " % (
+ self._prepared_index_name(index, include_schema=False),
+ preparer.format_table(index.table),
+ )
+
+ using = index.dialect_options["postgresql"]["using"]
+ if using:
+ text += (
+ "USING %s "
+ % self.preparer.validate_sql_phrase(using, IDX_USING).lower()
+ )
+
+ ops = index.dialect_options["postgresql"]["ops"]
+ text += "(%s)" % (
+ ", ".join(
+ [
+ self.sql_compiler.process(
+ expr.self_group()
+ if not isinstance(expr, expression.ColumnClause)
+ else expr,
+ include_table=False,
+ literal_binds=True,
+ )
+ + (
+ (" " + ops[expr.key])
+ if hasattr(expr, "key") and expr.key in ops
+ else ""
+ )
+ for expr in index.expressions
+ ]
+ )
+ )
+
+ includeclause = index.dialect_options["postgresql"]["include"]
+ if includeclause:
+ inclusions = [
+ index.table.c[col]
+ if isinstance(col, util.string_types)
+ else col
+ for col in includeclause
+ ]
+ text += " INCLUDE (%s)" % ", ".join(
+ [preparer.quote(c.name) for c in inclusions]
+ )
+
+ withclause = index.dialect_options["postgresql"]["with"]
+ if withclause:
+ text += " WITH (%s)" % (
+ ", ".join(
+ [
+ "%s = %s" % storage_parameter
+ for storage_parameter in withclause.items()
+ ]
+ )
+ )
+
+ tablespace_name = index.dialect_options["postgresql"]["tablespace"]
+ if tablespace_name:
+ text += " TABLESPACE %s" % preparer.quote(tablespace_name)
+
+ whereclause = index.dialect_options["postgresql"]["where"]
+ if whereclause is not None:
+ whereclause = coercions.expect(
+ roles.DDLExpressionRole, whereclause
+ )
+
+ where_compiled = self.sql_compiler.process(
+ whereclause, include_table=False, literal_binds=True
+ )
+ text += " WHERE " + where_compiled
+
+ return text
+
+ def visit_drop_index(self, drop):
+ index = drop.element
+
+ text = "\nDROP INDEX "
+
+ if self.dialect._supports_drop_index_concurrently:
+ concurrently = index.dialect_options["postgresql"]["concurrently"]
+ if concurrently:
+ text += "CONCURRENTLY "
+
+ if drop.if_exists:
+ text += "IF EXISTS "
+
+ text += self._prepared_index_name(index, include_schema=True)
+ return text
+
+ def visit_exclude_constraint(self, constraint, **kw):
+ text = ""
+ if constraint.name is not None:
+ text += "CONSTRAINT %s " % self.preparer.format_constraint(
+ constraint
+ )
+ elements = []
+ for expr, name, op in constraint._render_exprs:
+ kw["include_table"] = False
+ exclude_element = self.sql_compiler.process(expr, **kw) + (
+ (" " + constraint.ops[expr.key])
+ if hasattr(expr, "key") and expr.key in constraint.ops
+ else ""
+ )
+
+ elements.append("%s WITH %s" % (exclude_element, op))
+ text += "EXCLUDE USING %s (%s)" % (
+ self.preparer.validate_sql_phrase(
+ constraint.using, IDX_USING
+ ).lower(),
+ ", ".join(elements),
+ )
+ if constraint.where is not None:
+ text += " WHERE (%s)" % self.sql_compiler.process(
+ constraint.where, literal_binds=True
+ )
+ text += self.define_constraint_deferrability(constraint)
+ return text
+
+ def post_create_table(self, table):
+ table_opts = []
+ pg_opts = table.dialect_options["postgresql"]
+
+ inherits = pg_opts.get("inherits")
+ if inherits is not None:
+ if not isinstance(inherits, (list, tuple)):
+ inherits = (inherits,)
+ table_opts.append(
+ "\n INHERITS ( "
+ + ", ".join(self.preparer.quote(name) for name in inherits)
+ + " )"
+ )
+
+ if pg_opts["partition_by"]:
+ table_opts.append("\n PARTITION BY %s" % pg_opts["partition_by"])
+
+ if pg_opts["with_oids"] is True:
+ table_opts.append("\n WITH OIDS")
+ elif pg_opts["with_oids"] is False:
+ table_opts.append("\n WITHOUT OIDS")
+
+ if pg_opts["on_commit"]:
+ on_commit_options = pg_opts["on_commit"].replace("_", " ").upper()
+ table_opts.append("\n ON COMMIT %s" % on_commit_options)
+
+ if pg_opts["tablespace"]:
+ tablespace_name = pg_opts["tablespace"]
+ table_opts.append(
+ "\n TABLESPACE %s" % self.preparer.quote(tablespace_name)
+ )
+
+ return "".join(table_opts)
+
+ def visit_computed_column(self, generated):
+ if generated.persisted is False:
+ raise exc.CompileError(
+ "PostrgreSQL computed columns do not support 'virtual' "
+ "persistence; set the 'persisted' flag to None or True for "
+ "PostgreSQL support."
+ )
+
+ return "GENERATED ALWAYS AS (%s) STORED" % self.sql_compiler.process(
+ generated.sqltext, include_table=False, literal_binds=True
+ )
+
+ def visit_create_sequence(self, create, **kw):
+ prefix = None
+ if create.element.data_type is not None:
+ prefix = " AS %s" % self.type_compiler.process(
+ create.element.data_type
+ )
+
+ return super(PGDDLCompiler, self).visit_create_sequence(
+ create, prefix=prefix, **kw
+ )
+
+
+class PGTypeCompiler(compiler.GenericTypeCompiler):
+ def visit_TSVECTOR(self, type_, **kw):
+ return "TSVECTOR"
+
+ def visit_INET(self, type_, **kw):
+ return "INET"
+
+ def visit_CIDR(self, type_, **kw):
+ return "CIDR"
+
+ def visit_MACADDR(self, type_, **kw):
+ return "MACADDR"
+
+ def visit_MONEY(self, type_, **kw):
+ return "MONEY"
+
+ def visit_OID(self, type_, **kw):
+ return "OID"
+
+ def visit_REGCLASS(self, type_, **kw):
+ return "REGCLASS"
+
+ def visit_FLOAT(self, type_, **kw):
+ if not type_.precision:
+ return "FLOAT"
+ else:
+ return "FLOAT(%(precision)s)" % {"precision": type_.precision}
+
+ def visit_DOUBLE_PRECISION(self, type_, **kw):
+ return "DOUBLE PRECISION"
+
+ def visit_BIGINT(self, type_, **kw):
+ return "BIGINT"
+
+ def visit_HSTORE(self, type_, **kw):
+ return "HSTORE"
+
+ def visit_JSON(self, type_, **kw):
+ return "JSON"
+
+ def visit_JSONB(self, type_, **kw):
+ return "JSONB"
+
+ def visit_INT4RANGE(self, type_, **kw):
+ return "INT4RANGE"
+
+ def visit_INT8RANGE(self, type_, **kw):
+ return "INT8RANGE"
+
+ def visit_NUMRANGE(self, type_, **kw):
+ return "NUMRANGE"
+
+ def visit_DATERANGE(self, type_, **kw):
+ return "DATERANGE"
+
+ def visit_TSRANGE(self, type_, **kw):
+ return "TSRANGE"
+
+ def visit_TSTZRANGE(self, type_, **kw):
+ return "TSTZRANGE"
+
+ def visit_datetime(self, type_, **kw):
+ return self.visit_TIMESTAMP(type_, **kw)
+
+ def visit_enum(self, type_, **kw):
+ if not type_.native_enum or not self.dialect.supports_native_enum:
+ return super(PGTypeCompiler, self).visit_enum(type_, **kw)
+ else:
+ return self.visit_ENUM(type_, **kw)
+
+ def visit_ENUM(self, type_, identifier_preparer=None, **kw):
+ if identifier_preparer is None:
+ identifier_preparer = self.dialect.identifier_preparer
+ return identifier_preparer.format_type(type_)
+
+ def visit_TIMESTAMP(self, type_, **kw):
+ return "TIMESTAMP%s %s" % (
+ "(%d)" % type_.precision
+ if getattr(type_, "precision", None) is not None
+ else "",
+ (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE",
+ )
+
+ def visit_TIME(self, type_, **kw):
+ return "TIME%s %s" % (
+ "(%d)" % type_.precision
+ if getattr(type_, "precision", None) is not None
+ else "",
+ (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE",
+ )
+
+ def visit_INTERVAL(self, type_, **kw):
+ text = "INTERVAL"
+ if type_.fields is not None:
+ text += " " + type_.fields
+ if type_.precision is not None:
+ text += " (%d)" % type_.precision
+ return text
+
+ def visit_BIT(self, type_, **kw):
+ if type_.varying:
+ compiled = "BIT VARYING"
+ if type_.length is not None:
+ compiled += "(%d)" % type_.length
+ else:
+ compiled = "BIT(%d)" % type_.length
+ return compiled
+
+ def visit_UUID(self, type_, **kw):
+ return "UUID"
+
+ def visit_large_binary(self, type_, **kw):
+ return self.visit_BYTEA(type_, **kw)
+
+ def visit_BYTEA(self, type_, **kw):
+ return "BYTEA"
+
+ def visit_ARRAY(self, type_, **kw):
+
+ inner = self.process(type_.item_type, **kw)
+ return re.sub(
+ r"((?: COLLATE.*)?)$",
+ (
+ r"%s\1"
+ % (
+ "[]"
+ * (type_.dimensions if type_.dimensions is not None else 1)
+ )
+ ),
+ inner,
+ count=1,
+ )
+
+
+class PGIdentifierPreparer(compiler.IdentifierPreparer):
+
+ reserved_words = RESERVED_WORDS
+
+ def _unquote_identifier(self, value):
+ if value[0] == self.initial_quote:
+ value = value[1:-1].replace(
+ self.escape_to_quote, self.escape_quote
+ )
+ return value
+
+ def format_type(self, type_, use_schema=True):
+ if not type_.name:
+ raise exc.CompileError("PostgreSQL ENUM type requires a name.")
+
+ name = self.quote(type_.name)
+ effective_schema = self.schema_for_object(type_)
+
+ if (
+ not self.omit_schema
+ and use_schema
+ and effective_schema is not None
+ ):
+ name = self.quote_schema(effective_schema) + "." + name
+ return name
+
+
+class PGInspector(reflection.Inspector):
+ def get_table_oid(self, table_name, schema=None):
+ """Return the OID for the given table name."""
+
+ with self._operation_context() as conn:
+ return self.dialect.get_table_oid(
+ conn, table_name, schema, info_cache=self.info_cache
+ )
+
+ def get_enums(self, schema=None):
+ """Return a list of ENUM objects.
+
+ Each member is a dictionary containing these fields:
+
+ * name - name of the enum
+ * schema - the schema name for the enum.
+ * visible - boolean, whether or not this enum is visible
+ in the default search path.
+ * labels - a list of string labels that apply to the enum.
+
+ :param schema: schema name. If None, the default schema
+ (typically 'public') is used. May also be set to '*' to
+ indicate load enums for all schemas.
+
+ .. versionadded:: 1.0.0
+
+ """
+ schema = schema or self.default_schema_name
+ with self._operation_context() as conn:
+ return self.dialect._load_enums(conn, schema)
+
+ def get_foreign_table_names(self, schema=None):
+ """Return a list of FOREIGN TABLE names.
+
+ Behavior is similar to that of
+ :meth:`_reflection.Inspector.get_table_names`,
+ except that the list is limited to those tables that report a
+ ``relkind`` value of ``f``.
+
+ .. versionadded:: 1.0.0
+
+ """
+ schema = schema or self.default_schema_name
+ with self._operation_context() as conn:
+ return self.dialect._get_foreign_table_names(conn, schema)
+
+ def get_view_names(self, schema=None, include=("plain", "materialized")):
+ """Return all view names in `schema`.
+
+ :param schema: Optional, retrieve names from a non-default schema.
+ For special quoting, use :class:`.quoted_name`.
+
+ :param include: specify which types of views to return. Passed
+ as a string value (for a single type) or a tuple (for any number
+ of types). Defaults to ``('plain', 'materialized')``.
+
+ .. versionadded:: 1.1
+
+ """
+
+ with self._operation_context() as conn:
+ return self.dialect.get_view_names(
+ conn, schema, info_cache=self.info_cache, include=include
+ )
+
+
+class CreateEnumType(schema._CreateDropBase):
+ __visit_name__ = "create_enum_type"
+
+
+class DropEnumType(schema._CreateDropBase):
+ __visit_name__ = "drop_enum_type"
+
+
+class PGExecutionContext(default.DefaultExecutionContext):
+ def fire_sequence(self, seq, type_):
+ return self._execute_scalar(
+ (
+ "select nextval('%s')"
+ % self.identifier_preparer.format_sequence(seq)
+ ),
+ type_,
+ )
+
+ def get_insert_default(self, column):
+ if column.primary_key and column is column.table._autoincrement_column:
+ if column.server_default and column.server_default.has_argument:
+
+ # pre-execute passive defaults on primary key columns
+ return self._execute_scalar(
+ "select %s" % column.server_default.arg, column.type
+ )
+
+ elif column.default is None or (
+ column.default.is_sequence and column.default.optional
+ ):
+ # execute the sequence associated with a SERIAL primary
+ # key column. for non-primary-key SERIAL, the ID just
+ # generates server side.
+
+ try:
+ seq_name = column._postgresql_seq_name
+ except AttributeError:
+ tab = column.table.name
+ col = column.name
+ tab = tab[0 : 29 + max(0, (29 - len(col)))]
+ col = col[0 : 29 + max(0, (29 - len(tab)))]
+ name = "%s_%s_seq" % (tab, col)
+ column._postgresql_seq_name = seq_name = name
+
+ if column.table is not None:
+ effective_schema = self.connection.schema_for_object(
+ column.table
+ )
+ else:
+ effective_schema = None
+
+ if effective_schema is not None:
+ exc = 'select nextval(\'"%s"."%s"\')' % (
+ effective_schema,
+ seq_name,
+ )
+ else:
+ exc = "select nextval('\"%s\"')" % (seq_name,)
+
+ return self._execute_scalar(exc, column.type)
+
+ return super(PGExecutionContext, self).get_insert_default(column)
+
+ def should_autocommit_text(self, statement):
+ return AUTOCOMMIT_REGEXP.match(statement)
+
+
+class PGReadOnlyConnectionCharacteristic(
+ characteristics.ConnectionCharacteristic
+):
+ transactional = True
+
+ def reset_characteristic(self, dialect, dbapi_conn):
+ dialect.set_readonly(dbapi_conn, False)
+
+ def set_characteristic(self, dialect, dbapi_conn, value):
+ dialect.set_readonly(dbapi_conn, value)
+
+ def get_characteristic(self, dialect, dbapi_conn):
+ return dialect.get_readonly(dbapi_conn)
+
+
+class PGDeferrableConnectionCharacteristic(
+ characteristics.ConnectionCharacteristic
+):
+ transactional = True
+
+ def reset_characteristic(self, dialect, dbapi_conn):
+ dialect.set_deferrable(dbapi_conn, False)
+
+ def set_characteristic(self, dialect, dbapi_conn, value):
+ dialect.set_deferrable(dbapi_conn, value)
+
+ def get_characteristic(self, dialect, dbapi_conn):
+ return dialect.get_deferrable(dbapi_conn)
+
+
+class PGDialect(default.DefaultDialect):
+ name = "postgresql"
+ supports_statement_cache = True
+ supports_alter = True
+ max_identifier_length = 63
+ supports_sane_rowcount = True
+
+ supports_native_enum = True
+ supports_native_boolean = True
+ supports_smallserial = True
+
+ supports_sequences = True
+ sequences_optional = True
+ preexecute_autoincrement_sequences = True
+ postfetch_lastrowid = False
+
+ supports_comments = True
+ supports_default_values = True
+
+ supports_default_metavalue = True
+
+ supports_empty_insert = False
+ supports_multivalues_insert = True
+ supports_identity_columns = True
+
+ default_paramstyle = "pyformat"
+ ischema_names = ischema_names
+ colspecs = colspecs
+
+ statement_compiler = PGCompiler
+ ddl_compiler = PGDDLCompiler
+ type_compiler = PGTypeCompiler
+ preparer = PGIdentifierPreparer
+ execution_ctx_cls = PGExecutionContext
+ inspector = PGInspector
+ isolation_level = None
+
+ implicit_returning = True
+ full_returning = True
+
+ connection_characteristics = (
+ default.DefaultDialect.connection_characteristics
+ )
+ connection_characteristics = connection_characteristics.union(
+ {
+ "postgresql_readonly": PGReadOnlyConnectionCharacteristic(),
+ "postgresql_deferrable": PGDeferrableConnectionCharacteristic(),
+ }
+ )
+
+ construct_arguments = [
+ (
+ schema.Index,
+ {
+ "using": False,
+ "include": None,
+ "where": None,
+ "ops": {},
+ "concurrently": False,
+ "with": {},
+ "tablespace": None,
+ },
+ ),
+ (
+ schema.Table,
+ {
+ "ignore_search_path": False,
+ "tablespace": None,
+ "partition_by": None,
+ "with_oids": None,
+ "on_commit": None,
+ "inherits": None,
+ },
+ ),
+ (
+ schema.CheckConstraint,
+ {
+ "not_valid": False,
+ },
+ ),
+ (
+ schema.ForeignKeyConstraint,
+ {
+ "not_valid": False,
+ },
+ ),
+ ]
+
+ reflection_options = ("postgresql_ignore_search_path",)
+
+ _backslash_escapes = True
+ _supports_create_index_concurrently = True
+ _supports_drop_index_concurrently = True
+
+ def __init__(
+ self,
+ isolation_level=None,
+ json_serializer=None,
+ json_deserializer=None,
+ **kwargs
+ ):
+ default.DefaultDialect.__init__(self, **kwargs)
+
+ # the isolation_level parameter to the PGDialect itself is legacy.
+ # still works however the execution_options method is the one that
+ # is documented.
+ self.isolation_level = isolation_level
+ self._json_deserializer = json_deserializer
+ self._json_serializer = json_serializer
+
+ def initialize(self, connection):
+ super(PGDialect, self).initialize(connection)
+
+ if self.server_version_info <= (8, 2):
+ self.full_returning = self.implicit_returning = False
+
+ self.supports_native_enum = self.server_version_info >= (8, 3)
+ if not self.supports_native_enum:
+ self.colspecs = self.colspecs.copy()
+ # pop base Enum type
+ self.colspecs.pop(sqltypes.Enum, None)
+ # psycopg2, others may have placed ENUM here as well
+ self.colspecs.pop(ENUM, None)
+
+ # https://www.postgresql.org/docs/9.3/static/release-9-2.html#AEN116689
+ self.supports_smallserial = self.server_version_info >= (9, 2)
+
+ if self.server_version_info < (8, 2):
+ self._backslash_escapes = False
+ else:
+ # ensure this query is not emitted on server version < 8.2
+ # as it will fail
+ std_string = connection.exec_driver_sql(
+ "show standard_conforming_strings"
+ ).scalar()
+ self._backslash_escapes = std_string == "off"
+
+ self._supports_create_index_concurrently = (
+ self.server_version_info >= (8, 2)
+ )
+ self._supports_drop_index_concurrently = self.server_version_info >= (
+ 9,
+ 2,
+ )
+ self.supports_identity_columns = self.server_version_info >= (10,)
+
+ def on_connect(self):
+ if self.isolation_level is not None:
+
+ def connect(conn):
+ self.set_isolation_level(conn, self.isolation_level)
+
+ return connect
+ else:
+ return None
+
+ _isolation_lookup = set(
+ [
+ "SERIALIZABLE",
+ "READ UNCOMMITTED",
+ "READ COMMITTED",
+ "REPEATABLE READ",
+ ]
+ )
+
+ def set_isolation_level(self, connection, level):
+ level = level.replace("_", " ")
+ if level not in self._isolation_lookup:
+ raise exc.ArgumentError(
+ "Invalid value '%s' for isolation_level. "
+ "Valid isolation levels for %s are %s"
+ % (level, self.name, ", ".join(self._isolation_lookup))
+ )
+ cursor = connection.cursor()
+ cursor.execute(
+ "SET SESSION CHARACTERISTICS AS TRANSACTION "
+ "ISOLATION LEVEL %s" % level
+ )
+ cursor.execute("COMMIT")
+ cursor.close()
+
+ def get_isolation_level(self, connection):
+ cursor = connection.cursor()
+ cursor.execute("show transaction isolation level")
+ val = cursor.fetchone()[0]
+ cursor.close()
+ return val.upper()
+
+ def set_readonly(self, connection, value):
+ raise NotImplementedError()
+
+ def get_readonly(self, connection):
+ raise NotImplementedError()
+
+ def set_deferrable(self, connection, value):
+ raise NotImplementedError()
+
+ def get_deferrable(self, connection):
+ raise NotImplementedError()
+
+ def do_begin_twophase(self, connection, xid):
+ self.do_begin(connection.connection)
+
+ def do_prepare_twophase(self, connection, xid):
+ connection.exec_driver_sql("PREPARE TRANSACTION '%s'" % xid)
+
+ def do_rollback_twophase(
+ self, connection, xid, is_prepared=True, recover=False
+ ):
+ if is_prepared:
+ if recover:
+ # FIXME: ugly hack to get out of transaction
+ # context when committing recoverable transactions
+ # Must find out a way how to make the dbapi not
+ # open a transaction.
+ connection.exec_driver_sql("ROLLBACK")
+ connection.exec_driver_sql("ROLLBACK PREPARED '%s'" % xid)
+ connection.exec_driver_sql("BEGIN")
+ self.do_rollback(connection.connection)
+ else:
+ self.do_rollback(connection.connection)
+
+ def do_commit_twophase(
+ self, connection, xid, is_prepared=True, recover=False
+ ):
+ if is_prepared:
+ if recover:
+ connection.exec_driver_sql("ROLLBACK")
+ connection.exec_driver_sql("COMMIT PREPARED '%s'" % xid)
+ connection.exec_driver_sql("BEGIN")
+ self.do_rollback(connection.connection)
+ else:
+ self.do_commit(connection.connection)
+
+ def do_recover_twophase(self, connection):
+ resultset = connection.execute(
+ sql.text("SELECT gid FROM pg_prepared_xacts")
+ )
+ return [row[0] for row in resultset]
+
+ def _get_default_schema_name(self, connection):
+ return connection.exec_driver_sql("select current_schema()").scalar()
+
+ def has_schema(self, connection, schema):
+ query = (
+ "select nspname from pg_namespace " "where lower(nspname)=:schema"
+ )
+ cursor = connection.execute(
+ sql.text(query).bindparams(
+ sql.bindparam(
+ "schema",
+ util.text_type(schema.lower()),
+ type_=sqltypes.Unicode,
+ )
+ )
+ )
+
+ return bool(cursor.first())
+
+ def has_table(self, connection, table_name, schema=None):
+ self._ensure_has_table_connection(connection)
+ # seems like case gets folded in pg_class...
+ if schema is None:
+ cursor = connection.execute(
+ sql.text(
+ "select relname from pg_class c join pg_namespace n on "
+ "n.oid=c.relnamespace where "
+ "pg_catalog.pg_table_is_visible(c.oid) "
+ "and relname=:name"
+ ).bindparams(
+ sql.bindparam(
+ "name",
+ util.text_type(table_name),
+ type_=sqltypes.Unicode,
+ )
+ )
+ )
+ else:
+ cursor = connection.execute(
+ sql.text(
+ "select relname from pg_class c join pg_namespace n on "
+ "n.oid=c.relnamespace where n.nspname=:schema and "
+ "relname=:name"
+ ).bindparams(
+ sql.bindparam(
+ "name",
+ util.text_type(table_name),
+ type_=sqltypes.Unicode,
+ ),
+ sql.bindparam(
+ "schema",
+ util.text_type(schema),
+ type_=sqltypes.Unicode,
+ ),
+ )
+ )
+ return bool(cursor.first())
+
+ def has_sequence(self, connection, sequence_name, schema=None):
+ if schema is None:
+ schema = self.default_schema_name
+ cursor = connection.execute(
+ sql.text(
+ "SELECT relname FROM pg_class c join pg_namespace n on "
+ "n.oid=c.relnamespace where relkind='S' and "
+ "n.nspname=:schema and relname=:name"
+ ).bindparams(
+ sql.bindparam(
+ "name",
+ util.text_type(sequence_name),
+ type_=sqltypes.Unicode,
+ ),
+ sql.bindparam(
+ "schema",
+ util.text_type(schema),
+ type_=sqltypes.Unicode,
+ ),
+ )
+ )
+
+ return bool(cursor.first())
+
+ def has_type(self, connection, type_name, schema=None):
+ if schema is not None:
+ query = """
+ SELECT EXISTS (
+ SELECT * FROM pg_catalog.pg_type t, pg_catalog.pg_namespace n
+ WHERE t.typnamespace = n.oid
+ AND t.typname = :typname
+ AND n.nspname = :nspname
+ )
+ """
+ query = sql.text(query)
+ else:
+ query = """
+ SELECT EXISTS (
+ SELECT * FROM pg_catalog.pg_type t
+ WHERE t.typname = :typname
+ AND pg_type_is_visible(t.oid)
+ )
+ """
+ query = sql.text(query)
+ query = query.bindparams(
+ sql.bindparam(
+ "typname", util.text_type(type_name), type_=sqltypes.Unicode
+ )
+ )
+ if schema is not None:
+ query = query.bindparams(
+ sql.bindparam(
+ "nspname", util.text_type(schema), type_=sqltypes.Unicode
+ )
+ )
+ cursor = connection.execute(query)
+ return bool(cursor.scalar())
+
+ def _get_server_version_info(self, connection):
+ v = connection.exec_driver_sql("select pg_catalog.version()").scalar()
+ m = re.match(
+ r".*(?:PostgreSQL|EnterpriseDB) "
+ r"(\d+)\.?(\d+)?(?:\.(\d+))?(?:\.\d+)?(?:devel|beta)?",
+ v,
+ )
+ if not m:
+ raise AssertionError(
+ "Could not determine version from string '%s'" % v
+ )
+ return tuple([int(x) for x in m.group(1, 2, 3) if x is not None])
+
+ @reflection.cache
+ def get_table_oid(self, connection, table_name, schema=None, **kw):
+ """Fetch the oid for schema.table_name.
+
+ Several reflection methods require the table oid. The idea for using
+ this method is that it can be fetched one time and cached for
+ subsequent calls.
+
+ """
+ table_oid = None
+ if schema is not None:
+ schema_where_clause = "n.nspname = :schema"
+ else:
+ schema_where_clause = "pg_catalog.pg_table_is_visible(c.oid)"
+ query = (
+ """
+ SELECT c.oid
+ FROM pg_catalog.pg_class c
+ LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
+ WHERE (%s)
+ AND c.relname = :table_name AND c.relkind in
+ ('r', 'v', 'm', 'f', 'p')
+ """
+ % schema_where_clause
+ )
+ # Since we're binding to unicode, table_name and schema_name must be
+ # unicode.
+ table_name = util.text_type(table_name)
+ if schema is not None:
+ schema = util.text_type(schema)
+ s = sql.text(query).bindparams(table_name=sqltypes.Unicode)
+ s = s.columns(oid=sqltypes.Integer)
+ if schema:
+ s = s.bindparams(sql.bindparam("schema", type_=sqltypes.Unicode))
+ c = connection.execute(s, dict(table_name=table_name, schema=schema))
+ table_oid = c.scalar()
+ if table_oid is None:
+ raise exc.NoSuchTableError(table_name)
+ return table_oid
+
+ @reflection.cache
+ def get_schema_names(self, connection, **kw):
+ result = connection.execute(
+ sql.text(
+ "SELECT nspname FROM pg_namespace "
+ "WHERE nspname NOT LIKE 'pg_%' "
+ "ORDER BY nspname"
+ ).columns(nspname=sqltypes.Unicode)
+ )
+ return [name for name, in result]
+
+ @reflection.cache
+ def get_table_names(self, connection, schema=None, **kw):
+ result = connection.execute(
+ sql.text(
+ "SELECT c.relname FROM pg_class c "
+ "JOIN pg_namespace n ON n.oid = c.relnamespace "
+ "WHERE n.nspname = :schema AND c.relkind in ('r', 'p')"
+ ).columns(relname=sqltypes.Unicode),
+ dict(
+ schema=schema
+ if schema is not None
+ else self.default_schema_name
+ ),
+ )
+ return [name for name, in result]
+
+ @reflection.cache
+ def _get_foreign_table_names(self, connection, schema=None, **kw):
+ result = connection.execute(
+ sql.text(
+ "SELECT c.relname FROM pg_class c "
+ "JOIN pg_namespace n ON n.oid = c.relnamespace "
+ "WHERE n.nspname = :schema AND c.relkind = 'f'"
+ ).columns(relname=sqltypes.Unicode),
+ dict(
+ schema=schema
+ if schema is not None
+ else self.default_schema_name
+ ),
+ )
+ return [name for name, in result]
+
+ @reflection.cache
+ def get_view_names(
+ self, connection, schema=None, include=("plain", "materialized"), **kw
+ ):
+
+ include_kind = {"plain": "v", "materialized": "m"}
+ try:
+ kinds = [include_kind[i] for i in util.to_list(include)]
+ except KeyError:
+ raise ValueError(
+ "include %r unknown, needs to be a sequence containing "
+ "one or both of 'plain' and 'materialized'" % (include,)
+ )
+ if not kinds:
+ raise ValueError(
+ "empty include, needs to be a sequence containing "
+ "one or both of 'plain' and 'materialized'"
+ )
+
+ result = connection.execute(
+ sql.text(
+ "SELECT c.relname FROM pg_class c "
+ "JOIN pg_namespace n ON n.oid = c.relnamespace "
+ "WHERE n.nspname = :schema AND c.relkind IN (%s)"
+ % (", ".join("'%s'" % elem for elem in kinds))
+ ).columns(relname=sqltypes.Unicode),
+ dict(
+ schema=schema
+ if schema is not None
+ else self.default_schema_name
+ ),
+ )
+ return [name for name, in result]
+
+ @reflection.cache
+ def get_sequence_names(self, connection, schema=None, **kw):
+ if not schema:
+ schema = self.default_schema_name
+ cursor = connection.execute(
+ sql.text(
+ "SELECT relname FROM pg_class c join pg_namespace n on "
+ "n.oid=c.relnamespace where relkind='S' and "
+ "n.nspname=:schema"
+ ).bindparams(
+ sql.bindparam(
+ "schema",
+ util.text_type(schema),
+ type_=sqltypes.Unicode,
+ ),
+ )
+ )
+ return [row[0] for row in cursor]
+
+ @reflection.cache
+ def get_view_definition(self, connection, view_name, schema=None, **kw):
+ view_def = connection.scalar(
+ sql.text(
+ "SELECT pg_get_viewdef(c.oid) view_def FROM pg_class c "
+ "JOIN pg_namespace n ON n.oid = c.relnamespace "
+ "WHERE n.nspname = :schema AND c.relname = :view_name "
+ "AND c.relkind IN ('v', 'm')"
+ ).columns(view_def=sqltypes.Unicode),
+ dict(
+ schema=schema
+ if schema is not None
+ else self.default_schema_name,
+ view_name=view_name,
+ ),
+ )
+ return view_def
+
+ @reflection.cache
+ def get_columns(self, connection, table_name, schema=None, **kw):
+
+ table_oid = self.get_table_oid(
+ connection, table_name, schema, info_cache=kw.get("info_cache")
+ )
+
+ generated = (
+ "a.attgenerated as generated"
+ if self.server_version_info >= (12,)
+ else "NULL as generated"
+ )
+ if self.server_version_info >= (10,):
+ # a.attidentity != '' is required or it will reflect also
+ # serial columns as identity.
+ identity = """\
+ (SELECT json_build_object(
+ 'always', a.attidentity = 'a',
+ 'start', s.seqstart,
+ 'increment', s.seqincrement,
+ 'minvalue', s.seqmin,
+ 'maxvalue', s.seqmax,
+ 'cache', s.seqcache,
+ 'cycle', s.seqcycle)
+ FROM pg_catalog.pg_sequence s
+ JOIN pg_catalog.pg_class c on s.seqrelid = c."oid"
+ WHERE c.relkind = 'S'
+ AND a.attidentity != ''
+ AND s.seqrelid = pg_catalog.pg_get_serial_sequence(
+ a.attrelid::regclass::text, a.attname
+ )::regclass::oid
+ ) as identity_options\
+ """
+ else:
+ identity = "NULL as identity_options"
+
+ SQL_COLS = """
+ SELECT a.attname,
+ pg_catalog.format_type(a.atttypid, a.atttypmod),
+ (
+ SELECT pg_catalog.pg_get_expr(d.adbin, d.adrelid)
+ FROM pg_catalog.pg_attrdef d
+ WHERE d.adrelid = a.attrelid AND d.adnum = a.attnum
+ AND a.atthasdef
+ ) AS DEFAULT,
+ a.attnotnull,
+ a.attrelid as table_oid,
+ pgd.description as comment,
+ %s,
+ %s
+ FROM pg_catalog.pg_attribute a
+ LEFT JOIN pg_catalog.pg_description pgd ON (
+ pgd.objoid = a.attrelid AND pgd.objsubid = a.attnum)
+ WHERE a.attrelid = :table_oid
+ AND a.attnum > 0 AND NOT a.attisdropped
+ ORDER BY a.attnum
+ """ % (
+ generated,
+ identity,
+ )
+ s = (
+ sql.text(SQL_COLS)
+ .bindparams(sql.bindparam("table_oid", type_=sqltypes.Integer))
+ .columns(attname=sqltypes.Unicode, default=sqltypes.Unicode)
+ )
+ c = connection.execute(s, dict(table_oid=table_oid))
+ rows = c.fetchall()
+
+ # dictionary with (name, ) if default search path or (schema, name)
+ # as keys
+ domains = self._load_domains(connection)
+
+ # dictionary with (name, ) if default search path or (schema, name)
+ # as keys
+ enums = dict(
+ ((rec["name"],), rec)
+ if rec["visible"]
+ else ((rec["schema"], rec["name"]), rec)
+ for rec in self._load_enums(connection, schema="*")
+ )
+
+ # format columns
+ columns = []
+
+ for (
+ name,
+ format_type,
+ default_,
+ notnull,
+ table_oid,
+ comment,
+ generated,
+ identity,
+ ) in rows:
+ column_info = self._get_column_info(
+ name,
+ format_type,
+ default_,
+ notnull,
+ domains,
+ enums,
+ schema,
+ comment,
+ generated,
+ identity,
+ )
+ columns.append(column_info)
+ return columns
+
+ def _get_column_info(
+ self,
+ name,
+ format_type,
+ default,
+ notnull,
+ domains,
+ enums,
+ schema,
+ comment,
+ generated,
+ identity,
+ ):
+ def _handle_array_type(attype):
+ return (
+ # strip '[]' from integer[], etc.
+ re.sub(r"\[\]$", "", attype),
+ attype.endswith("[]"),
+ )
+
+ # strip (*) from character varying(5), timestamp(5)
+ # with time zone, geometry(POLYGON), etc.
+ attype = re.sub(r"\(.*\)", "", format_type)
+
+ # strip '[]' from integer[], etc. and check if an array
+ attype, is_array = _handle_array_type(attype)
+
+ # strip quotes from case sensitive enum or domain names
+ enum_or_domain_key = tuple(util.quoted_token_parser(attype))
+
+ nullable = not notnull
+
+ charlen = re.search(r"\(([\d,]+)\)", format_type)
+ if charlen:
+ charlen = charlen.group(1)
+ args = re.search(r"\((.*)\)", format_type)
+ if args and args.group(1):
+ args = tuple(re.split(r"\s*,\s*", args.group(1)))
+ else:
+ args = ()
+ kwargs = {}
+
+ if attype == "numeric":
+ if charlen:
+ prec, scale = charlen.split(",")
+ args = (int(prec), int(scale))
+ else:
+ args = ()
+ elif attype == "double precision":
+ args = (53,)
+ elif attype == "integer":
+ args = ()
+ elif attype in ("timestamp with time zone", "time with time zone"):
+ kwargs["timezone"] = True
+ if charlen:
+ kwargs["precision"] = int(charlen)
+ args = ()
+ elif attype in (
+ "timestamp without time zone",
+ "time without time zone",
+ "time",
+ ):
+ kwargs["timezone"] = False
+ if charlen:
+ kwargs["precision"] = int(charlen)
+ args = ()
+ elif attype == "bit varying":
+ kwargs["varying"] = True
+ if charlen:
+ args = (int(charlen),)
+ else:
+ args = ()
+ elif attype.startswith("interval"):
+ field_match = re.match(r"interval (.+)", attype, re.I)
+ if charlen:
+ kwargs["precision"] = int(charlen)
+ if field_match:
+ kwargs["fields"] = field_match.group(1)
+ attype = "interval"
+ args = ()
+ elif charlen:
+ args = (int(charlen),)
+
+ while True:
+ # looping here to suit nested domains
+ if attype in self.ischema_names:
+ coltype = self.ischema_names[attype]
+ break
+ elif enum_or_domain_key in enums:
+ enum = enums[enum_or_domain_key]
+ coltype = ENUM
+ kwargs["name"] = enum["name"]
+ if not enum["visible"]:
+ kwargs["schema"] = enum["schema"]
+ args = tuple(enum["labels"])
+ break
+ elif enum_or_domain_key in domains:
+ domain = domains[enum_or_domain_key]
+ attype = domain["attype"]
+ attype, is_array = _handle_array_type(attype)
+ # strip quotes from case sensitive enum or domain names
+ enum_or_domain_key = tuple(util.quoted_token_parser(attype))
+ # A table can't override a not null on the domain,
+ # but can override nullable
+ nullable = nullable and domain["nullable"]
+ if domain["default"] and not default:
+ # It can, however, override the default
+ # value, but can't set it to null.
+ default = domain["default"]
+ continue
+ else:
+ coltype = None
+ break
+
+ if coltype:
+ coltype = coltype(*args, **kwargs)
+ if is_array:
+ coltype = self.ischema_names["_array"](coltype)
+ else:
+ util.warn(
+ "Did not recognize type '%s' of column '%s'" % (attype, name)
+ )
+ coltype = sqltypes.NULLTYPE
+
+ # If a zero byte or blank string depending on driver (is also absent
+ # for older PG versions), then not a generated column. Otherwise, s =
+ # stored. (Other values might be added in the future.)
+ if generated not in (None, "", b"\x00"):
+ computed = dict(
+ sqltext=default, persisted=generated in ("s", b"s")
+ )
+ default = None
+ else:
+ computed = None
+
+ # adjust the default value
+ autoincrement = False
+ if default is not None:
+ match = re.search(r"""(nextval\(')([^']+)('.*$)""", default)
+ if match is not None:
+ if issubclass(coltype._type_affinity, sqltypes.Integer):
+ autoincrement = True
+ # the default is related to a Sequence
+ sch = schema
+ if "." not in match.group(2) and sch is not None:
+ # unconditionally quote the schema name. this could
+ # later be enhanced to obey quoting rules /
+ # "quote schema"
+ default = (
+ match.group(1)
+ + ('"%s"' % sch)
+ + "."
+ + match.group(2)
+ + match.group(3)
+ )
+
+ column_info = dict(
+ name=name,
+ type=coltype,
+ nullable=nullable,
+ default=default,
+ autoincrement=autoincrement or identity is not None,
+ comment=comment,
+ )
+ if computed is not None:
+ column_info["computed"] = computed
+ if identity is not None:
+ column_info["identity"] = identity
+ return column_info
+
+ @reflection.cache
+ def get_pk_constraint(self, connection, table_name, schema=None, **kw):
+ table_oid = self.get_table_oid(
+ connection, table_name, schema, info_cache=kw.get("info_cache")
+ )
+
+ if self.server_version_info < (8, 4):
+ PK_SQL = """
+ SELECT a.attname
+ FROM
+ pg_class t
+ join pg_index ix on t.oid = ix.indrelid
+ join pg_attribute a
+ on t.oid=a.attrelid AND %s
+ WHERE
+ t.oid = :table_oid and ix.indisprimary = 't'
+ ORDER BY a.attnum
+ """ % self._pg_index_any(
+ "a.attnum", "ix.indkey"
+ )
+
+ else:
+ # unnest() and generate_subscripts() both introduced in
+ # version 8.4
+ PK_SQL = """
+ SELECT a.attname
+ FROM pg_attribute a JOIN (
+ SELECT unnest(ix.indkey) attnum,
+ generate_subscripts(ix.indkey, 1) ord
+ FROM pg_index ix
+ WHERE ix.indrelid = :table_oid AND ix.indisprimary
+ ) k ON a.attnum=k.attnum
+ WHERE a.attrelid = :table_oid
+ ORDER BY k.ord
+ """
+ t = sql.text(PK_SQL).columns(attname=sqltypes.Unicode)
+ c = connection.execute(t, dict(table_oid=table_oid))
+ cols = [r[0] for r in c.fetchall()]
+
+ PK_CONS_SQL = """
+ SELECT conname
+ FROM pg_catalog.pg_constraint r
+ WHERE r.conrelid = :table_oid AND r.contype = 'p'
+ ORDER BY 1
+ """
+ t = sql.text(PK_CONS_SQL).columns(conname=sqltypes.Unicode)
+ c = connection.execute(t, dict(table_oid=table_oid))
+ name = c.scalar()
+
+ return {"constrained_columns": cols, "name": name}
+
+ @reflection.cache
+ def get_foreign_keys(
+ self,
+ connection,
+ table_name,
+ schema=None,
+ postgresql_ignore_search_path=False,
+ **kw
+ ):
+ preparer = self.identifier_preparer
+ table_oid = self.get_table_oid(
+ connection, table_name, schema, info_cache=kw.get("info_cache")
+ )
+
+ FK_SQL = """
+ SELECT r.conname,
+ pg_catalog.pg_get_constraintdef(r.oid, true) as condef,
+ n.nspname as conschema
+ FROM pg_catalog.pg_constraint r,
+ pg_namespace n,
+ pg_class c
+
+ WHERE r.conrelid = :table AND
+ r.contype = 'f' AND
+ c.oid = confrelid AND
+ n.oid = c.relnamespace
+ ORDER BY 1
+ """
+ # https://www.postgresql.org/docs/9.0/static/sql-createtable.html
+ FK_REGEX = re.compile(
+ r"FOREIGN KEY \((.*?)\) REFERENCES (?:(.*?)\.)?(.*?)\((.*?)\)"
+ r"[\s]?(MATCH (FULL|PARTIAL|SIMPLE)+)?"
+ r"[\s]?(ON UPDATE "
+ r"(CASCADE|RESTRICT|NO ACTION|SET NULL|SET DEFAULT)+)?"
+ r"[\s]?(ON DELETE "
+ r"(CASCADE|RESTRICT|NO ACTION|SET NULL|SET DEFAULT)+)?"
+ r"[\s]?(DEFERRABLE|NOT DEFERRABLE)?"
+ r"[\s]?(INITIALLY (DEFERRED|IMMEDIATE)+)?"
+ )
+
+ t = sql.text(FK_SQL).columns(
+ conname=sqltypes.Unicode, condef=sqltypes.Unicode
+ )
+ c = connection.execute(t, dict(table=table_oid))
+ fkeys = []
+ for conname, condef, conschema in c.fetchall():
+ m = re.search(FK_REGEX, condef).groups()
+
+ (
+ constrained_columns,
+ referred_schema,
+ referred_table,
+ referred_columns,
+ _,
+ match,
+ _,
+ onupdate,
+ _,
+ ondelete,
+ deferrable,
+ _,
+ initially,
+ ) = m
+
+ if deferrable is not None:
+ deferrable = True if deferrable == "DEFERRABLE" else False
+ constrained_columns = [
+ preparer._unquote_identifier(x)
+ for x in re.split(r"\s*,\s*", constrained_columns)
+ ]
+
+ if postgresql_ignore_search_path:
+ # when ignoring search path, we use the actual schema
+ # provided it isn't the "default" schema
+ if conschema != self.default_schema_name:
+ referred_schema = conschema
+ else:
+ referred_schema = schema
+ elif referred_schema:
+ # referred_schema is the schema that we regexp'ed from
+ # pg_get_constraintdef(). If the schema is in the search
+ # path, pg_get_constraintdef() will give us None.
+ referred_schema = preparer._unquote_identifier(referred_schema)
+ elif schema is not None and schema == conschema:
+ # If the actual schema matches the schema of the table
+ # we're reflecting, then we will use that.
+ referred_schema = schema
+
+ referred_table = preparer._unquote_identifier(referred_table)
+ referred_columns = [
+ preparer._unquote_identifier(x)
+ for x in re.split(r"\s*,\s", referred_columns)
+ ]
+ options = {
+ k: v
+ for k, v in [
+ ("onupdate", onupdate),
+ ("ondelete", ondelete),
+ ("initially", initially),
+ ("deferrable", deferrable),
+ ("match", match),
+ ]
+ if v is not None and v != "NO ACTION"
+ }
+ fkey_d = {
+ "name": conname,
+ "constrained_columns": constrained_columns,
+ "referred_schema": referred_schema,
+ "referred_table": referred_table,
+ "referred_columns": referred_columns,
+ "options": options,
+ }
+ fkeys.append(fkey_d)
+ return fkeys
+
+ def _pg_index_any(self, col, compare_to):
+ if self.server_version_info < (8, 1):
+ # https://www.postgresql.org/message-id/10279.1124395722@sss.pgh.pa.us
+ # "In CVS tip you could replace this with "attnum = ANY (indkey)".
+ # Unfortunately, most array support doesn't work on int2vector in
+ # pre-8.1 releases, so I think you're kinda stuck with the above
+ # for now.
+ # regards, tom lane"
+ return "(%s)" % " OR ".join(
+ "%s[%d] = %s" % (compare_to, ind, col) for ind in range(0, 10)
+ )
+ else:
+ return "%s = ANY(%s)" % (col, compare_to)
+
+ @reflection.cache
+ def get_indexes(self, connection, table_name, schema, **kw):
+ table_oid = self.get_table_oid(
+ connection, table_name, schema, info_cache=kw.get("info_cache")
+ )
+
+ # cast indkey as varchar since it's an int2vector,
+ # returned as a list by some drivers such as pypostgresql
+
+ if self.server_version_info < (8, 5):
+ IDX_SQL = """
+ SELECT
+ i.relname as relname,
+ ix.indisunique, ix.indexprs, ix.indpred,
+ a.attname, a.attnum, NULL, ix.indkey%s,
+ %s, %s, am.amname,
+ NULL as indnkeyatts
+ FROM
+ pg_class t
+ join pg_index ix on t.oid = ix.indrelid
+ join pg_class i on i.oid = ix.indexrelid
+ left outer join
+ pg_attribute a
+ on t.oid = a.attrelid and %s
+ left outer join
+ pg_am am
+ on i.relam = am.oid
+ WHERE
+ t.relkind IN ('r', 'v', 'f', 'm')
+ and t.oid = :table_oid
+ and ix.indisprimary = 'f'
+ ORDER BY
+ t.relname,
+ i.relname
+ """ % (
+ # version 8.3 here was based on observing the
+ # cast does not work in PG 8.2.4, does work in 8.3.0.
+ # nothing in PG changelogs regarding this.
+ "::varchar" if self.server_version_info >= (8, 3) else "",
+ "ix.indoption::varchar"
+ if self.server_version_info >= (8, 3)
+ else "NULL",
+ "i.reloptions"
+ if self.server_version_info >= (8, 2)
+ else "NULL",
+ self._pg_index_any("a.attnum", "ix.indkey"),
+ )
+ else:
+ IDX_SQL = """
+ SELECT
+ i.relname as relname,
+ ix.indisunique, ix.indexprs,
+ a.attname, a.attnum, c.conrelid, ix.indkey::varchar,
+ ix.indoption::varchar, i.reloptions, am.amname,
+ pg_get_expr(ix.indpred, ix.indrelid),
+ %s as indnkeyatts
+ FROM
+ pg_class t
+ join pg_index ix on t.oid = ix.indrelid
+ join pg_class i on i.oid = ix.indexrelid
+ left outer join
+ pg_attribute a
+ on t.oid = a.attrelid and a.attnum = ANY(ix.indkey)
+ left outer join
+ pg_constraint c
+ on (ix.indrelid = c.conrelid and
+ ix.indexrelid = c.conindid and
+ c.contype in ('p', 'u', 'x'))
+ left outer join
+ pg_am am
+ on i.relam = am.oid
+ WHERE
+ t.relkind IN ('r', 'v', 'f', 'm', 'p')
+ and t.oid = :table_oid
+ and ix.indisprimary = 'f'
+ ORDER BY
+ t.relname,
+ i.relname
+ """ % (
+ "ix.indnkeyatts"
+ if self.server_version_info >= (11, 0)
+ else "NULL",
+ )
+
+ t = sql.text(IDX_SQL).columns(
+ relname=sqltypes.Unicode, attname=sqltypes.Unicode
+ )
+ c = connection.execute(t, dict(table_oid=table_oid))
+
+ indexes = defaultdict(lambda: defaultdict(dict))
+
+ sv_idx_name = None
+ for row in c.fetchall():
+ (
+ idx_name,
+ unique,
+ expr,
+ col,
+ col_num,
+ conrelid,
+ idx_key,
+ idx_option,
+ options,
+ amname,
+ filter_definition,
+ indnkeyatts,
+ ) = row
+
+ if expr:
+ if idx_name != sv_idx_name:
+ util.warn(
+ "Skipped unsupported reflection of "
+ "expression-based index %s" % idx_name
+ )
+ sv_idx_name = idx_name
+ continue
+
+ has_idx = idx_name in indexes
+ index = indexes[idx_name]
+ if col is not None:
+ index["cols"][col_num] = col
+ if not has_idx:
+ idx_keys = idx_key.split()
+ # "The number of key columns in the index, not counting any
+ # included columns, which are merely stored and do not
+ # participate in the index semantics"
+ if indnkeyatts and idx_keys[indnkeyatts:]:
+ # this is a "covering index" which has INCLUDE columns
+ # as well as regular index columns
+ inc_keys = idx_keys[indnkeyatts:]
+ idx_keys = idx_keys[:indnkeyatts]
+ else:
+ inc_keys = []
+
+ index["key"] = [int(k.strip()) for k in idx_keys]
+ index["inc"] = [int(k.strip()) for k in inc_keys]
+
+ # (new in pg 8.3)
+ # "pg_index.indoption" is list of ints, one per column/expr.
+ # int acts as bitmask: 0x01=DESC, 0x02=NULLSFIRST
+ sorting = {}
+ for col_idx, col_flags in enumerate(
+ (idx_option or "").split()
+ ):
+ col_flags = int(col_flags.strip())
+ col_sorting = ()
+ # try to set flags only if they differ from PG defaults...
+ if col_flags & 0x01:
+ col_sorting += ("desc",)
+ if not (col_flags & 0x02):
+ col_sorting += ("nulls_last",)
+ else:
+ if col_flags & 0x02:
+ col_sorting += ("nulls_first",)
+ if col_sorting:
+ sorting[col_idx] = col_sorting
+ if sorting:
+ index["sorting"] = sorting
+
+ index["unique"] = unique
+ if conrelid is not None:
+ index["duplicates_constraint"] = idx_name
+ if options:
+ index["options"] = dict(
+ [option.split("=") for option in options]
+ )
+
+ # it *might* be nice to include that this is 'btree' in the
+ # reflection info. But we don't want an Index object
+ # to have a ``postgresql_using`` in it that is just the
+ # default, so for the moment leaving this out.
+ if amname and amname != "btree":
+ index["amname"] = amname
+
+ if filter_definition:
+ index["postgresql_where"] = filter_definition
+
+ result = []
+ for name, idx in indexes.items():
+ entry = {
+ "name": name,
+ "unique": idx["unique"],
+ "column_names": [idx["cols"][i] for i in idx["key"]],
+ }
+ if self.server_version_info >= (11, 0):
+ # NOTE: this is legacy, this is part of dialect_options now
+ # as of #7382
+ entry["include_columns"] = [idx["cols"][i] for i in idx["inc"]]
+ if "duplicates_constraint" in idx:
+ entry["duplicates_constraint"] = idx["duplicates_constraint"]
+ if "sorting" in idx:
+ entry["column_sorting"] = dict(
+ (idx["cols"][idx["key"][i]], value)
+ for i, value in idx["sorting"].items()
+ )
+ if "include_columns" in entry:
+ entry.setdefault("dialect_options", {})[
+ "postgresql_include"
+ ] = entry["include_columns"]
+ if "options" in idx:
+ entry.setdefault("dialect_options", {})[
+ "postgresql_with"
+ ] = idx["options"]
+ if "amname" in idx:
+ entry.setdefault("dialect_options", {})[
+ "postgresql_using"
+ ] = idx["amname"]
+ if "postgresql_where" in idx:
+ entry.setdefault("dialect_options", {})[
+ "postgresql_where"
+ ] = idx["postgresql_where"]
+ result.append(entry)
+ return result
+
+ @reflection.cache
+ def get_unique_constraints(
+ self, connection, table_name, schema=None, **kw
+ ):
+ table_oid = self.get_table_oid(
+ connection, table_name, schema, info_cache=kw.get("info_cache")
+ )
+
+ UNIQUE_SQL = """
+ SELECT
+ cons.conname as name,
+ cons.conkey as key,
+ a.attnum as col_num,
+ a.attname as col_name
+ FROM
+ pg_catalog.pg_constraint cons
+ join pg_attribute a
+ on cons.conrelid = a.attrelid AND
+ a.attnum = ANY(cons.conkey)
+ WHERE
+ cons.conrelid = :table_oid AND
+ cons.contype = 'u'
+ """
+
+ t = sql.text(UNIQUE_SQL).columns(col_name=sqltypes.Unicode)
+ c = connection.execute(t, dict(table_oid=table_oid))
+
+ uniques = defaultdict(lambda: defaultdict(dict))
+ for row in c.fetchall():
+ uc = uniques[row.name]
+ uc["key"] = row.key
+ uc["cols"][row.col_num] = row.col_name
+
+ return [
+ {"name": name, "column_names": [uc["cols"][i] for i in uc["key"]]}
+ for name, uc in uniques.items()
+ ]
+
+ @reflection.cache
+ def get_table_comment(self, connection, table_name, schema=None, **kw):
+ table_oid = self.get_table_oid(
+ connection, table_name, schema, info_cache=kw.get("info_cache")
+ )
+
+ COMMENT_SQL = """
+ SELECT
+ pgd.description as table_comment
+ FROM
+ pg_catalog.pg_description pgd
+ WHERE
+ pgd.objsubid = 0 AND
+ pgd.objoid = :table_oid
+ """
+
+ c = connection.execute(
+ sql.text(COMMENT_SQL), dict(table_oid=table_oid)
+ )
+ return {"text": c.scalar()}
+
+ @reflection.cache
+ def get_check_constraints(self, connection, table_name, schema=None, **kw):
+ table_oid = self.get_table_oid(
+ connection, table_name, schema, info_cache=kw.get("info_cache")
+ )
+
+ CHECK_SQL = """
+ SELECT
+ cons.conname as name,
+ pg_get_constraintdef(cons.oid) as src
+ FROM
+ pg_catalog.pg_constraint cons
+ WHERE
+ cons.conrelid = :table_oid AND
+ cons.contype = 'c'
+ """
+
+ c = connection.execute(sql.text(CHECK_SQL), dict(table_oid=table_oid))
+
+ ret = []
+ for name, src in c:
+ # samples:
+ # "CHECK (((a > 1) AND (a < 5)))"
+ # "CHECK (((a = 1) OR ((a > 2) AND (a < 5))))"
+ # "CHECK (((a > 1) AND (a < 5))) NOT VALID"
+ # "CHECK (some_boolean_function(a))"
+ # "CHECK (((a\n < 1)\n OR\n (a\n >= 5))\n)"
+
+ m = re.match(
+ r"^CHECK *\((.+)\)( NOT VALID)?$", src, flags=re.DOTALL
+ )
+ if not m:
+ util.warn("Could not parse CHECK constraint text: %r" % src)
+ sqltext = ""
+ else:
+ sqltext = re.compile(
+ r"^[\s\n]*\((.+)\)[\s\n]*$", flags=re.DOTALL
+ ).sub(r"\1", m.group(1))
+ entry = {"name": name, "sqltext": sqltext}
+ if m and m.group(2):
+ entry["dialect_options"] = {"not_valid": True}
+
+ ret.append(entry)
+ return ret
+
+ def _load_enums(self, connection, schema=None):
+ schema = schema or self.default_schema_name
+ if not self.supports_native_enum:
+ return {}
+
+ # Load data types for enums:
+ SQL_ENUMS = """
+ SELECT t.typname as "name",
+ -- no enum defaults in 8.4 at least
+ -- t.typdefault as "default",
+ pg_catalog.pg_type_is_visible(t.oid) as "visible",
+ n.nspname as "schema",
+ e.enumlabel as "label"
+ FROM pg_catalog.pg_type t
+ LEFT JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
+ LEFT JOIN pg_catalog.pg_enum e ON t.oid = e.enumtypid
+ WHERE t.typtype = 'e'
+ """
+
+ if schema != "*":
+ SQL_ENUMS += "AND n.nspname = :schema "
+
+ # e.oid gives us label order within an enum
+ SQL_ENUMS += 'ORDER BY "schema", "name", e.oid'
+
+ s = sql.text(SQL_ENUMS).columns(
+ attname=sqltypes.Unicode, label=sqltypes.Unicode
+ )
+
+ if schema != "*":
+ s = s.bindparams(schema=schema)
+
+ c = connection.execute(s)
+
+ enums = []
+ enum_by_name = {}
+ for enum in c.fetchall():
+ key = (enum.schema, enum.name)
+ if key in enum_by_name:
+ enum_by_name[key]["labels"].append(enum.label)
+ else:
+ enum_by_name[key] = enum_rec = {
+ "name": enum.name,
+ "schema": enum.schema,
+ "visible": enum.visible,
+ "labels": [],
+ }
+ if enum.label is not None:
+ enum_rec["labels"].append(enum.label)
+ enums.append(enum_rec)
+ return enums
+
+ def _load_domains(self, connection):
+ # Load data types for domains:
+ SQL_DOMAINS = """
+ SELECT t.typname as "name",
+ pg_catalog.format_type(t.typbasetype, t.typtypmod) as "attype",
+ not t.typnotnull as "nullable",
+ t.typdefault as "default",
+ pg_catalog.pg_type_is_visible(t.oid) as "visible",
+ n.nspname as "schema"
+ FROM pg_catalog.pg_type t
+ LEFT JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
+ WHERE t.typtype = 'd'
+ """
+
+ s = sql.text(SQL_DOMAINS)
+ c = connection.execution_options(future_result=True).execute(s)
+
+ domains = {}
+ for domain in c.mappings():
+ domain = domain
+ # strip (30) from character varying(30)
+ attype = re.search(r"([^\(]+)", domain["attype"]).group(1)
+ # 'visible' just means whether or not the domain is in a
+ # schema that's on the search path -- or not overridden by
+ # a schema with higher precedence. If it's not visible,
+ # it will be prefixed with the schema-name when it's used.
+ if domain["visible"]:
+ key = (domain["name"],)
+ else:
+ key = (domain["schema"], domain["name"])
+
+ domains[key] = {
+ "attype": attype,
+ "nullable": domain["nullable"],
+ "default": domain["default"],
+ }
+
+ return domains
diff --git a/lib/sqlalchemy/dialects/postgresql/dml.py b/lib/sqlalchemy/dialects/postgresql/dml.py
new file mode 100644
index 0000000..b483774
--- /dev/null
+++ b/lib/sqlalchemy/dialects/postgresql/dml.py
@@ -0,0 +1,274 @@
+# postgresql/on_conflict.py
+# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+
+from . import ext
+from ... import util
+from ...sql import coercions
+from ...sql import roles
+from ...sql import schema
+from ...sql.base import _exclusive_against
+from ...sql.base import _generative
+from ...sql.base import ColumnCollection
+from ...sql.dml import Insert as StandardInsert
+from ...sql.elements import ClauseElement
+from ...sql.expression import alias
+from ...util.langhelpers import public_factory
+
+
+__all__ = ("Insert", "insert")
+
+
+class Insert(StandardInsert):
+ """PostgreSQL-specific implementation of INSERT.
+
+ Adds methods for PG-specific syntaxes such as ON CONFLICT.
+
+ The :class:`_postgresql.Insert` object is created using the
+ :func:`sqlalchemy.dialects.postgresql.insert` function.
+
+ .. versionadded:: 1.1
+
+ """
+
+ stringify_dialect = "postgresql"
+ inherit_cache = False
+
+ @util.memoized_property
+ def excluded(self):
+ """Provide the ``excluded`` namespace for an ON CONFLICT statement
+
+ PG's ON CONFLICT clause allows reference to the row that would
+ be inserted, known as ``excluded``. This attribute provides
+ all columns in this row to be referenceable.
+
+ .. tip:: The :attr:`_postgresql.Insert.excluded` attribute is an
+ instance of :class:`_expression.ColumnCollection`, which provides
+ an interface the same as that of the :attr:`_schema.Table.c`
+ collection described at :ref:`metadata_tables_and_columns`.
+ With this collection, ordinary names are accessible like attributes
+ (e.g. ``stmt.excluded.some_column``), but special names and
+ dictionary method names should be accessed using indexed access,
+ such as ``stmt.excluded["column name"]`` or
+ ``stmt.excluded["values"]``. See the docstring for
+ :class:`_expression.ColumnCollection` for further examples.
+
+ .. seealso::
+
+ :ref:`postgresql_insert_on_conflict` - example of how
+ to use :attr:`_expression.Insert.excluded`
+
+ """
+ return alias(self.table, name="excluded").columns
+
+ _on_conflict_exclusive = _exclusive_against(
+ "_post_values_clause",
+ msgs={
+ "_post_values_clause": "This Insert construct already has "
+ "an ON CONFLICT clause established"
+ },
+ )
+
+ @_generative
+ @_on_conflict_exclusive
+ def on_conflict_do_update(
+ self,
+ constraint=None,
+ index_elements=None,
+ index_where=None,
+ set_=None,
+ where=None,
+ ):
+ r"""
+ Specifies a DO UPDATE SET action for ON CONFLICT clause.
+
+ Either the ``constraint`` or ``index_elements`` argument is
+ required, but only one of these can be specified.
+
+ :param constraint:
+ The name of a unique or exclusion constraint on the table,
+ or the constraint object itself if it has a .name attribute.
+
+ :param index_elements:
+ A sequence consisting of string column names, :class:`_schema.Column`
+ objects, or other column expression objects that will be used
+ to infer a target index.
+
+ :param index_where:
+ Additional WHERE criterion that can be used to infer a
+ conditional target index.
+
+ :param set\_:
+ A dictionary or other mapping object
+ where the keys are either names of columns in the target table,
+ or :class:`_schema.Column` objects or other ORM-mapped columns
+ matching that of the target table, and expressions or literals
+ as values, specifying the ``SET`` actions to take.
+
+ .. versionadded:: 1.4 The
+ :paramref:`_postgresql.Insert.on_conflict_do_update.set_`
+ parameter supports :class:`_schema.Column` objects from the target
+ :class:`_schema.Table` as keys.
+
+ .. warning:: This dictionary does **not** take into account
+ Python-specified default UPDATE values or generation functions,
+ e.g. those specified using :paramref:`_schema.Column.onupdate`.
+ These values will not be exercised for an ON CONFLICT style of
+ UPDATE, unless they are manually specified in the
+ :paramref:`.Insert.on_conflict_do_update.set_` dictionary.
+
+ :param where:
+ Optional argument. If present, can be a literal SQL
+ string or an acceptable expression for a ``WHERE`` clause
+ that restricts the rows affected by ``DO UPDATE SET``. Rows
+ not meeting the ``WHERE`` condition will not be updated
+ (effectively a ``DO NOTHING`` for those rows).
+
+ .. versionadded:: 1.1
+
+
+ .. seealso::
+
+ :ref:`postgresql_insert_on_conflict`
+
+ """
+ self._post_values_clause = OnConflictDoUpdate(
+ constraint, index_elements, index_where, set_, where
+ )
+
+ @_generative
+ @_on_conflict_exclusive
+ def on_conflict_do_nothing(
+ self, constraint=None, index_elements=None, index_where=None
+ ):
+ """
+ Specifies a DO NOTHING action for ON CONFLICT clause.
+
+ The ``constraint`` and ``index_elements`` arguments
+ are optional, but only one of these can be specified.
+
+ :param constraint:
+ The name of a unique or exclusion constraint on the table,
+ or the constraint object itself if it has a .name attribute.
+
+ :param index_elements:
+ A sequence consisting of string column names, :class:`_schema.Column`
+ objects, or other column expression objects that will be used
+ to infer a target index.
+
+ :param index_where:
+ Additional WHERE criterion that can be used to infer a
+ conditional target index.
+
+ .. versionadded:: 1.1
+
+ .. seealso::
+
+ :ref:`postgresql_insert_on_conflict`
+
+ """
+ self._post_values_clause = OnConflictDoNothing(
+ constraint, index_elements, index_where
+ )
+
+
+insert = public_factory(
+ Insert, ".dialects.postgresql.insert", ".dialects.postgresql.Insert"
+)
+
+
+class OnConflictClause(ClauseElement):
+ stringify_dialect = "postgresql"
+
+ def __init__(self, constraint=None, index_elements=None, index_where=None):
+
+ if constraint is not None:
+ if not isinstance(constraint, util.string_types) and isinstance(
+ constraint,
+ (schema.Index, schema.Constraint, ext.ExcludeConstraint),
+ ):
+ constraint = getattr(constraint, "name") or constraint
+
+ if constraint is not None:
+ if index_elements is not None:
+ raise ValueError(
+ "'constraint' and 'index_elements' are mutually exclusive"
+ )
+
+ if isinstance(constraint, util.string_types):
+ self.constraint_target = constraint
+ self.inferred_target_elements = None
+ self.inferred_target_whereclause = None
+ elif isinstance(constraint, schema.Index):
+ index_elements = constraint.expressions
+ index_where = constraint.dialect_options["postgresql"].get(
+ "where"
+ )
+ elif isinstance(constraint, ext.ExcludeConstraint):
+ index_elements = constraint.columns
+ index_where = constraint.where
+ else:
+ index_elements = constraint.columns
+ index_where = constraint.dialect_options["postgresql"].get(
+ "where"
+ )
+
+ if index_elements is not None:
+ self.constraint_target = None
+ self.inferred_target_elements = index_elements
+ self.inferred_target_whereclause = index_where
+ elif constraint is None:
+ self.constraint_target = (
+ self.inferred_target_elements
+ ) = self.inferred_target_whereclause = None
+
+
+class OnConflictDoNothing(OnConflictClause):
+ __visit_name__ = "on_conflict_do_nothing"
+
+
+class OnConflictDoUpdate(OnConflictClause):
+ __visit_name__ = "on_conflict_do_update"
+
+ def __init__(
+ self,
+ constraint=None,
+ index_elements=None,
+ index_where=None,
+ set_=None,
+ where=None,
+ ):
+ super(OnConflictDoUpdate, self).__init__(
+ constraint=constraint,
+ index_elements=index_elements,
+ index_where=index_where,
+ )
+
+ if (
+ self.inferred_target_elements is None
+ and self.constraint_target is None
+ ):
+ raise ValueError(
+ "Either constraint or index_elements, "
+ "but not both, must be specified unless DO NOTHING"
+ )
+
+ if isinstance(set_, dict):
+ if not set_:
+ raise ValueError("set parameter dictionary must not be empty")
+ elif isinstance(set_, ColumnCollection):
+ set_ = dict(set_)
+ else:
+ raise ValueError(
+ "set parameter must be a non-empty dictionary "
+ "or a ColumnCollection such as the `.c.` collection "
+ "of a Table object"
+ )
+ self.update_values_to_set = [
+ (coercions.expect(roles.DMLColumnRole, key), value)
+ for key, value in set_.items()
+ ]
+ self.update_whereclause = where
diff --git a/lib/sqlalchemy/dialects/postgresql/ext.py b/lib/sqlalchemy/dialects/postgresql/ext.py
new file mode 100644
index 0000000..9e52ee1
--- /dev/null
+++ b/lib/sqlalchemy/dialects/postgresql/ext.py
@@ -0,0 +1,277 @@
+# postgresql/ext.py
+# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+
+from .array import ARRAY
+from ... import util
+from ...sql import coercions
+from ...sql import elements
+from ...sql import expression
+from ...sql import functions
+from ...sql import roles
+from ...sql import schema
+from ...sql.schema import ColumnCollectionConstraint
+
+
+class aggregate_order_by(expression.ColumnElement):
+ """Represent a PostgreSQL aggregate order by expression.
+
+ E.g.::
+
+ from sqlalchemy.dialects.postgresql import aggregate_order_by
+ expr = func.array_agg(aggregate_order_by(table.c.a, table.c.b.desc()))
+ stmt = select(expr)
+
+ would represent the expression::
+
+ SELECT array_agg(a ORDER BY b DESC) FROM table;
+
+ Similarly::
+
+ expr = func.string_agg(
+ table.c.a,
+ aggregate_order_by(literal_column("','"), table.c.a)
+ )
+ stmt = select(expr)
+
+ Would represent::
+
+ SELECT string_agg(a, ',' ORDER BY a) FROM table;
+
+ .. versionadded:: 1.1
+
+ .. versionchanged:: 1.2.13 - the ORDER BY argument may be multiple terms
+
+ .. seealso::
+
+ :class:`_functions.array_agg`
+
+ """
+
+ __visit_name__ = "aggregate_order_by"
+
+ stringify_dialect = "postgresql"
+ inherit_cache = False
+
+ def __init__(self, target, *order_by):
+ self.target = coercions.expect(roles.ExpressionElementRole, target)
+ self.type = self.target.type
+
+ _lob = len(order_by)
+ if _lob == 0:
+ raise TypeError("at least one ORDER BY element is required")
+ elif _lob == 1:
+ self.order_by = coercions.expect(
+ roles.ExpressionElementRole, order_by[0]
+ )
+ else:
+ self.order_by = elements.ClauseList(
+ *order_by, _literal_as_text_role=roles.ExpressionElementRole
+ )
+
+ def self_group(self, against=None):
+ return self
+
+ def get_children(self, **kwargs):
+ return self.target, self.order_by
+
+ def _copy_internals(self, clone=elements._clone, **kw):
+ self.target = clone(self.target, **kw)
+ self.order_by = clone(self.order_by, **kw)
+
+ @property
+ def _from_objects(self):
+ return self.target._from_objects + self.order_by._from_objects
+
+
+class ExcludeConstraint(ColumnCollectionConstraint):
+ """A table-level EXCLUDE constraint.
+
+ Defines an EXCLUDE constraint as described in the `PostgreSQL
+ documentation`__.
+
+ __ https://www.postgresql.org/docs/current/static/sql-createtable.html#SQL-CREATETABLE-EXCLUDE
+
+ """ # noqa
+
+ __visit_name__ = "exclude_constraint"
+
+ where = None
+ inherit_cache = False
+
+ create_drop_stringify_dialect = "postgresql"
+
+ @elements._document_text_coercion(
+ "where",
+ ":class:`.ExcludeConstraint`",
+ ":paramref:`.ExcludeConstraint.where`",
+ )
+ def __init__(self, *elements, **kw):
+ r"""
+ Create an :class:`.ExcludeConstraint` object.
+
+ E.g.::
+
+ const = ExcludeConstraint(
+ (Column('period'), '&&'),
+ (Column('group'), '='),
+ where=(Column('group') != 'some group'),
+ ops={'group': 'my_operator_class'}
+ )
+
+ The constraint is normally embedded into the :class:`_schema.Table`
+ construct
+ directly, or added later using :meth:`.append_constraint`::
+
+ some_table = Table(
+ 'some_table', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('period', TSRANGE()),
+ Column('group', String)
+ )
+
+ some_table.append_constraint(
+ ExcludeConstraint(
+ (some_table.c.period, '&&'),
+ (some_table.c.group, '='),
+ where=some_table.c.group != 'some group',
+ name='some_table_excl_const',
+ ops={'group': 'my_operator_class'}
+ )
+ )
+
+ :param \*elements:
+
+ A sequence of two tuples of the form ``(column, operator)`` where
+ "column" is a SQL expression element or a raw SQL string, most
+ typically a :class:`_schema.Column` object,
+ and "operator" is a string
+ containing the operator to use. In order to specify a column name
+ when a :class:`_schema.Column` object is not available,
+ while ensuring
+ that any necessary quoting rules take effect, an ad-hoc
+ :class:`_schema.Column` or :func:`_expression.column`
+ object should be
+ used.
+
+ :param name:
+ Optional, the in-database name of this constraint.
+
+ :param deferrable:
+ Optional bool. If set, emit DEFERRABLE or NOT DEFERRABLE when
+ issuing DDL for this constraint.
+
+ :param initially:
+ Optional string. If set, emit INITIALLY <value> when issuing DDL
+ for this constraint.
+
+ :param using:
+ Optional string. If set, emit USING <index_method> when issuing DDL
+ for this constraint. Defaults to 'gist'.
+
+ :param where:
+ Optional SQL expression construct or literal SQL string.
+ If set, emit WHERE <predicate> when issuing DDL
+ for this constraint.
+
+ :param ops:
+ Optional dictionary. Used to define operator classes for the
+ elements; works the same way as that of the
+ :ref:`postgresql_ops <postgresql_operator_classes>`
+ parameter specified to the :class:`_schema.Index` construct.
+
+ .. versionadded:: 1.3.21
+
+ .. seealso::
+
+ :ref:`postgresql_operator_classes` - general description of how
+ PostgreSQL operator classes are specified.
+
+ """
+ columns = []
+ render_exprs = []
+ self.operators = {}
+
+ expressions, operators = zip(*elements)
+
+ for (expr, column, strname, add_element), operator in zip(
+ coercions.expect_col_expression_collection(
+ roles.DDLConstraintColumnRole, expressions
+ ),
+ operators,
+ ):
+ if add_element is not None:
+ columns.append(add_element)
+
+ name = column.name if column is not None else strname
+
+ if name is not None:
+ # backwards compat
+ self.operators[name] = operator
+
+ render_exprs.append((expr, name, operator))
+
+ self._render_exprs = render_exprs
+
+ ColumnCollectionConstraint.__init__(
+ self,
+ *columns,
+ name=kw.get("name"),
+ deferrable=kw.get("deferrable"),
+ initially=kw.get("initially")
+ )
+ self.using = kw.get("using", "gist")
+ where = kw.get("where")
+ if where is not None:
+ self.where = coercions.expect(roles.StatementOptionRole, where)
+
+ self.ops = kw.get("ops", {})
+
+ def _set_parent(self, table, **kw):
+ super(ExcludeConstraint, self)._set_parent(table)
+
+ self._render_exprs = [
+ (
+ expr if isinstance(expr, elements.ClauseElement) else colexpr,
+ name,
+ operator,
+ )
+ for (expr, name, operator), colexpr in util.zip_longest(
+ self._render_exprs, self.columns
+ )
+ ]
+
+ def _copy(self, target_table=None, **kw):
+ elements = [
+ (
+ schema._copy_expression(expr, self.parent, target_table),
+ self.operators[expr.name],
+ )
+ for expr in self.columns
+ ]
+ c = self.__class__(
+ *elements,
+ name=self.name,
+ deferrable=self.deferrable,
+ initially=self.initially,
+ where=self.where,
+ using=self.using
+ )
+ c.dispatch._update(self.dispatch)
+ return c
+
+
+def array_agg(*arg, **kw):
+ """PostgreSQL-specific form of :class:`_functions.array_agg`, ensures
+ return type is :class:`_postgresql.ARRAY` and not
+ the plain :class:`_types.ARRAY`, unless an explicit ``type_``
+ is passed.
+
+ .. versionadded:: 1.1
+
+ """
+ kw["_default_array_type"] = ARRAY
+ return functions.func.array_agg(*arg, **kw)
diff --git a/lib/sqlalchemy/dialects/postgresql/hstore.py b/lib/sqlalchemy/dialects/postgresql/hstore.py
new file mode 100644
index 0000000..29800d2
--- /dev/null
+++ b/lib/sqlalchemy/dialects/postgresql/hstore.py
@@ -0,0 +1,455 @@
+# postgresql/hstore.py
+# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+
+import re
+
+from .array import ARRAY
+from ... import types as sqltypes
+from ... import util
+from ...sql import functions as sqlfunc
+from ...sql import operators
+
+
+__all__ = ("HSTORE", "hstore")
+
+idx_precedence = operators._PRECEDENCE[operators.json_getitem_op]
+
+GETITEM = operators.custom_op(
+ "->",
+ precedence=idx_precedence,
+ natural_self_precedent=True,
+ eager_grouping=True,
+)
+
+HAS_KEY = operators.custom_op(
+ "?",
+ precedence=idx_precedence,
+ natural_self_precedent=True,
+ eager_grouping=True,
+)
+
+HAS_ALL = operators.custom_op(
+ "?&",
+ precedence=idx_precedence,
+ natural_self_precedent=True,
+ eager_grouping=True,
+)
+
+HAS_ANY = operators.custom_op(
+ "?|",
+ precedence=idx_precedence,
+ natural_self_precedent=True,
+ eager_grouping=True,
+)
+
+CONTAINS = operators.custom_op(
+ "@>",
+ precedence=idx_precedence,
+ natural_self_precedent=True,
+ eager_grouping=True,
+)
+
+CONTAINED_BY = operators.custom_op(
+ "<@",
+ precedence=idx_precedence,
+ natural_self_precedent=True,
+ eager_grouping=True,
+)
+
+
+class HSTORE(sqltypes.Indexable, sqltypes.Concatenable, sqltypes.TypeEngine):
+ """Represent the PostgreSQL HSTORE type.
+
+ The :class:`.HSTORE` type stores dictionaries containing strings, e.g.::
+
+ data_table = Table('data_table', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', HSTORE)
+ )
+
+ with engine.connect() as conn:
+ conn.execute(
+ data_table.insert(),
+ data = {"key1": "value1", "key2": "value2"}
+ )
+
+ :class:`.HSTORE` provides for a wide range of operations, including:
+
+ * Index operations::
+
+ data_table.c.data['some key'] == 'some value'
+
+ * Containment operations::
+
+ data_table.c.data.has_key('some key')
+
+ data_table.c.data.has_all(['one', 'two', 'three'])
+
+ * Concatenation::
+
+ data_table.c.data + {"k1": "v1"}
+
+ For a full list of special methods see
+ :class:`.HSTORE.comparator_factory`.
+
+ For usage with the SQLAlchemy ORM, it may be desirable to combine
+ the usage of :class:`.HSTORE` with :class:`.MutableDict` dictionary
+ now part of the :mod:`sqlalchemy.ext.mutable`
+ extension. This extension will allow "in-place" changes to the
+ dictionary, e.g. addition of new keys or replacement/removal of existing
+ keys to/from the current dictionary, to produce events which will be
+ detected by the unit of work::
+
+ from sqlalchemy.ext.mutable import MutableDict
+
+ class MyClass(Base):
+ __tablename__ = 'data_table'
+
+ id = Column(Integer, primary_key=True)
+ data = Column(MutableDict.as_mutable(HSTORE))
+
+ my_object = session.query(MyClass).one()
+
+ # in-place mutation, requires Mutable extension
+ # in order for the ORM to detect
+ my_object.data['some_key'] = 'some value'
+
+ session.commit()
+
+ When the :mod:`sqlalchemy.ext.mutable` extension is not used, the ORM
+ will not be alerted to any changes to the contents of an existing
+ dictionary, unless that dictionary value is re-assigned to the
+ HSTORE-attribute itself, thus generating a change event.
+
+ .. seealso::
+
+ :class:`.hstore` - render the PostgreSQL ``hstore()`` function.
+
+
+ """
+
+ __visit_name__ = "HSTORE"
+ hashable = False
+ text_type = sqltypes.Text()
+
+ def __init__(self, text_type=None):
+ """Construct a new :class:`.HSTORE`.
+
+ :param text_type: the type that should be used for indexed values.
+ Defaults to :class:`_types.Text`.
+
+ .. versionadded:: 1.1.0
+
+ """
+ if text_type is not None:
+ self.text_type = text_type
+
+ class Comparator(
+ sqltypes.Indexable.Comparator, sqltypes.Concatenable.Comparator
+ ):
+ """Define comparison operations for :class:`.HSTORE`."""
+
+ def has_key(self, other):
+ """Boolean expression. Test for presence of a key. Note that the
+ key may be a SQLA expression.
+ """
+ return self.operate(HAS_KEY, other, result_type=sqltypes.Boolean)
+
+ def has_all(self, other):
+ """Boolean expression. Test for presence of all keys in jsonb"""
+ return self.operate(HAS_ALL, other, result_type=sqltypes.Boolean)
+
+ def has_any(self, other):
+ """Boolean expression. Test for presence of any key in jsonb"""
+ return self.operate(HAS_ANY, other, result_type=sqltypes.Boolean)
+
+ def contains(self, other, **kwargs):
+ """Boolean expression. Test if keys (or array) are a superset
+ of/contained the keys of the argument jsonb expression.
+
+ kwargs may be ignored by this operator but are required for API
+ conformance.
+ """
+ return self.operate(CONTAINS, other, result_type=sqltypes.Boolean)
+
+ def contained_by(self, other):
+ """Boolean expression. Test if keys are a proper subset of the
+ keys of the argument jsonb expression.
+ """
+ return self.operate(
+ CONTAINED_BY, other, result_type=sqltypes.Boolean
+ )
+
+ def _setup_getitem(self, index):
+ return GETITEM, index, self.type.text_type
+
+ def defined(self, key):
+ """Boolean expression. Test for presence of a non-NULL value for
+ the key. Note that the key may be a SQLA expression.
+ """
+ return _HStoreDefinedFunction(self.expr, key)
+
+ def delete(self, key):
+ """HStore expression. Returns the contents of this hstore with the
+ given key deleted. Note that the key may be a SQLA expression.
+ """
+ if isinstance(key, dict):
+ key = _serialize_hstore(key)
+ return _HStoreDeleteFunction(self.expr, key)
+
+ def slice(self, array):
+ """HStore expression. Returns a subset of an hstore defined by
+ array of keys.
+ """
+ return _HStoreSliceFunction(self.expr, array)
+
+ def keys(self):
+ """Text array expression. Returns array of keys."""
+ return _HStoreKeysFunction(self.expr)
+
+ def vals(self):
+ """Text array expression. Returns array of values."""
+ return _HStoreValsFunction(self.expr)
+
+ def array(self):
+ """Text array expression. Returns array of alternating keys and
+ values.
+ """
+ return _HStoreArrayFunction(self.expr)
+
+ def matrix(self):
+ """Text array expression. Returns array of [key, value] pairs."""
+ return _HStoreMatrixFunction(self.expr)
+
+ comparator_factory = Comparator
+
+ def bind_processor(self, dialect):
+ if util.py2k:
+ encoding = dialect.encoding
+
+ def process(value):
+ if isinstance(value, dict):
+ return _serialize_hstore(value).encode(encoding)
+ else:
+ return value
+
+ else:
+
+ def process(value):
+ if isinstance(value, dict):
+ return _serialize_hstore(value)
+ else:
+ return value
+
+ return process
+
+ def result_processor(self, dialect, coltype):
+ if util.py2k:
+ encoding = dialect.encoding
+
+ def process(value):
+ if value is not None:
+ return _parse_hstore(value.decode(encoding))
+ else:
+ return value
+
+ else:
+
+ def process(value):
+ if value is not None:
+ return _parse_hstore(value)
+ else:
+ return value
+
+ return process
+
+
+class hstore(sqlfunc.GenericFunction):
+ """Construct an hstore value within a SQL expression using the
+ PostgreSQL ``hstore()`` function.
+
+ The :class:`.hstore` function accepts one or two arguments as described
+ in the PostgreSQL documentation.
+
+ E.g.::
+
+ from sqlalchemy.dialects.postgresql import array, hstore
+
+ select(hstore('key1', 'value1'))
+
+ select(
+ hstore(
+ array(['key1', 'key2', 'key3']),
+ array(['value1', 'value2', 'value3'])
+ )
+ )
+
+ .. seealso::
+
+ :class:`.HSTORE` - the PostgreSQL ``HSTORE`` datatype.
+
+ """
+
+ type = HSTORE
+ name = "hstore"
+ inherit_cache = True
+
+
+class _HStoreDefinedFunction(sqlfunc.GenericFunction):
+ type = sqltypes.Boolean
+ name = "defined"
+ inherit_cache = True
+
+
+class _HStoreDeleteFunction(sqlfunc.GenericFunction):
+ type = HSTORE
+ name = "delete"
+ inherit_cache = True
+
+
+class _HStoreSliceFunction(sqlfunc.GenericFunction):
+ type = HSTORE
+ name = "slice"
+ inherit_cache = True
+
+
+class _HStoreKeysFunction(sqlfunc.GenericFunction):
+ type = ARRAY(sqltypes.Text)
+ name = "akeys"
+ inherit_cache = True
+
+
+class _HStoreValsFunction(sqlfunc.GenericFunction):
+ type = ARRAY(sqltypes.Text)
+ name = "avals"
+ inherit_cache = True
+
+
+class _HStoreArrayFunction(sqlfunc.GenericFunction):
+ type = ARRAY(sqltypes.Text)
+ name = "hstore_to_array"
+ inherit_cache = True
+
+
+class _HStoreMatrixFunction(sqlfunc.GenericFunction):
+ type = ARRAY(sqltypes.Text)
+ name = "hstore_to_matrix"
+ inherit_cache = True
+
+
+#
+# parsing. note that none of this is used with the psycopg2 backend,
+# which provides its own native extensions.
+#
+
+# My best guess at the parsing rules of hstore literals, since no formal
+# grammar is given. This is mostly reverse engineered from PG's input parser
+# behavior.
+HSTORE_PAIR_RE = re.compile(
+ r"""
+(
+ "(?P<key> (\\ . | [^"])* )" # Quoted key
+)
+[ ]* => [ ]* # Pair operator, optional adjoining whitespace
+(
+ (?P<value_null> NULL ) # NULL value
+ | "(?P<value> (\\ . | [^"])* )" # Quoted value
+)
+""",
+ re.VERBOSE,
+)
+
+HSTORE_DELIMITER_RE = re.compile(
+ r"""
+[ ]* , [ ]*
+""",
+ re.VERBOSE,
+)
+
+
+def _parse_error(hstore_str, pos):
+ """format an unmarshalling error."""
+
+ ctx = 20
+ hslen = len(hstore_str)
+
+ parsed_tail = hstore_str[max(pos - ctx - 1, 0) : min(pos, hslen)]
+ residual = hstore_str[min(pos, hslen) : min(pos + ctx + 1, hslen)]
+
+ if len(parsed_tail) > ctx:
+ parsed_tail = "[...]" + parsed_tail[1:]
+ if len(residual) > ctx:
+ residual = residual[:-1] + "[...]"
+
+ return "After %r, could not parse residual at position %d: %r" % (
+ parsed_tail,
+ pos,
+ residual,
+ )
+
+
+def _parse_hstore(hstore_str):
+ """Parse an hstore from its literal string representation.
+
+ Attempts to approximate PG's hstore input parsing rules as closely as
+ possible. Although currently this is not strictly necessary, since the
+ current implementation of hstore's output syntax is stricter than what it
+ accepts as input, the documentation makes no guarantees that will always
+ be the case.
+
+
+
+ """
+ result = {}
+ pos = 0
+ pair_match = HSTORE_PAIR_RE.match(hstore_str)
+
+ while pair_match is not None:
+ key = pair_match.group("key").replace(r"\"", '"').replace("\\\\", "\\")
+ if pair_match.group("value_null"):
+ value = None
+ else:
+ value = (
+ pair_match.group("value")
+ .replace(r"\"", '"')
+ .replace("\\\\", "\\")
+ )
+ result[key] = value
+
+ pos += pair_match.end()
+
+ delim_match = HSTORE_DELIMITER_RE.match(hstore_str[pos:])
+ if delim_match is not None:
+ pos += delim_match.end()
+
+ pair_match = HSTORE_PAIR_RE.match(hstore_str[pos:])
+
+ if pos != len(hstore_str):
+ raise ValueError(_parse_error(hstore_str, pos))
+
+ return result
+
+
+def _serialize_hstore(val):
+ """Serialize a dictionary into an hstore literal. Keys and values must
+ both be strings (except None for values).
+
+ """
+
+ def esc(s, position):
+ if position == "value" and s is None:
+ return "NULL"
+ elif isinstance(s, util.string_types):
+ return '"%s"' % s.replace("\\", "\\\\").replace('"', r"\"")
+ else:
+ raise ValueError(
+ "%r in %s position is not a string." % (s, position)
+ )
+
+ return ", ".join(
+ "%s=>%s" % (esc(k, "key"), esc(v, "value")) for k, v in val.items()
+ )
diff --git a/lib/sqlalchemy/dialects/postgresql/json.py b/lib/sqlalchemy/dialects/postgresql/json.py
new file mode 100644
index 0000000..daaaeac
--- /dev/null
+++ b/lib/sqlalchemy/dialects/postgresql/json.py
@@ -0,0 +1,327 @@
+# postgresql/json.py
+# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+from __future__ import absolute_import
+
+from ... import types as sqltypes
+from ... import util
+from ...sql import operators
+
+
+__all__ = ("JSON", "JSONB")
+
+idx_precedence = operators._PRECEDENCE[operators.json_getitem_op]
+
+ASTEXT = operators.custom_op(
+ "->>",
+ precedence=idx_precedence,
+ natural_self_precedent=True,
+ eager_grouping=True,
+)
+
+JSONPATH_ASTEXT = operators.custom_op(
+ "#>>",
+ precedence=idx_precedence,
+ natural_self_precedent=True,
+ eager_grouping=True,
+)
+
+
+HAS_KEY = operators.custom_op(
+ "?",
+ precedence=idx_precedence,
+ natural_self_precedent=True,
+ eager_grouping=True,
+)
+
+HAS_ALL = operators.custom_op(
+ "?&",
+ precedence=idx_precedence,
+ natural_self_precedent=True,
+ eager_grouping=True,
+)
+
+HAS_ANY = operators.custom_op(
+ "?|",
+ precedence=idx_precedence,
+ natural_self_precedent=True,
+ eager_grouping=True,
+)
+
+CONTAINS = operators.custom_op(
+ "@>",
+ precedence=idx_precedence,
+ natural_self_precedent=True,
+ eager_grouping=True,
+)
+
+CONTAINED_BY = operators.custom_op(
+ "<@",
+ precedence=idx_precedence,
+ natural_self_precedent=True,
+ eager_grouping=True,
+)
+
+
+class JSONPathType(sqltypes.JSON.JSONPathType):
+ def bind_processor(self, dialect):
+ super_proc = self.string_bind_processor(dialect)
+
+ def process(value):
+ assert isinstance(value, util.collections_abc.Sequence)
+ tokens = [util.text_type(elem) for elem in value]
+ value = "{%s}" % (", ".join(tokens))
+ if super_proc:
+ value = super_proc(value)
+ return value
+
+ return process
+
+ def literal_processor(self, dialect):
+ super_proc = self.string_literal_processor(dialect)
+
+ def process(value):
+ assert isinstance(value, util.collections_abc.Sequence)
+ tokens = [util.text_type(elem) for elem in value]
+ value = "{%s}" % (", ".join(tokens))
+ if super_proc:
+ value = super_proc(value)
+ return value
+
+ return process
+
+
+class JSON(sqltypes.JSON):
+ """Represent the PostgreSQL JSON type.
+
+ :class:`_postgresql.JSON` is used automatically whenever the base
+ :class:`_types.JSON` datatype is used against a PostgreSQL backend,
+ however base :class:`_types.JSON` datatype does not provide Python
+ accessors for PostgreSQL-specific comparison methods such as
+ :meth:`_postgresql.JSON.Comparator.astext`; additionally, to use
+ PostgreSQL ``JSONB``, the :class:`_postgresql.JSONB` datatype should
+ be used explicitly.
+
+ .. seealso::
+
+ :class:`_types.JSON` - main documentation for the generic
+ cross-platform JSON datatype.
+
+ The operators provided by the PostgreSQL version of :class:`_types.JSON`
+ include:
+
+ * Index operations (the ``->`` operator)::
+
+ data_table.c.data['some key']
+
+ data_table.c.data[5]
+
+
+ * Index operations returning text (the ``->>`` operator)::
+
+ data_table.c.data['some key'].astext == 'some value'
+
+ Note that equivalent functionality is available via the
+ :attr:`.JSON.Comparator.as_string` accessor.
+
+ * Index operations with CAST
+ (equivalent to ``CAST(col ->> ['some key'] AS <type>)``)::
+
+ data_table.c.data['some key'].astext.cast(Integer) == 5
+
+ Note that equivalent functionality is available via the
+ :attr:`.JSON.Comparator.as_integer` and similar accessors.
+
+ * Path index operations (the ``#>`` operator)::
+
+ data_table.c.data[('key_1', 'key_2', 5, ..., 'key_n')]
+
+ * Path index operations returning text (the ``#>>`` operator)::
+
+ data_table.c.data[('key_1', 'key_2', 5, ..., 'key_n')].astext == 'some value'
+
+ .. versionchanged:: 1.1 The :meth:`_expression.ColumnElement.cast`
+ operator on
+ JSON objects now requires that the :attr:`.JSON.Comparator.astext`
+ modifier be called explicitly, if the cast works only from a textual
+ string.
+
+ Index operations return an expression object whose type defaults to
+ :class:`_types.JSON` by default,
+ so that further JSON-oriented instructions
+ may be called upon the result type.
+
+ Custom serializers and deserializers are specified at the dialect level,
+ that is using :func:`_sa.create_engine`. The reason for this is that when
+ using psycopg2, the DBAPI only allows serializers at the per-cursor
+ or per-connection level. E.g.::
+
+ engine = create_engine("postgresql://scott:tiger@localhost/test",
+ json_serializer=my_serialize_fn,
+ json_deserializer=my_deserialize_fn
+ )
+
+ When using the psycopg2 dialect, the json_deserializer is registered
+ against the database using ``psycopg2.extras.register_default_json``.
+
+ .. seealso::
+
+ :class:`_types.JSON` - Core level JSON type
+
+ :class:`_postgresql.JSONB`
+
+ .. versionchanged:: 1.1 :class:`_postgresql.JSON` is now a PostgreSQL-
+ specific specialization of the new :class:`_types.JSON` type.
+
+ """ # noqa
+
+ astext_type = sqltypes.Text()
+
+ def __init__(self, none_as_null=False, astext_type=None):
+ """Construct a :class:`_types.JSON` type.
+
+ :param none_as_null: if True, persist the value ``None`` as a
+ SQL NULL value, not the JSON encoding of ``null``. Note that
+ when this flag is False, the :func:`.null` construct can still
+ be used to persist a NULL value::
+
+ from sqlalchemy import null
+ conn.execute(table.insert(), data=null())
+
+ .. versionchanged:: 0.9.8 - Added ``none_as_null``, and :func:`.null`
+ is now supported in order to persist a NULL value.
+
+ .. seealso::
+
+ :attr:`_types.JSON.NULL`
+
+ :param astext_type: the type to use for the
+ :attr:`.JSON.Comparator.astext`
+ accessor on indexed attributes. Defaults to :class:`_types.Text`.
+
+ .. versionadded:: 1.1
+
+ """
+ super(JSON, self).__init__(none_as_null=none_as_null)
+ if astext_type is not None:
+ self.astext_type = astext_type
+
+ class Comparator(sqltypes.JSON.Comparator):
+ """Define comparison operations for :class:`_types.JSON`."""
+
+ @property
+ def astext(self):
+ """On an indexed expression, use the "astext" (e.g. "->>")
+ conversion when rendered in SQL.
+
+ E.g.::
+
+ select(data_table.c.data['some key'].astext)
+
+ .. seealso::
+
+ :meth:`_expression.ColumnElement.cast`
+
+ """
+ if isinstance(self.expr.right.type, sqltypes.JSON.JSONPathType):
+ return self.expr.left.operate(
+ JSONPATH_ASTEXT,
+ self.expr.right,
+ result_type=self.type.astext_type,
+ )
+ else:
+ return self.expr.left.operate(
+ ASTEXT, self.expr.right, result_type=self.type.astext_type
+ )
+
+ comparator_factory = Comparator
+
+
+class JSONB(JSON):
+ """Represent the PostgreSQL JSONB type.
+
+ The :class:`_postgresql.JSONB` type stores arbitrary JSONB format data,
+ e.g.::
+
+ data_table = Table('data_table', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', JSONB)
+ )
+
+ with engine.connect() as conn:
+ conn.execute(
+ data_table.insert(),
+ data = {"key1": "value1", "key2": "value2"}
+ )
+
+ The :class:`_postgresql.JSONB` type includes all operations provided by
+ :class:`_types.JSON`, including the same behaviors for indexing
+ operations.
+ It also adds additional operators specific to JSONB, including
+ :meth:`.JSONB.Comparator.has_key`, :meth:`.JSONB.Comparator.has_all`,
+ :meth:`.JSONB.Comparator.has_any`, :meth:`.JSONB.Comparator.contains`,
+ and :meth:`.JSONB.Comparator.contained_by`.
+
+ Like the :class:`_types.JSON` type, the :class:`_postgresql.JSONB`
+ type does not detect
+ in-place changes when used with the ORM, unless the
+ :mod:`sqlalchemy.ext.mutable` extension is used.
+
+ Custom serializers and deserializers
+ are shared with the :class:`_types.JSON` class,
+ using the ``json_serializer``
+ and ``json_deserializer`` keyword arguments. These must be specified
+ at the dialect level using :func:`_sa.create_engine`. When using
+ psycopg2, the serializers are associated with the jsonb type using
+ ``psycopg2.extras.register_default_jsonb`` on a per-connection basis,
+ in the same way that ``psycopg2.extras.register_default_json`` is used
+ to register these handlers with the json type.
+
+ .. versionadded:: 0.9.7
+
+ .. seealso::
+
+ :class:`_types.JSON`
+
+ """
+
+ __visit_name__ = "JSONB"
+
+ class Comparator(JSON.Comparator):
+ """Define comparison operations for :class:`_types.JSON`."""
+
+ def has_key(self, other):
+ """Boolean expression. Test for presence of a key. Note that the
+ key may be a SQLA expression.
+ """
+ return self.operate(HAS_KEY, other, result_type=sqltypes.Boolean)
+
+ def has_all(self, other):
+ """Boolean expression. Test for presence of all keys in jsonb"""
+ return self.operate(HAS_ALL, other, result_type=sqltypes.Boolean)
+
+ def has_any(self, other):
+ """Boolean expression. Test for presence of any key in jsonb"""
+ return self.operate(HAS_ANY, other, result_type=sqltypes.Boolean)
+
+ def contains(self, other, **kwargs):
+ """Boolean expression. Test if keys (or array) are a superset
+ of/contained the keys of the argument jsonb expression.
+
+ kwargs may be ignored by this operator but are required for API
+ conformance.
+ """
+ return self.operate(CONTAINS, other, result_type=sqltypes.Boolean)
+
+ def contained_by(self, other):
+ """Boolean expression. Test if keys are a proper subset of the
+ keys of the argument jsonb expression.
+ """
+ return self.operate(
+ CONTAINED_BY, other, result_type=sqltypes.Boolean
+ )
+
+ comparator_factory = Comparator
diff --git a/lib/sqlalchemy/dialects/postgresql/pg8000.py b/lib/sqlalchemy/dialects/postgresql/pg8000.py
new file mode 100644
index 0000000..98561a9
--- /dev/null
+++ b/lib/sqlalchemy/dialects/postgresql/pg8000.py
@@ -0,0 +1,594 @@
+# postgresql/pg8000.py
+# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors <see AUTHORS
+# file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+r"""
+.. dialect:: postgresql+pg8000
+ :name: pg8000
+ :dbapi: pg8000
+ :connectstring: postgresql+pg8000://user:password@host:port/dbname[?key=value&key=value...]
+ :url: https://pypi.org/project/pg8000/
+
+.. versionchanged:: 1.4 The pg8000 dialect has been updated for version
+ 1.16.6 and higher, and is again part of SQLAlchemy's continuous integration
+ with full feature support.
+
+.. _pg8000_unicode:
+
+Unicode
+-------
+
+pg8000 will encode / decode string values between it and the server using the
+PostgreSQL ``client_encoding`` parameter; by default this is the value in
+the ``postgresql.conf`` file, which often defaults to ``SQL_ASCII``.
+Typically, this can be changed to ``utf-8``, as a more useful default::
+
+ #client_encoding = sql_ascii # actually, defaults to database
+ # encoding
+ client_encoding = utf8
+
+The ``client_encoding`` can be overridden for a session by executing the SQL:
+
+SET CLIENT_ENCODING TO 'utf8';
+
+SQLAlchemy will execute this SQL on all new connections based on the value
+passed to :func:`_sa.create_engine` using the ``client_encoding`` parameter::
+
+ engine = create_engine(
+ "postgresql+pg8000://user:pass@host/dbname", client_encoding='utf8')
+
+.. _pg8000_ssl:
+
+SSL Connections
+---------------
+
+pg8000 accepts a Python ``SSLContext`` object which may be specified using the
+:paramref:`_sa.create_engine.connect_args` dictionary::
+
+ import ssl
+ ssl_context = ssl.create_default_context()
+ engine = sa.create_engine(
+ "postgresql+pg8000://scott:tiger@192.168.0.199/test",
+ connect_args={"ssl_context": ssl_context},
+ )
+
+If the server uses an automatically-generated certificate that is self-signed
+or does not match the host name (as seen from the client), it may also be
+necessary to disable hostname checking::
+
+ import ssl
+ ssl_context = ssl.create_default_context()
+ ssl_context.check_hostname = False
+ ssl_context.verify_mode = ssl.CERT_NONE
+ engine = sa.create_engine(
+ "postgresql+pg8000://scott:tiger@192.168.0.199/test",
+ connect_args={"ssl_context": ssl_context},
+ )
+
+.. _pg8000_isolation_level:
+
+pg8000 Transaction Isolation Level
+-------------------------------------
+
+The pg8000 dialect offers the same isolation level settings as that
+of the :ref:`psycopg2 <psycopg2_isolation_level>` dialect:
+
+* ``READ COMMITTED``
+* ``READ UNCOMMITTED``
+* ``REPEATABLE READ``
+* ``SERIALIZABLE``
+* ``AUTOCOMMIT``
+
+.. seealso::
+
+ :ref:`postgresql_isolation_level`
+
+ :ref:`psycopg2_isolation_level`
+
+
+""" # noqa
+import decimal
+import re
+from uuid import UUID as _python_UUID
+
+from .array import ARRAY as PGARRAY
+from .base import _ColonCast
+from .base import _DECIMAL_TYPES
+from .base import _FLOAT_TYPES
+from .base import _INT_TYPES
+from .base import ENUM
+from .base import INTERVAL
+from .base import PGCompiler
+from .base import PGDialect
+from .base import PGExecutionContext
+from .base import PGIdentifierPreparer
+from .base import UUID
+from .json import JSON
+from .json import JSONB
+from .json import JSONPathType
+from ... import exc
+from ... import processors
+from ... import types as sqltypes
+from ... import util
+from ...sql.elements import quoted_name
+
+
+class _PGNumeric(sqltypes.Numeric):
+ def result_processor(self, dialect, coltype):
+ if self.asdecimal:
+ if coltype in _FLOAT_TYPES:
+ return processors.to_decimal_processor_factory(
+ decimal.Decimal, self._effective_decimal_return_scale
+ )
+ elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
+ # pg8000 returns Decimal natively for 1700
+ return None
+ else:
+ raise exc.InvalidRequestError(
+ "Unknown PG numeric type: %d" % coltype
+ )
+ else:
+ if coltype in _FLOAT_TYPES:
+ # pg8000 returns float natively for 701
+ return None
+ elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
+ return processors.to_float
+ else:
+ raise exc.InvalidRequestError(
+ "Unknown PG numeric type: %d" % coltype
+ )
+
+
+class _PGNumericNoBind(_PGNumeric):
+ def bind_processor(self, dialect):
+ return None
+
+
+class _PGJSON(JSON):
+ def result_processor(self, dialect, coltype):
+ return None
+
+ def get_dbapi_type(self, dbapi):
+ return dbapi.JSON
+
+
+class _PGJSONB(JSONB):
+ def result_processor(self, dialect, coltype):
+ return None
+
+ def get_dbapi_type(self, dbapi):
+ return dbapi.JSONB
+
+
+class _PGJSONIndexType(sqltypes.JSON.JSONIndexType):
+ def get_dbapi_type(self, dbapi):
+ raise NotImplementedError("should not be here")
+
+
+class _PGJSONIntIndexType(sqltypes.JSON.JSONIntIndexType):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.INTEGER
+
+
+class _PGJSONStrIndexType(sqltypes.JSON.JSONStrIndexType):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.STRING
+
+
+class _PGJSONPathType(JSONPathType):
+ def get_dbapi_type(self, dbapi):
+ return 1009
+
+
+class _PGUUID(UUID):
+ def bind_processor(self, dialect):
+ if not self.as_uuid:
+
+ def process(value):
+ if value is not None:
+ value = _python_UUID(value)
+ return value
+
+ return process
+
+ def result_processor(self, dialect, coltype):
+ if not self.as_uuid:
+
+ def process(value):
+ if value is not None:
+ value = str(value)
+ return value
+
+ return process
+
+
+class _PGEnum(ENUM):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.UNKNOWN
+
+
+class _PGInterval(INTERVAL):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.INTERVAL
+
+ @classmethod
+ def adapt_emulated_to_native(cls, interval, **kw):
+ return _PGInterval(precision=interval.second_precision)
+
+
+class _PGTimeStamp(sqltypes.DateTime):
+ def get_dbapi_type(self, dbapi):
+ if self.timezone:
+ # TIMESTAMPTZOID
+ return 1184
+ else:
+ # TIMESTAMPOID
+ return 1114
+
+
+class _PGTime(sqltypes.Time):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.TIME
+
+
+class _PGInteger(sqltypes.Integer):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.INTEGER
+
+
+class _PGSmallInteger(sqltypes.SmallInteger):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.INTEGER
+
+
+class _PGNullType(sqltypes.NullType):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.NULLTYPE
+
+
+class _PGBigInteger(sqltypes.BigInteger):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.BIGINTEGER
+
+
+class _PGBoolean(sqltypes.Boolean):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.BOOLEAN
+
+
+class _PGARRAY(PGARRAY):
+ def bind_expression(self, bindvalue):
+ return _ColonCast(bindvalue, self)
+
+
+_server_side_id = util.counter()
+
+
+class PGExecutionContext_pg8000(PGExecutionContext):
+ def create_server_side_cursor(self):
+ ident = "c_%s_%s" % (hex(id(self))[2:], hex(_server_side_id())[2:])
+ return ServerSideCursor(self._dbapi_connection.cursor(), ident)
+
+ def pre_exec(self):
+ if not self.compiled:
+ return
+
+
+class ServerSideCursor:
+ server_side = True
+
+ def __init__(self, cursor, ident):
+ self.ident = ident
+ self.cursor = cursor
+
+ @property
+ def connection(self):
+ return self.cursor.connection
+
+ @property
+ def rowcount(self):
+ return self.cursor.rowcount
+
+ @property
+ def description(self):
+ return self.cursor.description
+
+ def execute(self, operation, args=(), stream=None):
+ op = "DECLARE " + self.ident + " NO SCROLL CURSOR FOR " + operation
+ self.cursor.execute(op, args, stream=stream)
+ return self
+
+ def executemany(self, operation, param_sets):
+ self.cursor.executemany(operation, param_sets)
+ return self
+
+ def fetchone(self):
+ self.cursor.execute("FETCH FORWARD 1 FROM " + self.ident)
+ return self.cursor.fetchone()
+
+ def fetchmany(self, num=None):
+ if num is None:
+ return self.fetchall()
+ else:
+ self.cursor.execute(
+ "FETCH FORWARD " + str(int(num)) + " FROM " + self.ident
+ )
+ return self.cursor.fetchall()
+
+ def fetchall(self):
+ self.cursor.execute("FETCH FORWARD ALL FROM " + self.ident)
+ return self.cursor.fetchall()
+
+ def close(self):
+ self.cursor.execute("CLOSE " + self.ident)
+ self.cursor.close()
+
+ def setinputsizes(self, *sizes):
+ self.cursor.setinputsizes(*sizes)
+
+ def setoutputsize(self, size, column=None):
+ pass
+
+
+class PGCompiler_pg8000(PGCompiler):
+ def visit_mod_binary(self, binary, operator, **kw):
+ return (
+ self.process(binary.left, **kw)
+ + " %% "
+ + self.process(binary.right, **kw)
+ )
+
+
+class PGIdentifierPreparer_pg8000(PGIdentifierPreparer):
+ def __init__(self, *args, **kwargs):
+ PGIdentifierPreparer.__init__(self, *args, **kwargs)
+ self._double_percents = False
+
+
+class PGDialect_pg8000(PGDialect):
+ driver = "pg8000"
+ supports_statement_cache = True
+
+ supports_unicode_statements = True
+
+ supports_unicode_binds = True
+
+ default_paramstyle = "format"
+ supports_sane_multi_rowcount = True
+ execution_ctx_cls = PGExecutionContext_pg8000
+ statement_compiler = PGCompiler_pg8000
+ preparer = PGIdentifierPreparer_pg8000
+ supports_server_side_cursors = True
+
+ use_setinputsizes = True
+
+ # reversed as of pg8000 1.16.6. 1.16.5 and lower
+ # are no longer compatible
+ description_encoding = None
+ # description_encoding = "use_encoding"
+
+ colspecs = util.update_copy(
+ PGDialect.colspecs,
+ {
+ sqltypes.Numeric: _PGNumericNoBind,
+ sqltypes.Float: _PGNumeric,
+ sqltypes.JSON: _PGJSON,
+ sqltypes.Boolean: _PGBoolean,
+ sqltypes.NullType: _PGNullType,
+ JSONB: _PGJSONB,
+ sqltypes.JSON.JSONPathType: _PGJSONPathType,
+ sqltypes.JSON.JSONIndexType: _PGJSONIndexType,
+ sqltypes.JSON.JSONIntIndexType: _PGJSONIntIndexType,
+ sqltypes.JSON.JSONStrIndexType: _PGJSONStrIndexType,
+ UUID: _PGUUID,
+ sqltypes.Interval: _PGInterval,
+ INTERVAL: _PGInterval,
+ sqltypes.DateTime: _PGTimeStamp,
+ sqltypes.Time: _PGTime,
+ sqltypes.Integer: _PGInteger,
+ sqltypes.SmallInteger: _PGSmallInteger,
+ sqltypes.BigInteger: _PGBigInteger,
+ sqltypes.Enum: _PGEnum,
+ sqltypes.ARRAY: _PGARRAY,
+ },
+ )
+
+ def __init__(self, client_encoding=None, **kwargs):
+ PGDialect.__init__(self, **kwargs)
+ self.client_encoding = client_encoding
+
+ if self._dbapi_version < (1, 16, 6):
+ raise NotImplementedError("pg8000 1.16.6 or greater is required")
+
+ @util.memoized_property
+ def _dbapi_version(self):
+ if self.dbapi and hasattr(self.dbapi, "__version__"):
+ return tuple(
+ [
+ int(x)
+ for x in re.findall(
+ r"(\d+)(?:[-\.]?|$)", self.dbapi.__version__
+ )
+ ]
+ )
+ else:
+ return (99, 99, 99)
+
+ @classmethod
+ def dbapi(cls):
+ return __import__("pg8000")
+
+ def create_connect_args(self, url):
+ opts = url.translate_connect_args(username="user")
+ if "port" in opts:
+ opts["port"] = int(opts["port"])
+ opts.update(url.query)
+ return ([], opts)
+
+ def is_disconnect(self, e, connection, cursor):
+ if isinstance(e, self.dbapi.InterfaceError) and "network error" in str(
+ e
+ ):
+ # new as of pg8000 1.19.0 for broken connections
+ return True
+
+ # connection was closed normally
+ return "connection is closed" in str(e)
+
+ def set_isolation_level(self, connection, level):
+ level = level.replace("_", " ")
+
+ # adjust for ConnectionFairy possibly being present
+ if hasattr(connection, "dbapi_connection"):
+ connection = connection.dbapi_connection
+
+ if level == "AUTOCOMMIT":
+ connection.autocommit = True
+ elif level in self._isolation_lookup:
+ connection.autocommit = False
+ cursor = connection.cursor()
+ cursor.execute(
+ "SET SESSION CHARACTERISTICS AS TRANSACTION "
+ "ISOLATION LEVEL %s" % level
+ )
+ cursor.execute("COMMIT")
+ cursor.close()
+ else:
+ raise exc.ArgumentError(
+ "Invalid value '%s' for isolation_level. "
+ "Valid isolation levels for %s are %s or AUTOCOMMIT"
+ % (level, self.name, ", ".join(self._isolation_lookup))
+ )
+
+ def set_readonly(self, connection, value):
+ cursor = connection.cursor()
+ try:
+ cursor.execute(
+ "SET SESSION CHARACTERISTICS AS TRANSACTION %s"
+ % ("READ ONLY" if value else "READ WRITE")
+ )
+ cursor.execute("COMMIT")
+ finally:
+ cursor.close()
+
+ def get_readonly(self, connection):
+ cursor = connection.cursor()
+ try:
+ cursor.execute("show transaction_read_only")
+ val = cursor.fetchone()[0]
+ finally:
+ cursor.close()
+
+ return val == "on"
+
+ def set_deferrable(self, connection, value):
+ cursor = connection.cursor()
+ try:
+ cursor.execute(
+ "SET SESSION CHARACTERISTICS AS TRANSACTION %s"
+ % ("DEFERRABLE" if value else "NOT DEFERRABLE")
+ )
+ cursor.execute("COMMIT")
+ finally:
+ cursor.close()
+
+ def get_deferrable(self, connection):
+ cursor = connection.cursor()
+ try:
+ cursor.execute("show transaction_deferrable")
+ val = cursor.fetchone()[0]
+ finally:
+ cursor.close()
+
+ return val == "on"
+
+ def set_client_encoding(self, connection, client_encoding):
+ # adjust for ConnectionFairy possibly being present
+ if hasattr(connection, "dbapi_connection"):
+ connection = connection.dbapi_connection
+
+ cursor = connection.cursor()
+ cursor.execute("SET CLIENT_ENCODING TO '" + client_encoding + "'")
+ cursor.execute("COMMIT")
+ cursor.close()
+
+ def do_set_input_sizes(self, cursor, list_of_tuples, context):
+ if self.positional:
+ cursor.setinputsizes(
+ *[dbtype for key, dbtype, sqltype in list_of_tuples]
+ )
+ else:
+ cursor.setinputsizes(
+ **{
+ key: dbtype
+ for key, dbtype, sqltype in list_of_tuples
+ if dbtype
+ }
+ )
+
+ def do_begin_twophase(self, connection, xid):
+ connection.connection.tpc_begin((0, xid, ""))
+
+ def do_prepare_twophase(self, connection, xid):
+ connection.connection.tpc_prepare()
+
+ def do_rollback_twophase(
+ self, connection, xid, is_prepared=True, recover=False
+ ):
+ connection.connection.tpc_rollback((0, xid, ""))
+
+ def do_commit_twophase(
+ self, connection, xid, is_prepared=True, recover=False
+ ):
+ connection.connection.tpc_commit((0, xid, ""))
+
+ def do_recover_twophase(self, connection):
+ return [row[1] for row in connection.connection.tpc_recover()]
+
+ def on_connect(self):
+ fns = []
+
+ def on_connect(conn):
+ conn.py_types[quoted_name] = conn.py_types[util.text_type]
+
+ fns.append(on_connect)
+
+ if self.client_encoding is not None:
+
+ def on_connect(conn):
+ self.set_client_encoding(conn, self.client_encoding)
+
+ fns.append(on_connect)
+
+ if self.isolation_level is not None:
+
+ def on_connect(conn):
+ self.set_isolation_level(conn, self.isolation_level)
+
+ fns.append(on_connect)
+
+ if self._json_deserializer:
+
+ def on_connect(conn):
+ # json
+ conn.register_in_adapter(114, self._json_deserializer)
+
+ # jsonb
+ conn.register_in_adapter(3802, self._json_deserializer)
+
+ fns.append(on_connect)
+
+ if len(fns) > 0:
+
+ def on_connect(conn):
+ for fn in fns:
+ fn(conn)
+
+ return on_connect
+ else:
+ return None
+
+
+dialect = PGDialect_pg8000
diff --git a/lib/sqlalchemy/dialects/postgresql/provision.py b/lib/sqlalchemy/dialects/postgresql/provision.py
new file mode 100644
index 0000000..98470f3
--- /dev/null
+++ b/lib/sqlalchemy/dialects/postgresql/provision.py
@@ -0,0 +1,124 @@
+import time
+
+from ... import exc
+from ... import inspect
+from ... import text
+from ...testing import warn_test_suite
+from ...testing.provision import create_db
+from ...testing.provision import drop_all_schema_objects_post_tables
+from ...testing.provision import drop_all_schema_objects_pre_tables
+from ...testing.provision import drop_db
+from ...testing.provision import log
+from ...testing.provision import prepare_for_drop_tables
+from ...testing.provision import set_default_schema_on_connection
+from ...testing.provision import temp_table_keyword_args
+
+
+@create_db.for_db("postgresql")
+def _pg_create_db(cfg, eng, ident):
+ template_db = cfg.options.postgresql_templatedb
+
+ with eng.execution_options(isolation_level="AUTOCOMMIT").begin() as conn:
+
+ if not template_db:
+ template_db = conn.exec_driver_sql(
+ "select current_database()"
+ ).scalar()
+
+ attempt = 0
+ while True:
+ try:
+ conn.exec_driver_sql(
+ "CREATE DATABASE %s TEMPLATE %s" % (ident, template_db)
+ )
+ except exc.OperationalError as err:
+ attempt += 1
+ if attempt >= 3:
+ raise
+ if "accessed by other users" in str(err):
+ log.info(
+ "Waiting to create %s, URI %r, "
+ "template DB %s is in use sleeping for .5",
+ ident,
+ eng.url,
+ template_db,
+ )
+ time.sleep(0.5)
+ except:
+ raise
+ else:
+ break
+
+
+@drop_db.for_db("postgresql")
+def _pg_drop_db(cfg, eng, ident):
+ with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
+ with conn.begin():
+ conn.execute(
+ text(
+ "select pg_terminate_backend(pid) from pg_stat_activity "
+ "where usename=current_user and pid != pg_backend_pid() "
+ "and datname=:dname"
+ ),
+ dict(dname=ident),
+ )
+ conn.exec_driver_sql("DROP DATABASE %s" % ident)
+
+
+@temp_table_keyword_args.for_db("postgresql")
+def _postgresql_temp_table_keyword_args(cfg, eng):
+ return {"prefixes": ["TEMPORARY"]}
+
+
+@set_default_schema_on_connection.for_db("postgresql")
+def _postgresql_set_default_schema_on_connection(
+ cfg, dbapi_connection, schema_name
+):
+ existing_autocommit = dbapi_connection.autocommit
+ dbapi_connection.autocommit = True
+ cursor = dbapi_connection.cursor()
+ cursor.execute("SET SESSION search_path='%s'" % schema_name)
+ cursor.close()
+ dbapi_connection.autocommit = existing_autocommit
+
+
+@drop_all_schema_objects_pre_tables.for_db("postgresql")
+def drop_all_schema_objects_pre_tables(cfg, eng):
+ with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
+ for xid in conn.execute("select gid from pg_prepared_xacts").scalars():
+ conn.execute("ROLLBACK PREPARED '%s'" % xid)
+
+
+@drop_all_schema_objects_post_tables.for_db("postgresql")
+def drop_all_schema_objects_post_tables(cfg, eng):
+ from sqlalchemy.dialects import postgresql
+
+ inspector = inspect(eng)
+ with eng.begin() as conn:
+ for enum in inspector.get_enums("*"):
+ conn.execute(
+ postgresql.DropEnumType(
+ postgresql.ENUM(name=enum["name"], schema=enum["schema"])
+ )
+ )
+
+
+@prepare_for_drop_tables.for_db("postgresql")
+def prepare_for_drop_tables(config, connection):
+ """Ensure there are no locks on the current username/database."""
+
+ result = connection.exec_driver_sql(
+ "select pid, state, wait_event_type, query "
+ # "select pg_terminate_backend(pid), state, wait_event_type "
+ "from pg_stat_activity where "
+ "usename=current_user "
+ "and datname=current_database() and state='idle in transaction' "
+ "and pid != pg_backend_pid()"
+ )
+ rows = result.all() # noqa
+ if rows:
+ warn_test_suite(
+ "PostgreSQL may not be able to DROP tables due to "
+ "idle in transaction: %s"
+ % ("; ".join(row._mapping["query"] for row in rows))
+ )
diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg2.py b/lib/sqlalchemy/dialects/postgresql/psycopg2.py
new file mode 100644
index 0000000..6747427
--- /dev/null
+++ b/lib/sqlalchemy/dialects/postgresql/psycopg2.py
@@ -0,0 +1,1088 @@
+# postgresql/psycopg2.py
+# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+r"""
+.. dialect:: postgresql+psycopg2
+ :name: psycopg2
+ :dbapi: psycopg2
+ :connectstring: postgresql+psycopg2://user:password@host:port/dbname[?key=value&key=value...]
+ :url: https://pypi.org/project/psycopg2/
+
+psycopg2 Connect Arguments
+--------------------------
+
+Keyword arguments that are specific to the SQLAlchemy psycopg2 dialect
+may be passed to :func:`_sa.create_engine()`, and include the following:
+
+
+* ``isolation_level``: This option, available for all PostgreSQL dialects,
+ includes the ``AUTOCOMMIT`` isolation level when using the psycopg2
+ dialect. This option sets the **default** isolation level for the
+ connection that is set immediately upon connection to the database before
+ the connection is pooled. This option is generally superseded by the more
+ modern :paramref:`_engine.Connection.execution_options.isolation_level`
+ execution option, detailed at :ref:`dbapi_autocommit`.
+
+ .. seealso::
+
+ :ref:`psycopg2_isolation_level`
+
+ :ref:`dbapi_autocommit`
+
+
+* ``client_encoding``: sets the client encoding in a libpq-agnostic way,
+ using psycopg2's ``set_client_encoding()`` method.
+
+ .. seealso::
+
+ :ref:`psycopg2_unicode`
+
+* ``use_native_unicode``: Under Python 2 only, this can be set to False to
+ disable the use of psycopg2's native Unicode support.
+
+ .. seealso::
+
+ :ref:`psycopg2_disable_native_unicode`
+
+
+* ``executemany_mode``, ``executemany_batch_page_size``,
+ ``executemany_values_page_size``: Allows use of psycopg2
+ extensions for optimizing "executemany"-style queries. See the referenced
+ section below for details.
+
+ .. seealso::
+
+ :ref:`psycopg2_executemany_mode`
+
+.. tip::
+
+ The above keyword arguments are **dialect** keyword arguments, meaning
+ that they are passed as explicit keyword arguments to :func:`_sa.create_engine()`::
+
+ engine = create_engine(
+ "postgresql+psycopg2://scott:tiger@localhost/test",
+ isolation_level="SERIALIZABLE",
+ )
+
+ These should not be confused with **DBAPI** connect arguments, which
+ are passed as part of the :paramref:`_sa.create_engine.connect_args`
+ dictionary and/or are passed in the URL query string, as detailed in
+ the section :ref:`custom_dbapi_args`.
+
+.. _psycopg2_ssl:
+
+SSL Connections
+---------------
+
+The psycopg2 module has a connection argument named ``sslmode`` for
+controlling its behavior regarding secure (SSL) connections. The default is
+``sslmode=prefer``; it will attempt an SSL connection and if that fails it
+will fall back to an unencrypted connection. ``sslmode=require`` may be used
+to ensure that only secure connections are established. Consult the
+psycopg2 / libpq documentation for further options that are available.
+
+Note that ``sslmode`` is specific to psycopg2 so it is included in the
+connection URI::
+
+ engine = sa.create_engine(
+ "postgresql+psycopg2://scott:tiger@192.168.0.199:5432/test?sslmode=require"
+ )
+
+Unix Domain Connections
+------------------------
+
+psycopg2 supports connecting via Unix domain connections. When the ``host``
+portion of the URL is omitted, SQLAlchemy passes ``None`` to psycopg2,
+which specifies Unix-domain communication rather than TCP/IP communication::
+
+ create_engine("postgresql+psycopg2://user:password@/dbname")
+
+By default, the socket file used is to connect to a Unix-domain socket
+in ``/tmp``, or whatever socket directory was specified when PostgreSQL
+was built. This value can be overridden by passing a pathname to psycopg2,
+using ``host`` as an additional keyword argument::
+
+ create_engine("postgresql+psycopg2://user:password@/dbname?host=/var/lib/postgresql")
+
+.. seealso::
+
+ `PQconnectdbParams \
+ <https://www.postgresql.org/docs/current/static/libpq-connect.html#LIBPQ-PQCONNECTDBPARAMS>`_
+
+.. _psycopg2_multi_host:
+
+Specifying multiple fallback hosts
+-----------------------------------
+
+psycopg2 supports multiple connection points in the connection string.
+When the ``host`` parameter is used multiple times in the query section of
+the URL, SQLAlchemy will create a single string of the host and port
+information provided to make the connections. Tokens may consist of
+``host::port`` or just ``host``; in the latter case, the default port
+is selected by libpq. In the example below, three host connections
+are specified, for ``HostA::PortA``, ``HostB`` connecting to the default port,
+and ``HostC::PortC``::
+
+ create_engine(
+ "postgresql+psycopg2://user:password@/dbname?host=HostA:PortA&host=HostB&host=HostC:PortC"
+ )
+
+As an alternative, libpq query string format also may be used; this specifies
+``host`` and ``port`` as single query string arguments with comma-separated
+lists - the default port can be chosen by indicating an empty value
+in the comma separated list::
+
+ create_engine(
+ "postgresql+psycopg2://user:password@/dbname?host=HostA,HostB,HostC&port=PortA,,PortC"
+ )
+
+With either URL style, connections to each host is attempted based on a
+configurable strategy, which may be configured using the libpq
+``target_session_attrs`` parameter. Per libpq this defaults to ``any``
+which indicates a connection to each host is then attempted until a connection is successful.
+Other strategies include ``primary``, ``prefer-standby``, etc. The complete
+list is documented by PostgreSQL at
+`libpq connection strings <https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING>`_.
+
+For example, to indicate two hosts using the ``primary`` strategy::
+
+ create_engine(
+ "postgresql+psycopg2://user:password@/dbname?host=HostA:PortA&host=HostB&host=HostC:PortC&target_session_attrs=primary"
+ )
+
+.. versionchanged:: 1.4.40 Port specification in psycopg2 multiple host format
+ is repaired, previously ports were not correctly interpreted in this context.
+ libpq comma-separated format is also now supported.
+
+.. versionadded:: 1.3.20 Support for multiple hosts in PostgreSQL connection
+ string.
+
+.. seealso::
+
+ `libpq connection strings <https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING>`_ - please refer
+ to this section in the libpq documentation for complete background on multiple host support.
+
+
+Empty DSN Connections / Environment Variable Connections
+---------------------------------------------------------
+
+The psycopg2 DBAPI can connect to PostgreSQL by passing an empty DSN to the
+libpq client library, which by default indicates to connect to a localhost
+PostgreSQL database that is open for "trust" connections. This behavior can be
+further tailored using a particular set of environment variables which are
+prefixed with ``PG_...``, which are consumed by ``libpq`` to take the place of
+any or all elements of the connection string.
+
+For this form, the URL can be passed without any elements other than the
+initial scheme::
+
+ engine = create_engine('postgresql+psycopg2://')
+
+In the above form, a blank "dsn" string is passed to the ``psycopg2.connect()``
+function which in turn represents an empty DSN passed to libpq.
+
+.. versionadded:: 1.3.2 support for parameter-less connections with psycopg2.
+
+.. seealso::
+
+ `Environment Variables\
+ <https://www.postgresql.org/docs/current/libpq-envars.html>`_ -
+ PostgreSQL documentation on how to use ``PG_...``
+ environment variables for connections.
+
+.. _psycopg2_execution_options:
+
+Per-Statement/Connection Execution Options
+-------------------------------------------
+
+The following DBAPI-specific options are respected when used with
+:meth:`_engine.Connection.execution_options`,
+:meth:`.Executable.execution_options`,
+:meth:`_query.Query.execution_options`,
+in addition to those not specific to DBAPIs:
+
+* ``isolation_level`` - Set the transaction isolation level for the lifespan
+ of a :class:`_engine.Connection` (can only be set on a connection,
+ not a statement
+ or query). See :ref:`psycopg2_isolation_level`.
+
+* ``stream_results`` - Enable or disable usage of psycopg2 server side
+ cursors - this feature makes use of "named" cursors in combination with
+ special result handling methods so that result rows are not fully buffered.
+ Defaults to False, meaning cursors are buffered by default.
+
+* ``max_row_buffer`` - when using ``stream_results``, an integer value that
+ specifies the maximum number of rows to buffer at a time. This is
+ interpreted by the :class:`.BufferedRowCursorResult`, and if omitted the
+ buffer will grow to ultimately store 1000 rows at a time.
+
+ .. versionchanged:: 1.4 The ``max_row_buffer`` size can now be greater than
+ 1000, and the buffer will grow to that size.
+
+.. _psycopg2_batch_mode:
+
+.. _psycopg2_executemany_mode:
+
+Psycopg2 Fast Execution Helpers
+-------------------------------
+
+Modern versions of psycopg2 include a feature known as
+`Fast Execution Helpers \
+<https://initd.org/psycopg/docs/extras.html#fast-execution-helpers>`_, which
+have been shown in benchmarking to improve psycopg2's executemany()
+performance, primarily with INSERT statements, by multiple orders of magnitude.
+SQLAlchemy internally makes use of these extensions for ``executemany()`` style
+calls, which correspond to lists of parameters being passed to
+:meth:`_engine.Connection.execute` as detailed in :ref:`multiple parameter
+sets <tutorial_multiple_parameters>`. The ORM also uses this mode internally whenever
+possible.
+
+The two available extensions on the psycopg2 side are the ``execute_values()``
+and ``execute_batch()`` functions. The psycopg2 dialect defaults to using the
+``execute_values()`` extension for all qualifying INSERT statements.
+
+.. versionchanged:: 1.4 The psycopg2 dialect now defaults to a new mode
+ ``"values_only"`` for ``executemany_mode``, which allows an order of
+ magnitude performance improvement for INSERT statements, but does not
+ include "batch" mode for UPDATE and DELETE statements which removes the
+ ability of ``cursor.rowcount`` to function correctly.
+
+The use of these extensions is controlled by the ``executemany_mode`` flag
+which may be passed to :func:`_sa.create_engine`::
+
+ engine = create_engine(
+ "postgresql+psycopg2://scott:tiger@host/dbname",
+ executemany_mode='values_plus_batch')
+
+
+Possible options for ``executemany_mode`` include:
+
+* ``values_only`` - this is the default value. the psycopg2 execute_values()
+ extension is used for qualifying INSERT statements, which rewrites the INSERT
+ to include multiple VALUES clauses so that many parameter sets can be
+ inserted with one statement.
+
+ .. versionadded:: 1.4 Added ``"values_only"`` setting for ``executemany_mode``
+ which is also now the default.
+
+* ``None`` - No psycopg2 extensions are not used, and the usual
+ ``cursor.executemany()`` method is used when invoking statements with
+ multiple parameter sets.
+
+* ``'batch'`` - Uses ``psycopg2.extras.execute_batch`` for all qualifying
+ INSERT, UPDATE and DELETE statements, so that multiple copies
+ of a SQL query, each one corresponding to a parameter set passed to
+ ``executemany()``, are joined into a single SQL string separated by a
+ semicolon. When using this mode, the :attr:`_engine.CursorResult.rowcount`
+ attribute will not contain a value for executemany-style executions.
+
+* ``'values_plus_batch'``- ``execute_values`` is used for qualifying INSERT
+ statements, ``execute_batch`` is used for UPDATE and DELETE.
+ When using this mode, the :attr:`_engine.CursorResult.rowcount`
+ attribute will not contain a value for executemany-style executions against
+ UPDATE and DELETE statements.
+
+By "qualifying statements", we mean that the statement being executed
+must be a Core :func:`_expression.insert`, :func:`_expression.update`
+or :func:`_expression.delete` construct, and not a plain textual SQL
+string or one constructed using :func:`_expression.text`. When using the
+ORM, all insert/update/delete statements used by the ORM flush process
+are qualifying.
+
+The "page size" for the "values" and "batch" strategies can be affected
+by using the ``executemany_batch_page_size`` and
+``executemany_values_page_size`` engine parameters. These
+control how many parameter sets
+should be represented in each execution. The "values" page size defaults
+to 1000, which is different that psycopg2's default. The "batch" page
+size defaults to 100. These can be affected by passing new values to
+:func:`_engine.create_engine`::
+
+ engine = create_engine(
+ "postgresql+psycopg2://scott:tiger@host/dbname",
+ executemany_mode='values',
+ executemany_values_page_size=10000, executemany_batch_page_size=500)
+
+.. versionchanged:: 1.4
+
+ The default for ``executemany_values_page_size`` is now 1000, up from
+ 100.
+
+.. seealso::
+
+ :ref:`tutorial_multiple_parameters` - General information on using the
+ :class:`_engine.Connection`
+ object to execute statements in such a way as to make
+ use of the DBAPI ``.executemany()`` method.
+
+
+.. _psycopg2_unicode:
+
+Unicode with Psycopg2
+----------------------
+
+The psycopg2 DBAPI driver supports Unicode data transparently. Under Python 2
+only, the SQLAlchemy psycopg2 dialect will enable the
+``psycopg2.extensions.UNICODE`` extension by default to ensure Unicode is
+handled properly; under Python 3, this is psycopg2's default behavior.
+
+The client character encoding can be controlled for the psycopg2 dialect
+in the following ways:
+
+* For PostgreSQL 9.1 and above, the ``client_encoding`` parameter may be
+ passed in the database URL; this parameter is consumed by the underlying
+ ``libpq`` PostgreSQL client library::
+
+ engine = create_engine("postgresql+psycopg2://user:pass@host/dbname?client_encoding=utf8")
+
+ Alternatively, the above ``client_encoding`` value may be passed using
+ :paramref:`_sa.create_engine.connect_args` for programmatic establishment with
+ ``libpq``::
+
+ engine = create_engine(
+ "postgresql+psycopg2://user:pass@host/dbname",
+ connect_args={'client_encoding': 'utf8'}
+ )
+
+* For all PostgreSQL versions, psycopg2 supports a client-side encoding
+ value that will be passed to database connections when they are first
+ established. The SQLAlchemy psycopg2 dialect supports this using the
+ ``client_encoding`` parameter passed to :func:`_sa.create_engine`::
+
+ engine = create_engine(
+ "postgresql+psycopg2://user:pass@host/dbname",
+ client_encoding="utf8"
+ )
+
+ .. tip:: The above ``client_encoding`` parameter admittedly is very similar
+ in appearance to usage of the parameter within the
+ :paramref:`_sa.create_engine.connect_args` dictionary; the difference
+ above is that the parameter is consumed by psycopg2 and is
+ passed to the database connection using ``SET client_encoding TO
+ 'utf8'``; in the previously mentioned style, the parameter is instead
+ passed through psycopg2 and consumed by the ``libpq`` library.
+
+* A common way to set up client encoding with PostgreSQL databases is to
+ ensure it is configured within the server-side postgresql.conf file;
+ this is the recommended way to set encoding for a server that is
+ consistently of one encoding in all databases::
+
+ # postgresql.conf file
+
+ # client_encoding = sql_ascii # actually, defaults to database
+ # encoding
+ client_encoding = utf8
+
+.. _psycopg2_disable_native_unicode:
+
+Disabling Native Unicode
+^^^^^^^^^^^^^^^^^^^^^^^^
+
+Under Python 2 only, SQLAlchemy can also be instructed to skip the usage of the
+psycopg2 ``UNICODE`` extension and to instead utilize its own unicode
+encode/decode services, which are normally reserved only for those DBAPIs that
+don't fully support unicode directly. Passing ``use_native_unicode=False`` to
+:func:`_sa.create_engine` will disable usage of ``psycopg2.extensions.
+UNICODE``. SQLAlchemy will instead encode data itself into Python bytestrings
+on the way in and coerce from bytes on the way back, using the value of the
+:func:`_sa.create_engine` ``encoding`` parameter, which defaults to ``utf-8``.
+SQLAlchemy's own unicode encode/decode functionality is steadily becoming
+obsolete as most DBAPIs now support unicode fully.
+
+
+Transactions
+------------
+
+The psycopg2 dialect fully supports SAVEPOINT and two-phase commit operations.
+
+.. _psycopg2_isolation_level:
+
+Psycopg2 Transaction Isolation Level
+-------------------------------------
+
+As discussed in :ref:`postgresql_isolation_level`,
+all PostgreSQL dialects support setting of transaction isolation level
+both via the ``isolation_level`` parameter passed to :func:`_sa.create_engine`
+,
+as well as the ``isolation_level`` argument used by
+:meth:`_engine.Connection.execution_options`. When using the psycopg2 dialect
+, these
+options make use of psycopg2's ``set_isolation_level()`` connection method,
+rather than emitting a PostgreSQL directive; this is because psycopg2's
+API-level setting is always emitted at the start of each transaction in any
+case.
+
+The psycopg2 dialect supports these constants for isolation level:
+
+* ``READ COMMITTED``
+* ``READ UNCOMMITTED``
+* ``REPEATABLE READ``
+* ``SERIALIZABLE``
+* ``AUTOCOMMIT``
+
+.. seealso::
+
+ :ref:`postgresql_isolation_level`
+
+ :ref:`pg8000_isolation_level`
+
+
+NOTICE logging
+---------------
+
+The psycopg2 dialect will log PostgreSQL NOTICE messages
+via the ``sqlalchemy.dialects.postgresql`` logger. When this logger
+is set to the ``logging.INFO`` level, notice messages will be logged::
+
+ import logging
+
+ logging.getLogger('sqlalchemy.dialects.postgresql').setLevel(logging.INFO)
+
+Above, it is assumed that logging is configured externally. If this is not
+the case, configuration such as ``logging.basicConfig()`` must be utilized::
+
+ import logging
+
+ logging.basicConfig() # log messages to stdout
+ logging.getLogger('sqlalchemy.dialects.postgresql').setLevel(logging.INFO)
+
+.. seealso::
+
+ `Logging HOWTO <https://docs.python.org/3/howto/logging.html>`_ - on the python.org website
+
+.. _psycopg2_hstore:
+
+HSTORE type
+------------
+
+The ``psycopg2`` DBAPI includes an extension to natively handle marshalling of
+the HSTORE type. The SQLAlchemy psycopg2 dialect will enable this extension
+by default when psycopg2 version 2.4 or greater is used, and
+it is detected that the target database has the HSTORE type set up for use.
+In other words, when the dialect makes the first
+connection, a sequence like the following is performed:
+
+1. Request the available HSTORE oids using
+ ``psycopg2.extras.HstoreAdapter.get_oids()``.
+ If this function returns a list of HSTORE identifiers, we then determine
+ that the ``HSTORE`` extension is present.
+ This function is **skipped** if the version of psycopg2 installed is
+ less than version 2.4.
+
+2. If the ``use_native_hstore`` flag is at its default of ``True``, and
+ we've detected that ``HSTORE`` oids are available, the
+ ``psycopg2.extensions.register_hstore()`` extension is invoked for all
+ connections.
+
+The ``register_hstore()`` extension has the effect of **all Python
+dictionaries being accepted as parameters regardless of the type of target
+column in SQL**. The dictionaries are converted by this extension into a
+textual HSTORE expression. If this behavior is not desired, disable the
+use of the hstore extension by setting ``use_native_hstore`` to ``False`` as
+follows::
+
+ engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test",
+ use_native_hstore=False)
+
+The ``HSTORE`` type is **still supported** when the
+``psycopg2.extensions.register_hstore()`` extension is not used. It merely
+means that the coercion between Python dictionaries and the HSTORE
+string format, on both the parameter side and the result side, will take
+place within SQLAlchemy's own marshalling logic, and not that of ``psycopg2``
+which may be more performant.
+
+""" # noqa
+from __future__ import absolute_import
+
+import decimal
+import logging
+import re
+from uuid import UUID as _python_UUID
+
+from .array import ARRAY as PGARRAY
+from .base import _ColonCast
+from .base import _DECIMAL_TYPES
+from .base import _FLOAT_TYPES
+from .base import _INT_TYPES
+from .base import ENUM
+from .base import PGCompiler
+from .base import PGDialect
+from .base import PGExecutionContext
+from .base import PGIdentifierPreparer
+from .base import UUID
+from .hstore import HSTORE
+from .json import JSON
+from .json import JSONB
+from ... import exc
+from ... import processors
+from ... import types as sqltypes
+from ... import util
+from ...engine import cursor as _cursor
+from ...util import collections_abc
+
+
+logger = logging.getLogger("sqlalchemy.dialects.postgresql")
+
+
+class _PGNumeric(sqltypes.Numeric):
+ def bind_processor(self, dialect):
+ return None
+
+ def result_processor(self, dialect, coltype):
+ if self.asdecimal:
+ if coltype in _FLOAT_TYPES:
+ return processors.to_decimal_processor_factory(
+ decimal.Decimal, self._effective_decimal_return_scale
+ )
+ elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
+ # pg8000 returns Decimal natively for 1700
+ return None
+ else:
+ raise exc.InvalidRequestError(
+ "Unknown PG numeric type: %d" % coltype
+ )
+ else:
+ if coltype in _FLOAT_TYPES:
+ # pg8000 returns float natively for 701
+ return None
+ elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
+ return processors.to_float
+ else:
+ raise exc.InvalidRequestError(
+ "Unknown PG numeric type: %d" % coltype
+ )
+
+
+class _PGEnum(ENUM):
+ def result_processor(self, dialect, coltype):
+ if util.py2k and self._expect_unicode is True:
+ # for py2k, if the enum type needs unicode data (which is set up as
+ # part of the Enum() constructor based on values passed as py2k
+ # unicode objects) we have to use our own converters since
+ # psycopg2's don't work, a rare exception to the "modern DBAPIs
+ # support unicode everywhere" theme of deprecating
+ # convert_unicode=True. Use the special "force_nocheck" directive
+ # which forces unicode conversion to happen on the Python side
+ # without an isinstance() check. in py3k psycopg2 does the right
+ # thing automatically.
+ self._expect_unicode = "force_nocheck"
+ return super(_PGEnum, self).result_processor(dialect, coltype)
+
+
+class _PGHStore(HSTORE):
+ def bind_processor(self, dialect):
+ if dialect._has_native_hstore:
+ return None
+ else:
+ return super(_PGHStore, self).bind_processor(dialect)
+
+ def result_processor(self, dialect, coltype):
+ if dialect._has_native_hstore:
+ return None
+ else:
+ return super(_PGHStore, self).result_processor(dialect, coltype)
+
+
+class _PGARRAY(PGARRAY):
+ def bind_expression(self, bindvalue):
+ return _ColonCast(bindvalue, self)
+
+
+class _PGJSON(JSON):
+ def result_processor(self, dialect, coltype):
+ return None
+
+
+class _PGJSONB(JSONB):
+ def result_processor(self, dialect, coltype):
+ return None
+
+
+class _PGUUID(UUID):
+ def bind_processor(self, dialect):
+ if not self.as_uuid and dialect.use_native_uuid:
+
+ def process(value):
+ if value is not None:
+ value = _python_UUID(value)
+ return value
+
+ return process
+
+ def result_processor(self, dialect, coltype):
+ if not self.as_uuid and dialect.use_native_uuid:
+
+ def process(value):
+ if value is not None:
+ value = str(value)
+ return value
+
+ return process
+
+
+_server_side_id = util.counter()
+
+
+class PGExecutionContext_psycopg2(PGExecutionContext):
+ _psycopg2_fetched_rows = None
+
+ def create_server_side_cursor(self):
+ # use server-side cursors:
+ # https://lists.initd.org/pipermail/psycopg/2007-January/005251.html
+ ident = "c_%s_%s" % (hex(id(self))[2:], hex(_server_side_id())[2:])
+ return self._dbapi_connection.cursor(ident)
+
+ def post_exec(self):
+ if (
+ self._psycopg2_fetched_rows
+ and self.compiled
+ and self.compiled.returning
+ ):
+ # psycopg2 execute_values will provide for a real cursor where
+ # cursor.description works correctly. however, it executes the
+ # INSERT statement multiple times for multiple pages of rows, so
+ # while this cursor also supports calling .fetchall() directly, in
+ # order to get the list of all rows inserted across multiple pages,
+ # we have to retrieve the aggregated list from the execute_values()
+ # function directly.
+ strat_cls = _cursor.FullyBufferedCursorFetchStrategy
+ self.cursor_fetch_strategy = strat_cls(
+ self.cursor, initial_buffer=self._psycopg2_fetched_rows
+ )
+ self._log_notices(self.cursor)
+
+ def _log_notices(self, cursor):
+ # check also that notices is an iterable, after it's already
+ # established that we will be iterating through it. This is to get
+ # around test suites such as SQLAlchemy's using a Mock object for
+ # cursor
+ if not cursor.connection.notices or not isinstance(
+ cursor.connection.notices, collections_abc.Iterable
+ ):
+ return
+
+ for notice in cursor.connection.notices:
+ # NOTICE messages have a
+ # newline character at the end
+ logger.info(notice.rstrip())
+
+ cursor.connection.notices[:] = []
+
+
+class PGCompiler_psycopg2(PGCompiler):
+ pass
+
+
+class PGIdentifierPreparer_psycopg2(PGIdentifierPreparer):
+ pass
+
+
+EXECUTEMANY_PLAIN = util.symbol("executemany_plain", canonical=0)
+EXECUTEMANY_BATCH = util.symbol("executemany_batch", canonical=1)
+EXECUTEMANY_VALUES = util.symbol("executemany_values", canonical=2)
+EXECUTEMANY_VALUES_PLUS_BATCH = util.symbol(
+ "executemany_values_plus_batch",
+ canonical=EXECUTEMANY_BATCH | EXECUTEMANY_VALUES,
+)
+
+
+class PGDialect_psycopg2(PGDialect):
+ driver = "psycopg2"
+
+ supports_statement_cache = True
+
+ if util.py2k:
+ # turn off supports_unicode_statements for Python 2. psycopg2 supports
+ # unicode statements in Py2K. But! it does not support unicode *bound
+ # parameter names* because it uses the Python "%" operator to
+ # interpolate these into the string, and this fails. So for Py2K, we
+ # have to use full-on encoding for statements and parameters before
+ # passing to cursor.execute().
+ supports_unicode_statements = False
+
+ supports_server_side_cursors = True
+
+ default_paramstyle = "pyformat"
+ # set to true based on psycopg2 version
+ supports_sane_multi_rowcount = False
+ execution_ctx_cls = PGExecutionContext_psycopg2
+ statement_compiler = PGCompiler_psycopg2
+ preparer = PGIdentifierPreparer_psycopg2
+ psycopg2_version = (0, 0)
+
+ _has_native_hstore = True
+
+ engine_config_types = PGDialect.engine_config_types.union(
+ {"use_native_unicode": util.asbool}
+ )
+
+ colspecs = util.update_copy(
+ PGDialect.colspecs,
+ {
+ sqltypes.Numeric: _PGNumeric,
+ ENUM: _PGEnum, # needs force_unicode
+ sqltypes.Enum: _PGEnum, # needs force_unicode
+ HSTORE: _PGHStore,
+ JSON: _PGJSON,
+ sqltypes.JSON: _PGJSON,
+ JSONB: _PGJSONB,
+ UUID: _PGUUID,
+ sqltypes.ARRAY: _PGARRAY,
+ },
+ )
+
+ def __init__(
+ self,
+ use_native_unicode=True,
+ client_encoding=None,
+ use_native_hstore=True,
+ use_native_uuid=True,
+ executemany_mode="values_only",
+ executemany_batch_page_size=100,
+ executemany_values_page_size=1000,
+ **kwargs
+ ):
+ PGDialect.__init__(self, **kwargs)
+ self.use_native_unicode = use_native_unicode
+ if not use_native_unicode and not util.py2k:
+ raise exc.ArgumentError(
+ "psycopg2 native_unicode mode is required under Python 3"
+ )
+ if not use_native_hstore:
+ self._has_native_hstore = False
+ self.use_native_hstore = use_native_hstore
+ self.use_native_uuid = use_native_uuid
+ self.supports_unicode_binds = use_native_unicode
+ self.client_encoding = client_encoding
+
+ # Parse executemany_mode argument, allowing it to be only one of the
+ # symbol names
+ self.executemany_mode = util.symbol.parse_user_argument(
+ executemany_mode,
+ {
+ EXECUTEMANY_PLAIN: [None],
+ EXECUTEMANY_BATCH: ["batch"],
+ EXECUTEMANY_VALUES: ["values_only"],
+ EXECUTEMANY_VALUES_PLUS_BATCH: ["values_plus_batch", "values"],
+ },
+ "executemany_mode",
+ )
+
+ if self.executemany_mode & EXECUTEMANY_VALUES:
+ self.insert_executemany_returning = True
+
+ self.executemany_batch_page_size = executemany_batch_page_size
+ self.executemany_values_page_size = executemany_values_page_size
+
+ if self.dbapi and hasattr(self.dbapi, "__version__"):
+ m = re.match(r"(\d+)\.(\d+)(?:\.(\d+))?", self.dbapi.__version__)
+ if m:
+ self.psycopg2_version = tuple(
+ int(x) for x in m.group(1, 2, 3) if x is not None
+ )
+
+ if self.psycopg2_version < (2, 7):
+ raise ImportError(
+ "psycopg2 version 2.7 or higher is required."
+ )
+
+ def initialize(self, connection):
+ super(PGDialect_psycopg2, self).initialize(connection)
+ self._has_native_hstore = (
+ self.use_native_hstore
+ and self._hstore_oids(connection.connection) is not None
+ )
+
+ # PGDialect.initialize() checks server version for <= 8.2 and sets
+ # this flag to False if so
+ if not self.full_returning:
+ self.insert_executemany_returning = False
+ self.executemany_mode = EXECUTEMANY_PLAIN
+
+ self.supports_sane_multi_rowcount = not (
+ self.executemany_mode & EXECUTEMANY_BATCH
+ )
+
+ @classmethod
+ def dbapi(cls):
+ import psycopg2
+
+ return psycopg2
+
+ @classmethod
+ def _psycopg2_extensions(cls):
+ from psycopg2 import extensions
+
+ return extensions
+
+ @classmethod
+ def _psycopg2_extras(cls):
+ from psycopg2 import extras
+
+ return extras
+
+ @util.memoized_property
+ def _isolation_lookup(self):
+ extensions = self._psycopg2_extensions()
+ return {
+ "AUTOCOMMIT": extensions.ISOLATION_LEVEL_AUTOCOMMIT,
+ "READ COMMITTED": extensions.ISOLATION_LEVEL_READ_COMMITTED,
+ "READ UNCOMMITTED": extensions.ISOLATION_LEVEL_READ_UNCOMMITTED,
+ "REPEATABLE READ": extensions.ISOLATION_LEVEL_REPEATABLE_READ,
+ "SERIALIZABLE": extensions.ISOLATION_LEVEL_SERIALIZABLE,
+ }
+
+ def set_isolation_level(self, connection, level):
+ try:
+ level = self._isolation_lookup[level.replace("_", " ")]
+ except KeyError as err:
+ util.raise_(
+ exc.ArgumentError(
+ "Invalid value '%s' for isolation_level. "
+ "Valid isolation levels for %s are %s"
+ % (level, self.name, ", ".join(self._isolation_lookup))
+ ),
+ replace_context=err,
+ )
+
+ connection.set_isolation_level(level)
+
+ def set_readonly(self, connection, value):
+ connection.readonly = value
+
+ def get_readonly(self, connection):
+ return connection.readonly
+
+ def set_deferrable(self, connection, value):
+ connection.deferrable = value
+
+ def get_deferrable(self, connection):
+ return connection.deferrable
+
+ def do_ping(self, dbapi_connection):
+ cursor = None
+ before_autocommit = dbapi_connection.autocommit
+ try:
+ if not before_autocommit:
+ dbapi_connection.autocommit = True
+ cursor = dbapi_connection.cursor()
+ try:
+ cursor.execute(self._dialect_specific_select_one)
+ finally:
+ cursor.close()
+ if not before_autocommit and not dbapi_connection.closed:
+ dbapi_connection.autocommit = before_autocommit
+ except self.dbapi.Error as err:
+ if self.is_disconnect(err, dbapi_connection, cursor):
+ return False
+ else:
+ raise
+ else:
+ return True
+
+ def on_connect(self):
+ extras = self._psycopg2_extras()
+ extensions = self._psycopg2_extensions()
+
+ fns = []
+ if self.client_encoding is not None:
+
+ def on_connect(conn):
+ conn.set_client_encoding(self.client_encoding)
+
+ fns.append(on_connect)
+
+ if self.isolation_level is not None:
+
+ def on_connect(conn):
+ self.set_isolation_level(conn, self.isolation_level)
+
+ fns.append(on_connect)
+
+ if self.dbapi and self.use_native_uuid:
+
+ def on_connect(conn):
+ extras.register_uuid(None, conn)
+
+ fns.append(on_connect)
+
+ if util.py2k and self.dbapi and self.use_native_unicode:
+
+ def on_connect(conn):
+ extensions.register_type(extensions.UNICODE, conn)
+ extensions.register_type(extensions.UNICODEARRAY, conn)
+
+ fns.append(on_connect)
+
+ if self.dbapi and self.use_native_hstore:
+
+ def on_connect(conn):
+ hstore_oids = self._hstore_oids(conn)
+ if hstore_oids is not None:
+ oid, array_oid = hstore_oids
+ kw = {"oid": oid}
+ if util.py2k:
+ kw["unicode"] = True
+ kw["array_oid"] = array_oid
+ extras.register_hstore(conn, **kw)
+
+ fns.append(on_connect)
+
+ if self.dbapi and self._json_deserializer:
+
+ def on_connect(conn):
+ extras.register_default_json(
+ conn, loads=self._json_deserializer
+ )
+ extras.register_default_jsonb(
+ conn, loads=self._json_deserializer
+ )
+
+ fns.append(on_connect)
+
+ if fns:
+
+ def on_connect(conn):
+ for fn in fns:
+ fn(conn)
+
+ return on_connect
+ else:
+ return None
+
+ def do_executemany(self, cursor, statement, parameters, context=None):
+ if (
+ self.executemany_mode & EXECUTEMANY_VALUES
+ and context
+ and context.isinsert
+ and context.compiled._is_safe_for_fast_insert_values_helper
+ ):
+ executemany_values = (
+ "(%s)" % context.compiled.insert_single_values_expr
+ )
+ if not self.supports_unicode_statements:
+ executemany_values = executemany_values.encode(self.encoding)
+
+ # guard for statement that was altered via event hook or similar
+ if executemany_values not in statement:
+ executemany_values = None
+ else:
+ executemany_values = None
+
+ if executemany_values:
+ statement = statement.replace(executemany_values, "%s")
+ if self.executemany_values_page_size:
+ kwargs = {"page_size": self.executemany_values_page_size}
+ else:
+ kwargs = {}
+ xtras = self._psycopg2_extras()
+ context._psycopg2_fetched_rows = xtras.execute_values(
+ cursor,
+ statement,
+ parameters,
+ template=executemany_values,
+ fetch=bool(context.compiled.returning),
+ **kwargs
+ )
+
+ elif self.executemany_mode & EXECUTEMANY_BATCH:
+ if self.executemany_batch_page_size:
+ kwargs = {"page_size": self.executemany_batch_page_size}
+ else:
+ kwargs = {}
+ self._psycopg2_extras().execute_batch(
+ cursor, statement, parameters, **kwargs
+ )
+ else:
+ cursor.executemany(statement, parameters)
+
+ @util.memoized_instancemethod
+ def _hstore_oids(self, conn):
+ extras = self._psycopg2_extras()
+ if hasattr(conn, "dbapi_connection"):
+ conn = conn.dbapi_connection
+ oids = extras.HstoreAdapter.get_oids(conn)
+ if oids is not None and oids[0]:
+ return oids[0:2]
+ else:
+ return None
+
+ def create_connect_args(self, url):
+ opts = url.translate_connect_args(username="user")
+
+ is_multihost = False
+ if "host" in url.query:
+ is_multihost = isinstance(url.query["host"], (list, tuple))
+
+ if opts or url.query:
+ if not opts:
+ opts = {}
+ if "port" in opts:
+ opts["port"] = int(opts["port"])
+ opts.update(url.query)
+ if is_multihost:
+ hosts, ports = zip(
+ *[
+ token.split(":") if ":" in token else (token, "")
+ for token in url.query["host"]
+ ]
+ )
+ opts["host"] = ",".join(hosts)
+ if "port" in opts:
+ raise exc.ArgumentError(
+ "Can't mix 'multihost' formats together; use "
+ '"host=h1,h2,h3&port=p1,p2,p3" or '
+ '"host=h1:p1&host=h2:p2&host=h3:p3" separately'
+ )
+ opts["port"] = ",".join(ports)
+ return ([], opts)
+ else:
+ # no connection arguments whatsoever; psycopg2.connect()
+ # requires that "dsn" be present as a blank string.
+ return ([""], opts)
+
+ def is_disconnect(self, e, connection, cursor):
+ if isinstance(e, self.dbapi.Error):
+ # check the "closed" flag. this might not be
+ # present on old psycopg2 versions. Also,
+ # this flag doesn't actually help in a lot of disconnect
+ # situations, so don't rely on it.
+ if getattr(connection, "closed", False):
+ return True
+
+ # checks based on strings. in the case that .closed
+ # didn't cut it, fall back onto these.
+ str_e = str(e).partition("\n")[0]
+ for msg in [
+ # these error messages from libpq: interfaces/libpq/fe-misc.c
+ # and interfaces/libpq/fe-secure.c.
+ "terminating connection",
+ "closed the connection",
+ "connection not open",
+ "could not receive data from server",
+ "could not send data to server",
+ # psycopg2 client errors, psycopg2/connection.h,
+ # psycopg2/cursor.h
+ "connection already closed",
+ "cursor already closed",
+ # not sure where this path is originally from, it may
+ # be obsolete. It really says "losed", not "closed".
+ "losed the connection unexpectedly",
+ # these can occur in newer SSL
+ "connection has been closed unexpectedly",
+ "SSL error: decryption failed or bad record mac",
+ "SSL SYSCALL error: Bad file descriptor",
+ "SSL SYSCALL error: EOF detected",
+ "SSL SYSCALL error: Operation timed out",
+ "SSL SYSCALL error: Bad address",
+ ]:
+ idx = str_e.find(msg)
+ if idx >= 0 and '"' not in str_e[:idx]:
+ return True
+ return False
+
+
+dialect = PGDialect_psycopg2
diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg2cffi.py b/lib/sqlalchemy/dialects/postgresql/psycopg2cffi.py
new file mode 100644
index 0000000..10d1aae
--- /dev/null
+++ b/lib/sqlalchemy/dialects/postgresql/psycopg2cffi.py
@@ -0,0 +1,60 @@
+# testing/engines.py
+# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+r"""
+.. dialect:: postgresql+psycopg2cffi
+ :name: psycopg2cffi
+ :dbapi: psycopg2cffi
+ :connectstring: postgresql+psycopg2cffi://user:password@host:port/dbname[?key=value&key=value...]
+ :url: https://pypi.org/project/psycopg2cffi/
+
+``psycopg2cffi`` is an adaptation of ``psycopg2``, using CFFI for the C
+layer. This makes it suitable for use in e.g. PyPy. Documentation
+is as per ``psycopg2``.
+
+.. versionadded:: 1.0.0
+
+.. seealso::
+
+ :mod:`sqlalchemy.dialects.postgresql.psycopg2`
+
+""" # noqa
+from .psycopg2 import PGDialect_psycopg2
+
+
+class PGDialect_psycopg2cffi(PGDialect_psycopg2):
+ driver = "psycopg2cffi"
+ supports_unicode_statements = True
+ supports_statement_cache = True
+
+ # psycopg2cffi's first release is 2.5.0, but reports
+ # __version__ as 2.4.4. Subsequent releases seem to have
+ # fixed this.
+
+ FEATURE_VERSION_MAP = dict(
+ native_json=(2, 4, 4),
+ native_jsonb=(2, 7, 1),
+ sane_multi_rowcount=(2, 4, 4),
+ array_oid=(2, 4, 4),
+ hstore_adapter=(2, 4, 4),
+ )
+
+ @classmethod
+ def dbapi(cls):
+ return __import__("psycopg2cffi")
+
+ @classmethod
+ def _psycopg2_extensions(cls):
+ root = __import__("psycopg2cffi", fromlist=["extensions"])
+ return root.extensions
+
+ @classmethod
+ def _psycopg2_extras(cls):
+ root = __import__("psycopg2cffi", fromlist=["extras"])
+ return root.extras
+
+
+dialect = PGDialect_psycopg2cffi
diff --git a/lib/sqlalchemy/dialects/postgresql/pygresql.py b/lib/sqlalchemy/dialects/postgresql/pygresql.py
new file mode 100644
index 0000000..d273b8c
--- /dev/null
+++ b/lib/sqlalchemy/dialects/postgresql/pygresql.py
@@ -0,0 +1,278 @@
+# postgresql/pygresql.py
+# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+"""
+.. dialect:: postgresql+pygresql
+ :name: pygresql
+ :dbapi: pgdb
+ :connectstring: postgresql+pygresql://user:password@host:port/dbname[?key=value&key=value...]
+ :url: https://www.pygresql.org/
+
+.. note::
+
+ The pygresql dialect is **not tested as part of SQLAlchemy's continuous
+ integration** and may have unresolved issues. The recommended PostgreSQL
+ dialect is psycopg2.
+
+.. deprecated:: 1.4 The pygresql DBAPI is deprecated and will be removed
+ in a future version. Please use one of the supported DBAPIs to
+ connect to PostgreSQL.
+
+""" # noqa
+
+import decimal
+import re
+
+from .base import _DECIMAL_TYPES
+from .base import _FLOAT_TYPES
+from .base import _INT_TYPES
+from .base import PGCompiler
+from .base import PGDialect
+from .base import PGIdentifierPreparer
+from .base import UUID
+from .hstore import HSTORE
+from .json import JSON
+from .json import JSONB
+from ... import exc
+from ... import processors
+from ... import util
+from ...sql.elements import Null
+from ...types import JSON as Json
+from ...types import Numeric
+
+
+class _PGNumeric(Numeric):
+ def bind_processor(self, dialect):
+ return None
+
+ def result_processor(self, dialect, coltype):
+ if not isinstance(coltype, int):
+ coltype = coltype.oid
+ if self.asdecimal:
+ if coltype in _FLOAT_TYPES:
+ return processors.to_decimal_processor_factory(
+ decimal.Decimal, self._effective_decimal_return_scale
+ )
+ elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
+ # PyGreSQL returns Decimal natively for 1700 (numeric)
+ return None
+ else:
+ raise exc.InvalidRequestError(
+ "Unknown PG numeric type: %d" % coltype
+ )
+ else:
+ if coltype in _FLOAT_TYPES:
+ # PyGreSQL returns float natively for 701 (float8)
+ return None
+ elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
+ return processors.to_float
+ else:
+ raise exc.InvalidRequestError(
+ "Unknown PG numeric type: %d" % coltype
+ )
+
+
+class _PGHStore(HSTORE):
+ def bind_processor(self, dialect):
+ if not dialect.has_native_hstore:
+ return super(_PGHStore, self).bind_processor(dialect)
+ hstore = dialect.dbapi.Hstore
+
+ def process(value):
+ if isinstance(value, dict):
+ return hstore(value)
+ return value
+
+ return process
+
+ def result_processor(self, dialect, coltype):
+ if not dialect.has_native_hstore:
+ return super(_PGHStore, self).result_processor(dialect, coltype)
+
+
+class _PGJSON(JSON):
+ def bind_processor(self, dialect):
+ if not dialect.has_native_json:
+ return super(_PGJSON, self).bind_processor(dialect)
+ json = dialect.dbapi.Json
+
+ def process(value):
+ if value is self.NULL:
+ value = None
+ elif isinstance(value, Null) or (
+ value is None and self.none_as_null
+ ):
+ return None
+ if value is None or isinstance(value, (dict, list)):
+ return json(value)
+ return value
+
+ return process
+
+ def result_processor(self, dialect, coltype):
+ if not dialect.has_native_json:
+ return super(_PGJSON, self).result_processor(dialect, coltype)
+
+
+class _PGJSONB(JSONB):
+ def bind_processor(self, dialect):
+ if not dialect.has_native_json:
+ return super(_PGJSONB, self).bind_processor(dialect)
+ json = dialect.dbapi.Json
+
+ def process(value):
+ if value is self.NULL:
+ value = None
+ elif isinstance(value, Null) or (
+ value is None and self.none_as_null
+ ):
+ return None
+ if value is None or isinstance(value, (dict, list)):
+ return json(value)
+ return value
+
+ return process
+
+ def result_processor(self, dialect, coltype):
+ if not dialect.has_native_json:
+ return super(_PGJSONB, self).result_processor(dialect, coltype)
+
+
+class _PGUUID(UUID):
+ def bind_processor(self, dialect):
+ if not dialect.has_native_uuid:
+ return super(_PGUUID, self).bind_processor(dialect)
+ uuid = dialect.dbapi.Uuid
+
+ def process(value):
+ if value is None:
+ return None
+ if isinstance(value, (str, bytes)):
+ if len(value) == 16:
+ return uuid(bytes=value)
+ return uuid(value)
+ if isinstance(value, int):
+ return uuid(int=value)
+ return value
+
+ return process
+
+ def result_processor(self, dialect, coltype):
+ if not dialect.has_native_uuid:
+ return super(_PGUUID, self).result_processor(dialect, coltype)
+ if not self.as_uuid:
+
+ def process(value):
+ if value is not None:
+ return str(value)
+
+ return process
+
+
+class _PGCompiler(PGCompiler):
+ def visit_mod_binary(self, binary, operator, **kw):
+ return (
+ self.process(binary.left, **kw)
+ + " %% "
+ + self.process(binary.right, **kw)
+ )
+
+ def post_process_text(self, text):
+ return text.replace("%", "%%")
+
+
+class _PGIdentifierPreparer(PGIdentifierPreparer):
+ def _escape_identifier(self, value):
+ value = value.replace(self.escape_quote, self.escape_to_quote)
+ return value.replace("%", "%%")
+
+
+class PGDialect_pygresql(PGDialect):
+
+ driver = "pygresql"
+ supports_statement_cache = True
+
+ statement_compiler = _PGCompiler
+ preparer = _PGIdentifierPreparer
+
+ @classmethod
+ def dbapi(cls):
+ import pgdb
+
+ util.warn_deprecated(
+ "The pygresql DBAPI is deprecated and will be removed "
+ "in a future version. Please use one of the supported DBAPIs to "
+ "connect to PostgreSQL.",
+ version="1.4",
+ )
+
+ return pgdb
+
+ colspecs = util.update_copy(
+ PGDialect.colspecs,
+ {
+ Numeric: _PGNumeric,
+ HSTORE: _PGHStore,
+ Json: _PGJSON,
+ JSON: _PGJSON,
+ JSONB: _PGJSONB,
+ UUID: _PGUUID,
+ },
+ )
+
+ def __init__(self, **kwargs):
+ super(PGDialect_pygresql, self).__init__(**kwargs)
+ try:
+ version = self.dbapi.version
+ m = re.match(r"(\d+)\.(\d+)", version)
+ version = (int(m.group(1)), int(m.group(2)))
+ except (AttributeError, ValueError, TypeError):
+ version = (0, 0)
+ self.dbapi_version = version
+ if version < (5, 0):
+ has_native_hstore = has_native_json = has_native_uuid = False
+ if version != (0, 0):
+ util.warn(
+ "PyGreSQL is only fully supported by SQLAlchemy"
+ " since version 5.0."
+ )
+ else:
+ self.supports_unicode_statements = True
+ self.supports_unicode_binds = True
+ has_native_hstore = has_native_json = has_native_uuid = True
+ self.has_native_hstore = has_native_hstore
+ self.has_native_json = has_native_json
+ self.has_native_uuid = has_native_uuid
+
+ def create_connect_args(self, url):
+ opts = url.translate_connect_args(username="user")
+ if "port" in opts:
+ opts["host"] = "%s:%s" % (
+ opts.get("host", "").rsplit(":", 1)[0],
+ opts.pop("port"),
+ )
+ opts.update(url.query)
+ return [], opts
+
+ def is_disconnect(self, e, connection, cursor):
+ if isinstance(e, self.dbapi.Error):
+ if not connection:
+ return False
+ try:
+ connection = connection.connection
+ except AttributeError:
+ pass
+ else:
+ if not connection:
+ return False
+ try:
+ return connection.closed
+ except AttributeError: # PyGreSQL < 5.0
+ return connection._cnx is None
+ return False
+
+
+dialect = PGDialect_pygresql
diff --git a/lib/sqlalchemy/dialects/postgresql/pypostgresql.py b/lib/sqlalchemy/dialects/postgresql/pypostgresql.py
new file mode 100644
index 0000000..886e368
--- /dev/null
+++ b/lib/sqlalchemy/dialects/postgresql/pypostgresql.py
@@ -0,0 +1,126 @@
+# postgresql/pypostgresql.py
+# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+"""
+.. dialect:: postgresql+pypostgresql
+ :name: py-postgresql
+ :dbapi: pypostgresql
+ :connectstring: postgresql+pypostgresql://user:password@host:port/dbname[?key=value&key=value...]
+ :url: https://python.projects.pgfoundry.org/
+
+.. note::
+
+ The pypostgresql dialect is **not tested as part of SQLAlchemy's continuous
+ integration** and may have unresolved issues. The recommended PostgreSQL
+ driver is psycopg2.
+
+.. deprecated:: 1.4 The py-postgresql DBAPI is deprecated and will be removed
+ in a future version. This DBAPI is superseded by the external
+ version available at external-dialect_. Please use the external version or
+ one of the supported DBAPIs to connect to PostgreSQL.
+
+.. TODO update link
+.. _external-dialect: https://github.com/PyGreSQL
+
+""" # noqa
+
+from .base import PGDialect
+from .base import PGExecutionContext
+from ... import processors
+from ... import types as sqltypes
+from ... import util
+
+
+class PGNumeric(sqltypes.Numeric):
+ def bind_processor(self, dialect):
+ return processors.to_str
+
+ def result_processor(self, dialect, coltype):
+ if self.asdecimal:
+ return None
+ else:
+ return processors.to_float
+
+
+class PGExecutionContext_pypostgresql(PGExecutionContext):
+ pass
+
+
+class PGDialect_pypostgresql(PGDialect):
+ driver = "pypostgresql"
+
+ supports_statement_cache = True
+ supports_unicode_statements = True
+ supports_unicode_binds = True
+ description_encoding = None
+ default_paramstyle = "pyformat"
+
+ # requires trunk version to support sane rowcounts
+ # TODO: use dbapi version information to set this flag appropriately
+ supports_sane_rowcount = True
+ supports_sane_multi_rowcount = False
+
+ execution_ctx_cls = PGExecutionContext_pypostgresql
+ colspecs = util.update_copy(
+ PGDialect.colspecs,
+ {
+ sqltypes.Numeric: PGNumeric,
+ # prevents PGNumeric from being used
+ sqltypes.Float: sqltypes.Float,
+ },
+ )
+
+ @classmethod
+ def dbapi(cls):
+ from postgresql.driver import dbapi20
+
+ # TODO update link
+ util.warn_deprecated(
+ "The py-postgresql DBAPI is deprecated and will be removed "
+ "in a future version. This DBAPI is superseded by the external"
+ "version available at https://github.com/PyGreSQL. Please "
+ "use one of the supported DBAPIs to connect to PostgreSQL.",
+ version="1.4",
+ )
+
+ return dbapi20
+
+ _DBAPI_ERROR_NAMES = [
+ "Error",
+ "InterfaceError",
+ "DatabaseError",
+ "DataError",
+ "OperationalError",
+ "IntegrityError",
+ "InternalError",
+ "ProgrammingError",
+ "NotSupportedError",
+ ]
+
+ @util.memoized_property
+ def dbapi_exception_translation_map(self):
+ if self.dbapi is None:
+ return {}
+
+ return dict(
+ (getattr(self.dbapi, name).__name__, name)
+ for name in self._DBAPI_ERROR_NAMES
+ )
+
+ def create_connect_args(self, url):
+ opts = url.translate_connect_args(username="user")
+ if "port" in opts:
+ opts["port"] = int(opts["port"])
+ else:
+ opts["port"] = 5432
+ opts.update(url.query)
+ return ([], opts)
+
+ def is_disconnect(self, e, connection, cursor):
+ return "connection is closed" in str(e)
+
+
+dialect = PGDialect_pypostgresql
diff --git a/lib/sqlalchemy/dialects/postgresql/ranges.py b/lib/sqlalchemy/dialects/postgresql/ranges.py
new file mode 100644
index 0000000..51f3b04
--- /dev/null
+++ b/lib/sqlalchemy/dialects/postgresql/ranges.py
@@ -0,0 +1,138 @@
+# Copyright (C) 2013-2022 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+
+from ... import types as sqltypes
+
+
+__all__ = ("INT4RANGE", "INT8RANGE", "NUMRANGE")
+
+
+class RangeOperators(object):
+ """
+ This mixin provides functionality for the Range Operators
+ listed in the Range Operators table of the `PostgreSQL documentation`__
+ for Range Functions and Operators. It is used by all the range types
+ provided in the ``postgres`` dialect and can likely be used for
+ any range types you create yourself.
+
+ __ https://www.postgresql.org/docs/current/static/functions-range.html
+
+ No extra support is provided for the Range Functions listed in the Range
+ Functions table of the PostgreSQL documentation. For these, the normal
+ :func:`~sqlalchemy.sql.expression.func` object should be used.
+
+ """
+
+ class comparator_factory(sqltypes.Concatenable.Comparator):
+ """Define comparison operations for range types."""
+
+ def __ne__(self, other):
+ "Boolean expression. Returns true if two ranges are not equal"
+ if other is None:
+ return super(RangeOperators.comparator_factory, self).__ne__(
+ other
+ )
+ else:
+ return self.expr.op("<>", is_comparison=True)(other)
+
+ def contains(self, other, **kw):
+ """Boolean expression. Returns true if the right hand operand,
+ which can be an element or a range, is contained within the
+ column.
+
+ kwargs may be ignored by this operator but are required for API
+ conformance.
+ """
+ return self.expr.op("@>", is_comparison=True)(other)
+
+ def contained_by(self, other):
+ """Boolean expression. Returns true if the column is contained
+ within the right hand operand.
+ """
+ return self.expr.op("<@", is_comparison=True)(other)
+
+ def overlaps(self, other):
+ """Boolean expression. Returns true if the column overlaps
+ (has points in common with) the right hand operand.
+ """
+ return self.expr.op("&&", is_comparison=True)(other)
+
+ def strictly_left_of(self, other):
+ """Boolean expression. Returns true if the column is strictly
+ left of the right hand operand.
+ """
+ return self.expr.op("<<", is_comparison=True)(other)
+
+ __lshift__ = strictly_left_of
+
+ def strictly_right_of(self, other):
+ """Boolean expression. Returns true if the column is strictly
+ right of the right hand operand.
+ """
+ return self.expr.op(">>", is_comparison=True)(other)
+
+ __rshift__ = strictly_right_of
+
+ def not_extend_right_of(self, other):
+ """Boolean expression. Returns true if the range in the column
+ does not extend right of the range in the operand.
+ """
+ return self.expr.op("&<", is_comparison=True)(other)
+
+ def not_extend_left_of(self, other):
+ """Boolean expression. Returns true if the range in the column
+ does not extend left of the range in the operand.
+ """
+ return self.expr.op("&>", is_comparison=True)(other)
+
+ def adjacent_to(self, other):
+ """Boolean expression. Returns true if the range in the column
+ is adjacent to the range in the operand.
+ """
+ return self.expr.op("-|-", is_comparison=True)(other)
+
+ def __add__(self, other):
+ """Range expression. Returns the union of the two ranges.
+ Will raise an exception if the resulting range is not
+ contiguous.
+ """
+ return self.expr.op("+")(other)
+
+
+class INT4RANGE(RangeOperators, sqltypes.TypeEngine):
+ """Represent the PostgreSQL INT4RANGE type."""
+
+ __visit_name__ = "INT4RANGE"
+
+
+class INT8RANGE(RangeOperators, sqltypes.TypeEngine):
+ """Represent the PostgreSQL INT8RANGE type."""
+
+ __visit_name__ = "INT8RANGE"
+
+
+class NUMRANGE(RangeOperators, sqltypes.TypeEngine):
+ """Represent the PostgreSQL NUMRANGE type."""
+
+ __visit_name__ = "NUMRANGE"
+
+
+class DATERANGE(RangeOperators, sqltypes.TypeEngine):
+ """Represent the PostgreSQL DATERANGE type."""
+
+ __visit_name__ = "DATERANGE"
+
+
+class TSRANGE(RangeOperators, sqltypes.TypeEngine):
+ """Represent the PostgreSQL TSRANGE type."""
+
+ __visit_name__ = "TSRANGE"
+
+
+class TSTZRANGE(RangeOperators, sqltypes.TypeEngine):
+ """Represent the PostgreSQL TSTZRANGE type."""
+
+ __visit_name__ = "TSTZRANGE"