summaryrefslogtreecommitdiffstats
path: root/lib/sqlalchemy/dialects/oracle
diff options
context:
space:
mode:
authorxiubuzhe <xiubuzhe@sina.com>2023-10-08 20:59:00 +0800
committerxiubuzhe <xiubuzhe@sina.com>2023-10-08 20:59:00 +0800
commit1dac2263372df2b85db5d029a45721fa158a5c9d (patch)
tree0365f9c57df04178a726d7584ca6a6b955a7ce6a /lib/sqlalchemy/dialects/oracle
parentb494be364bb39e1de128ada7dc576a729d99907e (diff)
downloadsunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.tar.gz
sunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.tar.bz2
sunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.zip
first add files
Diffstat (limited to 'lib/sqlalchemy/dialects/oracle')
-rw-r--r--lib/sqlalchemy/dialects/oracle/__init__.py58
-rw-r--r--lib/sqlalchemy/dialects/oracle/base.py2522
-rw-r--r--lib/sqlalchemy/dialects/oracle/cx_oracle.py1424
-rw-r--r--lib/sqlalchemy/dialects/oracle/provision.py160
4 files changed, 4164 insertions, 0 deletions
diff --git a/lib/sqlalchemy/dialects/oracle/__init__.py b/lib/sqlalchemy/dialects/oracle/__init__.py
new file mode 100644
index 0000000..c83e057
--- /dev/null
+++ b/lib/sqlalchemy/dialects/oracle/__init__.py
@@ -0,0 +1,58 @@
+# oracle/__init__.py
+# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+
+from . import base # noqa
+from . import cx_oracle # noqa
+from .base import BFILE
+from .base import BINARY_DOUBLE
+from .base import BINARY_FLOAT
+from .base import BLOB
+from .base import CHAR
+from .base import CLOB
+from .base import DATE
+from .base import DOUBLE_PRECISION
+from .base import FLOAT
+from .base import INTERVAL
+from .base import LONG
+from .base import NCHAR
+from .base import NCLOB
+from .base import NUMBER
+from .base import NVARCHAR
+from .base import NVARCHAR2
+from .base import RAW
+from .base import ROWID
+from .base import TIMESTAMP
+from .base import VARCHAR
+from .base import VARCHAR2
+
+
+base.dialect = dialect = cx_oracle.dialect
+
+__all__ = (
+ "VARCHAR",
+ "NVARCHAR",
+ "CHAR",
+ "NCHAR",
+ "DATE",
+ "NUMBER",
+ "BLOB",
+ "BFILE",
+ "CLOB",
+ "NCLOB",
+ "TIMESTAMP",
+ "RAW",
+ "FLOAT",
+ "DOUBLE_PRECISION",
+ "BINARY_DOUBLE",
+ "BINARY_FLOAT",
+ "LONG",
+ "dialect",
+ "INTERVAL",
+ "VARCHAR2",
+ "NVARCHAR2",
+ "ROWID",
+)
diff --git a/lib/sqlalchemy/dialects/oracle/base.py b/lib/sqlalchemy/dialects/oracle/base.py
new file mode 100644
index 0000000..77f0dbd
--- /dev/null
+++ b/lib/sqlalchemy/dialects/oracle/base.py
@@ -0,0 +1,2522 @@
+# oracle/base.py
+# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+
+r"""
+.. dialect:: oracle
+ :name: Oracle
+ :full_support: 11.2, 18c
+ :normal_support: 11+
+ :best_effort: 8+
+
+
+Auto Increment Behavior
+-----------------------
+
+SQLAlchemy Table objects which include integer primary keys are usually
+assumed to have "autoincrementing" behavior, meaning they can generate their
+own primary key values upon INSERT. For use within Oracle, two options are
+available, which are the use of IDENTITY columns (Oracle 12 and above only)
+or the association of a SEQUENCE with the column.
+
+Specifying GENERATED AS IDENTITY (Oracle 12 and above)
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Starting from version 12 Oracle can make use of identity columns using
+the :class:`_sql.Identity` to specify the autoincrementing behavior::
+
+ t = Table('mytable', metadata,
+ Column('id', Integer, Identity(start=3), primary_key=True),
+ Column(...), ...
+ )
+
+The CREATE TABLE for the above :class:`_schema.Table` object would be:
+
+.. sourcecode:: sql
+
+ CREATE TABLE mytable (
+ id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 3),
+ ...,
+ PRIMARY KEY (id)
+ )
+
+The :class:`_schema.Identity` object support many options to control the
+"autoincrementing" behavior of the column, like the starting value, the
+incrementing value, etc.
+In addition to the standard options, Oracle supports setting
+:paramref:`_schema.Identity.always` to ``None`` to use the default
+generated mode, rendering GENERATED AS IDENTITY in the DDL. It also supports
+setting :paramref:`_schema.Identity.on_null` to ``True`` to specify ON NULL
+in conjunction with a 'BY DEFAULT' identity column.
+
+Using a SEQUENCE (all Oracle versions)
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Older version of Oracle had no "autoincrement"
+feature, SQLAlchemy relies upon sequences to produce these values. With the
+older Oracle versions, *a sequence must always be explicitly specified to
+enable autoincrement*. This is divergent with the majority of documentation
+examples which assume the usage of an autoincrement-capable database. To
+specify sequences, use the sqlalchemy.schema.Sequence object which is passed
+to a Column construct::
+
+ t = Table('mytable', metadata,
+ Column('id', Integer, Sequence('id_seq'), primary_key=True),
+ Column(...), ...
+ )
+
+This step is also required when using table reflection, i.e. autoload_with=engine::
+
+ t = Table('mytable', metadata,
+ Column('id', Integer, Sequence('id_seq'), primary_key=True),
+ autoload_with=engine
+ )
+
+.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct
+ in a :class:`_schema.Column` to specify the option of an autoincrementing
+ column.
+
+.. _oracle_isolation_level:
+
+Transaction Isolation Level / Autocommit
+----------------------------------------
+
+The Oracle database supports "READ COMMITTED" and "SERIALIZABLE" modes of
+isolation. The AUTOCOMMIT isolation level is also supported by the cx_Oracle
+dialect.
+
+To set using per-connection execution options::
+
+ connection = engine.connect()
+ connection = connection.execution_options(
+ isolation_level="AUTOCOMMIT"
+ )
+
+For ``READ COMMITTED`` and ``SERIALIZABLE``, the Oracle dialect sets the
+level at the session level using ``ALTER SESSION``, which is reverted back
+to its default setting when the connection is returned to the connection
+pool.
+
+Valid values for ``isolation_level`` include:
+
+* ``READ COMMITTED``
+* ``AUTOCOMMIT``
+* ``SERIALIZABLE``
+
+.. note:: The implementation for the
+ :meth:`_engine.Connection.get_isolation_level` method as implemented by the
+ Oracle dialect necessarily forces the start of a transaction using the
+ Oracle LOCAL_TRANSACTION_ID function; otherwise no level is normally
+ readable.
+
+ Additionally, the :meth:`_engine.Connection.get_isolation_level` method will
+ raise an exception if the ``v$transaction`` view is not available due to
+ permissions or other reasons, which is a common occurrence in Oracle
+ installations.
+
+ The cx_Oracle dialect attempts to call the
+ :meth:`_engine.Connection.get_isolation_level` method when the dialect makes
+ its first connection to the database in order to acquire the
+ "default"isolation level. This default level is necessary so that the level
+ can be reset on a connection after it has been temporarily modified using
+ :meth:`_engine.Connection.execution_options` method. In the common event
+ that the :meth:`_engine.Connection.get_isolation_level` method raises an
+ exception due to ``v$transaction`` not being readable as well as any other
+ database-related failure, the level is assumed to be "READ COMMITTED". No
+ warning is emitted for this initial first-connect condition as it is
+ expected to be a common restriction on Oracle databases.
+
+.. versionadded:: 1.3.16 added support for AUTOCOMMIT to the cx_oracle dialect
+ as well as the notion of a default isolation level
+
+.. versionadded:: 1.3.21 Added support for SERIALIZABLE as well as live
+ reading of the isolation level.
+
+.. versionchanged:: 1.3.22 In the event that the default isolation
+ level cannot be read due to permissions on the v$transaction view as
+ is common in Oracle installations, the default isolation level is hardcoded
+ to "READ COMMITTED" which was the behavior prior to 1.3.21.
+
+.. seealso::
+
+ :ref:`dbapi_autocommit`
+
+Identifier Casing
+-----------------
+
+In Oracle, the data dictionary represents all case insensitive identifier
+names using UPPERCASE text. SQLAlchemy on the other hand considers an
+all-lower case identifier name to be case insensitive. The Oracle dialect
+converts all case insensitive identifiers to and from those two formats during
+schema level communication, such as reflection of tables and indexes. Using
+an UPPERCASE name on the SQLAlchemy side indicates a case sensitive
+identifier, and SQLAlchemy will quote the name - this will cause mismatches
+against data dictionary data received from Oracle, so unless identifier names
+have been truly created as case sensitive (i.e. using quoted names), all
+lowercase names should be used on the SQLAlchemy side.
+
+.. _oracle_max_identifier_lengths:
+
+Max Identifier Lengths
+----------------------
+
+Oracle has changed the default max identifier length as of Oracle Server
+version 12.2. Prior to this version, the length was 30, and for 12.2 and
+greater it is now 128. This change impacts SQLAlchemy in the area of
+generated SQL label names as well as the generation of constraint names,
+particularly in the case where the constraint naming convention feature
+described at :ref:`constraint_naming_conventions` is being used.
+
+To assist with this change and others, Oracle includes the concept of a
+"compatibility" version, which is a version number that is independent of the
+actual server version in order to assist with migration of Oracle databases,
+and may be configured within the Oracle server itself. This compatibility
+version is retrieved using the query ``SELECT value FROM v$parameter WHERE
+name = 'compatible';``. The SQLAlchemy Oracle dialect, when tasked with
+determining the default max identifier length, will attempt to use this query
+upon first connect in order to determine the effective compatibility version of
+the server, which determines what the maximum allowed identifier length is for
+the server. If the table is not available, the server version information is
+used instead.
+
+As of SQLAlchemy 1.4, the default max identifier length for the Oracle dialect
+is 128 characters. Upon first connect, the compatibility version is detected
+and if it is less than Oracle version 12.2, the max identifier length is
+changed to be 30 characters. In all cases, setting the
+:paramref:`_sa.create_engine.max_identifier_length` parameter will bypass this
+change and the value given will be used as is::
+
+ engine = create_engine(
+ "oracle+cx_oracle://scott:tiger@oracle122",
+ max_identifier_length=30)
+
+The maximum identifier length comes into play both when generating anonymized
+SQL labels in SELECT statements, but more crucially when generating constraint
+names from a naming convention. It is this area that has created the need for
+SQLAlchemy to change this default conservatively. For example, the following
+naming convention produces two very different constraint names based on the
+identifier length::
+
+ from sqlalchemy import Column
+ from sqlalchemy import Index
+ from sqlalchemy import Integer
+ from sqlalchemy import MetaData
+ from sqlalchemy import Table
+ from sqlalchemy.dialects import oracle
+ from sqlalchemy.schema import CreateIndex
+
+ m = MetaData(naming_convention={"ix": "ix_%(column_0N_name)s"})
+
+ t = Table(
+ "t",
+ m,
+ Column("some_column_name_1", Integer),
+ Column("some_column_name_2", Integer),
+ Column("some_column_name_3", Integer),
+ )
+
+ ix = Index(
+ None,
+ t.c.some_column_name_1,
+ t.c.some_column_name_2,
+ t.c.some_column_name_3,
+ )
+
+ oracle_dialect = oracle.dialect(max_identifier_length=30)
+ print(CreateIndex(ix).compile(dialect=oracle_dialect))
+
+With an identifier length of 30, the above CREATE INDEX looks like::
+
+ CREATE INDEX ix_some_column_name_1s_70cd ON t
+ (some_column_name_1, some_column_name_2, some_column_name_3)
+
+However with length=128, it becomes::
+
+ CREATE INDEX ix_some_column_name_1some_column_name_2some_column_name_3 ON t
+ (some_column_name_1, some_column_name_2, some_column_name_3)
+
+Applications which have run versions of SQLAlchemy prior to 1.4 on an Oracle
+server version 12.2 or greater are therefore subject to the scenario of a
+database migration that wishes to "DROP CONSTRAINT" on a name that was
+previously generated with the shorter length. This migration will fail when
+the identifier length is changed without the name of the index or constraint
+first being adjusted. Such applications are strongly advised to make use of
+:paramref:`_sa.create_engine.max_identifier_length`
+in order to maintain control
+of the generation of truncated names, and to fully review and test all database
+migrations in a staging environment when changing this value to ensure that the
+impact of this change has been mitigated.
+
+.. versionchanged:: 1.4 the default max_identifier_length for Oracle is 128
+ characters, which is adjusted down to 30 upon first connect if an older
+ version of Oracle server (compatibility version < 12.2) is detected.
+
+
+LIMIT/OFFSET/FETCH Support
+--------------------------
+
+Methods like :meth:`_sql.Select.limit` and :meth:`_sql.Select.offset` currently
+use an emulated approach for LIMIT / OFFSET based on window functions, which
+involves creation of a subquery using ``ROW_NUMBER`` that is prone to
+performance issues as well as SQL construction issues for complex statements.
+However, this approach is supported by all Oracle versions. See notes below.
+
+When using Oracle 12c and above, use the :meth:`_sql.Select.fetch` method
+instead; this will render the more modern
+``FETCH FIRST N ROW / OFFSET N ROWS`` syntax.
+
+Notes on LIMIT / OFFSET emulation (when fetch() method cannot be used)
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+If using :meth:`_sql.Select.limit` and :meth:`_sql.Select.offset`,
+or with the ORM the :meth:`_orm.Query.limit` and :meth:`_orm.Query.offset` methods,
+and the :meth:`_sql.Select.fetch` method **cannot** be used instead, the following
+notes apply:
+
+* SQLAlchemy currently makes use of ROWNUM to achieve
+ LIMIT/OFFSET; the exact methodology is taken from
+ https://blogs.oracle.com/oraclemagazine/on-rownum-and-limiting-results .
+
+* the "FIRST_ROWS()" optimization keyword is not used by default. To enable
+ the usage of this optimization directive, specify ``optimize_limits=True``
+ to :func:`_sa.create_engine`.
+
+ .. versionchanged:: 1.4
+ The Oracle dialect renders limit/offset integer values using a "post
+ compile" scheme which renders the integer directly before passing the
+ statement to the cursor for execution. The ``use_binds_for_limits`` flag
+ no longer has an effect.
+
+ .. seealso::
+
+ :ref:`change_4808`.
+
+* A future release may use ``FETCH FIRST N ROW / OFFSET N ROWS`` automatically
+ when :meth:`_sql.Select.limit`, :meth:`_sql.Select.offset`, :meth:`_orm.Query.limit`,
+ :meth:`_orm.Query.offset` are used.
+
+.. _oracle_returning:
+
+RETURNING Support
+-----------------
+
+The Oracle database supports a limited form of RETURNING, in order to retrieve
+result sets of matched rows from INSERT, UPDATE and DELETE statements.
+Oracle's RETURNING..INTO syntax only supports one row being returned, as it
+relies upon OUT parameters in order to function. In addition, supported
+DBAPIs have further limitations (see :ref:`cx_oracle_returning`).
+
+SQLAlchemy's "implicit returning" feature, which employs RETURNING within an
+INSERT and sometimes an UPDATE statement in order to fetch newly generated
+primary key values and other SQL defaults and expressions, is normally enabled
+on the Oracle backend. By default, "implicit returning" typically only
+fetches the value of a single ``nextval(some_seq)`` expression embedded into
+an INSERT in order to increment a sequence within an INSERT statement and get
+the value back at the same time. To disable this feature across the board,
+specify ``implicit_returning=False`` to :func:`_sa.create_engine`::
+
+ engine = create_engine("oracle://scott:tiger@dsn",
+ implicit_returning=False)
+
+Implicit returning can also be disabled on a table-by-table basis as a table
+option::
+
+ # Core Table
+ my_table = Table("my_table", metadata, ..., implicit_returning=False)
+
+
+ # declarative
+ class MyClass(Base):
+ __tablename__ = 'my_table'
+ __table_args__ = {"implicit_returning": False}
+
+.. seealso::
+
+ :ref:`cx_oracle_returning` - additional cx_oracle-specific restrictions on
+ implicit returning.
+
+ON UPDATE CASCADE
+-----------------
+
+Oracle doesn't have native ON UPDATE CASCADE functionality. A trigger based
+solution is available at
+https://asktom.oracle.com/tkyte/update_cascade/index.html .
+
+When using the SQLAlchemy ORM, the ORM has limited ability to manually issue
+cascading updates - specify ForeignKey objects using the
+"deferrable=True, initially='deferred'" keyword arguments,
+and specify "passive_updates=False" on each relationship().
+
+Oracle 8 Compatibility
+----------------------
+
+When Oracle 8 is detected, the dialect internally configures itself to the
+following behaviors:
+
+* the use_ansi flag is set to False. This has the effect of converting all
+ JOIN phrases into the WHERE clause, and in the case of LEFT OUTER JOIN
+ makes use of Oracle's (+) operator.
+
+* the NVARCHAR2 and NCLOB datatypes are no longer generated as DDL when
+ the :class:`~sqlalchemy.types.Unicode` is used - VARCHAR2 and CLOB are
+ issued instead. This because these types don't seem to work correctly on
+ Oracle 8 even though they are available. The
+ :class:`~sqlalchemy.types.NVARCHAR` and
+ :class:`~sqlalchemy.dialects.oracle.NCLOB` types will always generate
+ NVARCHAR2 and NCLOB.
+
+* the "native unicode" mode is disabled when using cx_oracle, i.e. SQLAlchemy
+ encodes all Python unicode objects to "string" before passing in as bind
+ parameters.
+
+Synonym/DBLINK Reflection
+-------------------------
+
+When using reflection with Table objects, the dialect can optionally search
+for tables indicated by synonyms, either in local or remote schemas or
+accessed over DBLINK, by passing the flag ``oracle_resolve_synonyms=True`` as
+a keyword argument to the :class:`_schema.Table` construct::
+
+ some_table = Table('some_table', autoload_with=some_engine,
+ oracle_resolve_synonyms=True)
+
+When this flag is set, the given name (such as ``some_table`` above) will
+be searched not just in the ``ALL_TABLES`` view, but also within the
+``ALL_SYNONYMS`` view to see if this name is actually a synonym to another
+name. If the synonym is located and refers to a DBLINK, the oracle dialect
+knows how to locate the table's information using DBLINK syntax(e.g.
+``@dblink``).
+
+``oracle_resolve_synonyms`` is accepted wherever reflection arguments are
+accepted, including methods such as :meth:`_schema.MetaData.reflect` and
+:meth:`_reflection.Inspector.get_columns`.
+
+If synonyms are not in use, this flag should be left disabled.
+
+.. _oracle_constraint_reflection:
+
+Constraint Reflection
+---------------------
+
+The Oracle dialect can return information about foreign key, unique, and
+CHECK constraints, as well as indexes on tables.
+
+Raw information regarding these constraints can be acquired using
+:meth:`_reflection.Inspector.get_foreign_keys`,
+:meth:`_reflection.Inspector.get_unique_constraints`,
+:meth:`_reflection.Inspector.get_check_constraints`, and
+:meth:`_reflection.Inspector.get_indexes`.
+
+.. versionchanged:: 1.2 The Oracle dialect can now reflect UNIQUE and
+ CHECK constraints.
+
+When using reflection at the :class:`_schema.Table` level, the
+:class:`_schema.Table`
+will also include these constraints.
+
+Note the following caveats:
+
+* When using the :meth:`_reflection.Inspector.get_check_constraints` method,
+ Oracle
+ builds a special "IS NOT NULL" constraint for columns that specify
+ "NOT NULL". This constraint is **not** returned by default; to include
+ the "IS NOT NULL" constraints, pass the flag ``include_all=True``::
+
+ from sqlalchemy import create_engine, inspect
+
+ engine = create_engine("oracle+cx_oracle://s:t@dsn")
+ inspector = inspect(engine)
+ all_check_constraints = inspector.get_check_constraints(
+ "some_table", include_all=True)
+
+* in most cases, when reflecting a :class:`_schema.Table`,
+ a UNIQUE constraint will
+ **not** be available as a :class:`.UniqueConstraint` object, as Oracle
+ mirrors unique constraints with a UNIQUE index in most cases (the exception
+ seems to be when two or more unique constraints represent the same columns);
+ the :class:`_schema.Table` will instead represent these using
+ :class:`.Index`
+ with the ``unique=True`` flag set.
+
+* Oracle creates an implicit index for the primary key of a table; this index
+ is **excluded** from all index results.
+
+* the list of columns reflected for an index will not include column names
+ that start with SYS_NC.
+
+Table names with SYSTEM/SYSAUX tablespaces
+-------------------------------------------
+
+The :meth:`_reflection.Inspector.get_table_names` and
+:meth:`_reflection.Inspector.get_temp_table_names`
+methods each return a list of table names for the current engine. These methods
+are also part of the reflection which occurs within an operation such as
+:meth:`_schema.MetaData.reflect`. By default,
+these operations exclude the ``SYSTEM``
+and ``SYSAUX`` tablespaces from the operation. In order to change this, the
+default list of tablespaces excluded can be changed at the engine level using
+the ``exclude_tablespaces`` parameter::
+
+ # exclude SYSAUX and SOME_TABLESPACE, but not SYSTEM
+ e = create_engine(
+ "oracle://scott:tiger@xe",
+ exclude_tablespaces=["SYSAUX", "SOME_TABLESPACE"])
+
+.. versionadded:: 1.1
+
+DateTime Compatibility
+----------------------
+
+Oracle has no datatype known as ``DATETIME``, it instead has only ``DATE``,
+which can actually store a date and time value. For this reason, the Oracle
+dialect provides a type :class:`_oracle.DATE` which is a subclass of
+:class:`.DateTime`. This type has no special behavior, and is only
+present as a "marker" for this type; additionally, when a database column
+is reflected and the type is reported as ``DATE``, the time-supporting
+:class:`_oracle.DATE` type is used.
+
+.. versionchanged:: 0.9.4 Added :class:`_oracle.DATE` to subclass
+ :class:`.DateTime`. This is a change as previous versions
+ would reflect a ``DATE`` column as :class:`_types.DATE`, which subclasses
+ :class:`.Date`. The only significance here is for schemes that are
+ examining the type of column for use in special Python translations or
+ for migrating schemas to other database backends.
+
+.. _oracle_table_options:
+
+Oracle Table Options
+-------------------------
+
+The CREATE TABLE phrase supports the following options with Oracle
+in conjunction with the :class:`_schema.Table` construct:
+
+
+* ``ON COMMIT``::
+
+ Table(
+ "some_table", metadata, ...,
+ prefixes=['GLOBAL TEMPORARY'], oracle_on_commit='PRESERVE ROWS')
+
+.. versionadded:: 1.0.0
+
+* ``COMPRESS``::
+
+ Table('mytable', metadata, Column('data', String(32)),
+ oracle_compress=True)
+
+ Table('mytable', metadata, Column('data', String(32)),
+ oracle_compress=6)
+
+ The ``oracle_compress`` parameter accepts either an integer compression
+ level, or ``True`` to use the default compression level.
+
+.. versionadded:: 1.0.0
+
+.. _oracle_index_options:
+
+Oracle Specific Index Options
+-----------------------------
+
+Bitmap Indexes
+~~~~~~~~~~~~~~
+
+You can specify the ``oracle_bitmap`` parameter to create a bitmap index
+instead of a B-tree index::
+
+ Index('my_index', my_table.c.data, oracle_bitmap=True)
+
+Bitmap indexes cannot be unique and cannot be compressed. SQLAlchemy will not
+check for such limitations, only the database will.
+
+.. versionadded:: 1.0.0
+
+Index compression
+~~~~~~~~~~~~~~~~~
+
+Oracle has a more efficient storage mode for indexes containing lots of
+repeated values. Use the ``oracle_compress`` parameter to turn on key
+compression::
+
+ Index('my_index', my_table.c.data, oracle_compress=True)
+
+ Index('my_index', my_table.c.data1, my_table.c.data2, unique=True,
+ oracle_compress=1)
+
+The ``oracle_compress`` parameter accepts either an integer specifying the
+number of prefix columns to compress, or ``True`` to use the default (all
+columns for non-unique indexes, all but the last column for unique indexes).
+
+.. versionadded:: 1.0.0
+
+""" # noqa
+
+from itertools import groupby
+import re
+
+from ... import Computed
+from ... import exc
+from ... import schema as sa_schema
+from ... import sql
+from ... import util
+from ...engine import default
+from ...engine import reflection
+from ...sql import compiler
+from ...sql import expression
+from ...sql import sqltypes
+from ...sql import util as sql_util
+from ...sql import visitors
+from ...types import BLOB
+from ...types import CHAR
+from ...types import CLOB
+from ...types import FLOAT
+from ...types import INTEGER
+from ...types import NCHAR
+from ...types import NVARCHAR
+from ...types import TIMESTAMP
+from ...types import VARCHAR
+from ...util import compat
+
+RESERVED_WORDS = set(
+ "SHARE RAW DROP BETWEEN FROM DESC OPTION PRIOR LONG THEN "
+ "DEFAULT ALTER IS INTO MINUS INTEGER NUMBER GRANT IDENTIFIED "
+ "ALL TO ORDER ON FLOAT DATE HAVING CLUSTER NOWAIT RESOURCE "
+ "ANY TABLE INDEX FOR UPDATE WHERE CHECK SMALLINT WITH DELETE "
+ "BY ASC REVOKE LIKE SIZE RENAME NOCOMPRESS NULL GROUP VALUES "
+ "AS IN VIEW EXCLUSIVE COMPRESS SYNONYM SELECT INSERT EXISTS "
+ "NOT TRIGGER ELSE CREATE INTERSECT PCTFREE DISTINCT USER "
+ "CONNECT SET MODE OF UNIQUE VARCHAR2 VARCHAR LOCK OR CHAR "
+ "DECIMAL UNION PUBLIC AND START UID COMMENT CURRENT LEVEL".split()
+)
+
+NO_ARG_FNS = set(
+ "UID CURRENT_DATE SYSDATE USER " "CURRENT_TIME CURRENT_TIMESTAMP".split()
+)
+
+
+class RAW(sqltypes._Binary):
+ __visit_name__ = "RAW"
+
+
+OracleRaw = RAW
+
+
+class NCLOB(sqltypes.Text):
+ __visit_name__ = "NCLOB"
+
+
+class VARCHAR2(VARCHAR):
+ __visit_name__ = "VARCHAR2"
+
+
+NVARCHAR2 = NVARCHAR
+
+
+class NUMBER(sqltypes.Numeric, sqltypes.Integer):
+ __visit_name__ = "NUMBER"
+
+ def __init__(self, precision=None, scale=None, asdecimal=None):
+ if asdecimal is None:
+ asdecimal = bool(scale and scale > 0)
+
+ super(NUMBER, self).__init__(
+ precision=precision, scale=scale, asdecimal=asdecimal
+ )
+
+ def adapt(self, impltype):
+ ret = super(NUMBER, self).adapt(impltype)
+ # leave a hint for the DBAPI handler
+ ret._is_oracle_number = True
+ return ret
+
+ @property
+ def _type_affinity(self):
+ if bool(self.scale and self.scale > 0):
+ return sqltypes.Numeric
+ else:
+ return sqltypes.Integer
+
+
+class DOUBLE_PRECISION(sqltypes.Float):
+ __visit_name__ = "DOUBLE_PRECISION"
+
+
+class BINARY_DOUBLE(sqltypes.Float):
+ __visit_name__ = "BINARY_DOUBLE"
+
+
+class BINARY_FLOAT(sqltypes.Float):
+ __visit_name__ = "BINARY_FLOAT"
+
+
+class BFILE(sqltypes.LargeBinary):
+ __visit_name__ = "BFILE"
+
+
+class LONG(sqltypes.Text):
+ __visit_name__ = "LONG"
+
+
+class DATE(sqltypes.DateTime):
+ """Provide the oracle DATE type.
+
+ This type has no special Python behavior, except that it subclasses
+ :class:`_types.DateTime`; this is to suit the fact that the Oracle
+ ``DATE`` type supports a time value.
+
+ .. versionadded:: 0.9.4
+
+ """
+
+ __visit_name__ = "DATE"
+
+ def _compare_type_affinity(self, other):
+ return other._type_affinity in (sqltypes.DateTime, sqltypes.Date)
+
+
+class INTERVAL(sqltypes.NativeForEmulated, sqltypes._AbstractInterval):
+ __visit_name__ = "INTERVAL"
+
+ def __init__(self, day_precision=None, second_precision=None):
+ """Construct an INTERVAL.
+
+ Note that only DAY TO SECOND intervals are currently supported.
+ This is due to a lack of support for YEAR TO MONTH intervals
+ within available DBAPIs.
+
+ :param day_precision: the day precision value. this is the number of
+ digits to store for the day field. Defaults to "2"
+ :param second_precision: the second precision value. this is the
+ number of digits to store for the fractional seconds field.
+ Defaults to "6".
+
+ """
+ self.day_precision = day_precision
+ self.second_precision = second_precision
+
+ @classmethod
+ def _adapt_from_generic_interval(cls, interval):
+ return INTERVAL(
+ day_precision=interval.day_precision,
+ second_precision=interval.second_precision,
+ )
+
+ @property
+ def _type_affinity(self):
+ return sqltypes.Interval
+
+ def as_generic(self, allow_nulltype=False):
+ return sqltypes.Interval(
+ native=True,
+ second_precision=self.second_precision,
+ day_precision=self.day_precision,
+ )
+
+ def coerce_compared_value(self, op, value):
+ return self
+
+
+class ROWID(sqltypes.TypeEngine):
+ """Oracle ROWID type.
+
+ When used in a cast() or similar, generates ROWID.
+
+ """
+
+ __visit_name__ = "ROWID"
+
+
+class _OracleBoolean(sqltypes.Boolean):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.NUMBER
+
+
+colspecs = {
+ sqltypes.Boolean: _OracleBoolean,
+ sqltypes.Interval: INTERVAL,
+ sqltypes.DateTime: DATE,
+}
+
+ischema_names = {
+ "VARCHAR2": VARCHAR,
+ "NVARCHAR2": NVARCHAR,
+ "CHAR": CHAR,
+ "NCHAR": NCHAR,
+ "DATE": DATE,
+ "NUMBER": NUMBER,
+ "BLOB": BLOB,
+ "BFILE": BFILE,
+ "CLOB": CLOB,
+ "NCLOB": NCLOB,
+ "TIMESTAMP": TIMESTAMP,
+ "TIMESTAMP WITH TIME ZONE": TIMESTAMP,
+ "INTERVAL DAY TO SECOND": INTERVAL,
+ "RAW": RAW,
+ "FLOAT": FLOAT,
+ "DOUBLE PRECISION": DOUBLE_PRECISION,
+ "LONG": LONG,
+ "BINARY_DOUBLE": BINARY_DOUBLE,
+ "BINARY_FLOAT": BINARY_FLOAT,
+}
+
+
+class OracleTypeCompiler(compiler.GenericTypeCompiler):
+ # Note:
+ # Oracle DATE == DATETIME
+ # Oracle does not allow milliseconds in DATE
+ # Oracle does not support TIME columns
+
+ def visit_datetime(self, type_, **kw):
+ return self.visit_DATE(type_, **kw)
+
+ def visit_float(self, type_, **kw):
+ return self.visit_FLOAT(type_, **kw)
+
+ def visit_unicode(self, type_, **kw):
+ if self.dialect._use_nchar_for_unicode:
+ return self.visit_NVARCHAR2(type_, **kw)
+ else:
+ return self.visit_VARCHAR2(type_, **kw)
+
+ def visit_INTERVAL(self, type_, **kw):
+ return "INTERVAL DAY%s TO SECOND%s" % (
+ type_.day_precision is not None
+ and "(%d)" % type_.day_precision
+ or "",
+ type_.second_precision is not None
+ and "(%d)" % type_.second_precision
+ or "",
+ )
+
+ def visit_LONG(self, type_, **kw):
+ return "LONG"
+
+ def visit_TIMESTAMP(self, type_, **kw):
+ if type_.timezone:
+ return "TIMESTAMP WITH TIME ZONE"
+ else:
+ return "TIMESTAMP"
+
+ def visit_DOUBLE_PRECISION(self, type_, **kw):
+ return self._generate_numeric(type_, "DOUBLE PRECISION", **kw)
+
+ def visit_BINARY_DOUBLE(self, type_, **kw):
+ return self._generate_numeric(type_, "BINARY_DOUBLE", **kw)
+
+ def visit_BINARY_FLOAT(self, type_, **kw):
+ return self._generate_numeric(type_, "BINARY_FLOAT", **kw)
+
+ def visit_FLOAT(self, type_, **kw):
+ # don't support conversion between decimal/binary
+ # precision yet
+ kw["no_precision"] = True
+ return self._generate_numeric(type_, "FLOAT", **kw)
+
+ def visit_NUMBER(self, type_, **kw):
+ return self._generate_numeric(type_, "NUMBER", **kw)
+
+ def _generate_numeric(
+ self, type_, name, precision=None, scale=None, no_precision=False, **kw
+ ):
+ if precision is None:
+ precision = type_.precision
+
+ if scale is None:
+ scale = getattr(type_, "scale", None)
+
+ if no_precision or precision is None:
+ return name
+ elif scale is None:
+ n = "%(name)s(%(precision)s)"
+ return n % {"name": name, "precision": precision}
+ else:
+ n = "%(name)s(%(precision)s, %(scale)s)"
+ return n % {"name": name, "precision": precision, "scale": scale}
+
+ def visit_string(self, type_, **kw):
+ return self.visit_VARCHAR2(type_, **kw)
+
+ def visit_VARCHAR2(self, type_, **kw):
+ return self._visit_varchar(type_, "", "2")
+
+ def visit_NVARCHAR2(self, type_, **kw):
+ return self._visit_varchar(type_, "N", "2")
+
+ visit_NVARCHAR = visit_NVARCHAR2
+
+ def visit_VARCHAR(self, type_, **kw):
+ return self._visit_varchar(type_, "", "")
+
+ def _visit_varchar(self, type_, n, num):
+ if not type_.length:
+ return "%(n)sVARCHAR%(two)s" % {"two": num, "n": n}
+ elif not n and self.dialect._supports_char_length:
+ varchar = "VARCHAR%(two)s(%(length)s CHAR)"
+ return varchar % {"length": type_.length, "two": num}
+ else:
+ varchar = "%(n)sVARCHAR%(two)s(%(length)s)"
+ return varchar % {"length": type_.length, "two": num, "n": n}
+
+ def visit_text(self, type_, **kw):
+ return self.visit_CLOB(type_, **kw)
+
+ def visit_unicode_text(self, type_, **kw):
+ if self.dialect._use_nchar_for_unicode:
+ return self.visit_NCLOB(type_, **kw)
+ else:
+ return self.visit_CLOB(type_, **kw)
+
+ def visit_large_binary(self, type_, **kw):
+ return self.visit_BLOB(type_, **kw)
+
+ def visit_big_integer(self, type_, **kw):
+ return self.visit_NUMBER(type_, precision=19, **kw)
+
+ def visit_boolean(self, type_, **kw):
+ return self.visit_SMALLINT(type_, **kw)
+
+ def visit_RAW(self, type_, **kw):
+ if type_.length:
+ return "RAW(%(length)s)" % {"length": type_.length}
+ else:
+ return "RAW"
+
+ def visit_ROWID(self, type_, **kw):
+ return "ROWID"
+
+
+class OracleCompiler(compiler.SQLCompiler):
+ """Oracle compiler modifies the lexical structure of Select
+ statements to work under non-ANSI configured Oracle databases, if
+ the use_ansi flag is False.
+ """
+
+ compound_keywords = util.update_copy(
+ compiler.SQLCompiler.compound_keywords,
+ {expression.CompoundSelect.EXCEPT: "MINUS"},
+ )
+
+ def __init__(self, *args, **kwargs):
+ self.__wheres = {}
+ super(OracleCompiler, self).__init__(*args, **kwargs)
+
+ def visit_mod_binary(self, binary, operator, **kw):
+ return "mod(%s, %s)" % (
+ self.process(binary.left, **kw),
+ self.process(binary.right, **kw),
+ )
+
+ def visit_now_func(self, fn, **kw):
+ return "CURRENT_TIMESTAMP"
+
+ def visit_char_length_func(self, fn, **kw):
+ return "LENGTH" + self.function_argspec(fn, **kw)
+
+ def visit_match_op_binary(self, binary, operator, **kw):
+ return "CONTAINS (%s, %s)" % (
+ self.process(binary.left),
+ self.process(binary.right),
+ )
+
+ def visit_true(self, expr, **kw):
+ return "1"
+
+ def visit_false(self, expr, **kw):
+ return "0"
+
+ def get_cte_preamble(self, recursive):
+ return "WITH"
+
+ def get_select_hint_text(self, byfroms):
+ return " ".join("/*+ %s */" % text for table, text in byfroms.items())
+
+ def function_argspec(self, fn, **kw):
+ if len(fn.clauses) > 0 or fn.name.upper() not in NO_ARG_FNS:
+ return compiler.SQLCompiler.function_argspec(self, fn, **kw)
+ else:
+ return ""
+
+ def visit_function(self, func, **kw):
+ text = super(OracleCompiler, self).visit_function(func, **kw)
+ if kw.get("asfrom", False):
+ text = "TABLE (%s)" % func
+ return text
+
+ def visit_table_valued_column(self, element, **kw):
+ text = super(OracleCompiler, self).visit_table_valued_column(
+ element, **kw
+ )
+ text = "COLUMN_VALUE " + text
+ return text
+
+ def default_from(self):
+ """Called when a ``SELECT`` statement has no froms,
+ and no ``FROM`` clause is to be appended.
+
+ The Oracle compiler tacks a "FROM DUAL" to the statement.
+ """
+
+ return " FROM DUAL"
+
+ def visit_join(self, join, from_linter=None, **kwargs):
+ if self.dialect.use_ansi:
+ return compiler.SQLCompiler.visit_join(
+ self, join, from_linter=from_linter, **kwargs
+ )
+ else:
+ if from_linter:
+ from_linter.edges.add((join.left, join.right))
+
+ kwargs["asfrom"] = True
+ if isinstance(join.right, expression.FromGrouping):
+ right = join.right.element
+ else:
+ right = join.right
+ return (
+ self.process(join.left, from_linter=from_linter, **kwargs)
+ + ", "
+ + self.process(right, from_linter=from_linter, **kwargs)
+ )
+
+ def _get_nonansi_join_whereclause(self, froms):
+ clauses = []
+
+ def visit_join(join):
+ if join.isouter:
+ # https://docs.oracle.com/database/121/SQLRF/queries006.htm#SQLRF52354
+ # "apply the outer join operator (+) to all columns of B in
+ # the join condition in the WHERE clause" - that is,
+ # unconditionally regardless of operator or the other side
+ def visit_binary(binary):
+ if isinstance(
+ binary.left, expression.ColumnClause
+ ) and join.right.is_derived_from(binary.left.table):
+ binary.left = _OuterJoinColumn(binary.left)
+ elif isinstance(
+ binary.right, expression.ColumnClause
+ ) and join.right.is_derived_from(binary.right.table):
+ binary.right = _OuterJoinColumn(binary.right)
+
+ clauses.append(
+ visitors.cloned_traverse(
+ join.onclause, {}, {"binary": visit_binary}
+ )
+ )
+ else:
+ clauses.append(join.onclause)
+
+ for j in join.left, join.right:
+ if isinstance(j, expression.Join):
+ visit_join(j)
+ elif isinstance(j, expression.FromGrouping):
+ visit_join(j.element)
+
+ for f in froms:
+ if isinstance(f, expression.Join):
+ visit_join(f)
+
+ if not clauses:
+ return None
+ else:
+ return sql.and_(*clauses)
+
+ def visit_outer_join_column(self, vc, **kw):
+ return self.process(vc.column, **kw) + "(+)"
+
+ def visit_sequence(self, seq, **kw):
+ return self.preparer.format_sequence(seq) + ".nextval"
+
+ def get_render_as_alias_suffix(self, alias_name_text):
+ """Oracle doesn't like ``FROM table AS alias``"""
+
+ return " " + alias_name_text
+
+ def returning_clause(self, stmt, returning_cols):
+ columns = []
+ binds = []
+
+ for i, column in enumerate(
+ expression._select_iterables(returning_cols)
+ ):
+ if (
+ self.isupdate
+ and isinstance(column, sa_schema.Column)
+ and isinstance(column.server_default, Computed)
+ and not self.dialect._supports_update_returning_computed_cols
+ ):
+ util.warn(
+ "Computed columns don't work with Oracle UPDATE "
+ "statements that use RETURNING; the value of the column "
+ "*before* the UPDATE takes place is returned. It is "
+ "advised to not use RETURNING with an Oracle computed "
+ "column. Consider setting implicit_returning to False on "
+ "the Table object in order to avoid implicit RETURNING "
+ "clauses from being generated for this Table."
+ )
+ if column.type._has_column_expression:
+ col_expr = column.type.column_expression(column)
+ else:
+ col_expr = column
+
+ outparam = sql.outparam("ret_%d" % i, type_=column.type)
+ self.binds[outparam.key] = outparam
+ binds.append(
+ self.bindparam_string(self._truncate_bindparam(outparam))
+ )
+
+ # ensure the ExecutionContext.get_out_parameters() method is
+ # *not* called; the cx_Oracle dialect wants to handle these
+ # parameters separately
+ self.has_out_parameters = False
+
+ columns.append(self.process(col_expr, within_columns_clause=False))
+
+ self._add_to_result_map(
+ getattr(col_expr, "name", col_expr._anon_name_label),
+ getattr(col_expr, "name", col_expr._anon_name_label),
+ (
+ column,
+ getattr(column, "name", None),
+ getattr(column, "key", None),
+ ),
+ column.type,
+ )
+
+ return "RETURNING " + ", ".join(columns) + " INTO " + ", ".join(binds)
+
+ def translate_select_structure(self, select_stmt, **kwargs):
+ select = select_stmt
+
+ if not getattr(select, "_oracle_visit", None):
+ if not self.dialect.use_ansi:
+ froms = self._display_froms_for_select(
+ select, kwargs.get("asfrom", False)
+ )
+ whereclause = self._get_nonansi_join_whereclause(froms)
+ if whereclause is not None:
+ select = select.where(whereclause)
+ select._oracle_visit = True
+
+ # if fetch is used this is not needed
+ if (
+ select._has_row_limiting_clause
+ and select._fetch_clause is None
+ ):
+ limit_clause = select._limit_clause
+ offset_clause = select._offset_clause
+
+ if select._simple_int_clause(limit_clause):
+ limit_clause = limit_clause.render_literal_execute()
+
+ if select._simple_int_clause(offset_clause):
+ offset_clause = offset_clause.render_literal_execute()
+
+ # currently using form at:
+ # https://blogs.oracle.com/oraclemagazine/\
+ # on-rownum-and-limiting-results
+
+ orig_select = select
+ select = select._generate()
+ select._oracle_visit = True
+
+ # add expressions to accommodate FOR UPDATE OF
+ for_update = select._for_update_arg
+ if for_update is not None and for_update.of:
+ for_update = for_update._clone()
+ for_update._copy_internals()
+
+ for elem in for_update.of:
+ if not select.selected_columns.contains_column(elem):
+ select = select.add_columns(elem)
+
+ # Wrap the middle select and add the hint
+ inner_subquery = select.alias()
+ limitselect = sql.select(
+ *[
+ c
+ for c in inner_subquery.c
+ if orig_select.selected_columns.corresponding_column(c)
+ is not None
+ ]
+ )
+
+ if (
+ limit_clause is not None
+ and self.dialect.optimize_limits
+ and select._simple_int_clause(limit_clause)
+ ):
+ limitselect = limitselect.prefix_with(
+ expression.text(
+ "/*+ FIRST_ROWS(%s) */"
+ % self.process(limit_clause, **kwargs)
+ )
+ )
+
+ limitselect._oracle_visit = True
+ limitselect._is_wrapper = True
+
+ # add expressions to accommodate FOR UPDATE OF
+ if for_update is not None and for_update.of:
+
+ adapter = sql_util.ClauseAdapter(inner_subquery)
+ for_update.of = [
+ adapter.traverse(elem) for elem in for_update.of
+ ]
+
+ # If needed, add the limiting clause
+ if limit_clause is not None:
+ if select._simple_int_clause(limit_clause) and (
+ offset_clause is None
+ or select._simple_int_clause(offset_clause)
+ ):
+ max_row = limit_clause
+
+ if offset_clause is not None:
+ max_row = max_row + offset_clause
+
+ else:
+ max_row = limit_clause
+
+ if offset_clause is not None:
+ max_row = max_row + offset_clause
+ limitselect = limitselect.where(
+ sql.literal_column("ROWNUM") <= max_row
+ )
+
+ # If needed, add the ora_rn, and wrap again with offset.
+ if offset_clause is None:
+ limitselect._for_update_arg = for_update
+ select = limitselect
+ else:
+ limitselect = limitselect.add_columns(
+ sql.literal_column("ROWNUM").label("ora_rn")
+ )
+ limitselect._oracle_visit = True
+ limitselect._is_wrapper = True
+
+ if for_update is not None and for_update.of:
+ limitselect_cols = limitselect.selected_columns
+ for elem in for_update.of:
+ if (
+ limitselect_cols.corresponding_column(elem)
+ is None
+ ):
+ limitselect = limitselect.add_columns(elem)
+
+ limit_subquery = limitselect.alias()
+ origselect_cols = orig_select.selected_columns
+ offsetselect = sql.select(
+ *[
+ c
+ for c in limit_subquery.c
+ if origselect_cols.corresponding_column(c)
+ is not None
+ ]
+ )
+
+ offsetselect._oracle_visit = True
+ offsetselect._is_wrapper = True
+
+ if for_update is not None and for_update.of:
+ adapter = sql_util.ClauseAdapter(limit_subquery)
+ for_update.of = [
+ adapter.traverse(elem) for elem in for_update.of
+ ]
+
+ offsetselect = offsetselect.where(
+ sql.literal_column("ora_rn") > offset_clause
+ )
+
+ offsetselect._for_update_arg = for_update
+ select = offsetselect
+
+ return select
+
+ def limit_clause(self, select, **kw):
+ return ""
+
+ def visit_empty_set_expr(self, type_):
+ return "SELECT 1 FROM DUAL WHERE 1!=1"
+
+ def for_update_clause(self, select, **kw):
+ if self.is_subquery():
+ return ""
+
+ tmp = " FOR UPDATE"
+
+ if select._for_update_arg.of:
+ tmp += " OF " + ", ".join(
+ self.process(elem, **kw) for elem in select._for_update_arg.of
+ )
+
+ if select._for_update_arg.nowait:
+ tmp += " NOWAIT"
+ if select._for_update_arg.skip_locked:
+ tmp += " SKIP LOCKED"
+
+ return tmp
+
+ def visit_is_distinct_from_binary(self, binary, operator, **kw):
+ return "DECODE(%s, %s, 0, 1) = 1" % (
+ self.process(binary.left),
+ self.process(binary.right),
+ )
+
+ def visit_is_not_distinct_from_binary(self, binary, operator, **kw):
+ return "DECODE(%s, %s, 0, 1) = 0" % (
+ self.process(binary.left),
+ self.process(binary.right),
+ )
+
+ def _get_regexp_args(self, binary, kw):
+ string = self.process(binary.left, **kw)
+ pattern = self.process(binary.right, **kw)
+ flags = binary.modifiers["flags"]
+ if flags is not None:
+ flags = self.process(flags, **kw)
+ return string, pattern, flags
+
+ def visit_regexp_match_op_binary(self, binary, operator, **kw):
+ string, pattern, flags = self._get_regexp_args(binary, kw)
+ if flags is None:
+ return "REGEXP_LIKE(%s, %s)" % (string, pattern)
+ else:
+ return "REGEXP_LIKE(%s, %s, %s)" % (string, pattern, flags)
+
+ def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
+ return "NOT %s" % self.visit_regexp_match_op_binary(
+ binary, operator, **kw
+ )
+
+ def visit_regexp_replace_op_binary(self, binary, operator, **kw):
+ string, pattern, flags = self._get_regexp_args(binary, kw)
+ replacement = self.process(binary.modifiers["replacement"], **kw)
+ if flags is None:
+ return "REGEXP_REPLACE(%s, %s, %s)" % (
+ string,
+ pattern,
+ replacement,
+ )
+ else:
+ return "REGEXP_REPLACE(%s, %s, %s, %s)" % (
+ string,
+ pattern,
+ replacement,
+ flags,
+ )
+
+
+class OracleDDLCompiler(compiler.DDLCompiler):
+ def define_constraint_cascades(self, constraint):
+ text = ""
+ if constraint.ondelete is not None:
+ text += " ON DELETE %s" % constraint.ondelete
+
+ # oracle has no ON UPDATE CASCADE -
+ # its only available via triggers
+ # https://asktom.oracle.com/tkyte/update_cascade/index.html
+ if constraint.onupdate is not None:
+ util.warn(
+ "Oracle does not contain native UPDATE CASCADE "
+ "functionality - onupdates will not be rendered for foreign "
+ "keys. Consider using deferrable=True, initially='deferred' "
+ "or triggers."
+ )
+
+ return text
+
+ def visit_drop_table_comment(self, drop):
+ return "COMMENT ON TABLE %s IS ''" % self.preparer.format_table(
+ drop.element
+ )
+
+ def visit_create_index(self, create):
+ index = create.element
+ self._verify_index_table(index)
+ preparer = self.preparer
+ text = "CREATE "
+ if index.unique:
+ text += "UNIQUE "
+ if index.dialect_options["oracle"]["bitmap"]:
+ text += "BITMAP "
+ text += "INDEX %s ON %s (%s)" % (
+ self._prepared_index_name(index, include_schema=True),
+ preparer.format_table(index.table, use_schema=True),
+ ", ".join(
+ self.sql_compiler.process(
+ expr, include_table=False, literal_binds=True
+ )
+ for expr in index.expressions
+ ),
+ )
+ if index.dialect_options["oracle"]["compress"] is not False:
+ if index.dialect_options["oracle"]["compress"] is True:
+ text += " COMPRESS"
+ else:
+ text += " COMPRESS %d" % (
+ index.dialect_options["oracle"]["compress"]
+ )
+ return text
+
+ def post_create_table(self, table):
+ table_opts = []
+ opts = table.dialect_options["oracle"]
+
+ if opts["on_commit"]:
+ on_commit_options = opts["on_commit"].replace("_", " ").upper()
+ table_opts.append("\n ON COMMIT %s" % on_commit_options)
+
+ if opts["compress"]:
+ if opts["compress"] is True:
+ table_opts.append("\n COMPRESS")
+ else:
+ table_opts.append("\n COMPRESS FOR %s" % (opts["compress"]))
+
+ return "".join(table_opts)
+
+ def get_identity_options(self, identity_options):
+ text = super(OracleDDLCompiler, self).get_identity_options(
+ identity_options
+ )
+ text = text.replace("NO MINVALUE", "NOMINVALUE")
+ text = text.replace("NO MAXVALUE", "NOMAXVALUE")
+ text = text.replace("NO CYCLE", "NOCYCLE")
+ text = text.replace("NO ORDER", "NOORDER")
+ return text
+
+ def visit_computed_column(self, generated):
+ text = "GENERATED ALWAYS AS (%s)" % self.sql_compiler.process(
+ generated.sqltext, include_table=False, literal_binds=True
+ )
+ if generated.persisted is True:
+ raise exc.CompileError(
+ "Oracle computed columns do not support 'stored' persistence; "
+ "set the 'persisted' flag to None or False for Oracle support."
+ )
+ elif generated.persisted is False:
+ text += " VIRTUAL"
+ return text
+
+ def visit_identity_column(self, identity, **kw):
+ if identity.always is None:
+ kind = ""
+ else:
+ kind = "ALWAYS" if identity.always else "BY DEFAULT"
+ text = "GENERATED %s" % kind
+ if identity.on_null:
+ text += " ON NULL"
+ text += " AS IDENTITY"
+ options = self.get_identity_options(identity)
+ if options:
+ text += " (%s)" % options
+ return text
+
+
+class OracleIdentifierPreparer(compiler.IdentifierPreparer):
+
+ reserved_words = {x.lower() for x in RESERVED_WORDS}
+ illegal_initial_characters = {str(dig) for dig in range(0, 10)}.union(
+ ["_", "$"]
+ )
+
+ def _bindparam_requires_quotes(self, value):
+ """Return True if the given identifier requires quoting."""
+ lc_value = value.lower()
+ return (
+ lc_value in self.reserved_words
+ or value[0] in self.illegal_initial_characters
+ or not self.legal_characters.match(util.text_type(value))
+ )
+
+ def format_savepoint(self, savepoint):
+ name = savepoint.ident.lstrip("_")
+ return super(OracleIdentifierPreparer, self).format_savepoint(
+ savepoint, name
+ )
+
+
+class OracleExecutionContext(default.DefaultExecutionContext):
+ def fire_sequence(self, seq, type_):
+ return self._execute_scalar(
+ "SELECT "
+ + self.identifier_preparer.format_sequence(seq)
+ + ".nextval FROM DUAL",
+ type_,
+ )
+
+
+class OracleDialect(default.DefaultDialect):
+ name = "oracle"
+ supports_statement_cache = True
+ supports_alter = True
+ supports_unicode_statements = False
+ supports_unicode_binds = False
+ max_identifier_length = 128
+
+ supports_simple_order_by_label = False
+ cte_follows_insert = True
+
+ supports_sequences = True
+ sequences_optional = False
+ postfetch_lastrowid = False
+
+ default_paramstyle = "named"
+ colspecs = colspecs
+ ischema_names = ischema_names
+ requires_name_normalize = True
+
+ supports_comments = True
+
+ supports_default_values = False
+ supports_default_metavalue = True
+ supports_empty_insert = False
+ supports_identity_columns = True
+
+ statement_compiler = OracleCompiler
+ ddl_compiler = OracleDDLCompiler
+ type_compiler = OracleTypeCompiler
+ preparer = OracleIdentifierPreparer
+ execution_ctx_cls = OracleExecutionContext
+
+ reflection_options = ("oracle_resolve_synonyms",)
+
+ _use_nchar_for_unicode = False
+
+ construct_arguments = [
+ (
+ sa_schema.Table,
+ {"resolve_synonyms": False, "on_commit": None, "compress": False},
+ ),
+ (sa_schema.Index, {"bitmap": False, "compress": False}),
+ ]
+
+ @util.deprecated_params(
+ use_binds_for_limits=(
+ "1.4",
+ "The ``use_binds_for_limits`` Oracle dialect parameter is "
+ "deprecated. The dialect now renders LIMIT /OFFSET integers "
+ "inline in all cases using a post-compilation hook, so that the "
+ "value is still represented by a 'bound parameter' on the Core "
+ "Expression side.",
+ )
+ )
+ def __init__(
+ self,
+ use_ansi=True,
+ optimize_limits=False,
+ use_binds_for_limits=None,
+ use_nchar_for_unicode=False,
+ exclude_tablespaces=("SYSTEM", "SYSAUX"),
+ **kwargs
+ ):
+ default.DefaultDialect.__init__(self, **kwargs)
+ self._use_nchar_for_unicode = use_nchar_for_unicode
+ self.use_ansi = use_ansi
+ self.optimize_limits = optimize_limits
+ self.exclude_tablespaces = exclude_tablespaces
+
+ def initialize(self, connection):
+ super(OracleDialect, self).initialize(connection)
+
+ self.implicit_returning = self.__dict__.get(
+ "implicit_returning", self.server_version_info > (10,)
+ )
+
+ if self._is_oracle_8:
+ self.colspecs = self.colspecs.copy()
+ self.colspecs.pop(sqltypes.Interval)
+ self.use_ansi = False
+
+ self.supports_identity_columns = self.server_version_info >= (12,)
+
+ def _get_effective_compat_server_version_info(self, connection):
+ # dialect does not need compat levels below 12.2, so don't query
+ # in those cases
+
+ if self.server_version_info < (12, 2):
+ return self.server_version_info
+ try:
+ compat = connection.exec_driver_sql(
+ "SELECT value FROM v$parameter WHERE name = 'compatible'"
+ ).scalar()
+ except exc.DBAPIError:
+ compat = None
+
+ if compat:
+ try:
+ return tuple(int(x) for x in compat.split("."))
+ except:
+ return self.server_version_info
+ else:
+ return self.server_version_info
+
+ @property
+ def _is_oracle_8(self):
+ return self.server_version_info and self.server_version_info < (9,)
+
+ @property
+ def _supports_table_compression(self):
+ return self.server_version_info and self.server_version_info >= (10, 1)
+
+ @property
+ def _supports_table_compress_for(self):
+ return self.server_version_info and self.server_version_info >= (11,)
+
+ @property
+ def _supports_char_length(self):
+ return not self._is_oracle_8
+
+ @property
+ def _supports_update_returning_computed_cols(self):
+ # on version 18 this error is no longet present while it happens on 11
+ # it may work also on versions before the 18
+ return self.server_version_info and self.server_version_info >= (18,)
+
+ def do_release_savepoint(self, connection, name):
+ # Oracle does not support RELEASE SAVEPOINT
+ pass
+
+ def _check_max_identifier_length(self, connection):
+ if self._get_effective_compat_server_version_info(connection) < (
+ 12,
+ 2,
+ ):
+ return 30
+ else:
+ # use the default
+ return None
+
+ def _check_unicode_returns(self, connection):
+ additional_tests = [
+ expression.cast(
+ expression.literal_column("'test nvarchar2 returns'"),
+ sqltypes.NVARCHAR(60),
+ )
+ ]
+ return super(OracleDialect, self)._check_unicode_returns(
+ connection, additional_tests
+ )
+
+ _isolation_lookup = ["READ COMMITTED", "SERIALIZABLE"]
+
+ def get_isolation_level(self, connection):
+ raise NotImplementedError("implemented by cx_Oracle dialect")
+
+ def get_default_isolation_level(self, dbapi_conn):
+ try:
+ return self.get_isolation_level(dbapi_conn)
+ except NotImplementedError:
+ raise
+ except:
+ return "READ COMMITTED"
+
+ def set_isolation_level(self, connection, level):
+ raise NotImplementedError("implemented by cx_Oracle dialect")
+
+ def has_table(self, connection, table_name, schema=None):
+ self._ensure_has_table_connection(connection)
+
+ if not schema:
+ schema = self.default_schema_name
+
+ cursor = connection.execute(
+ sql.text(
+ "SELECT table_name FROM all_tables "
+ "WHERE table_name = CAST(:name AS VARCHAR2(128)) "
+ "AND owner = CAST(:schema_name AS VARCHAR2(128))"
+ ),
+ dict(
+ name=self.denormalize_name(table_name),
+ schema_name=self.denormalize_name(schema),
+ ),
+ )
+ return cursor.first() is not None
+
+ def has_sequence(self, connection, sequence_name, schema=None):
+ if not schema:
+ schema = self.default_schema_name
+ cursor = connection.execute(
+ sql.text(
+ "SELECT sequence_name FROM all_sequences "
+ "WHERE sequence_name = :name AND "
+ "sequence_owner = :schema_name"
+ ),
+ dict(
+ name=self.denormalize_name(sequence_name),
+ schema_name=self.denormalize_name(schema),
+ ),
+ )
+ return cursor.first() is not None
+
+ def _get_default_schema_name(self, connection):
+ return self.normalize_name(
+ connection.exec_driver_sql(
+ "select sys_context( 'userenv', 'current_schema' ) from dual"
+ ).scalar()
+ )
+
+ def _resolve_synonym(
+ self,
+ connection,
+ desired_owner=None,
+ desired_synonym=None,
+ desired_table=None,
+ ):
+ """search for a local synonym matching the given desired owner/name.
+
+ if desired_owner is None, attempts to locate a distinct owner.
+
+ returns the actual name, owner, dblink name, and synonym name if
+ found.
+ """
+
+ q = (
+ "SELECT owner, table_owner, table_name, db_link, "
+ "synonym_name FROM all_synonyms WHERE "
+ )
+ clauses = []
+ params = {}
+ if desired_synonym:
+ clauses.append(
+ "synonym_name = CAST(:synonym_name AS VARCHAR2(128))"
+ )
+ params["synonym_name"] = desired_synonym
+ if desired_owner:
+ clauses.append("owner = CAST(:desired_owner AS VARCHAR2(128))")
+ params["desired_owner"] = desired_owner
+ if desired_table:
+ clauses.append("table_name = CAST(:tname AS VARCHAR2(128))")
+ params["tname"] = desired_table
+
+ q += " AND ".join(clauses)
+
+ result = connection.execution_options(future_result=True).execute(
+ sql.text(q), params
+ )
+ if desired_owner:
+ row = result.mappings().first()
+ if row:
+ return (
+ row["table_name"],
+ row["table_owner"],
+ row["db_link"],
+ row["synonym_name"],
+ )
+ else:
+ return None, None, None, None
+ else:
+ rows = result.mappings().all()
+ if len(rows) > 1:
+ raise AssertionError(
+ "There are multiple tables visible to the schema, you "
+ "must specify owner"
+ )
+ elif len(rows) == 1:
+ row = rows[0]
+ return (
+ row["table_name"],
+ row["table_owner"],
+ row["db_link"],
+ row["synonym_name"],
+ )
+ else:
+ return None, None, None, None
+
+ @reflection.cache
+ def _prepare_reflection_args(
+ self,
+ connection,
+ table_name,
+ schema=None,
+ resolve_synonyms=False,
+ dblink="",
+ **kw
+ ):
+
+ if resolve_synonyms:
+ actual_name, owner, dblink, synonym = self._resolve_synonym(
+ connection,
+ desired_owner=self.denormalize_name(schema),
+ desired_synonym=self.denormalize_name(table_name),
+ )
+ else:
+ actual_name, owner, dblink, synonym = None, None, None, None
+ if not actual_name:
+ actual_name = self.denormalize_name(table_name)
+
+ if dblink:
+ # using user_db_links here since all_db_links appears
+ # to have more restricted permissions.
+ # https://docs.oracle.com/cd/B28359_01/server.111/b28310/ds_admin005.htm
+ # will need to hear from more users if we are doing
+ # the right thing here. See [ticket:2619]
+ owner = connection.scalar(
+ sql.text(
+ "SELECT username FROM user_db_links " "WHERE db_link=:link"
+ ),
+ dict(link=dblink),
+ )
+ dblink = "@" + dblink
+ elif not owner:
+ owner = self.denormalize_name(schema or self.default_schema_name)
+
+ return (actual_name, owner, dblink or "", synonym)
+
+ @reflection.cache
+ def get_schema_names(self, connection, **kw):
+ s = "SELECT username FROM all_users ORDER BY username"
+ cursor = connection.exec_driver_sql(s)
+ return [self.normalize_name(row[0]) for row in cursor]
+
+ @reflection.cache
+ def get_table_names(self, connection, schema=None, **kw):
+ schema = self.denormalize_name(schema or self.default_schema_name)
+
+ # note that table_names() isn't loading DBLINKed or synonym'ed tables
+ if schema is None:
+ schema = self.default_schema_name
+
+ sql_str = "SELECT table_name FROM all_tables WHERE "
+ if self.exclude_tablespaces:
+ sql_str += (
+ "nvl(tablespace_name, 'no tablespace') "
+ "NOT IN (%s) AND "
+ % (", ".join(["'%s'" % ts for ts in self.exclude_tablespaces]))
+ )
+ sql_str += (
+ "OWNER = :owner " "AND IOT_NAME IS NULL " "AND DURATION IS NULL"
+ )
+
+ cursor = connection.execute(sql.text(sql_str), dict(owner=schema))
+ return [self.normalize_name(row[0]) for row in cursor]
+
+ @reflection.cache
+ def get_temp_table_names(self, connection, **kw):
+ schema = self.denormalize_name(self.default_schema_name)
+
+ sql_str = "SELECT table_name FROM all_tables WHERE "
+ if self.exclude_tablespaces:
+ sql_str += (
+ "nvl(tablespace_name, 'no tablespace') "
+ "NOT IN (%s) AND "
+ % (", ".join(["'%s'" % ts for ts in self.exclude_tablespaces]))
+ )
+ sql_str += (
+ "OWNER = :owner "
+ "AND IOT_NAME IS NULL "
+ "AND DURATION IS NOT NULL"
+ )
+
+ cursor = connection.execute(sql.text(sql_str), dict(owner=schema))
+ return [self.normalize_name(row[0]) for row in cursor]
+
+ @reflection.cache
+ def get_view_names(self, connection, schema=None, **kw):
+ schema = self.denormalize_name(schema or self.default_schema_name)
+ s = sql.text("SELECT view_name FROM all_views WHERE owner = :owner")
+ cursor = connection.execute(
+ s, dict(owner=self.denormalize_name(schema))
+ )
+ return [self.normalize_name(row[0]) for row in cursor]
+
+ @reflection.cache
+ def get_sequence_names(self, connection, schema=None, **kw):
+ if not schema:
+ schema = self.default_schema_name
+ cursor = connection.execute(
+ sql.text(
+ "SELECT sequence_name FROM all_sequences "
+ "WHERE sequence_owner = :schema_name"
+ ),
+ dict(schema_name=self.denormalize_name(schema)),
+ )
+ return [self.normalize_name(row[0]) for row in cursor]
+
+ @reflection.cache
+ def get_table_options(self, connection, table_name, schema=None, **kw):
+ options = {}
+
+ resolve_synonyms = kw.get("oracle_resolve_synonyms", False)
+ dblink = kw.get("dblink", "")
+ info_cache = kw.get("info_cache")
+
+ (table_name, schema, dblink, synonym) = self._prepare_reflection_args(
+ connection,
+ table_name,
+ schema,
+ resolve_synonyms,
+ dblink,
+ info_cache=info_cache,
+ )
+
+ params = {"table_name": table_name}
+
+ columns = ["table_name"]
+ if self._supports_table_compression:
+ columns.append("compression")
+ if self._supports_table_compress_for:
+ columns.append("compress_for")
+
+ text = (
+ "SELECT %(columns)s "
+ "FROM ALL_TABLES%(dblink)s "
+ "WHERE table_name = CAST(:table_name AS VARCHAR(128))"
+ )
+
+ if schema is not None:
+ params["owner"] = schema
+ text += " AND owner = CAST(:owner AS VARCHAR(128)) "
+ text = text % {"dblink": dblink, "columns": ", ".join(columns)}
+
+ result = connection.execute(sql.text(text), params)
+
+ enabled = dict(DISABLED=False, ENABLED=True)
+
+ row = result.first()
+ if row:
+ if "compression" in row._fields and enabled.get(
+ row.compression, False
+ ):
+ if "compress_for" in row._fields:
+ options["oracle_compress"] = row.compress_for
+ else:
+ options["oracle_compress"] = True
+
+ return options
+
+ @reflection.cache
+ def get_columns(self, connection, table_name, schema=None, **kw):
+ """
+
+ kw arguments can be:
+
+ oracle_resolve_synonyms
+
+ dblink
+
+ """
+
+ resolve_synonyms = kw.get("oracle_resolve_synonyms", False)
+ dblink = kw.get("dblink", "")
+ info_cache = kw.get("info_cache")
+
+ (table_name, schema, dblink, synonym) = self._prepare_reflection_args(
+ connection,
+ table_name,
+ schema,
+ resolve_synonyms,
+ dblink,
+ info_cache=info_cache,
+ )
+ columns = []
+ if self._supports_char_length:
+ char_length_col = "char_length"
+ else:
+ char_length_col = "data_length"
+
+ if self.server_version_info >= (12,):
+ identity_cols = """\
+ col.default_on_null,
+ (
+ SELECT id.generation_type || ',' || id.IDENTITY_OPTIONS
+ FROM ALL_TAB_IDENTITY_COLS%(dblink)s id
+ WHERE col.table_name = id.table_name
+ AND col.column_name = id.column_name
+ AND col.owner = id.owner
+ ) AS identity_options""" % {
+ "dblink": dblink
+ }
+ else:
+ identity_cols = "NULL as default_on_null, NULL as identity_options"
+
+ params = {"table_name": table_name}
+
+ text = """
+ SELECT
+ col.column_name,
+ col.data_type,
+ col.%(char_length_col)s,
+ col.data_precision,
+ col.data_scale,
+ col.nullable,
+ col.data_default,
+ com.comments,
+ col.virtual_column,
+ %(identity_cols)s
+ FROM all_tab_cols%(dblink)s col
+ LEFT JOIN all_col_comments%(dblink)s com
+ ON col.table_name = com.table_name
+ AND col.column_name = com.column_name
+ AND col.owner = com.owner
+ WHERE col.table_name = CAST(:table_name AS VARCHAR2(128))
+ AND col.hidden_column = 'NO'
+ """
+ if schema is not None:
+ params["owner"] = schema
+ text += " AND col.owner = :owner "
+ text += " ORDER BY col.column_id"
+ text = text % {
+ "dblink": dblink,
+ "char_length_col": char_length_col,
+ "identity_cols": identity_cols,
+ }
+
+ c = connection.execute(sql.text(text), params)
+
+ for row in c:
+ colname = self.normalize_name(row[0])
+ orig_colname = row[0]
+ coltype = row[1]
+ length = row[2]
+ precision = row[3]
+ scale = row[4]
+ nullable = row[5] == "Y"
+ default = row[6]
+ comment = row[7]
+ generated = row[8]
+ default_on_nul = row[9]
+ identity_options = row[10]
+
+ if coltype == "NUMBER":
+ if precision is None and scale == 0:
+ coltype = INTEGER()
+ else:
+ coltype = NUMBER(precision, scale)
+ elif coltype == "FLOAT":
+ # TODO: support "precision" here as "binary_precision"
+ coltype = FLOAT()
+ elif coltype in ("VARCHAR2", "NVARCHAR2", "CHAR", "NCHAR"):
+ coltype = self.ischema_names.get(coltype)(length)
+ elif "WITH TIME ZONE" in coltype:
+ coltype = TIMESTAMP(timezone=True)
+ else:
+ coltype = re.sub(r"\(\d+\)", "", coltype)
+ try:
+ coltype = self.ischema_names[coltype]
+ except KeyError:
+ util.warn(
+ "Did not recognize type '%s' of column '%s'"
+ % (coltype, colname)
+ )
+ coltype = sqltypes.NULLTYPE
+
+ if generated == "YES":
+ computed = dict(sqltext=default)
+ default = None
+ else:
+ computed = None
+
+ if identity_options is not None:
+ identity = self._parse_identity_options(
+ identity_options, default_on_nul
+ )
+ default = None
+ else:
+ identity = None
+
+ cdict = {
+ "name": colname,
+ "type": coltype,
+ "nullable": nullable,
+ "default": default,
+ "autoincrement": "auto",
+ "comment": comment,
+ }
+ if orig_colname.lower() == orig_colname:
+ cdict["quote"] = True
+ if computed is not None:
+ cdict["computed"] = computed
+ if identity is not None:
+ cdict["identity"] = identity
+
+ columns.append(cdict)
+ return columns
+
+ def _parse_identity_options(self, identity_options, default_on_nul):
+ # identity_options is a string that starts with 'ALWAYS,' or
+ # 'BY DEFAULT,' and continues with
+ # START WITH: 1, INCREMENT BY: 1, MAX_VALUE: 123, MIN_VALUE: 1,
+ # CYCLE_FLAG: N, CACHE_SIZE: 1, ORDER_FLAG: N, SCALE_FLAG: N,
+ # EXTEND_FLAG: N, SESSION_FLAG: N, KEEP_VALUE: N
+ parts = [p.strip() for p in identity_options.split(",")]
+ identity = {
+ "always": parts[0] == "ALWAYS",
+ "on_null": default_on_nul == "YES",
+ }
+
+ for part in parts[1:]:
+ option, value = part.split(":")
+ value = value.strip()
+
+ if "START WITH" in option:
+ identity["start"] = compat.long_type(value)
+ elif "INCREMENT BY" in option:
+ identity["increment"] = compat.long_type(value)
+ elif "MAX_VALUE" in option:
+ identity["maxvalue"] = compat.long_type(value)
+ elif "MIN_VALUE" in option:
+ identity["minvalue"] = compat.long_type(value)
+ elif "CYCLE_FLAG" in option:
+ identity["cycle"] = value == "Y"
+ elif "CACHE_SIZE" in option:
+ identity["cache"] = compat.long_type(value)
+ elif "ORDER_FLAG" in option:
+ identity["order"] = value == "Y"
+ return identity
+
+ @reflection.cache
+ def get_table_comment(
+ self,
+ connection,
+ table_name,
+ schema=None,
+ resolve_synonyms=False,
+ dblink="",
+ **kw
+ ):
+
+ info_cache = kw.get("info_cache")
+ (table_name, schema, dblink, synonym) = self._prepare_reflection_args(
+ connection,
+ table_name,
+ schema,
+ resolve_synonyms,
+ dblink,
+ info_cache=info_cache,
+ )
+
+ if not schema:
+ schema = self.default_schema_name
+
+ COMMENT_SQL = """
+ SELECT comments
+ FROM all_tab_comments
+ WHERE table_name = CAST(:table_name AS VARCHAR(128))
+ AND owner = CAST(:schema_name AS VARCHAR(128))
+ """
+
+ c = connection.execute(
+ sql.text(COMMENT_SQL),
+ dict(table_name=table_name, schema_name=schema),
+ )
+ return {"text": c.scalar()}
+
+ @reflection.cache
+ def get_indexes(
+ self,
+ connection,
+ table_name,
+ schema=None,
+ resolve_synonyms=False,
+ dblink="",
+ **kw
+ ):
+
+ info_cache = kw.get("info_cache")
+ (table_name, schema, dblink, synonym) = self._prepare_reflection_args(
+ connection,
+ table_name,
+ schema,
+ resolve_synonyms,
+ dblink,
+ info_cache=info_cache,
+ )
+ indexes = []
+
+ params = {"table_name": table_name}
+ text = (
+ "SELECT a.index_name, a.column_name, "
+ "\nb.index_type, b.uniqueness, b.compression, b.prefix_length "
+ "\nFROM ALL_IND_COLUMNS%(dblink)s a, "
+ "\nALL_INDEXES%(dblink)s b "
+ "\nWHERE "
+ "\na.index_name = b.index_name "
+ "\nAND a.table_owner = b.table_owner "
+ "\nAND a.table_name = b.table_name "
+ "\nAND a.table_name = CAST(:table_name AS VARCHAR(128))"
+ )
+
+ if schema is not None:
+ params["schema"] = schema
+ text += "AND a.table_owner = :schema "
+
+ text += "ORDER BY a.index_name, a.column_position"
+
+ text = text % {"dblink": dblink}
+
+ q = sql.text(text)
+ rp = connection.execute(q, params)
+ indexes = []
+ last_index_name = None
+ pk_constraint = self.get_pk_constraint(
+ connection,
+ table_name,
+ schema,
+ resolve_synonyms=resolve_synonyms,
+ dblink=dblink,
+ info_cache=kw.get("info_cache"),
+ )
+
+ uniqueness = dict(NONUNIQUE=False, UNIQUE=True)
+ enabled = dict(DISABLED=False, ENABLED=True)
+
+ oracle_sys_col = re.compile(r"SYS_NC\d+\$", re.IGNORECASE)
+
+ index = None
+ for rset in rp:
+ index_name_normalized = self.normalize_name(rset.index_name)
+
+ # skip primary key index. This is refined as of
+ # [ticket:5421]. Note that ALL_INDEXES.GENERATED will by "Y"
+ # if the name of this index was generated by Oracle, however
+ # if a named primary key constraint was created then this flag
+ # is false.
+ if (
+ pk_constraint
+ and index_name_normalized == pk_constraint["name"]
+ ):
+ continue
+
+ if rset.index_name != last_index_name:
+ index = dict(
+ name=index_name_normalized,
+ column_names=[],
+ dialect_options={},
+ )
+ indexes.append(index)
+ index["unique"] = uniqueness.get(rset.uniqueness, False)
+
+ if rset.index_type in ("BITMAP", "FUNCTION-BASED BITMAP"):
+ index["dialect_options"]["oracle_bitmap"] = True
+ if enabled.get(rset.compression, False):
+ index["dialect_options"][
+ "oracle_compress"
+ ] = rset.prefix_length
+
+ # filter out Oracle SYS_NC names. could also do an outer join
+ # to the all_tab_columns table and check for real col names there.
+ if not oracle_sys_col.match(rset.column_name):
+ index["column_names"].append(
+ self.normalize_name(rset.column_name)
+ )
+ last_index_name = rset.index_name
+
+ return indexes
+
+ @reflection.cache
+ def _get_constraint_data(
+ self, connection, table_name, schema=None, dblink="", **kw
+ ):
+
+ params = {"table_name": table_name}
+
+ text = (
+ "SELECT"
+ "\nac.constraint_name," # 0
+ "\nac.constraint_type," # 1
+ "\nloc.column_name AS local_column," # 2
+ "\nrem.table_name AS remote_table," # 3
+ "\nrem.column_name AS remote_column," # 4
+ "\nrem.owner AS remote_owner," # 5
+ "\nloc.position as loc_pos," # 6
+ "\nrem.position as rem_pos," # 7
+ "\nac.search_condition," # 8
+ "\nac.delete_rule" # 9
+ "\nFROM all_constraints%(dblink)s ac,"
+ "\nall_cons_columns%(dblink)s loc,"
+ "\nall_cons_columns%(dblink)s rem"
+ "\nWHERE ac.table_name = CAST(:table_name AS VARCHAR2(128))"
+ "\nAND ac.constraint_type IN ('R','P', 'U', 'C')"
+ )
+
+ if schema is not None:
+ params["owner"] = schema
+ text += "\nAND ac.owner = CAST(:owner AS VARCHAR2(128))"
+
+ text += (
+ "\nAND ac.owner = loc.owner"
+ "\nAND ac.constraint_name = loc.constraint_name"
+ "\nAND ac.r_owner = rem.owner(+)"
+ "\nAND ac.r_constraint_name = rem.constraint_name(+)"
+ "\nAND (rem.position IS NULL or loc.position=rem.position)"
+ "\nORDER BY ac.constraint_name, loc.position"
+ )
+
+ text = text % {"dblink": dblink}
+ rp = connection.execute(sql.text(text), params)
+ constraint_data = rp.fetchall()
+ return constraint_data
+
+ @reflection.cache
+ def get_pk_constraint(self, connection, table_name, schema=None, **kw):
+ resolve_synonyms = kw.get("oracle_resolve_synonyms", False)
+ dblink = kw.get("dblink", "")
+ info_cache = kw.get("info_cache")
+
+ (table_name, schema, dblink, synonym) = self._prepare_reflection_args(
+ connection,
+ table_name,
+ schema,
+ resolve_synonyms,
+ dblink,
+ info_cache=info_cache,
+ )
+ pkeys = []
+ constraint_name = None
+ constraint_data = self._get_constraint_data(
+ connection,
+ table_name,
+ schema,
+ dblink,
+ info_cache=kw.get("info_cache"),
+ )
+
+ for row in constraint_data:
+ (
+ cons_name,
+ cons_type,
+ local_column,
+ remote_table,
+ remote_column,
+ remote_owner,
+ ) = row[0:2] + tuple([self.normalize_name(x) for x in row[2:6]])
+ if cons_type == "P":
+ if constraint_name is None:
+ constraint_name = self.normalize_name(cons_name)
+ pkeys.append(local_column)
+ return {"constrained_columns": pkeys, "name": constraint_name}
+
+ @reflection.cache
+ def get_foreign_keys(self, connection, table_name, schema=None, **kw):
+ """
+
+ kw arguments can be:
+
+ oracle_resolve_synonyms
+
+ dblink
+
+ """
+ requested_schema = schema # to check later on
+ resolve_synonyms = kw.get("oracle_resolve_synonyms", False)
+ dblink = kw.get("dblink", "")
+ info_cache = kw.get("info_cache")
+
+ (table_name, schema, dblink, synonym) = self._prepare_reflection_args(
+ connection,
+ table_name,
+ schema,
+ resolve_synonyms,
+ dblink,
+ info_cache=info_cache,
+ )
+
+ constraint_data = self._get_constraint_data(
+ connection,
+ table_name,
+ schema,
+ dblink,
+ info_cache=kw.get("info_cache"),
+ )
+
+ def fkey_rec():
+ return {
+ "name": None,
+ "constrained_columns": [],
+ "referred_schema": None,
+ "referred_table": None,
+ "referred_columns": [],
+ "options": {},
+ }
+
+ fkeys = util.defaultdict(fkey_rec)
+
+ for row in constraint_data:
+ (
+ cons_name,
+ cons_type,
+ local_column,
+ remote_table,
+ remote_column,
+ remote_owner,
+ ) = row[0:2] + tuple([self.normalize_name(x) for x in row[2:6]])
+
+ cons_name = self.normalize_name(cons_name)
+
+ if cons_type == "R":
+ if remote_table is None:
+ # ticket 363
+ util.warn(
+ (
+ "Got 'None' querying 'table_name' from "
+ "all_cons_columns%(dblink)s - does the user have "
+ "proper rights to the table?"
+ )
+ % {"dblink": dblink}
+ )
+ continue
+
+ rec = fkeys[cons_name]
+ rec["name"] = cons_name
+ local_cols, remote_cols = (
+ rec["constrained_columns"],
+ rec["referred_columns"],
+ )
+
+ if not rec["referred_table"]:
+ if resolve_synonyms:
+ (
+ ref_remote_name,
+ ref_remote_owner,
+ ref_dblink,
+ ref_synonym,
+ ) = self._resolve_synonym(
+ connection,
+ desired_owner=self.denormalize_name(remote_owner),
+ desired_table=self.denormalize_name(remote_table),
+ )
+ if ref_synonym:
+ remote_table = self.normalize_name(ref_synonym)
+ remote_owner = self.normalize_name(
+ ref_remote_owner
+ )
+
+ rec["referred_table"] = remote_table
+
+ if (
+ requested_schema is not None
+ or self.denormalize_name(remote_owner) != schema
+ ):
+ rec["referred_schema"] = remote_owner
+
+ if row[9] != "NO ACTION":
+ rec["options"]["ondelete"] = row[9]
+
+ local_cols.append(local_column)
+ remote_cols.append(remote_column)
+
+ return list(fkeys.values())
+
+ @reflection.cache
+ def get_unique_constraints(
+ self, connection, table_name, schema=None, **kw
+ ):
+ resolve_synonyms = kw.get("oracle_resolve_synonyms", False)
+ dblink = kw.get("dblink", "")
+ info_cache = kw.get("info_cache")
+
+ (table_name, schema, dblink, synonym) = self._prepare_reflection_args(
+ connection,
+ table_name,
+ schema,
+ resolve_synonyms,
+ dblink,
+ info_cache=info_cache,
+ )
+
+ constraint_data = self._get_constraint_data(
+ connection,
+ table_name,
+ schema,
+ dblink,
+ info_cache=kw.get("info_cache"),
+ )
+
+ unique_keys = filter(lambda x: x[1] == "U", constraint_data)
+ uniques_group = groupby(unique_keys, lambda x: x[0])
+
+ index_names = {
+ ix["name"]
+ for ix in self.get_indexes(connection, table_name, schema=schema)
+ }
+ return [
+ {
+ "name": name,
+ "column_names": cols,
+ "duplicates_index": name if name in index_names else None,
+ }
+ for name, cols in [
+ [
+ self.normalize_name(i[0]),
+ [self.normalize_name(x[2]) for x in i[1]],
+ ]
+ for i in uniques_group
+ ]
+ ]
+
+ @reflection.cache
+ def get_view_definition(
+ self,
+ connection,
+ view_name,
+ schema=None,
+ resolve_synonyms=False,
+ dblink="",
+ **kw
+ ):
+ info_cache = kw.get("info_cache")
+ (view_name, schema, dblink, synonym) = self._prepare_reflection_args(
+ connection,
+ view_name,
+ schema,
+ resolve_synonyms,
+ dblink,
+ info_cache=info_cache,
+ )
+
+ params = {"view_name": view_name}
+ text = "SELECT text FROM all_views WHERE view_name=:view_name"
+
+ if schema is not None:
+ text += " AND owner = :schema"
+ params["schema"] = schema
+
+ rp = connection.execute(sql.text(text), params).scalar()
+ if rp:
+ if util.py2k:
+ rp = rp.decode(self.encoding)
+ return rp
+ else:
+ return None
+
+ @reflection.cache
+ def get_check_constraints(
+ self, connection, table_name, schema=None, include_all=False, **kw
+ ):
+ resolve_synonyms = kw.get("oracle_resolve_synonyms", False)
+ dblink = kw.get("dblink", "")
+ info_cache = kw.get("info_cache")
+
+ (table_name, schema, dblink, synonym) = self._prepare_reflection_args(
+ connection,
+ table_name,
+ schema,
+ resolve_synonyms,
+ dblink,
+ info_cache=info_cache,
+ )
+
+ constraint_data = self._get_constraint_data(
+ connection,
+ table_name,
+ schema,
+ dblink,
+ info_cache=kw.get("info_cache"),
+ )
+
+ check_constraints = filter(lambda x: x[1] == "C", constraint_data)
+
+ return [
+ {"name": self.normalize_name(cons[0]), "sqltext": cons[8]}
+ for cons in check_constraints
+ if include_all or not re.match(r"..+?. IS NOT NULL$", cons[8])
+ ]
+
+
+class _OuterJoinColumn(sql.ClauseElement):
+ __visit_name__ = "outer_join_column"
+
+ def __init__(self, column):
+ self.column = column
diff --git a/lib/sqlalchemy/dialects/oracle/cx_oracle.py b/lib/sqlalchemy/dialects/oracle/cx_oracle.py
new file mode 100644
index 0000000..64029a4
--- /dev/null
+++ b/lib/sqlalchemy/dialects/oracle/cx_oracle.py
@@ -0,0 +1,1424 @@
+# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+
+r"""
+.. dialect:: oracle+cx_oracle
+ :name: cx-Oracle
+ :dbapi: cx_oracle
+ :connectstring: oracle+cx_oracle://user:pass@hostname:port[/dbname][?service_name=<service>[&key=value&key=value...]]
+ :url: https://oracle.github.io/python-cx_Oracle/
+
+DSN vs. Hostname connections
+-----------------------------
+
+cx_Oracle provides several methods of indicating the target database. The
+dialect translates from a series of different URL forms.
+
+Hostname Connections with Easy Connect Syntax
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Given a hostname, port and service name of the target Oracle Database, for
+example from Oracle's `Easy Connect syntax
+<https://cx-oracle.readthedocs.io/en/latest/user_guide/connection_handling.html#easy-connect-syntax-for-connection-strings>`_,
+then connect in SQLAlchemy using the ``service_name`` query string parameter::
+
+ engine = create_engine("oracle+cx_oracle://scott:tiger@hostname:port/?service_name=myservice&encoding=UTF-8&nencoding=UTF-8")
+
+The `full Easy Connect syntax
+<https://www.oracle.com/pls/topic/lookup?ctx=dblatest&id=GUID-B0437826-43C1-49EC-A94D-B650B6A4A6EE>`_
+is not supported. Instead, use a ``tnsnames.ora`` file and connect using a
+DSN.
+
+Connections with tnsnames.ora or Oracle Cloud
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Alternatively, if no port, database name, or ``service_name`` is provided, the
+dialect will use an Oracle DSN "connection string". This takes the "hostname"
+portion of the URL as the data source name. For example, if the
+``tnsnames.ora`` file contains a `Net Service Name
+<https://cx-oracle.readthedocs.io/en/latest/user_guide/connection_handling.html#net-service-names-for-connection-strings>`_
+of ``myalias`` as below::
+
+ myalias =
+ (DESCRIPTION =
+ (ADDRESS = (PROTOCOL = TCP)(HOST = mymachine.example.com)(PORT = 1521))
+ (CONNECT_DATA =
+ (SERVER = DEDICATED)
+ (SERVICE_NAME = orclpdb1)
+ )
+ )
+
+The cx_Oracle dialect connects to this database service when ``myalias`` is the
+hostname portion of the URL, without specifying a port, database name or
+``service_name``::
+
+ engine = create_engine("oracle+cx_oracle://scott:tiger@myalias/?encoding=UTF-8&nencoding=UTF-8")
+
+Users of Oracle Cloud should use this syntax and also configure the cloud
+wallet as shown in cx_Oracle documentation `Connecting to Autononmous Databases
+<https://cx-oracle.readthedocs.io/en/latest/user_guide/connection_handling.html#connecting-to-autononmous-databases>`_.
+
+SID Connections
+^^^^^^^^^^^^^^^
+
+To use Oracle's obsolete SID connection syntax, the SID can be passed in a
+"database name" portion of the URL as below::
+
+ engine = create_engine("oracle+cx_oracle://scott:tiger@hostname:1521/dbname?encoding=UTF-8&nencoding=UTF-8")
+
+Above, the DSN passed to cx_Oracle is created by ``cx_Oracle.makedsn()`` as
+follows::
+
+ >>> import cx_Oracle
+ >>> cx_Oracle.makedsn("hostname", 1521, sid="dbname")
+ '(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=hostname)(PORT=1521))(CONNECT_DATA=(SID=dbname)))'
+
+Passing cx_Oracle connect arguments
+-----------------------------------
+
+Additional connection arguments can usually be passed via the URL
+query string; particular symbols like ``cx_Oracle.SYSDBA`` are intercepted
+and converted to the correct symbol::
+
+ e = create_engine(
+ "oracle+cx_oracle://user:pass@dsn?encoding=UTF-8&nencoding=UTF-8&mode=SYSDBA&events=true")
+
+.. versionchanged:: 1.3 the cx_oracle dialect now accepts all argument names
+ within the URL string itself, to be passed to the cx_Oracle DBAPI. As
+ was the case earlier but not correctly documented, the
+ :paramref:`_sa.create_engine.connect_args` parameter also accepts all
+ cx_Oracle DBAPI connect arguments.
+
+To pass arguments directly to ``.connect()`` without using the query
+string, use the :paramref:`_sa.create_engine.connect_args` dictionary.
+Any cx_Oracle parameter value and/or constant may be passed, such as::
+
+ import cx_Oracle
+ e = create_engine(
+ "oracle+cx_oracle://user:pass@dsn",
+ connect_args={
+ "encoding": "UTF-8",
+ "nencoding": "UTF-8",
+ "mode": cx_Oracle.SYSDBA,
+ "events": True
+ }
+ )
+
+Note that the default value for ``encoding`` and ``nencoding`` was changed to
+"UTF-8" in cx_Oracle 8.0 so these parameters can be omitted when using that
+version, or later.
+
+Options consumed by the SQLAlchemy cx_Oracle dialect outside of the driver
+--------------------------------------------------------------------------
+
+There are also options that are consumed by the SQLAlchemy cx_oracle dialect
+itself. These options are always passed directly to :func:`_sa.create_engine`
+, such as::
+
+ e = create_engine(
+ "oracle+cx_oracle://user:pass@dsn", coerce_to_unicode=False)
+
+The parameters accepted by the cx_oracle dialect are as follows:
+
+* ``arraysize`` - set the cx_oracle.arraysize value on cursors, defaulted
+ to 50. This setting is significant with cx_Oracle as the contents of LOB
+ objects are only readable within a "live" row (e.g. within a batch of
+ 50 rows).
+
+* ``auto_convert_lobs`` - defaults to True; See :ref:`cx_oracle_lob`.
+
+* ``coerce_to_unicode`` - see :ref:`cx_oracle_unicode` for detail.
+
+* ``coerce_to_decimal`` - see :ref:`cx_oracle_numeric` for detail.
+
+* ``encoding_errors`` - see :ref:`cx_oracle_unicode_encoding_errors` for detail.
+
+.. _cx_oracle_sessionpool:
+
+Using cx_Oracle SessionPool
+---------------------------
+
+The cx_Oracle library provides its own connection pool implementation that may
+be used in place of SQLAlchemy's pooling functionality. This can be achieved
+by using the :paramref:`_sa.create_engine.creator` parameter to provide a
+function that returns a new connection, along with setting
+:paramref:`_sa.create_engine.pool_class` to ``NullPool`` to disable
+SQLAlchemy's pooling::
+
+ import cx_Oracle
+ from sqlalchemy import create_engine
+ from sqlalchemy.pool import NullPool
+
+ pool = cx_Oracle.SessionPool(
+ user="scott", password="tiger", dsn="orclpdb",
+ min=2, max=5, increment=1, threaded=True,
+ encoding="UTF-8", nencoding="UTF-8"
+ )
+
+ engine = create_engine("oracle://", creator=pool.acquire, poolclass=NullPool)
+
+The above engine may then be used normally where cx_Oracle's pool handles
+connection pooling::
+
+ with engine.connect() as conn:
+ print(conn.scalar("select 1 FROM dual"))
+
+
+As well as providing a scalable solution for multi-user applications, the
+cx_Oracle session pool supports some Oracle features such as DRCP and
+`Application Continuity
+<https://cx-oracle.readthedocs.io/en/latest/user_guide/ha.html#application-continuity-ac>`_.
+
+Using Oracle Database Resident Connection Pooling (DRCP)
+--------------------------------------------------------
+
+When using Oracle's `DRCP
+<https://www.oracle.com/pls/topic/lookup?ctx=dblatest&id=GUID-015CA8C1-2386-4626-855D-CC546DDC1086>`_,
+the best practice is to pass a connection class and "purity" when acquiring a
+connection from the SessionPool. Refer to the `cx_Oracle DRCP documentation
+<https://cx-oracle.readthedocs.io/en/latest/user_guide/connection_handling.html#database-resident-connection-pooling-drcp>`_.
+
+This can be achieved by wrapping ``pool.acquire()``::
+
+ import cx_Oracle
+ from sqlalchemy import create_engine
+ from sqlalchemy.pool import NullPool
+
+ pool = cx_Oracle.SessionPool(
+ user="scott", password="tiger", dsn="orclpdb",
+ min=2, max=5, increment=1, threaded=True,
+ encoding="UTF-8", nencoding="UTF-8"
+ )
+
+ def creator():
+ return pool.acquire(cclass="MYCLASS", purity=cx_Oracle.ATTR_PURITY_SELF)
+
+ engine = create_engine("oracle://", creator=creator, poolclass=NullPool)
+
+The above engine may then be used normally where cx_Oracle handles session
+pooling and Oracle Database additionally uses DRCP::
+
+ with engine.connect() as conn:
+ print(conn.scalar("select 1 FROM dual"))
+
+.. _cx_oracle_unicode:
+
+Unicode
+-------
+
+As is the case for all DBAPIs under Python 3, all strings are inherently
+Unicode strings. Under Python 2, cx_Oracle also supports Python Unicode
+objects directly. In all cases however, the driver requires an explicit
+encoding configuration.
+
+Ensuring the Correct Client Encoding
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The long accepted standard for establishing client encoding for nearly all
+Oracle related software is via the `NLS_LANG <https://www.oracle.com/database/technologies/faq-nls-lang.html>`_
+environment variable. cx_Oracle like most other Oracle drivers will use
+this environment variable as the source of its encoding configuration. The
+format of this variable is idiosyncratic; a typical value would be
+``AMERICAN_AMERICA.AL32UTF8``.
+
+The cx_Oracle driver also supports a programmatic alternative which is to
+pass the ``encoding`` and ``nencoding`` parameters directly to its
+``.connect()`` function. These can be present in the URL as follows::
+
+ engine = create_engine("oracle+cx_oracle://scott:tiger@orclpdb/?encoding=UTF-8&nencoding=UTF-8")
+
+For the meaning of the ``encoding`` and ``nencoding`` parameters, please
+consult
+`Characters Sets and National Language Support (NLS) <https://cx-oracle.readthedocs.io/en/latest/user_guide/globalization.html#globalization>`_.
+
+.. seealso::
+
+ `Characters Sets and National Language Support (NLS) <https://cx-oracle.readthedocs.io/en/latest/user_guide/globalization.html#globalization>`_
+ - in the cx_Oracle documentation.
+
+
+Unicode-specific Column datatypes
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The Core expression language handles unicode data by use of the :class:`.Unicode`
+and :class:`.UnicodeText`
+datatypes. These types correspond to the VARCHAR2 and CLOB Oracle datatypes by
+default. When using these datatypes with Unicode data, it is expected that
+the Oracle database is configured with a Unicode-aware character set, as well
+as that the ``NLS_LANG`` environment variable is set appropriately, so that
+the VARCHAR2 and CLOB datatypes can accommodate the data.
+
+In the case that the Oracle database is not configured with a Unicode character
+set, the two options are to use the :class:`_types.NCHAR` and
+:class:`_oracle.NCLOB` datatypes explicitly, or to pass the flag
+``use_nchar_for_unicode=True`` to :func:`_sa.create_engine`,
+which will cause the
+SQLAlchemy dialect to use NCHAR/NCLOB for the :class:`.Unicode` /
+:class:`.UnicodeText` datatypes instead of VARCHAR/CLOB.
+
+.. versionchanged:: 1.3 The :class:`.Unicode` and :class:`.UnicodeText`
+ datatypes now correspond to the ``VARCHAR2`` and ``CLOB`` Oracle datatypes
+ unless the ``use_nchar_for_unicode=True`` is passed to the dialect
+ when :func:`_sa.create_engine` is called.
+
+Unicode Coercion of result rows under Python 2
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+When result sets are fetched that include strings, under Python 3 the cx_Oracle
+DBAPI returns all strings as Python Unicode objects, since Python 3 only has a
+Unicode string type. This occurs for data fetched from datatypes such as
+VARCHAR2, CHAR, CLOB, NCHAR, NCLOB, etc. In order to provide cross-
+compatibility under Python 2, the SQLAlchemy cx_Oracle dialect will add
+Unicode-conversion to string data under Python 2 as well. Historically, this
+made use of converters that were supplied by cx_Oracle but were found to be
+non-performant; SQLAlchemy's own converters are used for the string to Unicode
+conversion under Python 2. To disable the Python 2 Unicode conversion for
+VARCHAR2, CHAR, and CLOB, the flag ``coerce_to_unicode=False`` can be passed to
+:func:`_sa.create_engine`.
+
+.. versionchanged:: 1.3 Unicode conversion is applied to all string values
+ by default under python 2. The ``coerce_to_unicode`` now defaults to True
+ and can be set to False to disable the Unicode coercion of strings that are
+ delivered as VARCHAR2/CHAR/CLOB data.
+
+.. _cx_oracle_unicode_encoding_errors:
+
+Encoding Errors
+^^^^^^^^^^^^^^^
+
+For the unusual case that data in the Oracle database is present with a broken
+encoding, the dialect accepts a parameter ``encoding_errors`` which will be
+passed to Unicode decoding functions in order to affect how decoding errors are
+handled. The value is ultimately consumed by the Python `decode
+<https://docs.python.org/3/library/stdtypes.html#bytes.decode>`_ function, and
+is passed both via cx_Oracle's ``encodingErrors`` parameter consumed by
+``Cursor.var()``, as well as SQLAlchemy's own decoding function, as the
+cx_Oracle dialect makes use of both under different circumstances.
+
+.. versionadded:: 1.3.11
+
+
+.. _cx_oracle_setinputsizes:
+
+Fine grained control over cx_Oracle data binding performance with setinputsizes
+-------------------------------------------------------------------------------
+
+The cx_Oracle DBAPI has a deep and fundamental reliance upon the usage of the
+DBAPI ``setinputsizes()`` call. The purpose of this call is to establish the
+datatypes that are bound to a SQL statement for Python values being passed as
+parameters. While virtually no other DBAPI assigns any use to the
+``setinputsizes()`` call, the cx_Oracle DBAPI relies upon it heavily in its
+interactions with the Oracle client interface, and in some scenarios it is not
+possible for SQLAlchemy to know exactly how data should be bound, as some
+settings can cause profoundly different performance characteristics, while
+altering the type coercion behavior at the same time.
+
+Users of the cx_Oracle dialect are **strongly encouraged** to read through
+cx_Oracle's list of built-in datatype symbols at
+https://cx-oracle.readthedocs.io/en/latest/api_manual/module.html#database-types.
+Note that in some cases, significant performance degradation can occur when
+using these types vs. not, in particular when specifying ``cx_Oracle.CLOB``.
+
+On the SQLAlchemy side, the :meth:`.DialectEvents.do_setinputsizes` event can
+be used both for runtime visibility (e.g. logging) of the setinputsizes step as
+well as to fully control how ``setinputsizes()`` is used on a per-statement
+basis.
+
+.. versionadded:: 1.2.9 Added :meth:`.DialectEvents.setinputsizes`
+
+
+Example 1 - logging all setinputsizes calls
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The following example illustrates how to log the intermediary values from a
+SQLAlchemy perspective before they are converted to the raw ``setinputsizes()``
+parameter dictionary. The keys of the dictionary are :class:`.BindParameter`
+objects which have a ``.key`` and a ``.type`` attribute::
+
+ from sqlalchemy import create_engine, event
+
+ engine = create_engine("oracle+cx_oracle://scott:tiger@host/xe")
+
+ @event.listens_for(engine, "do_setinputsizes")
+ def _log_setinputsizes(inputsizes, cursor, statement, parameters, context):
+ for bindparam, dbapitype in inputsizes.items():
+ log.info(
+ "Bound parameter name: %s SQLAlchemy type: %r "
+ "DBAPI object: %s",
+ bindparam.key, bindparam.type, dbapitype)
+
+Example 2 - remove all bindings to CLOB
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The ``CLOB`` datatype in cx_Oracle incurs a significant performance overhead,
+however is set by default for the ``Text`` type within the SQLAlchemy 1.2
+series. This setting can be modified as follows::
+
+ from sqlalchemy import create_engine, event
+ from cx_Oracle import CLOB
+
+ engine = create_engine("oracle+cx_oracle://scott:tiger@host/xe")
+
+ @event.listens_for(engine, "do_setinputsizes")
+ def _remove_clob(inputsizes, cursor, statement, parameters, context):
+ for bindparam, dbapitype in list(inputsizes.items()):
+ if dbapitype is CLOB:
+ del inputsizes[bindparam]
+
+.. _cx_oracle_returning:
+
+RETURNING Support
+-----------------
+
+The cx_Oracle dialect implements RETURNING using OUT parameters.
+The dialect supports RETURNING fully, however cx_Oracle 6 is recommended
+for complete support.
+
+.. _cx_oracle_lob:
+
+LOB Objects
+-----------
+
+cx_oracle returns oracle LOBs using the cx_oracle.LOB object. SQLAlchemy
+converts these to strings so that the interface of the Binary type is
+consistent with that of other backends, which takes place within a cx_Oracle
+outputtypehandler.
+
+cx_Oracle prior to version 6 would require that LOB objects be read before
+a new batch of rows would be read, as determined by the ``cursor.arraysize``.
+As of the 6 series, this limitation has been lifted. Nevertheless, because
+SQLAlchemy pre-reads these LOBs up front, this issue is avoided in any case.
+
+To disable the auto "read()" feature of the dialect, the flag
+``auto_convert_lobs=False`` may be passed to :func:`_sa.create_engine`. Under
+the cx_Oracle 5 series, having this flag turned off means there is the chance
+of reading from a stale LOB object if not read as it is fetched. With
+cx_Oracle 6, this issue is resolved.
+
+.. versionchanged:: 1.2 the LOB handling system has been greatly simplified
+ internally to make use of outputtypehandlers, and no longer makes use
+ of alternate "buffered" result set objects.
+
+Two Phase Transactions Not Supported
+-------------------------------------
+
+Two phase transactions are **not supported** under cx_Oracle due to poor
+driver support. As of cx_Oracle 6.0b1, the interface for
+two phase transactions has been changed to be more of a direct pass-through
+to the underlying OCI layer with less automation. The additional logic
+to support this system is not implemented in SQLAlchemy.
+
+.. _cx_oracle_numeric:
+
+Precision Numerics
+------------------
+
+SQLAlchemy's numeric types can handle receiving and returning values as Python
+``Decimal`` objects or float objects. When a :class:`.Numeric` object, or a
+subclass such as :class:`.Float`, :class:`_oracle.DOUBLE_PRECISION` etc. is in
+use, the :paramref:`.Numeric.asdecimal` flag determines if values should be
+coerced to ``Decimal`` upon return, or returned as float objects. To make
+matters more complicated under Oracle, Oracle's ``NUMBER`` type can also
+represent integer values if the "scale" is zero, so the Oracle-specific
+:class:`_oracle.NUMBER` type takes this into account as well.
+
+The cx_Oracle dialect makes extensive use of connection- and cursor-level
+"outputtypehandler" callables in order to coerce numeric values as requested.
+These callables are specific to the specific flavor of :class:`.Numeric` in
+use, as well as if no SQLAlchemy typing objects are present. There are
+observed scenarios where Oracle may sends incomplete or ambiguous information
+about the numeric types being returned, such as a query where the numeric types
+are buried under multiple levels of subquery. The type handlers do their best
+to make the right decision in all cases, deferring to the underlying cx_Oracle
+DBAPI for all those cases where the driver can make the best decision.
+
+When no typing objects are present, as when executing plain SQL strings, a
+default "outputtypehandler" is present which will generally return numeric
+values which specify precision and scale as Python ``Decimal`` objects. To
+disable this coercion to decimal for performance reasons, pass the flag
+``coerce_to_decimal=False`` to :func:`_sa.create_engine`::
+
+ engine = create_engine("oracle+cx_oracle://dsn", coerce_to_decimal=False)
+
+The ``coerce_to_decimal`` flag only impacts the results of plain string
+SQL statements that are not otherwise associated with a :class:`.Numeric`
+SQLAlchemy type (or a subclass of such).
+
+.. versionchanged:: 1.2 The numeric handling system for cx_Oracle has been
+ reworked to take advantage of newer cx_Oracle features as well
+ as better integration of outputtypehandlers.
+
+""" # noqa
+
+from __future__ import absolute_import
+
+import decimal
+import random
+import re
+
+from . import base as oracle
+from .base import OracleCompiler
+from .base import OracleDialect
+from .base import OracleExecutionContext
+from ... import exc
+from ... import processors
+from ... import types as sqltypes
+from ... import util
+from ...engine import cursor as _cursor
+from ...util import compat
+
+
+class _OracleInteger(sqltypes.Integer):
+ def get_dbapi_type(self, dbapi):
+ # see https://github.com/oracle/python-cx_Oracle/issues/
+ # 208#issuecomment-409715955
+ return int
+
+ def _cx_oracle_var(self, dialect, cursor):
+ cx_Oracle = dialect.dbapi
+ return cursor.var(
+ cx_Oracle.STRING, 255, arraysize=cursor.arraysize, outconverter=int
+ )
+
+ def _cx_oracle_outputtypehandler(self, dialect):
+ def handler(cursor, name, default_type, size, precision, scale):
+ return self._cx_oracle_var(dialect, cursor)
+
+ return handler
+
+
+class _OracleNumeric(sqltypes.Numeric):
+ is_number = False
+
+ def bind_processor(self, dialect):
+ if self.scale == 0:
+ return None
+ elif self.asdecimal:
+ processor = processors.to_decimal_processor_factory(
+ decimal.Decimal, self._effective_decimal_return_scale
+ )
+
+ def process(value):
+ if isinstance(value, (int, float)):
+ return processor(value)
+ elif value is not None and value.is_infinite():
+ return float(value)
+ else:
+ return value
+
+ return process
+ else:
+ return processors.to_float
+
+ def result_processor(self, dialect, coltype):
+ return None
+
+ def _cx_oracle_outputtypehandler(self, dialect):
+ cx_Oracle = dialect.dbapi
+
+ is_cx_oracle_6 = dialect._is_cx_oracle_6
+
+ def handler(cursor, name, default_type, size, precision, scale):
+ outconverter = None
+
+ if precision:
+ if self.asdecimal:
+ if default_type == cx_Oracle.NATIVE_FLOAT:
+ # receiving float and doing Decimal after the fact
+ # allows for float("inf") to be handled
+ type_ = default_type
+ outconverter = decimal.Decimal
+ elif is_cx_oracle_6:
+ type_ = decimal.Decimal
+ else:
+ type_ = cx_Oracle.STRING
+ outconverter = dialect._to_decimal
+ else:
+ if self.is_number and scale == 0:
+ # integer. cx_Oracle is observed to handle the widest
+ # variety of ints when no directives are passed,
+ # from 5.2 to 7.0. See [ticket:4457]
+ return None
+ else:
+ type_ = cx_Oracle.NATIVE_FLOAT
+
+ else:
+ if self.asdecimal:
+ if default_type == cx_Oracle.NATIVE_FLOAT:
+ type_ = default_type
+ outconverter = decimal.Decimal
+ elif is_cx_oracle_6:
+ type_ = decimal.Decimal
+ else:
+ type_ = cx_Oracle.STRING
+ outconverter = dialect._to_decimal
+ else:
+ if self.is_number and scale == 0:
+ # integer. cx_Oracle is observed to handle the widest
+ # variety of ints when no directives are passed,
+ # from 5.2 to 7.0. See [ticket:4457]
+ return None
+ else:
+ type_ = cx_Oracle.NATIVE_FLOAT
+
+ return cursor.var(
+ type_,
+ 255,
+ arraysize=cursor.arraysize,
+ outconverter=outconverter,
+ )
+
+ return handler
+
+
+class _OracleBinaryFloat(_OracleNumeric):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.NATIVE_FLOAT
+
+
+class _OracleBINARY_FLOAT(_OracleBinaryFloat, oracle.BINARY_FLOAT):
+ pass
+
+
+class _OracleBINARY_DOUBLE(_OracleBinaryFloat, oracle.BINARY_DOUBLE):
+ pass
+
+
+class _OracleNUMBER(_OracleNumeric):
+ is_number = True
+
+
+class _OracleDate(sqltypes.Date):
+ def bind_processor(self, dialect):
+ return None
+
+ def result_processor(self, dialect, coltype):
+ def process(value):
+ if value is not None:
+ return value.date()
+ else:
+ return value
+
+ return process
+
+
+# TODO: the names used across CHAR / VARCHAR / NCHAR / NVARCHAR
+# here are inconsistent and not very good
+class _OracleChar(sqltypes.CHAR):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.FIXED_CHAR
+
+
+class _OracleNChar(sqltypes.NCHAR):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.FIXED_NCHAR
+
+
+class _OracleUnicodeStringNCHAR(oracle.NVARCHAR2):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.NCHAR
+
+
+class _OracleUnicodeStringCHAR(sqltypes.Unicode):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.LONG_STRING
+
+
+class _OracleUnicodeTextNCLOB(oracle.NCLOB):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.NCLOB
+
+
+class _OracleUnicodeTextCLOB(sqltypes.UnicodeText):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.CLOB
+
+
+class _OracleText(sqltypes.Text):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.CLOB
+
+
+class _OracleLong(oracle.LONG):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.LONG_STRING
+
+
+class _OracleString(sqltypes.String):
+ pass
+
+
+class _OracleEnum(sqltypes.Enum):
+ def bind_processor(self, dialect):
+ enum_proc = sqltypes.Enum.bind_processor(self, dialect)
+
+ def process(value):
+ raw_str = enum_proc(value)
+ return raw_str
+
+ return process
+
+
+class _OracleBinary(sqltypes.LargeBinary):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.BLOB
+
+ def bind_processor(self, dialect):
+ return None
+
+ def result_processor(self, dialect, coltype):
+ if not dialect.auto_convert_lobs:
+ return None
+ else:
+ return super(_OracleBinary, self).result_processor(
+ dialect, coltype
+ )
+
+
+class _OracleInterval(oracle.INTERVAL):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.INTERVAL
+
+
+class _OracleRaw(oracle.RAW):
+ pass
+
+
+class _OracleRowid(oracle.ROWID):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.ROWID
+
+
+class OracleCompiler_cx_oracle(OracleCompiler):
+ _oracle_cx_sql_compiler = True
+
+ def bindparam_string(self, name, **kw):
+ quote = getattr(name, "quote", None)
+ if (
+ quote is True
+ or quote is not False
+ and self.preparer._bindparam_requires_quotes(name)
+ and not kw.get("post_compile", False)
+ ):
+ # interesting to note about expanding parameters - since the
+ # new parameters take the form <paramname>_<int>, at least if
+ # they are originally formed from reserved words, they no longer
+ # need quoting :). names that include illegal characters
+ # won't work however.
+ quoted_name = '"%s"' % name
+ kw["escaped_from"] = name
+ name = quoted_name
+
+ return OracleCompiler.bindparam_string(self, name, **kw)
+
+
+class OracleExecutionContext_cx_oracle(OracleExecutionContext):
+ out_parameters = None
+
+ def _generate_out_parameter_vars(self):
+ # check for has_out_parameters or RETURNING, create cx_Oracle.var
+ # objects if so
+ if self.compiled.returning or self.compiled.has_out_parameters:
+ quoted_bind_names = self.compiled.escaped_bind_names
+ for bindparam in self.compiled.binds.values():
+ if bindparam.isoutparam:
+ name = self.compiled.bind_names[bindparam]
+ type_impl = bindparam.type.dialect_impl(self.dialect)
+
+ if hasattr(type_impl, "_cx_oracle_var"):
+ self.out_parameters[name] = type_impl._cx_oracle_var(
+ self.dialect, self.cursor
+ )
+ else:
+ dbtype = type_impl.get_dbapi_type(self.dialect.dbapi)
+
+ cx_Oracle = self.dialect.dbapi
+
+ if dbtype is None:
+ raise exc.InvalidRequestError(
+ "Cannot create out parameter for "
+ "parameter "
+ "%r - its type %r is not supported by"
+ " cx_oracle" % (bindparam.key, bindparam.type)
+ )
+
+ if compat.py2k and dbtype in (
+ cx_Oracle.CLOB,
+ cx_Oracle.NCLOB,
+ ):
+ outconverter = (
+ processors.to_unicode_processor_factory(
+ self.dialect.encoding,
+ errors=self.dialect.encoding_errors,
+ )
+ )
+ self.out_parameters[name] = self.cursor.var(
+ dbtype,
+ outconverter=lambda value: outconverter(
+ value.read()
+ ),
+ )
+
+ elif dbtype in (
+ cx_Oracle.BLOB,
+ cx_Oracle.CLOB,
+ cx_Oracle.NCLOB,
+ ):
+ self.out_parameters[name] = self.cursor.var(
+ dbtype, outconverter=lambda value: value.read()
+ )
+ elif compat.py2k and isinstance(
+ type_impl, sqltypes.Unicode
+ ):
+ outconverter = (
+ processors.to_unicode_processor_factory(
+ self.dialect.encoding,
+ errors=self.dialect.encoding_errors,
+ )
+ )
+ self.out_parameters[name] = self.cursor.var(
+ dbtype, outconverter=outconverter
+ )
+ else:
+ self.out_parameters[name] = self.cursor.var(dbtype)
+ self.parameters[0][
+ quoted_bind_names.get(name, name)
+ ] = self.out_parameters[name]
+
+ def _generate_cursor_outputtype_handler(self):
+ output_handlers = {}
+
+ for (keyname, name, objects, type_) in self.compiled._result_columns:
+ handler = type_._cached_custom_processor(
+ self.dialect,
+ "cx_oracle_outputtypehandler",
+ self._get_cx_oracle_type_handler,
+ )
+
+ if handler:
+ denormalized_name = self.dialect.denormalize_name(keyname)
+ output_handlers[denormalized_name] = handler
+
+ if output_handlers:
+ default_handler = self._dbapi_connection.outputtypehandler
+
+ def output_type_handler(
+ cursor, name, default_type, size, precision, scale
+ ):
+ if name in output_handlers:
+ return output_handlers[name](
+ cursor, name, default_type, size, precision, scale
+ )
+ else:
+ return default_handler(
+ cursor, name, default_type, size, precision, scale
+ )
+
+ self.cursor.outputtypehandler = output_type_handler
+
+ def _get_cx_oracle_type_handler(self, impl):
+ if hasattr(impl, "_cx_oracle_outputtypehandler"):
+ return impl._cx_oracle_outputtypehandler(self.dialect)
+ else:
+ return None
+
+ def pre_exec(self):
+ if not getattr(self.compiled, "_oracle_cx_sql_compiler", False):
+ return
+
+ self.out_parameters = {}
+
+ self._generate_out_parameter_vars()
+
+ self._generate_cursor_outputtype_handler()
+
+ self.include_set_input_sizes = self.dialect._include_setinputsizes
+
+ def post_exec(self):
+ if self.compiled and self.out_parameters and self.compiled.returning:
+ # create a fake cursor result from the out parameters. unlike
+ # get_out_parameter_values(), the result-row handlers here will be
+ # applied at the Result level
+ returning_params = [
+ self.dialect._returningval(self.out_parameters["ret_%d" % i])
+ for i in range(len(self.out_parameters))
+ ]
+
+ fetch_strategy = _cursor.FullyBufferedCursorFetchStrategy(
+ self.cursor,
+ [
+ (getattr(col, "name", col._anon_name_label), None)
+ for col in self.compiled.returning
+ ],
+ initial_buffer=[tuple(returning_params)],
+ )
+
+ self.cursor_fetch_strategy = fetch_strategy
+
+ def create_cursor(self):
+ c = self._dbapi_connection.cursor()
+ if self.dialect.arraysize:
+ c.arraysize = self.dialect.arraysize
+
+ return c
+
+ def get_out_parameter_values(self, out_param_names):
+ # this method should not be called when the compiler has
+ # RETURNING as we've turned the has_out_parameters flag set to
+ # False.
+ assert not self.compiled.returning
+
+ return [
+ self.dialect._paramval(self.out_parameters[name])
+ for name in out_param_names
+ ]
+
+
+class OracleDialect_cx_oracle(OracleDialect):
+ supports_statement_cache = True
+ execution_ctx_cls = OracleExecutionContext_cx_oracle
+ statement_compiler = OracleCompiler_cx_oracle
+
+ supports_sane_rowcount = True
+ supports_sane_multi_rowcount = True
+
+ supports_unicode_statements = True
+ supports_unicode_binds = True
+
+ use_setinputsizes = True
+
+ driver = "cx_oracle"
+
+ colspecs = {
+ sqltypes.Numeric: _OracleNumeric,
+ sqltypes.Float: _OracleNumeric,
+ oracle.BINARY_FLOAT: _OracleBINARY_FLOAT,
+ oracle.BINARY_DOUBLE: _OracleBINARY_DOUBLE,
+ sqltypes.Integer: _OracleInteger,
+ oracle.NUMBER: _OracleNUMBER,
+ sqltypes.Date: _OracleDate,
+ sqltypes.LargeBinary: _OracleBinary,
+ sqltypes.Boolean: oracle._OracleBoolean,
+ sqltypes.Interval: _OracleInterval,
+ oracle.INTERVAL: _OracleInterval,
+ sqltypes.Text: _OracleText,
+ sqltypes.String: _OracleString,
+ sqltypes.UnicodeText: _OracleUnicodeTextCLOB,
+ sqltypes.CHAR: _OracleChar,
+ sqltypes.NCHAR: _OracleNChar,
+ sqltypes.Enum: _OracleEnum,
+ oracle.LONG: _OracleLong,
+ oracle.RAW: _OracleRaw,
+ sqltypes.Unicode: _OracleUnicodeStringCHAR,
+ sqltypes.NVARCHAR: _OracleUnicodeStringNCHAR,
+ oracle.NCLOB: _OracleUnicodeTextNCLOB,
+ oracle.ROWID: _OracleRowid,
+ }
+
+ execute_sequence_format = list
+
+ _cx_oracle_threaded = None
+
+ @util.deprecated_params(
+ threaded=(
+ "1.3",
+ "The 'threaded' parameter to the cx_oracle dialect "
+ "is deprecated as a dialect-level argument, and will be removed "
+ "in a future release. As of version 1.3, it defaults to False "
+ "rather than True. The 'threaded' option can be passed to "
+ "cx_Oracle directly in the URL query string passed to "
+ ":func:`_sa.create_engine`.",
+ )
+ )
+ def __init__(
+ self,
+ auto_convert_lobs=True,
+ coerce_to_unicode=True,
+ coerce_to_decimal=True,
+ arraysize=50,
+ encoding_errors=None,
+ threaded=None,
+ **kwargs
+ ):
+
+ OracleDialect.__init__(self, **kwargs)
+ self.arraysize = arraysize
+ self.encoding_errors = encoding_errors
+ if threaded is not None:
+ self._cx_oracle_threaded = threaded
+ self.auto_convert_lobs = auto_convert_lobs
+ self.coerce_to_unicode = coerce_to_unicode
+ self.coerce_to_decimal = coerce_to_decimal
+ if self._use_nchar_for_unicode:
+ self.colspecs = self.colspecs.copy()
+ self.colspecs[sqltypes.Unicode] = _OracleUnicodeStringNCHAR
+ self.colspecs[sqltypes.UnicodeText] = _OracleUnicodeTextNCLOB
+
+ cx_Oracle = self.dbapi
+
+ if cx_Oracle is None:
+ self._include_setinputsizes = {}
+ self.cx_oracle_ver = (0, 0, 0)
+ else:
+ self.cx_oracle_ver = self._parse_cx_oracle_ver(cx_Oracle.version)
+ if self.cx_oracle_ver < (5, 2) and self.cx_oracle_ver > (0, 0, 0):
+ raise exc.InvalidRequestError(
+ "cx_Oracle version 5.2 and above are supported"
+ )
+
+ self._include_setinputsizes = {
+ cx_Oracle.DATETIME,
+ cx_Oracle.NCLOB,
+ cx_Oracle.CLOB,
+ cx_Oracle.LOB,
+ cx_Oracle.NCHAR,
+ cx_Oracle.FIXED_NCHAR,
+ cx_Oracle.BLOB,
+ cx_Oracle.FIXED_CHAR,
+ cx_Oracle.TIMESTAMP,
+ _OracleInteger,
+ _OracleBINARY_FLOAT,
+ _OracleBINARY_DOUBLE,
+ }
+
+ self._paramval = lambda value: value.getvalue()
+
+ # https://github.com/oracle/python-cx_Oracle/issues/176#issuecomment-386821291
+ # https://github.com/oracle/python-cx_Oracle/issues/224
+ self._values_are_lists = self.cx_oracle_ver >= (6, 3)
+ if self._values_are_lists:
+ cx_Oracle.__future__.dml_ret_array_val = True
+
+ def _returningval(value):
+ try:
+ return value.values[0][0]
+ except IndexError:
+ return None
+
+ self._returningval = _returningval
+ else:
+ self._returningval = self._paramval
+
+ self._is_cx_oracle_6 = self.cx_oracle_ver >= (6,)
+
+ @property
+ def _cursor_var_unicode_kwargs(self):
+ if self.encoding_errors:
+ if self.cx_oracle_ver >= (6, 4):
+ return {"encodingErrors": self.encoding_errors}
+ else:
+ util.warn(
+ "cx_oracle version %r does not support encodingErrors"
+ % (self.cx_oracle_ver,)
+ )
+
+ return {}
+
+ def _parse_cx_oracle_ver(self, version):
+ m = re.match(r"(\d+)\.(\d+)(?:\.(\d+))?", version)
+ if m:
+ return tuple(int(x) for x in m.group(1, 2, 3) if x is not None)
+ else:
+ return (0, 0, 0)
+
+ @classmethod
+ def dbapi(cls):
+ import cx_Oracle
+
+ return cx_Oracle
+
+ def initialize(self, connection):
+ super(OracleDialect_cx_oracle, self).initialize(connection)
+ if self._is_oracle_8:
+ self.supports_unicode_binds = False
+
+ self._detect_decimal_char(connection)
+
+ def get_isolation_level(self, connection):
+ # sources:
+
+ # general idea of transaction id, have to start one, etc.
+ # https://stackoverflow.com/questions/10711204/how-to-check-isoloation-level
+
+ # how to decode xid cols from v$transaction to match
+ # https://asktom.oracle.com/pls/apex/f?p=100:11:0::::P11_QUESTION_ID:9532779900346079444
+
+ # Oracle tuple comparison without using IN:
+ # https://www.sql-workbench.eu/comparison/tuple_comparison.html
+
+ with connection.cursor() as cursor:
+ # this is the only way to ensure a transaction is started without
+ # actually running DML. There's no way to see the configured
+ # isolation level without getting it from v$transaction which
+ # means transaction has to be started.
+ outval = cursor.var(str)
+ cursor.execute(
+ """
+ begin
+ :trans_id := dbms_transaction.local_transaction_id( TRUE );
+ end;
+ """,
+ {"trans_id": outval},
+ )
+ trans_id = outval.getvalue()
+ xidusn, xidslot, xidsqn = trans_id.split(".", 2)
+
+ cursor.execute(
+ "SELECT CASE BITAND(t.flag, POWER(2, 28)) "
+ "WHEN 0 THEN 'READ COMMITTED' "
+ "ELSE 'SERIALIZABLE' END AS isolation_level "
+ "FROM v$transaction t WHERE "
+ "(t.xidusn, t.xidslot, t.xidsqn) = "
+ "((:xidusn, :xidslot, :xidsqn))",
+ {"xidusn": xidusn, "xidslot": xidslot, "xidsqn": xidsqn},
+ )
+ row = cursor.fetchone()
+ if row is None:
+ raise exc.InvalidRequestError(
+ "could not retrieve isolation level"
+ )
+ result = row[0]
+
+ return result
+
+ def set_isolation_level(self, connection, level):
+ if hasattr(connection, "dbapi_connection"):
+ dbapi_connection = connection.dbapi_connection
+ else:
+ dbapi_connection = connection
+ if level == "AUTOCOMMIT":
+ dbapi_connection.autocommit = True
+ else:
+ dbapi_connection.autocommit = False
+ connection.rollback()
+ with connection.cursor() as cursor:
+ cursor.execute("ALTER SESSION SET ISOLATION_LEVEL=%s" % level)
+
+ def _detect_decimal_char(self, connection):
+ # we have the option to change this setting upon connect,
+ # or just look at what it is upon connect and convert.
+ # to minimize the chance of interference with changes to
+ # NLS_TERRITORY or formatting behavior of the DB, we opt
+ # to just look at it
+
+ self._decimal_char = connection.exec_driver_sql(
+ "select value from nls_session_parameters "
+ "where parameter = 'NLS_NUMERIC_CHARACTERS'"
+ ).scalar()[0]
+ if self._decimal_char != ".":
+ _detect_decimal = self._detect_decimal
+ _to_decimal = self._to_decimal
+
+ self._detect_decimal = lambda value: _detect_decimal(
+ value.replace(self._decimal_char, ".")
+ )
+ self._to_decimal = lambda value: _to_decimal(
+ value.replace(self._decimal_char, ".")
+ )
+
+ def _detect_decimal(self, value):
+ if "." in value:
+ return self._to_decimal(value)
+ else:
+ return int(value)
+
+ _to_decimal = decimal.Decimal
+
+ def _generate_connection_outputtype_handler(self):
+ """establish the default outputtypehandler established at the
+ connection level.
+
+ """
+
+ dialect = self
+ cx_Oracle = dialect.dbapi
+
+ number_handler = _OracleNUMBER(
+ asdecimal=True
+ )._cx_oracle_outputtypehandler(dialect)
+ float_handler = _OracleNUMBER(
+ asdecimal=False
+ )._cx_oracle_outputtypehandler(dialect)
+
+ def output_type_handler(
+ cursor, name, default_type, size, precision, scale
+ ):
+
+ if (
+ default_type == cx_Oracle.NUMBER
+ and default_type is not cx_Oracle.NATIVE_FLOAT
+ ):
+ if not dialect.coerce_to_decimal:
+ return None
+ elif precision == 0 and scale in (0, -127):
+ # ambiguous type, this occurs when selecting
+ # numbers from deep subqueries
+ return cursor.var(
+ cx_Oracle.STRING,
+ 255,
+ outconverter=dialect._detect_decimal,
+ arraysize=cursor.arraysize,
+ )
+ elif precision and scale > 0:
+ return number_handler(
+ cursor, name, default_type, size, precision, scale
+ )
+ else:
+ return float_handler(
+ cursor, name, default_type, size, precision, scale
+ )
+
+ # allow all strings to come back natively as Unicode
+ elif (
+ dialect.coerce_to_unicode
+ and default_type
+ in (
+ cx_Oracle.STRING,
+ cx_Oracle.FIXED_CHAR,
+ )
+ and default_type is not cx_Oracle.CLOB
+ and default_type is not cx_Oracle.NCLOB
+ ):
+ if compat.py2k:
+ outconverter = processors.to_unicode_processor_factory(
+ dialect.encoding, errors=dialect.encoding_errors
+ )
+ return cursor.var(
+ cx_Oracle.STRING,
+ size,
+ cursor.arraysize,
+ outconverter=outconverter,
+ )
+ else:
+ return cursor.var(
+ util.text_type,
+ size,
+ cursor.arraysize,
+ **dialect._cursor_var_unicode_kwargs
+ )
+
+ elif dialect.auto_convert_lobs and default_type in (
+ cx_Oracle.CLOB,
+ cx_Oracle.NCLOB,
+ ):
+ if compat.py2k:
+ outconverter = processors.to_unicode_processor_factory(
+ dialect.encoding, errors=dialect.encoding_errors
+ )
+ return cursor.var(
+ cx_Oracle.LONG_STRING,
+ size,
+ cursor.arraysize,
+ outconverter=outconverter,
+ )
+ else:
+ return cursor.var(
+ cx_Oracle.LONG_STRING,
+ size,
+ cursor.arraysize,
+ **dialect._cursor_var_unicode_kwargs
+ )
+
+ elif dialect.auto_convert_lobs and default_type in (
+ cx_Oracle.BLOB,
+ ):
+ return cursor.var(
+ cx_Oracle.LONG_BINARY,
+ size,
+ cursor.arraysize,
+ )
+
+ return output_type_handler
+
+ def on_connect(self):
+
+ output_type_handler = self._generate_connection_outputtype_handler()
+
+ def on_connect(conn):
+ conn.outputtypehandler = output_type_handler
+
+ return on_connect
+
+ def create_connect_args(self, url):
+ opts = dict(url.query)
+
+ for opt in ("use_ansi", "auto_convert_lobs"):
+ if opt in opts:
+ util.warn_deprecated(
+ "cx_oracle dialect option %r should only be passed to "
+ "create_engine directly, not within the URL string" % opt,
+ version="1.3",
+ )
+ util.coerce_kw_type(opts, opt, bool)
+ setattr(self, opt, opts.pop(opt))
+
+ database = url.database
+ service_name = opts.pop("service_name", None)
+ if database or service_name:
+ # if we have a database, then we have a remote host
+ port = url.port
+ if port:
+ port = int(port)
+ else:
+ port = 1521
+
+ if database and service_name:
+ raise exc.InvalidRequestError(
+ '"service_name" option shouldn\'t '
+ 'be used with a "database" part of the url'
+ )
+ if database:
+ makedsn_kwargs = {"sid": database}
+ if service_name:
+ makedsn_kwargs = {"service_name": service_name}
+
+ dsn = self.dbapi.makedsn(url.host, port, **makedsn_kwargs)
+ else:
+ # we have a local tnsname
+ dsn = url.host
+
+ if dsn is not None:
+ opts["dsn"] = dsn
+ if url.password is not None:
+ opts["password"] = url.password
+ if url.username is not None:
+ opts["user"] = url.username
+
+ if self._cx_oracle_threaded is not None:
+ opts.setdefault("threaded", self._cx_oracle_threaded)
+
+ def convert_cx_oracle_constant(value):
+ if isinstance(value, util.string_types):
+ try:
+ int_val = int(value)
+ except ValueError:
+ value = value.upper()
+ return getattr(self.dbapi, value)
+ else:
+ return int_val
+ else:
+ return value
+
+ util.coerce_kw_type(opts, "mode", convert_cx_oracle_constant)
+ util.coerce_kw_type(opts, "threaded", bool)
+ util.coerce_kw_type(opts, "events", bool)
+ util.coerce_kw_type(opts, "purity", convert_cx_oracle_constant)
+ return ([], opts)
+
+ def _get_server_version_info(self, connection):
+ return tuple(int(x) for x in connection.connection.version.split("."))
+
+ def is_disconnect(self, e, connection, cursor):
+ (error,) = e.args
+ if isinstance(
+ e, (self.dbapi.InterfaceError, self.dbapi.DatabaseError)
+ ) and "not connected" in str(e):
+ return True
+
+ if hasattr(error, "code") and error.code in {
+ 28,
+ 3114,
+ 3113,
+ 3135,
+ 1033,
+ 2396,
+ }:
+ # ORA-00028: your session has been killed
+ # ORA-03114: not connected to ORACLE
+ # ORA-03113: end-of-file on communication channel
+ # ORA-03135: connection lost contact
+ # ORA-01033: ORACLE initialization or shutdown in progress
+ # ORA-02396: exceeded maximum idle time, please connect again
+ # TODO: Others ?
+ return True
+
+ if re.match(r"^(?:DPI-1010|DPI-1080|DPY-1001|DPY-4011)", str(e)):
+ # DPI-1010: not connected
+ # DPI-1080: connection was closed by ORA-3113
+ # python-oracledb's DPY-1001: not connected to database
+ # python-oracledb's DPY-4011: the database or network closed the
+ # connection
+ # TODO: others?
+ return True
+
+ return False
+
+ def create_xid(self):
+ """create a two-phase transaction ID.
+
+ this id will be passed to do_begin_twophase(), do_rollback_twophase(),
+ do_commit_twophase(). its format is unspecified.
+
+ """
+
+ id_ = random.randint(0, 2 ** 128)
+ return (0x1234, "%032x" % id_, "%032x" % 9)
+
+ def do_executemany(self, cursor, statement, parameters, context=None):
+ if isinstance(parameters, tuple):
+ parameters = list(parameters)
+ cursor.executemany(statement, parameters)
+
+ def do_begin_twophase(self, connection, xid):
+ connection.connection.begin(*xid)
+ connection.connection.info["cx_oracle_xid"] = xid
+
+ def do_prepare_twophase(self, connection, xid):
+ result = connection.connection.prepare()
+ connection.info["cx_oracle_prepared"] = result
+
+ def do_rollback_twophase(
+ self, connection, xid, is_prepared=True, recover=False
+ ):
+ self.do_rollback(connection.connection)
+ # TODO: need to end XA state here
+
+ def do_commit_twophase(
+ self, connection, xid, is_prepared=True, recover=False
+ ):
+
+ if not is_prepared:
+ self.do_commit(connection.connection)
+ else:
+ if recover:
+ raise NotImplementedError(
+ "2pc recovery not implemented for cx_Oracle"
+ )
+ oci_prepared = connection.info["cx_oracle_prepared"]
+ if oci_prepared:
+ self.do_commit(connection.connection)
+ # TODO: need to end XA state here
+
+ def do_set_input_sizes(self, cursor, list_of_tuples, context):
+ if self.positional:
+ # not usually used, here to support if someone is modifying
+ # the dialect to use positional style
+ cursor.setinputsizes(
+ *[dbtype for key, dbtype, sqltype in list_of_tuples]
+ )
+ else:
+ collection = (
+ (key, dbtype)
+ for key, dbtype, sqltype in list_of_tuples
+ if dbtype
+ )
+
+ if not self.supports_unicode_binds:
+ # oracle 8 only
+ collection = (
+ (self.dialect._encoder(key)[0], dbtype)
+ for key, dbtype in collection
+ )
+
+ cursor.setinputsizes(**{key: dbtype for key, dbtype in collection})
+
+ def do_recover_twophase(self, connection):
+ raise NotImplementedError(
+ "recover two phase query for cx_Oracle not implemented"
+ )
+
+
+dialect = OracleDialect_cx_oracle
diff --git a/lib/sqlalchemy/dialects/oracle/provision.py b/lib/sqlalchemy/dialects/oracle/provision.py
new file mode 100644
index 0000000..74ad1f2
--- /dev/null
+++ b/lib/sqlalchemy/dialects/oracle/provision.py
@@ -0,0 +1,160 @@
+from ... import create_engine
+from ... import exc
+from ...engine import url as sa_url
+from ...testing.provision import configure_follower
+from ...testing.provision import create_db
+from ...testing.provision import drop_db
+from ...testing.provision import follower_url_from_main
+from ...testing.provision import log
+from ...testing.provision import post_configure_engine
+from ...testing.provision import run_reap_dbs
+from ...testing.provision import set_default_schema_on_connection
+from ...testing.provision import stop_test_class_outside_fixtures
+from ...testing.provision import temp_table_keyword_args
+
+
+@create_db.for_db("oracle")
+def _oracle_create_db(cfg, eng, ident):
+ # NOTE: make sure you've run "ALTER DATABASE default tablespace users" or
+ # similar, so that the default tablespace is not "system"; reflection will
+ # fail otherwise
+ with eng.begin() as conn:
+ conn.exec_driver_sql("create user %s identified by xe" % ident)
+ conn.exec_driver_sql("create user %s_ts1 identified by xe" % ident)
+ conn.exec_driver_sql("create user %s_ts2 identified by xe" % ident)
+ conn.exec_driver_sql("grant dba to %s" % (ident,))
+ conn.exec_driver_sql("grant unlimited tablespace to %s" % ident)
+ conn.exec_driver_sql("grant unlimited tablespace to %s_ts1" % ident)
+ conn.exec_driver_sql("grant unlimited tablespace to %s_ts2" % ident)
+
+
+@configure_follower.for_db("oracle")
+def _oracle_configure_follower(config, ident):
+ config.test_schema = "%s_ts1" % ident
+ config.test_schema_2 = "%s_ts2" % ident
+
+
+def _ora_drop_ignore(conn, dbname):
+ try:
+ conn.exec_driver_sql("drop user %s cascade" % dbname)
+ log.info("Reaped db: %s", dbname)
+ return True
+ except exc.DatabaseError as err:
+ log.warning("couldn't drop db: %s", err)
+ return False
+
+
+@drop_db.for_db("oracle")
+def _oracle_drop_db(cfg, eng, ident):
+ with eng.begin() as conn:
+ # cx_Oracle seems to occasionally leak open connections when a large
+ # suite it run, even if we confirm we have zero references to
+ # connection objects.
+ # while there is a "kill session" command in Oracle,
+ # it unfortunately does not release the connection sufficiently.
+ _ora_drop_ignore(conn, ident)
+ _ora_drop_ignore(conn, "%s_ts1" % ident)
+ _ora_drop_ignore(conn, "%s_ts2" % ident)
+
+
+@stop_test_class_outside_fixtures.for_db("oracle")
+def stop_test_class_outside_fixtures(config, db, cls):
+
+ try:
+ with db.begin() as conn:
+ # run magic command to get rid of identity sequences
+ # https://floo.bar/2019/11/29/drop-the-underlying-sequence-of-an-identity-column/ # noqa: E501
+ conn.exec_driver_sql("purge recyclebin")
+ except exc.DatabaseError as err:
+ log.warning("purge recyclebin command failed: %s", err)
+
+ # clear statement cache on all connections that were used
+ # https://github.com/oracle/python-cx_Oracle/issues/519
+
+ for cx_oracle_conn in _all_conns:
+ try:
+ sc = cx_oracle_conn.stmtcachesize
+ except db.dialect.dbapi.InterfaceError:
+ # connection closed
+ pass
+ else:
+ cx_oracle_conn.stmtcachesize = 0
+ cx_oracle_conn.stmtcachesize = sc
+ _all_conns.clear()
+
+
+_all_conns = set()
+
+
+@post_configure_engine.for_db("oracle")
+def _oracle_post_configure_engine(url, engine, follower_ident):
+ from sqlalchemy import event
+
+ @event.listens_for(engine, "checkout")
+ def checkout(dbapi_con, con_record, con_proxy):
+ _all_conns.add(dbapi_con)
+
+ @event.listens_for(engine, "checkin")
+ def checkin(dbapi_connection, connection_record):
+ # work around cx_Oracle issue:
+ # https://github.com/oracle/python-cx_Oracle/issues/530
+ # invalidate oracle connections that had 2pc set up
+ if "cx_oracle_xid" in connection_record.info:
+ connection_record.invalidate()
+
+
+@run_reap_dbs.for_db("oracle")
+def _reap_oracle_dbs(url, idents):
+ log.info("db reaper connecting to %r", url)
+ eng = create_engine(url)
+ with eng.begin() as conn:
+
+ log.info("identifiers in file: %s", ", ".join(idents))
+
+ to_reap = conn.exec_driver_sql(
+ "select u.username from all_users u where username "
+ "like 'TEST_%' and not exists (select username "
+ "from v$session where username=u.username)"
+ )
+ all_names = {username.lower() for (username,) in to_reap}
+ to_drop = set()
+ for name in all_names:
+ if name.endswith("_ts1") or name.endswith("_ts2"):
+ continue
+ elif name in idents:
+ to_drop.add(name)
+ if "%s_ts1" % name in all_names:
+ to_drop.add("%s_ts1" % name)
+ if "%s_ts2" % name in all_names:
+ to_drop.add("%s_ts2" % name)
+
+ dropped = total = 0
+ for total, username in enumerate(to_drop, 1):
+ if _ora_drop_ignore(conn, username):
+ dropped += 1
+ log.info(
+ "Dropped %d out of %d stale databases detected", dropped, total
+ )
+
+
+@follower_url_from_main.for_db("oracle")
+def _oracle_follower_url_from_main(url, ident):
+ url = sa_url.make_url(url)
+ return url.set(username=ident, password="xe")
+
+
+@temp_table_keyword_args.for_db("oracle")
+def _oracle_temp_table_keyword_args(cfg, eng):
+ return {
+ "prefixes": ["GLOBAL TEMPORARY"],
+ "oracle_on_commit": "PRESERVE ROWS",
+ }
+
+
+@set_default_schema_on_connection.for_db("oracle")
+def _oracle_set_default_schema_on_connection(
+ cfg, dbapi_connection, schema_name
+):
+ cursor = dbapi_connection.cursor()
+ cursor.execute("ALTER SESSION SET CURRENT_SCHEMA=%s" % schema_name)
+ cursor.close()