diff options
Diffstat (limited to 'lib/sqlalchemy/dialects/oracle')
-rw-r--r-- | lib/sqlalchemy/dialects/oracle/__init__.py | 58 | ||||
-rw-r--r-- | lib/sqlalchemy/dialects/oracle/base.py | 2522 | ||||
-rw-r--r-- | lib/sqlalchemy/dialects/oracle/cx_oracle.py | 1424 | ||||
-rw-r--r-- | lib/sqlalchemy/dialects/oracle/provision.py | 160 |
4 files changed, 4164 insertions, 0 deletions
diff --git a/lib/sqlalchemy/dialects/oracle/__init__.py b/lib/sqlalchemy/dialects/oracle/__init__.py new file mode 100644 index 0000000..c83e057 --- /dev/null +++ b/lib/sqlalchemy/dialects/oracle/__init__.py @@ -0,0 +1,58 @@ +# oracle/__init__.py +# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php + +from . import base # noqa +from . import cx_oracle # noqa +from .base import BFILE +from .base import BINARY_DOUBLE +from .base import BINARY_FLOAT +from .base import BLOB +from .base import CHAR +from .base import CLOB +from .base import DATE +from .base import DOUBLE_PRECISION +from .base import FLOAT +from .base import INTERVAL +from .base import LONG +from .base import NCHAR +from .base import NCLOB +from .base import NUMBER +from .base import NVARCHAR +from .base import NVARCHAR2 +from .base import RAW +from .base import ROWID +from .base import TIMESTAMP +from .base import VARCHAR +from .base import VARCHAR2 + + +base.dialect = dialect = cx_oracle.dialect + +__all__ = ( + "VARCHAR", + "NVARCHAR", + "CHAR", + "NCHAR", + "DATE", + "NUMBER", + "BLOB", + "BFILE", + "CLOB", + "NCLOB", + "TIMESTAMP", + "RAW", + "FLOAT", + "DOUBLE_PRECISION", + "BINARY_DOUBLE", + "BINARY_FLOAT", + "LONG", + "dialect", + "INTERVAL", + "VARCHAR2", + "NVARCHAR2", + "ROWID", +) diff --git a/lib/sqlalchemy/dialects/oracle/base.py b/lib/sqlalchemy/dialects/oracle/base.py new file mode 100644 index 0000000..77f0dbd --- /dev/null +++ b/lib/sqlalchemy/dialects/oracle/base.py @@ -0,0 +1,2522 @@ +# oracle/base.py +# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php + +r""" +.. dialect:: oracle + :name: Oracle + :full_support: 11.2, 18c + :normal_support: 11+ + :best_effort: 8+ + + +Auto Increment Behavior +----------------------- + +SQLAlchemy Table objects which include integer primary keys are usually +assumed to have "autoincrementing" behavior, meaning they can generate their +own primary key values upon INSERT. For use within Oracle, two options are +available, which are the use of IDENTITY columns (Oracle 12 and above only) +or the association of a SEQUENCE with the column. + +Specifying GENERATED AS IDENTITY (Oracle 12 and above) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Starting from version 12 Oracle can make use of identity columns using +the :class:`_sql.Identity` to specify the autoincrementing behavior:: + + t = Table('mytable', metadata, + Column('id', Integer, Identity(start=3), primary_key=True), + Column(...), ... + ) + +The CREATE TABLE for the above :class:`_schema.Table` object would be: + +.. sourcecode:: sql + + CREATE TABLE mytable ( + id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 3), + ..., + PRIMARY KEY (id) + ) + +The :class:`_schema.Identity` object support many options to control the +"autoincrementing" behavior of the column, like the starting value, the +incrementing value, etc. +In addition to the standard options, Oracle supports setting +:paramref:`_schema.Identity.always` to ``None`` to use the default +generated mode, rendering GENERATED AS IDENTITY in the DDL. It also supports +setting :paramref:`_schema.Identity.on_null` to ``True`` to specify ON NULL +in conjunction with a 'BY DEFAULT' identity column. + +Using a SEQUENCE (all Oracle versions) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Older version of Oracle had no "autoincrement" +feature, SQLAlchemy relies upon sequences to produce these values. With the +older Oracle versions, *a sequence must always be explicitly specified to +enable autoincrement*. This is divergent with the majority of documentation +examples which assume the usage of an autoincrement-capable database. To +specify sequences, use the sqlalchemy.schema.Sequence object which is passed +to a Column construct:: + + t = Table('mytable', metadata, + Column('id', Integer, Sequence('id_seq'), primary_key=True), + Column(...), ... + ) + +This step is also required when using table reflection, i.e. autoload_with=engine:: + + t = Table('mytable', metadata, + Column('id', Integer, Sequence('id_seq'), primary_key=True), + autoload_with=engine + ) + +.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct + in a :class:`_schema.Column` to specify the option of an autoincrementing + column. + +.. _oracle_isolation_level: + +Transaction Isolation Level / Autocommit +---------------------------------------- + +The Oracle database supports "READ COMMITTED" and "SERIALIZABLE" modes of +isolation. The AUTOCOMMIT isolation level is also supported by the cx_Oracle +dialect. + +To set using per-connection execution options:: + + connection = engine.connect() + connection = connection.execution_options( + isolation_level="AUTOCOMMIT" + ) + +For ``READ COMMITTED`` and ``SERIALIZABLE``, the Oracle dialect sets the +level at the session level using ``ALTER SESSION``, which is reverted back +to its default setting when the connection is returned to the connection +pool. + +Valid values for ``isolation_level`` include: + +* ``READ COMMITTED`` +* ``AUTOCOMMIT`` +* ``SERIALIZABLE`` + +.. note:: The implementation for the + :meth:`_engine.Connection.get_isolation_level` method as implemented by the + Oracle dialect necessarily forces the start of a transaction using the + Oracle LOCAL_TRANSACTION_ID function; otherwise no level is normally + readable. + + Additionally, the :meth:`_engine.Connection.get_isolation_level` method will + raise an exception if the ``v$transaction`` view is not available due to + permissions or other reasons, which is a common occurrence in Oracle + installations. + + The cx_Oracle dialect attempts to call the + :meth:`_engine.Connection.get_isolation_level` method when the dialect makes + its first connection to the database in order to acquire the + "default"isolation level. This default level is necessary so that the level + can be reset on a connection after it has been temporarily modified using + :meth:`_engine.Connection.execution_options` method. In the common event + that the :meth:`_engine.Connection.get_isolation_level` method raises an + exception due to ``v$transaction`` not being readable as well as any other + database-related failure, the level is assumed to be "READ COMMITTED". No + warning is emitted for this initial first-connect condition as it is + expected to be a common restriction on Oracle databases. + +.. versionadded:: 1.3.16 added support for AUTOCOMMIT to the cx_oracle dialect + as well as the notion of a default isolation level + +.. versionadded:: 1.3.21 Added support for SERIALIZABLE as well as live + reading of the isolation level. + +.. versionchanged:: 1.3.22 In the event that the default isolation + level cannot be read due to permissions on the v$transaction view as + is common in Oracle installations, the default isolation level is hardcoded + to "READ COMMITTED" which was the behavior prior to 1.3.21. + +.. seealso:: + + :ref:`dbapi_autocommit` + +Identifier Casing +----------------- + +In Oracle, the data dictionary represents all case insensitive identifier +names using UPPERCASE text. SQLAlchemy on the other hand considers an +all-lower case identifier name to be case insensitive. The Oracle dialect +converts all case insensitive identifiers to and from those two formats during +schema level communication, such as reflection of tables and indexes. Using +an UPPERCASE name on the SQLAlchemy side indicates a case sensitive +identifier, and SQLAlchemy will quote the name - this will cause mismatches +against data dictionary data received from Oracle, so unless identifier names +have been truly created as case sensitive (i.e. using quoted names), all +lowercase names should be used on the SQLAlchemy side. + +.. _oracle_max_identifier_lengths: + +Max Identifier Lengths +---------------------- + +Oracle has changed the default max identifier length as of Oracle Server +version 12.2. Prior to this version, the length was 30, and for 12.2 and +greater it is now 128. This change impacts SQLAlchemy in the area of +generated SQL label names as well as the generation of constraint names, +particularly in the case where the constraint naming convention feature +described at :ref:`constraint_naming_conventions` is being used. + +To assist with this change and others, Oracle includes the concept of a +"compatibility" version, which is a version number that is independent of the +actual server version in order to assist with migration of Oracle databases, +and may be configured within the Oracle server itself. This compatibility +version is retrieved using the query ``SELECT value FROM v$parameter WHERE +name = 'compatible';``. The SQLAlchemy Oracle dialect, when tasked with +determining the default max identifier length, will attempt to use this query +upon first connect in order to determine the effective compatibility version of +the server, which determines what the maximum allowed identifier length is for +the server. If the table is not available, the server version information is +used instead. + +As of SQLAlchemy 1.4, the default max identifier length for the Oracle dialect +is 128 characters. Upon first connect, the compatibility version is detected +and if it is less than Oracle version 12.2, the max identifier length is +changed to be 30 characters. In all cases, setting the +:paramref:`_sa.create_engine.max_identifier_length` parameter will bypass this +change and the value given will be used as is:: + + engine = create_engine( + "oracle+cx_oracle://scott:tiger@oracle122", + max_identifier_length=30) + +The maximum identifier length comes into play both when generating anonymized +SQL labels in SELECT statements, but more crucially when generating constraint +names from a naming convention. It is this area that has created the need for +SQLAlchemy to change this default conservatively. For example, the following +naming convention produces two very different constraint names based on the +identifier length:: + + from sqlalchemy import Column + from sqlalchemy import Index + from sqlalchemy import Integer + from sqlalchemy import MetaData + from sqlalchemy import Table + from sqlalchemy.dialects import oracle + from sqlalchemy.schema import CreateIndex + + m = MetaData(naming_convention={"ix": "ix_%(column_0N_name)s"}) + + t = Table( + "t", + m, + Column("some_column_name_1", Integer), + Column("some_column_name_2", Integer), + Column("some_column_name_3", Integer), + ) + + ix = Index( + None, + t.c.some_column_name_1, + t.c.some_column_name_2, + t.c.some_column_name_3, + ) + + oracle_dialect = oracle.dialect(max_identifier_length=30) + print(CreateIndex(ix).compile(dialect=oracle_dialect)) + +With an identifier length of 30, the above CREATE INDEX looks like:: + + CREATE INDEX ix_some_column_name_1s_70cd ON t + (some_column_name_1, some_column_name_2, some_column_name_3) + +However with length=128, it becomes:: + + CREATE INDEX ix_some_column_name_1some_column_name_2some_column_name_3 ON t + (some_column_name_1, some_column_name_2, some_column_name_3) + +Applications which have run versions of SQLAlchemy prior to 1.4 on an Oracle +server version 12.2 or greater are therefore subject to the scenario of a +database migration that wishes to "DROP CONSTRAINT" on a name that was +previously generated with the shorter length. This migration will fail when +the identifier length is changed without the name of the index or constraint +first being adjusted. Such applications are strongly advised to make use of +:paramref:`_sa.create_engine.max_identifier_length` +in order to maintain control +of the generation of truncated names, and to fully review and test all database +migrations in a staging environment when changing this value to ensure that the +impact of this change has been mitigated. + +.. versionchanged:: 1.4 the default max_identifier_length for Oracle is 128 + characters, which is adjusted down to 30 upon first connect if an older + version of Oracle server (compatibility version < 12.2) is detected. + + +LIMIT/OFFSET/FETCH Support +-------------------------- + +Methods like :meth:`_sql.Select.limit` and :meth:`_sql.Select.offset` currently +use an emulated approach for LIMIT / OFFSET based on window functions, which +involves creation of a subquery using ``ROW_NUMBER`` that is prone to +performance issues as well as SQL construction issues for complex statements. +However, this approach is supported by all Oracle versions. See notes below. + +When using Oracle 12c and above, use the :meth:`_sql.Select.fetch` method +instead; this will render the more modern +``FETCH FIRST N ROW / OFFSET N ROWS`` syntax. + +Notes on LIMIT / OFFSET emulation (when fetch() method cannot be used) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +If using :meth:`_sql.Select.limit` and :meth:`_sql.Select.offset`, +or with the ORM the :meth:`_orm.Query.limit` and :meth:`_orm.Query.offset` methods, +and the :meth:`_sql.Select.fetch` method **cannot** be used instead, the following +notes apply: + +* SQLAlchemy currently makes use of ROWNUM to achieve + LIMIT/OFFSET; the exact methodology is taken from + https://blogs.oracle.com/oraclemagazine/on-rownum-and-limiting-results . + +* the "FIRST_ROWS()" optimization keyword is not used by default. To enable + the usage of this optimization directive, specify ``optimize_limits=True`` + to :func:`_sa.create_engine`. + + .. versionchanged:: 1.4 + The Oracle dialect renders limit/offset integer values using a "post + compile" scheme which renders the integer directly before passing the + statement to the cursor for execution. The ``use_binds_for_limits`` flag + no longer has an effect. + + .. seealso:: + + :ref:`change_4808`. + +* A future release may use ``FETCH FIRST N ROW / OFFSET N ROWS`` automatically + when :meth:`_sql.Select.limit`, :meth:`_sql.Select.offset`, :meth:`_orm.Query.limit`, + :meth:`_orm.Query.offset` are used. + +.. _oracle_returning: + +RETURNING Support +----------------- + +The Oracle database supports a limited form of RETURNING, in order to retrieve +result sets of matched rows from INSERT, UPDATE and DELETE statements. +Oracle's RETURNING..INTO syntax only supports one row being returned, as it +relies upon OUT parameters in order to function. In addition, supported +DBAPIs have further limitations (see :ref:`cx_oracle_returning`). + +SQLAlchemy's "implicit returning" feature, which employs RETURNING within an +INSERT and sometimes an UPDATE statement in order to fetch newly generated +primary key values and other SQL defaults and expressions, is normally enabled +on the Oracle backend. By default, "implicit returning" typically only +fetches the value of a single ``nextval(some_seq)`` expression embedded into +an INSERT in order to increment a sequence within an INSERT statement and get +the value back at the same time. To disable this feature across the board, +specify ``implicit_returning=False`` to :func:`_sa.create_engine`:: + + engine = create_engine("oracle://scott:tiger@dsn", + implicit_returning=False) + +Implicit returning can also be disabled on a table-by-table basis as a table +option:: + + # Core Table + my_table = Table("my_table", metadata, ..., implicit_returning=False) + + + # declarative + class MyClass(Base): + __tablename__ = 'my_table' + __table_args__ = {"implicit_returning": False} + +.. seealso:: + + :ref:`cx_oracle_returning` - additional cx_oracle-specific restrictions on + implicit returning. + +ON UPDATE CASCADE +----------------- + +Oracle doesn't have native ON UPDATE CASCADE functionality. A trigger based +solution is available at +https://asktom.oracle.com/tkyte/update_cascade/index.html . + +When using the SQLAlchemy ORM, the ORM has limited ability to manually issue +cascading updates - specify ForeignKey objects using the +"deferrable=True, initially='deferred'" keyword arguments, +and specify "passive_updates=False" on each relationship(). + +Oracle 8 Compatibility +---------------------- + +When Oracle 8 is detected, the dialect internally configures itself to the +following behaviors: + +* the use_ansi flag is set to False. This has the effect of converting all + JOIN phrases into the WHERE clause, and in the case of LEFT OUTER JOIN + makes use of Oracle's (+) operator. + +* the NVARCHAR2 and NCLOB datatypes are no longer generated as DDL when + the :class:`~sqlalchemy.types.Unicode` is used - VARCHAR2 and CLOB are + issued instead. This because these types don't seem to work correctly on + Oracle 8 even though they are available. The + :class:`~sqlalchemy.types.NVARCHAR` and + :class:`~sqlalchemy.dialects.oracle.NCLOB` types will always generate + NVARCHAR2 and NCLOB. + +* the "native unicode" mode is disabled when using cx_oracle, i.e. SQLAlchemy + encodes all Python unicode objects to "string" before passing in as bind + parameters. + +Synonym/DBLINK Reflection +------------------------- + +When using reflection with Table objects, the dialect can optionally search +for tables indicated by synonyms, either in local or remote schemas or +accessed over DBLINK, by passing the flag ``oracle_resolve_synonyms=True`` as +a keyword argument to the :class:`_schema.Table` construct:: + + some_table = Table('some_table', autoload_with=some_engine, + oracle_resolve_synonyms=True) + +When this flag is set, the given name (such as ``some_table`` above) will +be searched not just in the ``ALL_TABLES`` view, but also within the +``ALL_SYNONYMS`` view to see if this name is actually a synonym to another +name. If the synonym is located and refers to a DBLINK, the oracle dialect +knows how to locate the table's information using DBLINK syntax(e.g. +``@dblink``). + +``oracle_resolve_synonyms`` is accepted wherever reflection arguments are +accepted, including methods such as :meth:`_schema.MetaData.reflect` and +:meth:`_reflection.Inspector.get_columns`. + +If synonyms are not in use, this flag should be left disabled. + +.. _oracle_constraint_reflection: + +Constraint Reflection +--------------------- + +The Oracle dialect can return information about foreign key, unique, and +CHECK constraints, as well as indexes on tables. + +Raw information regarding these constraints can be acquired using +:meth:`_reflection.Inspector.get_foreign_keys`, +:meth:`_reflection.Inspector.get_unique_constraints`, +:meth:`_reflection.Inspector.get_check_constraints`, and +:meth:`_reflection.Inspector.get_indexes`. + +.. versionchanged:: 1.2 The Oracle dialect can now reflect UNIQUE and + CHECK constraints. + +When using reflection at the :class:`_schema.Table` level, the +:class:`_schema.Table` +will also include these constraints. + +Note the following caveats: + +* When using the :meth:`_reflection.Inspector.get_check_constraints` method, + Oracle + builds a special "IS NOT NULL" constraint for columns that specify + "NOT NULL". This constraint is **not** returned by default; to include + the "IS NOT NULL" constraints, pass the flag ``include_all=True``:: + + from sqlalchemy import create_engine, inspect + + engine = create_engine("oracle+cx_oracle://s:t@dsn") + inspector = inspect(engine) + all_check_constraints = inspector.get_check_constraints( + "some_table", include_all=True) + +* in most cases, when reflecting a :class:`_schema.Table`, + a UNIQUE constraint will + **not** be available as a :class:`.UniqueConstraint` object, as Oracle + mirrors unique constraints with a UNIQUE index in most cases (the exception + seems to be when two or more unique constraints represent the same columns); + the :class:`_schema.Table` will instead represent these using + :class:`.Index` + with the ``unique=True`` flag set. + +* Oracle creates an implicit index for the primary key of a table; this index + is **excluded** from all index results. + +* the list of columns reflected for an index will not include column names + that start with SYS_NC. + +Table names with SYSTEM/SYSAUX tablespaces +------------------------------------------- + +The :meth:`_reflection.Inspector.get_table_names` and +:meth:`_reflection.Inspector.get_temp_table_names` +methods each return a list of table names for the current engine. These methods +are also part of the reflection which occurs within an operation such as +:meth:`_schema.MetaData.reflect`. By default, +these operations exclude the ``SYSTEM`` +and ``SYSAUX`` tablespaces from the operation. In order to change this, the +default list of tablespaces excluded can be changed at the engine level using +the ``exclude_tablespaces`` parameter:: + + # exclude SYSAUX and SOME_TABLESPACE, but not SYSTEM + e = create_engine( + "oracle://scott:tiger@xe", + exclude_tablespaces=["SYSAUX", "SOME_TABLESPACE"]) + +.. versionadded:: 1.1 + +DateTime Compatibility +---------------------- + +Oracle has no datatype known as ``DATETIME``, it instead has only ``DATE``, +which can actually store a date and time value. For this reason, the Oracle +dialect provides a type :class:`_oracle.DATE` which is a subclass of +:class:`.DateTime`. This type has no special behavior, and is only +present as a "marker" for this type; additionally, when a database column +is reflected and the type is reported as ``DATE``, the time-supporting +:class:`_oracle.DATE` type is used. + +.. versionchanged:: 0.9.4 Added :class:`_oracle.DATE` to subclass + :class:`.DateTime`. This is a change as previous versions + would reflect a ``DATE`` column as :class:`_types.DATE`, which subclasses + :class:`.Date`. The only significance here is for schemes that are + examining the type of column for use in special Python translations or + for migrating schemas to other database backends. + +.. _oracle_table_options: + +Oracle Table Options +------------------------- + +The CREATE TABLE phrase supports the following options with Oracle +in conjunction with the :class:`_schema.Table` construct: + + +* ``ON COMMIT``:: + + Table( + "some_table", metadata, ..., + prefixes=['GLOBAL TEMPORARY'], oracle_on_commit='PRESERVE ROWS') + +.. versionadded:: 1.0.0 + +* ``COMPRESS``:: + + Table('mytable', metadata, Column('data', String(32)), + oracle_compress=True) + + Table('mytable', metadata, Column('data', String(32)), + oracle_compress=6) + + The ``oracle_compress`` parameter accepts either an integer compression + level, or ``True`` to use the default compression level. + +.. versionadded:: 1.0.0 + +.. _oracle_index_options: + +Oracle Specific Index Options +----------------------------- + +Bitmap Indexes +~~~~~~~~~~~~~~ + +You can specify the ``oracle_bitmap`` parameter to create a bitmap index +instead of a B-tree index:: + + Index('my_index', my_table.c.data, oracle_bitmap=True) + +Bitmap indexes cannot be unique and cannot be compressed. SQLAlchemy will not +check for such limitations, only the database will. + +.. versionadded:: 1.0.0 + +Index compression +~~~~~~~~~~~~~~~~~ + +Oracle has a more efficient storage mode for indexes containing lots of +repeated values. Use the ``oracle_compress`` parameter to turn on key +compression:: + + Index('my_index', my_table.c.data, oracle_compress=True) + + Index('my_index', my_table.c.data1, my_table.c.data2, unique=True, + oracle_compress=1) + +The ``oracle_compress`` parameter accepts either an integer specifying the +number of prefix columns to compress, or ``True`` to use the default (all +columns for non-unique indexes, all but the last column for unique indexes). + +.. versionadded:: 1.0.0 + +""" # noqa + +from itertools import groupby +import re + +from ... import Computed +from ... import exc +from ... import schema as sa_schema +from ... import sql +from ... import util +from ...engine import default +from ...engine import reflection +from ...sql import compiler +from ...sql import expression +from ...sql import sqltypes +from ...sql import util as sql_util +from ...sql import visitors +from ...types import BLOB +from ...types import CHAR +from ...types import CLOB +from ...types import FLOAT +from ...types import INTEGER +from ...types import NCHAR +from ...types import NVARCHAR +from ...types import TIMESTAMP +from ...types import VARCHAR +from ...util import compat + +RESERVED_WORDS = set( + "SHARE RAW DROP BETWEEN FROM DESC OPTION PRIOR LONG THEN " + "DEFAULT ALTER IS INTO MINUS INTEGER NUMBER GRANT IDENTIFIED " + "ALL TO ORDER ON FLOAT DATE HAVING CLUSTER NOWAIT RESOURCE " + "ANY TABLE INDEX FOR UPDATE WHERE CHECK SMALLINT WITH DELETE " + "BY ASC REVOKE LIKE SIZE RENAME NOCOMPRESS NULL GROUP VALUES " + "AS IN VIEW EXCLUSIVE COMPRESS SYNONYM SELECT INSERT EXISTS " + "NOT TRIGGER ELSE CREATE INTERSECT PCTFREE DISTINCT USER " + "CONNECT SET MODE OF UNIQUE VARCHAR2 VARCHAR LOCK OR CHAR " + "DECIMAL UNION PUBLIC AND START UID COMMENT CURRENT LEVEL".split() +) + +NO_ARG_FNS = set( + "UID CURRENT_DATE SYSDATE USER " "CURRENT_TIME CURRENT_TIMESTAMP".split() +) + + +class RAW(sqltypes._Binary): + __visit_name__ = "RAW" + + +OracleRaw = RAW + + +class NCLOB(sqltypes.Text): + __visit_name__ = "NCLOB" + + +class VARCHAR2(VARCHAR): + __visit_name__ = "VARCHAR2" + + +NVARCHAR2 = NVARCHAR + + +class NUMBER(sqltypes.Numeric, sqltypes.Integer): + __visit_name__ = "NUMBER" + + def __init__(self, precision=None, scale=None, asdecimal=None): + if asdecimal is None: + asdecimal = bool(scale and scale > 0) + + super(NUMBER, self).__init__( + precision=precision, scale=scale, asdecimal=asdecimal + ) + + def adapt(self, impltype): + ret = super(NUMBER, self).adapt(impltype) + # leave a hint for the DBAPI handler + ret._is_oracle_number = True + return ret + + @property + def _type_affinity(self): + if bool(self.scale and self.scale > 0): + return sqltypes.Numeric + else: + return sqltypes.Integer + + +class DOUBLE_PRECISION(sqltypes.Float): + __visit_name__ = "DOUBLE_PRECISION" + + +class BINARY_DOUBLE(sqltypes.Float): + __visit_name__ = "BINARY_DOUBLE" + + +class BINARY_FLOAT(sqltypes.Float): + __visit_name__ = "BINARY_FLOAT" + + +class BFILE(sqltypes.LargeBinary): + __visit_name__ = "BFILE" + + +class LONG(sqltypes.Text): + __visit_name__ = "LONG" + + +class DATE(sqltypes.DateTime): + """Provide the oracle DATE type. + + This type has no special Python behavior, except that it subclasses + :class:`_types.DateTime`; this is to suit the fact that the Oracle + ``DATE`` type supports a time value. + + .. versionadded:: 0.9.4 + + """ + + __visit_name__ = "DATE" + + def _compare_type_affinity(self, other): + return other._type_affinity in (sqltypes.DateTime, sqltypes.Date) + + +class INTERVAL(sqltypes.NativeForEmulated, sqltypes._AbstractInterval): + __visit_name__ = "INTERVAL" + + def __init__(self, day_precision=None, second_precision=None): + """Construct an INTERVAL. + + Note that only DAY TO SECOND intervals are currently supported. + This is due to a lack of support for YEAR TO MONTH intervals + within available DBAPIs. + + :param day_precision: the day precision value. this is the number of + digits to store for the day field. Defaults to "2" + :param second_precision: the second precision value. this is the + number of digits to store for the fractional seconds field. + Defaults to "6". + + """ + self.day_precision = day_precision + self.second_precision = second_precision + + @classmethod + def _adapt_from_generic_interval(cls, interval): + return INTERVAL( + day_precision=interval.day_precision, + second_precision=interval.second_precision, + ) + + @property + def _type_affinity(self): + return sqltypes.Interval + + def as_generic(self, allow_nulltype=False): + return sqltypes.Interval( + native=True, + second_precision=self.second_precision, + day_precision=self.day_precision, + ) + + def coerce_compared_value(self, op, value): + return self + + +class ROWID(sqltypes.TypeEngine): + """Oracle ROWID type. + + When used in a cast() or similar, generates ROWID. + + """ + + __visit_name__ = "ROWID" + + +class _OracleBoolean(sqltypes.Boolean): + def get_dbapi_type(self, dbapi): + return dbapi.NUMBER + + +colspecs = { + sqltypes.Boolean: _OracleBoolean, + sqltypes.Interval: INTERVAL, + sqltypes.DateTime: DATE, +} + +ischema_names = { + "VARCHAR2": VARCHAR, + "NVARCHAR2": NVARCHAR, + "CHAR": CHAR, + "NCHAR": NCHAR, + "DATE": DATE, + "NUMBER": NUMBER, + "BLOB": BLOB, + "BFILE": BFILE, + "CLOB": CLOB, + "NCLOB": NCLOB, + "TIMESTAMP": TIMESTAMP, + "TIMESTAMP WITH TIME ZONE": TIMESTAMP, + "INTERVAL DAY TO SECOND": INTERVAL, + "RAW": RAW, + "FLOAT": FLOAT, + "DOUBLE PRECISION": DOUBLE_PRECISION, + "LONG": LONG, + "BINARY_DOUBLE": BINARY_DOUBLE, + "BINARY_FLOAT": BINARY_FLOAT, +} + + +class OracleTypeCompiler(compiler.GenericTypeCompiler): + # Note: + # Oracle DATE == DATETIME + # Oracle does not allow milliseconds in DATE + # Oracle does not support TIME columns + + def visit_datetime(self, type_, **kw): + return self.visit_DATE(type_, **kw) + + def visit_float(self, type_, **kw): + return self.visit_FLOAT(type_, **kw) + + def visit_unicode(self, type_, **kw): + if self.dialect._use_nchar_for_unicode: + return self.visit_NVARCHAR2(type_, **kw) + else: + return self.visit_VARCHAR2(type_, **kw) + + def visit_INTERVAL(self, type_, **kw): + return "INTERVAL DAY%s TO SECOND%s" % ( + type_.day_precision is not None + and "(%d)" % type_.day_precision + or "", + type_.second_precision is not None + and "(%d)" % type_.second_precision + or "", + ) + + def visit_LONG(self, type_, **kw): + return "LONG" + + def visit_TIMESTAMP(self, type_, **kw): + if type_.timezone: + return "TIMESTAMP WITH TIME ZONE" + else: + return "TIMESTAMP" + + def visit_DOUBLE_PRECISION(self, type_, **kw): + return self._generate_numeric(type_, "DOUBLE PRECISION", **kw) + + def visit_BINARY_DOUBLE(self, type_, **kw): + return self._generate_numeric(type_, "BINARY_DOUBLE", **kw) + + def visit_BINARY_FLOAT(self, type_, **kw): + return self._generate_numeric(type_, "BINARY_FLOAT", **kw) + + def visit_FLOAT(self, type_, **kw): + # don't support conversion between decimal/binary + # precision yet + kw["no_precision"] = True + return self._generate_numeric(type_, "FLOAT", **kw) + + def visit_NUMBER(self, type_, **kw): + return self._generate_numeric(type_, "NUMBER", **kw) + + def _generate_numeric( + self, type_, name, precision=None, scale=None, no_precision=False, **kw + ): + if precision is None: + precision = type_.precision + + if scale is None: + scale = getattr(type_, "scale", None) + + if no_precision or precision is None: + return name + elif scale is None: + n = "%(name)s(%(precision)s)" + return n % {"name": name, "precision": precision} + else: + n = "%(name)s(%(precision)s, %(scale)s)" + return n % {"name": name, "precision": precision, "scale": scale} + + def visit_string(self, type_, **kw): + return self.visit_VARCHAR2(type_, **kw) + + def visit_VARCHAR2(self, type_, **kw): + return self._visit_varchar(type_, "", "2") + + def visit_NVARCHAR2(self, type_, **kw): + return self._visit_varchar(type_, "N", "2") + + visit_NVARCHAR = visit_NVARCHAR2 + + def visit_VARCHAR(self, type_, **kw): + return self._visit_varchar(type_, "", "") + + def _visit_varchar(self, type_, n, num): + if not type_.length: + return "%(n)sVARCHAR%(two)s" % {"two": num, "n": n} + elif not n and self.dialect._supports_char_length: + varchar = "VARCHAR%(two)s(%(length)s CHAR)" + return varchar % {"length": type_.length, "two": num} + else: + varchar = "%(n)sVARCHAR%(two)s(%(length)s)" + return varchar % {"length": type_.length, "two": num, "n": n} + + def visit_text(self, type_, **kw): + return self.visit_CLOB(type_, **kw) + + def visit_unicode_text(self, type_, **kw): + if self.dialect._use_nchar_for_unicode: + return self.visit_NCLOB(type_, **kw) + else: + return self.visit_CLOB(type_, **kw) + + def visit_large_binary(self, type_, **kw): + return self.visit_BLOB(type_, **kw) + + def visit_big_integer(self, type_, **kw): + return self.visit_NUMBER(type_, precision=19, **kw) + + def visit_boolean(self, type_, **kw): + return self.visit_SMALLINT(type_, **kw) + + def visit_RAW(self, type_, **kw): + if type_.length: + return "RAW(%(length)s)" % {"length": type_.length} + else: + return "RAW" + + def visit_ROWID(self, type_, **kw): + return "ROWID" + + +class OracleCompiler(compiler.SQLCompiler): + """Oracle compiler modifies the lexical structure of Select + statements to work under non-ANSI configured Oracle databases, if + the use_ansi flag is False. + """ + + compound_keywords = util.update_copy( + compiler.SQLCompiler.compound_keywords, + {expression.CompoundSelect.EXCEPT: "MINUS"}, + ) + + def __init__(self, *args, **kwargs): + self.__wheres = {} + super(OracleCompiler, self).__init__(*args, **kwargs) + + def visit_mod_binary(self, binary, operator, **kw): + return "mod(%s, %s)" % ( + self.process(binary.left, **kw), + self.process(binary.right, **kw), + ) + + def visit_now_func(self, fn, **kw): + return "CURRENT_TIMESTAMP" + + def visit_char_length_func(self, fn, **kw): + return "LENGTH" + self.function_argspec(fn, **kw) + + def visit_match_op_binary(self, binary, operator, **kw): + return "CONTAINS (%s, %s)" % ( + self.process(binary.left), + self.process(binary.right), + ) + + def visit_true(self, expr, **kw): + return "1" + + def visit_false(self, expr, **kw): + return "0" + + def get_cte_preamble(self, recursive): + return "WITH" + + def get_select_hint_text(self, byfroms): + return " ".join("/*+ %s */" % text for table, text in byfroms.items()) + + def function_argspec(self, fn, **kw): + if len(fn.clauses) > 0 or fn.name.upper() not in NO_ARG_FNS: + return compiler.SQLCompiler.function_argspec(self, fn, **kw) + else: + return "" + + def visit_function(self, func, **kw): + text = super(OracleCompiler, self).visit_function(func, **kw) + if kw.get("asfrom", False): + text = "TABLE (%s)" % func + return text + + def visit_table_valued_column(self, element, **kw): + text = super(OracleCompiler, self).visit_table_valued_column( + element, **kw + ) + text = "COLUMN_VALUE " + text + return text + + def default_from(self): + """Called when a ``SELECT`` statement has no froms, + and no ``FROM`` clause is to be appended. + + The Oracle compiler tacks a "FROM DUAL" to the statement. + """ + + return " FROM DUAL" + + def visit_join(self, join, from_linter=None, **kwargs): + if self.dialect.use_ansi: + return compiler.SQLCompiler.visit_join( + self, join, from_linter=from_linter, **kwargs + ) + else: + if from_linter: + from_linter.edges.add((join.left, join.right)) + + kwargs["asfrom"] = True + if isinstance(join.right, expression.FromGrouping): + right = join.right.element + else: + right = join.right + return ( + self.process(join.left, from_linter=from_linter, **kwargs) + + ", " + + self.process(right, from_linter=from_linter, **kwargs) + ) + + def _get_nonansi_join_whereclause(self, froms): + clauses = [] + + def visit_join(join): + if join.isouter: + # https://docs.oracle.com/database/121/SQLRF/queries006.htm#SQLRF52354 + # "apply the outer join operator (+) to all columns of B in + # the join condition in the WHERE clause" - that is, + # unconditionally regardless of operator or the other side + def visit_binary(binary): + if isinstance( + binary.left, expression.ColumnClause + ) and join.right.is_derived_from(binary.left.table): + binary.left = _OuterJoinColumn(binary.left) + elif isinstance( + binary.right, expression.ColumnClause + ) and join.right.is_derived_from(binary.right.table): + binary.right = _OuterJoinColumn(binary.right) + + clauses.append( + visitors.cloned_traverse( + join.onclause, {}, {"binary": visit_binary} + ) + ) + else: + clauses.append(join.onclause) + + for j in join.left, join.right: + if isinstance(j, expression.Join): + visit_join(j) + elif isinstance(j, expression.FromGrouping): + visit_join(j.element) + + for f in froms: + if isinstance(f, expression.Join): + visit_join(f) + + if not clauses: + return None + else: + return sql.and_(*clauses) + + def visit_outer_join_column(self, vc, **kw): + return self.process(vc.column, **kw) + "(+)" + + def visit_sequence(self, seq, **kw): + return self.preparer.format_sequence(seq) + ".nextval" + + def get_render_as_alias_suffix(self, alias_name_text): + """Oracle doesn't like ``FROM table AS alias``""" + + return " " + alias_name_text + + def returning_clause(self, stmt, returning_cols): + columns = [] + binds = [] + + for i, column in enumerate( + expression._select_iterables(returning_cols) + ): + if ( + self.isupdate + and isinstance(column, sa_schema.Column) + and isinstance(column.server_default, Computed) + and not self.dialect._supports_update_returning_computed_cols + ): + util.warn( + "Computed columns don't work with Oracle UPDATE " + "statements that use RETURNING; the value of the column " + "*before* the UPDATE takes place is returned. It is " + "advised to not use RETURNING with an Oracle computed " + "column. Consider setting implicit_returning to False on " + "the Table object in order to avoid implicit RETURNING " + "clauses from being generated for this Table." + ) + if column.type._has_column_expression: + col_expr = column.type.column_expression(column) + else: + col_expr = column + + outparam = sql.outparam("ret_%d" % i, type_=column.type) + self.binds[outparam.key] = outparam + binds.append( + self.bindparam_string(self._truncate_bindparam(outparam)) + ) + + # ensure the ExecutionContext.get_out_parameters() method is + # *not* called; the cx_Oracle dialect wants to handle these + # parameters separately + self.has_out_parameters = False + + columns.append(self.process(col_expr, within_columns_clause=False)) + + self._add_to_result_map( + getattr(col_expr, "name", col_expr._anon_name_label), + getattr(col_expr, "name", col_expr._anon_name_label), + ( + column, + getattr(column, "name", None), + getattr(column, "key", None), + ), + column.type, + ) + + return "RETURNING " + ", ".join(columns) + " INTO " + ", ".join(binds) + + def translate_select_structure(self, select_stmt, **kwargs): + select = select_stmt + + if not getattr(select, "_oracle_visit", None): + if not self.dialect.use_ansi: + froms = self._display_froms_for_select( + select, kwargs.get("asfrom", False) + ) + whereclause = self._get_nonansi_join_whereclause(froms) + if whereclause is not None: + select = select.where(whereclause) + select._oracle_visit = True + + # if fetch is used this is not needed + if ( + select._has_row_limiting_clause + and select._fetch_clause is None + ): + limit_clause = select._limit_clause + offset_clause = select._offset_clause + + if select._simple_int_clause(limit_clause): + limit_clause = limit_clause.render_literal_execute() + + if select._simple_int_clause(offset_clause): + offset_clause = offset_clause.render_literal_execute() + + # currently using form at: + # https://blogs.oracle.com/oraclemagazine/\ + # on-rownum-and-limiting-results + + orig_select = select + select = select._generate() + select._oracle_visit = True + + # add expressions to accommodate FOR UPDATE OF + for_update = select._for_update_arg + if for_update is not None and for_update.of: + for_update = for_update._clone() + for_update._copy_internals() + + for elem in for_update.of: + if not select.selected_columns.contains_column(elem): + select = select.add_columns(elem) + + # Wrap the middle select and add the hint + inner_subquery = select.alias() + limitselect = sql.select( + *[ + c + for c in inner_subquery.c + if orig_select.selected_columns.corresponding_column(c) + is not None + ] + ) + + if ( + limit_clause is not None + and self.dialect.optimize_limits + and select._simple_int_clause(limit_clause) + ): + limitselect = limitselect.prefix_with( + expression.text( + "/*+ FIRST_ROWS(%s) */" + % self.process(limit_clause, **kwargs) + ) + ) + + limitselect._oracle_visit = True + limitselect._is_wrapper = True + + # add expressions to accommodate FOR UPDATE OF + if for_update is not None and for_update.of: + + adapter = sql_util.ClauseAdapter(inner_subquery) + for_update.of = [ + adapter.traverse(elem) for elem in for_update.of + ] + + # If needed, add the limiting clause + if limit_clause is not None: + if select._simple_int_clause(limit_clause) and ( + offset_clause is None + or select._simple_int_clause(offset_clause) + ): + max_row = limit_clause + + if offset_clause is not None: + max_row = max_row + offset_clause + + else: + max_row = limit_clause + + if offset_clause is not None: + max_row = max_row + offset_clause + limitselect = limitselect.where( + sql.literal_column("ROWNUM") <= max_row + ) + + # If needed, add the ora_rn, and wrap again with offset. + if offset_clause is None: + limitselect._for_update_arg = for_update + select = limitselect + else: + limitselect = limitselect.add_columns( + sql.literal_column("ROWNUM").label("ora_rn") + ) + limitselect._oracle_visit = True + limitselect._is_wrapper = True + + if for_update is not None and for_update.of: + limitselect_cols = limitselect.selected_columns + for elem in for_update.of: + if ( + limitselect_cols.corresponding_column(elem) + is None + ): + limitselect = limitselect.add_columns(elem) + + limit_subquery = limitselect.alias() + origselect_cols = orig_select.selected_columns + offsetselect = sql.select( + *[ + c + for c in limit_subquery.c + if origselect_cols.corresponding_column(c) + is not None + ] + ) + + offsetselect._oracle_visit = True + offsetselect._is_wrapper = True + + if for_update is not None and for_update.of: + adapter = sql_util.ClauseAdapter(limit_subquery) + for_update.of = [ + adapter.traverse(elem) for elem in for_update.of + ] + + offsetselect = offsetselect.where( + sql.literal_column("ora_rn") > offset_clause + ) + + offsetselect._for_update_arg = for_update + select = offsetselect + + return select + + def limit_clause(self, select, **kw): + return "" + + def visit_empty_set_expr(self, type_): + return "SELECT 1 FROM DUAL WHERE 1!=1" + + def for_update_clause(self, select, **kw): + if self.is_subquery(): + return "" + + tmp = " FOR UPDATE" + + if select._for_update_arg.of: + tmp += " OF " + ", ".join( + self.process(elem, **kw) for elem in select._for_update_arg.of + ) + + if select._for_update_arg.nowait: + tmp += " NOWAIT" + if select._for_update_arg.skip_locked: + tmp += " SKIP LOCKED" + + return tmp + + def visit_is_distinct_from_binary(self, binary, operator, **kw): + return "DECODE(%s, %s, 0, 1) = 1" % ( + self.process(binary.left), + self.process(binary.right), + ) + + def visit_is_not_distinct_from_binary(self, binary, operator, **kw): + return "DECODE(%s, %s, 0, 1) = 0" % ( + self.process(binary.left), + self.process(binary.right), + ) + + def _get_regexp_args(self, binary, kw): + string = self.process(binary.left, **kw) + pattern = self.process(binary.right, **kw) + flags = binary.modifiers["flags"] + if flags is not None: + flags = self.process(flags, **kw) + return string, pattern, flags + + def visit_regexp_match_op_binary(self, binary, operator, **kw): + string, pattern, flags = self._get_regexp_args(binary, kw) + if flags is None: + return "REGEXP_LIKE(%s, %s)" % (string, pattern) + else: + return "REGEXP_LIKE(%s, %s, %s)" % (string, pattern, flags) + + def visit_not_regexp_match_op_binary(self, binary, operator, **kw): + return "NOT %s" % self.visit_regexp_match_op_binary( + binary, operator, **kw + ) + + def visit_regexp_replace_op_binary(self, binary, operator, **kw): + string, pattern, flags = self._get_regexp_args(binary, kw) + replacement = self.process(binary.modifiers["replacement"], **kw) + if flags is None: + return "REGEXP_REPLACE(%s, %s, %s)" % ( + string, + pattern, + replacement, + ) + else: + return "REGEXP_REPLACE(%s, %s, %s, %s)" % ( + string, + pattern, + replacement, + flags, + ) + + +class OracleDDLCompiler(compiler.DDLCompiler): + def define_constraint_cascades(self, constraint): + text = "" + if constraint.ondelete is not None: + text += " ON DELETE %s" % constraint.ondelete + + # oracle has no ON UPDATE CASCADE - + # its only available via triggers + # https://asktom.oracle.com/tkyte/update_cascade/index.html + if constraint.onupdate is not None: + util.warn( + "Oracle does not contain native UPDATE CASCADE " + "functionality - onupdates will not be rendered for foreign " + "keys. Consider using deferrable=True, initially='deferred' " + "or triggers." + ) + + return text + + def visit_drop_table_comment(self, drop): + return "COMMENT ON TABLE %s IS ''" % self.preparer.format_table( + drop.element + ) + + def visit_create_index(self, create): + index = create.element + self._verify_index_table(index) + preparer = self.preparer + text = "CREATE " + if index.unique: + text += "UNIQUE " + if index.dialect_options["oracle"]["bitmap"]: + text += "BITMAP " + text += "INDEX %s ON %s (%s)" % ( + self._prepared_index_name(index, include_schema=True), + preparer.format_table(index.table, use_schema=True), + ", ".join( + self.sql_compiler.process( + expr, include_table=False, literal_binds=True + ) + for expr in index.expressions + ), + ) + if index.dialect_options["oracle"]["compress"] is not False: + if index.dialect_options["oracle"]["compress"] is True: + text += " COMPRESS" + else: + text += " COMPRESS %d" % ( + index.dialect_options["oracle"]["compress"] + ) + return text + + def post_create_table(self, table): + table_opts = [] + opts = table.dialect_options["oracle"] + + if opts["on_commit"]: + on_commit_options = opts["on_commit"].replace("_", " ").upper() + table_opts.append("\n ON COMMIT %s" % on_commit_options) + + if opts["compress"]: + if opts["compress"] is True: + table_opts.append("\n COMPRESS") + else: + table_opts.append("\n COMPRESS FOR %s" % (opts["compress"])) + + return "".join(table_opts) + + def get_identity_options(self, identity_options): + text = super(OracleDDLCompiler, self).get_identity_options( + identity_options + ) + text = text.replace("NO MINVALUE", "NOMINVALUE") + text = text.replace("NO MAXVALUE", "NOMAXVALUE") + text = text.replace("NO CYCLE", "NOCYCLE") + text = text.replace("NO ORDER", "NOORDER") + return text + + def visit_computed_column(self, generated): + text = "GENERATED ALWAYS AS (%s)" % self.sql_compiler.process( + generated.sqltext, include_table=False, literal_binds=True + ) + if generated.persisted is True: + raise exc.CompileError( + "Oracle computed columns do not support 'stored' persistence; " + "set the 'persisted' flag to None or False for Oracle support." + ) + elif generated.persisted is False: + text += " VIRTUAL" + return text + + def visit_identity_column(self, identity, **kw): + if identity.always is None: + kind = "" + else: + kind = "ALWAYS" if identity.always else "BY DEFAULT" + text = "GENERATED %s" % kind + if identity.on_null: + text += " ON NULL" + text += " AS IDENTITY" + options = self.get_identity_options(identity) + if options: + text += " (%s)" % options + return text + + +class OracleIdentifierPreparer(compiler.IdentifierPreparer): + + reserved_words = {x.lower() for x in RESERVED_WORDS} + illegal_initial_characters = {str(dig) for dig in range(0, 10)}.union( + ["_", "$"] + ) + + def _bindparam_requires_quotes(self, value): + """Return True if the given identifier requires quoting.""" + lc_value = value.lower() + return ( + lc_value in self.reserved_words + or value[0] in self.illegal_initial_characters + or not self.legal_characters.match(util.text_type(value)) + ) + + def format_savepoint(self, savepoint): + name = savepoint.ident.lstrip("_") + return super(OracleIdentifierPreparer, self).format_savepoint( + savepoint, name + ) + + +class OracleExecutionContext(default.DefaultExecutionContext): + def fire_sequence(self, seq, type_): + return self._execute_scalar( + "SELECT " + + self.identifier_preparer.format_sequence(seq) + + ".nextval FROM DUAL", + type_, + ) + + +class OracleDialect(default.DefaultDialect): + name = "oracle" + supports_statement_cache = True + supports_alter = True + supports_unicode_statements = False + supports_unicode_binds = False + max_identifier_length = 128 + + supports_simple_order_by_label = False + cte_follows_insert = True + + supports_sequences = True + sequences_optional = False + postfetch_lastrowid = False + + default_paramstyle = "named" + colspecs = colspecs + ischema_names = ischema_names + requires_name_normalize = True + + supports_comments = True + + supports_default_values = False + supports_default_metavalue = True + supports_empty_insert = False + supports_identity_columns = True + + statement_compiler = OracleCompiler + ddl_compiler = OracleDDLCompiler + type_compiler = OracleTypeCompiler + preparer = OracleIdentifierPreparer + execution_ctx_cls = OracleExecutionContext + + reflection_options = ("oracle_resolve_synonyms",) + + _use_nchar_for_unicode = False + + construct_arguments = [ + ( + sa_schema.Table, + {"resolve_synonyms": False, "on_commit": None, "compress": False}, + ), + (sa_schema.Index, {"bitmap": False, "compress": False}), + ] + + @util.deprecated_params( + use_binds_for_limits=( + "1.4", + "The ``use_binds_for_limits`` Oracle dialect parameter is " + "deprecated. The dialect now renders LIMIT /OFFSET integers " + "inline in all cases using a post-compilation hook, so that the " + "value is still represented by a 'bound parameter' on the Core " + "Expression side.", + ) + ) + def __init__( + self, + use_ansi=True, + optimize_limits=False, + use_binds_for_limits=None, + use_nchar_for_unicode=False, + exclude_tablespaces=("SYSTEM", "SYSAUX"), + **kwargs + ): + default.DefaultDialect.__init__(self, **kwargs) + self._use_nchar_for_unicode = use_nchar_for_unicode + self.use_ansi = use_ansi + self.optimize_limits = optimize_limits + self.exclude_tablespaces = exclude_tablespaces + + def initialize(self, connection): + super(OracleDialect, self).initialize(connection) + + self.implicit_returning = self.__dict__.get( + "implicit_returning", self.server_version_info > (10,) + ) + + if self._is_oracle_8: + self.colspecs = self.colspecs.copy() + self.colspecs.pop(sqltypes.Interval) + self.use_ansi = False + + self.supports_identity_columns = self.server_version_info >= (12,) + + def _get_effective_compat_server_version_info(self, connection): + # dialect does not need compat levels below 12.2, so don't query + # in those cases + + if self.server_version_info < (12, 2): + return self.server_version_info + try: + compat = connection.exec_driver_sql( + "SELECT value FROM v$parameter WHERE name = 'compatible'" + ).scalar() + except exc.DBAPIError: + compat = None + + if compat: + try: + return tuple(int(x) for x in compat.split(".")) + except: + return self.server_version_info + else: + return self.server_version_info + + @property + def _is_oracle_8(self): + return self.server_version_info and self.server_version_info < (9,) + + @property + def _supports_table_compression(self): + return self.server_version_info and self.server_version_info >= (10, 1) + + @property + def _supports_table_compress_for(self): + return self.server_version_info and self.server_version_info >= (11,) + + @property + def _supports_char_length(self): + return not self._is_oracle_8 + + @property + def _supports_update_returning_computed_cols(self): + # on version 18 this error is no longet present while it happens on 11 + # it may work also on versions before the 18 + return self.server_version_info and self.server_version_info >= (18,) + + def do_release_savepoint(self, connection, name): + # Oracle does not support RELEASE SAVEPOINT + pass + + def _check_max_identifier_length(self, connection): + if self._get_effective_compat_server_version_info(connection) < ( + 12, + 2, + ): + return 30 + else: + # use the default + return None + + def _check_unicode_returns(self, connection): + additional_tests = [ + expression.cast( + expression.literal_column("'test nvarchar2 returns'"), + sqltypes.NVARCHAR(60), + ) + ] + return super(OracleDialect, self)._check_unicode_returns( + connection, additional_tests + ) + + _isolation_lookup = ["READ COMMITTED", "SERIALIZABLE"] + + def get_isolation_level(self, connection): + raise NotImplementedError("implemented by cx_Oracle dialect") + + def get_default_isolation_level(self, dbapi_conn): + try: + return self.get_isolation_level(dbapi_conn) + except NotImplementedError: + raise + except: + return "READ COMMITTED" + + def set_isolation_level(self, connection, level): + raise NotImplementedError("implemented by cx_Oracle dialect") + + def has_table(self, connection, table_name, schema=None): + self._ensure_has_table_connection(connection) + + if not schema: + schema = self.default_schema_name + + cursor = connection.execute( + sql.text( + "SELECT table_name FROM all_tables " + "WHERE table_name = CAST(:name AS VARCHAR2(128)) " + "AND owner = CAST(:schema_name AS VARCHAR2(128))" + ), + dict( + name=self.denormalize_name(table_name), + schema_name=self.denormalize_name(schema), + ), + ) + return cursor.first() is not None + + def has_sequence(self, connection, sequence_name, schema=None): + if not schema: + schema = self.default_schema_name + cursor = connection.execute( + sql.text( + "SELECT sequence_name FROM all_sequences " + "WHERE sequence_name = :name AND " + "sequence_owner = :schema_name" + ), + dict( + name=self.denormalize_name(sequence_name), + schema_name=self.denormalize_name(schema), + ), + ) + return cursor.first() is not None + + def _get_default_schema_name(self, connection): + return self.normalize_name( + connection.exec_driver_sql( + "select sys_context( 'userenv', 'current_schema' ) from dual" + ).scalar() + ) + + def _resolve_synonym( + self, + connection, + desired_owner=None, + desired_synonym=None, + desired_table=None, + ): + """search for a local synonym matching the given desired owner/name. + + if desired_owner is None, attempts to locate a distinct owner. + + returns the actual name, owner, dblink name, and synonym name if + found. + """ + + q = ( + "SELECT owner, table_owner, table_name, db_link, " + "synonym_name FROM all_synonyms WHERE " + ) + clauses = [] + params = {} + if desired_synonym: + clauses.append( + "synonym_name = CAST(:synonym_name AS VARCHAR2(128))" + ) + params["synonym_name"] = desired_synonym + if desired_owner: + clauses.append("owner = CAST(:desired_owner AS VARCHAR2(128))") + params["desired_owner"] = desired_owner + if desired_table: + clauses.append("table_name = CAST(:tname AS VARCHAR2(128))") + params["tname"] = desired_table + + q += " AND ".join(clauses) + + result = connection.execution_options(future_result=True).execute( + sql.text(q), params + ) + if desired_owner: + row = result.mappings().first() + if row: + return ( + row["table_name"], + row["table_owner"], + row["db_link"], + row["synonym_name"], + ) + else: + return None, None, None, None + else: + rows = result.mappings().all() + if len(rows) > 1: + raise AssertionError( + "There are multiple tables visible to the schema, you " + "must specify owner" + ) + elif len(rows) == 1: + row = rows[0] + return ( + row["table_name"], + row["table_owner"], + row["db_link"], + row["synonym_name"], + ) + else: + return None, None, None, None + + @reflection.cache + def _prepare_reflection_args( + self, + connection, + table_name, + schema=None, + resolve_synonyms=False, + dblink="", + **kw + ): + + if resolve_synonyms: + actual_name, owner, dblink, synonym = self._resolve_synonym( + connection, + desired_owner=self.denormalize_name(schema), + desired_synonym=self.denormalize_name(table_name), + ) + else: + actual_name, owner, dblink, synonym = None, None, None, None + if not actual_name: + actual_name = self.denormalize_name(table_name) + + if dblink: + # using user_db_links here since all_db_links appears + # to have more restricted permissions. + # https://docs.oracle.com/cd/B28359_01/server.111/b28310/ds_admin005.htm + # will need to hear from more users if we are doing + # the right thing here. See [ticket:2619] + owner = connection.scalar( + sql.text( + "SELECT username FROM user_db_links " "WHERE db_link=:link" + ), + dict(link=dblink), + ) + dblink = "@" + dblink + elif not owner: + owner = self.denormalize_name(schema or self.default_schema_name) + + return (actual_name, owner, dblink or "", synonym) + + @reflection.cache + def get_schema_names(self, connection, **kw): + s = "SELECT username FROM all_users ORDER BY username" + cursor = connection.exec_driver_sql(s) + return [self.normalize_name(row[0]) for row in cursor] + + @reflection.cache + def get_table_names(self, connection, schema=None, **kw): + schema = self.denormalize_name(schema or self.default_schema_name) + + # note that table_names() isn't loading DBLINKed or synonym'ed tables + if schema is None: + schema = self.default_schema_name + + sql_str = "SELECT table_name FROM all_tables WHERE " + if self.exclude_tablespaces: + sql_str += ( + "nvl(tablespace_name, 'no tablespace') " + "NOT IN (%s) AND " + % (", ".join(["'%s'" % ts for ts in self.exclude_tablespaces])) + ) + sql_str += ( + "OWNER = :owner " "AND IOT_NAME IS NULL " "AND DURATION IS NULL" + ) + + cursor = connection.execute(sql.text(sql_str), dict(owner=schema)) + return [self.normalize_name(row[0]) for row in cursor] + + @reflection.cache + def get_temp_table_names(self, connection, **kw): + schema = self.denormalize_name(self.default_schema_name) + + sql_str = "SELECT table_name FROM all_tables WHERE " + if self.exclude_tablespaces: + sql_str += ( + "nvl(tablespace_name, 'no tablespace') " + "NOT IN (%s) AND " + % (", ".join(["'%s'" % ts for ts in self.exclude_tablespaces])) + ) + sql_str += ( + "OWNER = :owner " + "AND IOT_NAME IS NULL " + "AND DURATION IS NOT NULL" + ) + + cursor = connection.execute(sql.text(sql_str), dict(owner=schema)) + return [self.normalize_name(row[0]) for row in cursor] + + @reflection.cache + def get_view_names(self, connection, schema=None, **kw): + schema = self.denormalize_name(schema or self.default_schema_name) + s = sql.text("SELECT view_name FROM all_views WHERE owner = :owner") + cursor = connection.execute( + s, dict(owner=self.denormalize_name(schema)) + ) + return [self.normalize_name(row[0]) for row in cursor] + + @reflection.cache + def get_sequence_names(self, connection, schema=None, **kw): + if not schema: + schema = self.default_schema_name + cursor = connection.execute( + sql.text( + "SELECT sequence_name FROM all_sequences " + "WHERE sequence_owner = :schema_name" + ), + dict(schema_name=self.denormalize_name(schema)), + ) + return [self.normalize_name(row[0]) for row in cursor] + + @reflection.cache + def get_table_options(self, connection, table_name, schema=None, **kw): + options = {} + + resolve_synonyms = kw.get("oracle_resolve_synonyms", False) + dblink = kw.get("dblink", "") + info_cache = kw.get("info_cache") + + (table_name, schema, dblink, synonym) = self._prepare_reflection_args( + connection, + table_name, + schema, + resolve_synonyms, + dblink, + info_cache=info_cache, + ) + + params = {"table_name": table_name} + + columns = ["table_name"] + if self._supports_table_compression: + columns.append("compression") + if self._supports_table_compress_for: + columns.append("compress_for") + + text = ( + "SELECT %(columns)s " + "FROM ALL_TABLES%(dblink)s " + "WHERE table_name = CAST(:table_name AS VARCHAR(128))" + ) + + if schema is not None: + params["owner"] = schema + text += " AND owner = CAST(:owner AS VARCHAR(128)) " + text = text % {"dblink": dblink, "columns": ", ".join(columns)} + + result = connection.execute(sql.text(text), params) + + enabled = dict(DISABLED=False, ENABLED=True) + + row = result.first() + if row: + if "compression" in row._fields and enabled.get( + row.compression, False + ): + if "compress_for" in row._fields: + options["oracle_compress"] = row.compress_for + else: + options["oracle_compress"] = True + + return options + + @reflection.cache + def get_columns(self, connection, table_name, schema=None, **kw): + """ + + kw arguments can be: + + oracle_resolve_synonyms + + dblink + + """ + + resolve_synonyms = kw.get("oracle_resolve_synonyms", False) + dblink = kw.get("dblink", "") + info_cache = kw.get("info_cache") + + (table_name, schema, dblink, synonym) = self._prepare_reflection_args( + connection, + table_name, + schema, + resolve_synonyms, + dblink, + info_cache=info_cache, + ) + columns = [] + if self._supports_char_length: + char_length_col = "char_length" + else: + char_length_col = "data_length" + + if self.server_version_info >= (12,): + identity_cols = """\ + col.default_on_null, + ( + SELECT id.generation_type || ',' || id.IDENTITY_OPTIONS + FROM ALL_TAB_IDENTITY_COLS%(dblink)s id + WHERE col.table_name = id.table_name + AND col.column_name = id.column_name + AND col.owner = id.owner + ) AS identity_options""" % { + "dblink": dblink + } + else: + identity_cols = "NULL as default_on_null, NULL as identity_options" + + params = {"table_name": table_name} + + text = """ + SELECT + col.column_name, + col.data_type, + col.%(char_length_col)s, + col.data_precision, + col.data_scale, + col.nullable, + col.data_default, + com.comments, + col.virtual_column, + %(identity_cols)s + FROM all_tab_cols%(dblink)s col + LEFT JOIN all_col_comments%(dblink)s com + ON col.table_name = com.table_name + AND col.column_name = com.column_name + AND col.owner = com.owner + WHERE col.table_name = CAST(:table_name AS VARCHAR2(128)) + AND col.hidden_column = 'NO' + """ + if schema is not None: + params["owner"] = schema + text += " AND col.owner = :owner " + text += " ORDER BY col.column_id" + text = text % { + "dblink": dblink, + "char_length_col": char_length_col, + "identity_cols": identity_cols, + } + + c = connection.execute(sql.text(text), params) + + for row in c: + colname = self.normalize_name(row[0]) + orig_colname = row[0] + coltype = row[1] + length = row[2] + precision = row[3] + scale = row[4] + nullable = row[5] == "Y" + default = row[6] + comment = row[7] + generated = row[8] + default_on_nul = row[9] + identity_options = row[10] + + if coltype == "NUMBER": + if precision is None and scale == 0: + coltype = INTEGER() + else: + coltype = NUMBER(precision, scale) + elif coltype == "FLOAT": + # TODO: support "precision" here as "binary_precision" + coltype = FLOAT() + elif coltype in ("VARCHAR2", "NVARCHAR2", "CHAR", "NCHAR"): + coltype = self.ischema_names.get(coltype)(length) + elif "WITH TIME ZONE" in coltype: + coltype = TIMESTAMP(timezone=True) + else: + coltype = re.sub(r"\(\d+\)", "", coltype) + try: + coltype = self.ischema_names[coltype] + except KeyError: + util.warn( + "Did not recognize type '%s' of column '%s'" + % (coltype, colname) + ) + coltype = sqltypes.NULLTYPE + + if generated == "YES": + computed = dict(sqltext=default) + default = None + else: + computed = None + + if identity_options is not None: + identity = self._parse_identity_options( + identity_options, default_on_nul + ) + default = None + else: + identity = None + + cdict = { + "name": colname, + "type": coltype, + "nullable": nullable, + "default": default, + "autoincrement": "auto", + "comment": comment, + } + if orig_colname.lower() == orig_colname: + cdict["quote"] = True + if computed is not None: + cdict["computed"] = computed + if identity is not None: + cdict["identity"] = identity + + columns.append(cdict) + return columns + + def _parse_identity_options(self, identity_options, default_on_nul): + # identity_options is a string that starts with 'ALWAYS,' or + # 'BY DEFAULT,' and continues with + # START WITH: 1, INCREMENT BY: 1, MAX_VALUE: 123, MIN_VALUE: 1, + # CYCLE_FLAG: N, CACHE_SIZE: 1, ORDER_FLAG: N, SCALE_FLAG: N, + # EXTEND_FLAG: N, SESSION_FLAG: N, KEEP_VALUE: N + parts = [p.strip() for p in identity_options.split(",")] + identity = { + "always": parts[0] == "ALWAYS", + "on_null": default_on_nul == "YES", + } + + for part in parts[1:]: + option, value = part.split(":") + value = value.strip() + + if "START WITH" in option: + identity["start"] = compat.long_type(value) + elif "INCREMENT BY" in option: + identity["increment"] = compat.long_type(value) + elif "MAX_VALUE" in option: + identity["maxvalue"] = compat.long_type(value) + elif "MIN_VALUE" in option: + identity["minvalue"] = compat.long_type(value) + elif "CYCLE_FLAG" in option: + identity["cycle"] = value == "Y" + elif "CACHE_SIZE" in option: + identity["cache"] = compat.long_type(value) + elif "ORDER_FLAG" in option: + identity["order"] = value == "Y" + return identity + + @reflection.cache + def get_table_comment( + self, + connection, + table_name, + schema=None, + resolve_synonyms=False, + dblink="", + **kw + ): + + info_cache = kw.get("info_cache") + (table_name, schema, dblink, synonym) = self._prepare_reflection_args( + connection, + table_name, + schema, + resolve_synonyms, + dblink, + info_cache=info_cache, + ) + + if not schema: + schema = self.default_schema_name + + COMMENT_SQL = """ + SELECT comments + FROM all_tab_comments + WHERE table_name = CAST(:table_name AS VARCHAR(128)) + AND owner = CAST(:schema_name AS VARCHAR(128)) + """ + + c = connection.execute( + sql.text(COMMENT_SQL), + dict(table_name=table_name, schema_name=schema), + ) + return {"text": c.scalar()} + + @reflection.cache + def get_indexes( + self, + connection, + table_name, + schema=None, + resolve_synonyms=False, + dblink="", + **kw + ): + + info_cache = kw.get("info_cache") + (table_name, schema, dblink, synonym) = self._prepare_reflection_args( + connection, + table_name, + schema, + resolve_synonyms, + dblink, + info_cache=info_cache, + ) + indexes = [] + + params = {"table_name": table_name} + text = ( + "SELECT a.index_name, a.column_name, " + "\nb.index_type, b.uniqueness, b.compression, b.prefix_length " + "\nFROM ALL_IND_COLUMNS%(dblink)s a, " + "\nALL_INDEXES%(dblink)s b " + "\nWHERE " + "\na.index_name = b.index_name " + "\nAND a.table_owner = b.table_owner " + "\nAND a.table_name = b.table_name " + "\nAND a.table_name = CAST(:table_name AS VARCHAR(128))" + ) + + if schema is not None: + params["schema"] = schema + text += "AND a.table_owner = :schema " + + text += "ORDER BY a.index_name, a.column_position" + + text = text % {"dblink": dblink} + + q = sql.text(text) + rp = connection.execute(q, params) + indexes = [] + last_index_name = None + pk_constraint = self.get_pk_constraint( + connection, + table_name, + schema, + resolve_synonyms=resolve_synonyms, + dblink=dblink, + info_cache=kw.get("info_cache"), + ) + + uniqueness = dict(NONUNIQUE=False, UNIQUE=True) + enabled = dict(DISABLED=False, ENABLED=True) + + oracle_sys_col = re.compile(r"SYS_NC\d+\$", re.IGNORECASE) + + index = None + for rset in rp: + index_name_normalized = self.normalize_name(rset.index_name) + + # skip primary key index. This is refined as of + # [ticket:5421]. Note that ALL_INDEXES.GENERATED will by "Y" + # if the name of this index was generated by Oracle, however + # if a named primary key constraint was created then this flag + # is false. + if ( + pk_constraint + and index_name_normalized == pk_constraint["name"] + ): + continue + + if rset.index_name != last_index_name: + index = dict( + name=index_name_normalized, + column_names=[], + dialect_options={}, + ) + indexes.append(index) + index["unique"] = uniqueness.get(rset.uniqueness, False) + + if rset.index_type in ("BITMAP", "FUNCTION-BASED BITMAP"): + index["dialect_options"]["oracle_bitmap"] = True + if enabled.get(rset.compression, False): + index["dialect_options"][ + "oracle_compress" + ] = rset.prefix_length + + # filter out Oracle SYS_NC names. could also do an outer join + # to the all_tab_columns table and check for real col names there. + if not oracle_sys_col.match(rset.column_name): + index["column_names"].append( + self.normalize_name(rset.column_name) + ) + last_index_name = rset.index_name + + return indexes + + @reflection.cache + def _get_constraint_data( + self, connection, table_name, schema=None, dblink="", **kw + ): + + params = {"table_name": table_name} + + text = ( + "SELECT" + "\nac.constraint_name," # 0 + "\nac.constraint_type," # 1 + "\nloc.column_name AS local_column," # 2 + "\nrem.table_name AS remote_table," # 3 + "\nrem.column_name AS remote_column," # 4 + "\nrem.owner AS remote_owner," # 5 + "\nloc.position as loc_pos," # 6 + "\nrem.position as rem_pos," # 7 + "\nac.search_condition," # 8 + "\nac.delete_rule" # 9 + "\nFROM all_constraints%(dblink)s ac," + "\nall_cons_columns%(dblink)s loc," + "\nall_cons_columns%(dblink)s rem" + "\nWHERE ac.table_name = CAST(:table_name AS VARCHAR2(128))" + "\nAND ac.constraint_type IN ('R','P', 'U', 'C')" + ) + + if schema is not None: + params["owner"] = schema + text += "\nAND ac.owner = CAST(:owner AS VARCHAR2(128))" + + text += ( + "\nAND ac.owner = loc.owner" + "\nAND ac.constraint_name = loc.constraint_name" + "\nAND ac.r_owner = rem.owner(+)" + "\nAND ac.r_constraint_name = rem.constraint_name(+)" + "\nAND (rem.position IS NULL or loc.position=rem.position)" + "\nORDER BY ac.constraint_name, loc.position" + ) + + text = text % {"dblink": dblink} + rp = connection.execute(sql.text(text), params) + constraint_data = rp.fetchall() + return constraint_data + + @reflection.cache + def get_pk_constraint(self, connection, table_name, schema=None, **kw): + resolve_synonyms = kw.get("oracle_resolve_synonyms", False) + dblink = kw.get("dblink", "") + info_cache = kw.get("info_cache") + + (table_name, schema, dblink, synonym) = self._prepare_reflection_args( + connection, + table_name, + schema, + resolve_synonyms, + dblink, + info_cache=info_cache, + ) + pkeys = [] + constraint_name = None + constraint_data = self._get_constraint_data( + connection, + table_name, + schema, + dblink, + info_cache=kw.get("info_cache"), + ) + + for row in constraint_data: + ( + cons_name, + cons_type, + local_column, + remote_table, + remote_column, + remote_owner, + ) = row[0:2] + tuple([self.normalize_name(x) for x in row[2:6]]) + if cons_type == "P": + if constraint_name is None: + constraint_name = self.normalize_name(cons_name) + pkeys.append(local_column) + return {"constrained_columns": pkeys, "name": constraint_name} + + @reflection.cache + def get_foreign_keys(self, connection, table_name, schema=None, **kw): + """ + + kw arguments can be: + + oracle_resolve_synonyms + + dblink + + """ + requested_schema = schema # to check later on + resolve_synonyms = kw.get("oracle_resolve_synonyms", False) + dblink = kw.get("dblink", "") + info_cache = kw.get("info_cache") + + (table_name, schema, dblink, synonym) = self._prepare_reflection_args( + connection, + table_name, + schema, + resolve_synonyms, + dblink, + info_cache=info_cache, + ) + + constraint_data = self._get_constraint_data( + connection, + table_name, + schema, + dblink, + info_cache=kw.get("info_cache"), + ) + + def fkey_rec(): + return { + "name": None, + "constrained_columns": [], + "referred_schema": None, + "referred_table": None, + "referred_columns": [], + "options": {}, + } + + fkeys = util.defaultdict(fkey_rec) + + for row in constraint_data: + ( + cons_name, + cons_type, + local_column, + remote_table, + remote_column, + remote_owner, + ) = row[0:2] + tuple([self.normalize_name(x) for x in row[2:6]]) + + cons_name = self.normalize_name(cons_name) + + if cons_type == "R": + if remote_table is None: + # ticket 363 + util.warn( + ( + "Got 'None' querying 'table_name' from " + "all_cons_columns%(dblink)s - does the user have " + "proper rights to the table?" + ) + % {"dblink": dblink} + ) + continue + + rec = fkeys[cons_name] + rec["name"] = cons_name + local_cols, remote_cols = ( + rec["constrained_columns"], + rec["referred_columns"], + ) + + if not rec["referred_table"]: + if resolve_synonyms: + ( + ref_remote_name, + ref_remote_owner, + ref_dblink, + ref_synonym, + ) = self._resolve_synonym( + connection, + desired_owner=self.denormalize_name(remote_owner), + desired_table=self.denormalize_name(remote_table), + ) + if ref_synonym: + remote_table = self.normalize_name(ref_synonym) + remote_owner = self.normalize_name( + ref_remote_owner + ) + + rec["referred_table"] = remote_table + + if ( + requested_schema is not None + or self.denormalize_name(remote_owner) != schema + ): + rec["referred_schema"] = remote_owner + + if row[9] != "NO ACTION": + rec["options"]["ondelete"] = row[9] + + local_cols.append(local_column) + remote_cols.append(remote_column) + + return list(fkeys.values()) + + @reflection.cache + def get_unique_constraints( + self, connection, table_name, schema=None, **kw + ): + resolve_synonyms = kw.get("oracle_resolve_synonyms", False) + dblink = kw.get("dblink", "") + info_cache = kw.get("info_cache") + + (table_name, schema, dblink, synonym) = self._prepare_reflection_args( + connection, + table_name, + schema, + resolve_synonyms, + dblink, + info_cache=info_cache, + ) + + constraint_data = self._get_constraint_data( + connection, + table_name, + schema, + dblink, + info_cache=kw.get("info_cache"), + ) + + unique_keys = filter(lambda x: x[1] == "U", constraint_data) + uniques_group = groupby(unique_keys, lambda x: x[0]) + + index_names = { + ix["name"] + for ix in self.get_indexes(connection, table_name, schema=schema) + } + return [ + { + "name": name, + "column_names": cols, + "duplicates_index": name if name in index_names else None, + } + for name, cols in [ + [ + self.normalize_name(i[0]), + [self.normalize_name(x[2]) for x in i[1]], + ] + for i in uniques_group + ] + ] + + @reflection.cache + def get_view_definition( + self, + connection, + view_name, + schema=None, + resolve_synonyms=False, + dblink="", + **kw + ): + info_cache = kw.get("info_cache") + (view_name, schema, dblink, synonym) = self._prepare_reflection_args( + connection, + view_name, + schema, + resolve_synonyms, + dblink, + info_cache=info_cache, + ) + + params = {"view_name": view_name} + text = "SELECT text FROM all_views WHERE view_name=:view_name" + + if schema is not None: + text += " AND owner = :schema" + params["schema"] = schema + + rp = connection.execute(sql.text(text), params).scalar() + if rp: + if util.py2k: + rp = rp.decode(self.encoding) + return rp + else: + return None + + @reflection.cache + def get_check_constraints( + self, connection, table_name, schema=None, include_all=False, **kw + ): + resolve_synonyms = kw.get("oracle_resolve_synonyms", False) + dblink = kw.get("dblink", "") + info_cache = kw.get("info_cache") + + (table_name, schema, dblink, synonym) = self._prepare_reflection_args( + connection, + table_name, + schema, + resolve_synonyms, + dblink, + info_cache=info_cache, + ) + + constraint_data = self._get_constraint_data( + connection, + table_name, + schema, + dblink, + info_cache=kw.get("info_cache"), + ) + + check_constraints = filter(lambda x: x[1] == "C", constraint_data) + + return [ + {"name": self.normalize_name(cons[0]), "sqltext": cons[8]} + for cons in check_constraints + if include_all or not re.match(r"..+?. IS NOT NULL$", cons[8]) + ] + + +class _OuterJoinColumn(sql.ClauseElement): + __visit_name__ = "outer_join_column" + + def __init__(self, column): + self.column = column diff --git a/lib/sqlalchemy/dialects/oracle/cx_oracle.py b/lib/sqlalchemy/dialects/oracle/cx_oracle.py new file mode 100644 index 0000000..64029a4 --- /dev/null +++ b/lib/sqlalchemy/dialects/oracle/cx_oracle.py @@ -0,0 +1,1424 @@ +# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php + +r""" +.. dialect:: oracle+cx_oracle + :name: cx-Oracle + :dbapi: cx_oracle + :connectstring: oracle+cx_oracle://user:pass@hostname:port[/dbname][?service_name=<service>[&key=value&key=value...]] + :url: https://oracle.github.io/python-cx_Oracle/ + +DSN vs. Hostname connections +----------------------------- + +cx_Oracle provides several methods of indicating the target database. The +dialect translates from a series of different URL forms. + +Hostname Connections with Easy Connect Syntax +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Given a hostname, port and service name of the target Oracle Database, for +example from Oracle's `Easy Connect syntax +<https://cx-oracle.readthedocs.io/en/latest/user_guide/connection_handling.html#easy-connect-syntax-for-connection-strings>`_, +then connect in SQLAlchemy using the ``service_name`` query string parameter:: + + engine = create_engine("oracle+cx_oracle://scott:tiger@hostname:port/?service_name=myservice&encoding=UTF-8&nencoding=UTF-8") + +The `full Easy Connect syntax +<https://www.oracle.com/pls/topic/lookup?ctx=dblatest&id=GUID-B0437826-43C1-49EC-A94D-B650B6A4A6EE>`_ +is not supported. Instead, use a ``tnsnames.ora`` file and connect using a +DSN. + +Connections with tnsnames.ora or Oracle Cloud +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Alternatively, if no port, database name, or ``service_name`` is provided, the +dialect will use an Oracle DSN "connection string". This takes the "hostname" +portion of the URL as the data source name. For example, if the +``tnsnames.ora`` file contains a `Net Service Name +<https://cx-oracle.readthedocs.io/en/latest/user_guide/connection_handling.html#net-service-names-for-connection-strings>`_ +of ``myalias`` as below:: + + myalias = + (DESCRIPTION = + (ADDRESS = (PROTOCOL = TCP)(HOST = mymachine.example.com)(PORT = 1521)) + (CONNECT_DATA = + (SERVER = DEDICATED) + (SERVICE_NAME = orclpdb1) + ) + ) + +The cx_Oracle dialect connects to this database service when ``myalias`` is the +hostname portion of the URL, without specifying a port, database name or +``service_name``:: + + engine = create_engine("oracle+cx_oracle://scott:tiger@myalias/?encoding=UTF-8&nencoding=UTF-8") + +Users of Oracle Cloud should use this syntax and also configure the cloud +wallet as shown in cx_Oracle documentation `Connecting to Autononmous Databases +<https://cx-oracle.readthedocs.io/en/latest/user_guide/connection_handling.html#connecting-to-autononmous-databases>`_. + +SID Connections +^^^^^^^^^^^^^^^ + +To use Oracle's obsolete SID connection syntax, the SID can be passed in a +"database name" portion of the URL as below:: + + engine = create_engine("oracle+cx_oracle://scott:tiger@hostname:1521/dbname?encoding=UTF-8&nencoding=UTF-8") + +Above, the DSN passed to cx_Oracle is created by ``cx_Oracle.makedsn()`` as +follows:: + + >>> import cx_Oracle + >>> cx_Oracle.makedsn("hostname", 1521, sid="dbname") + '(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=hostname)(PORT=1521))(CONNECT_DATA=(SID=dbname)))' + +Passing cx_Oracle connect arguments +----------------------------------- + +Additional connection arguments can usually be passed via the URL +query string; particular symbols like ``cx_Oracle.SYSDBA`` are intercepted +and converted to the correct symbol:: + + e = create_engine( + "oracle+cx_oracle://user:pass@dsn?encoding=UTF-8&nencoding=UTF-8&mode=SYSDBA&events=true") + +.. versionchanged:: 1.3 the cx_oracle dialect now accepts all argument names + within the URL string itself, to be passed to the cx_Oracle DBAPI. As + was the case earlier but not correctly documented, the + :paramref:`_sa.create_engine.connect_args` parameter also accepts all + cx_Oracle DBAPI connect arguments. + +To pass arguments directly to ``.connect()`` without using the query +string, use the :paramref:`_sa.create_engine.connect_args` dictionary. +Any cx_Oracle parameter value and/or constant may be passed, such as:: + + import cx_Oracle + e = create_engine( + "oracle+cx_oracle://user:pass@dsn", + connect_args={ + "encoding": "UTF-8", + "nencoding": "UTF-8", + "mode": cx_Oracle.SYSDBA, + "events": True + } + ) + +Note that the default value for ``encoding`` and ``nencoding`` was changed to +"UTF-8" in cx_Oracle 8.0 so these parameters can be omitted when using that +version, or later. + +Options consumed by the SQLAlchemy cx_Oracle dialect outside of the driver +-------------------------------------------------------------------------- + +There are also options that are consumed by the SQLAlchemy cx_oracle dialect +itself. These options are always passed directly to :func:`_sa.create_engine` +, such as:: + + e = create_engine( + "oracle+cx_oracle://user:pass@dsn", coerce_to_unicode=False) + +The parameters accepted by the cx_oracle dialect are as follows: + +* ``arraysize`` - set the cx_oracle.arraysize value on cursors, defaulted + to 50. This setting is significant with cx_Oracle as the contents of LOB + objects are only readable within a "live" row (e.g. within a batch of + 50 rows). + +* ``auto_convert_lobs`` - defaults to True; See :ref:`cx_oracle_lob`. + +* ``coerce_to_unicode`` - see :ref:`cx_oracle_unicode` for detail. + +* ``coerce_to_decimal`` - see :ref:`cx_oracle_numeric` for detail. + +* ``encoding_errors`` - see :ref:`cx_oracle_unicode_encoding_errors` for detail. + +.. _cx_oracle_sessionpool: + +Using cx_Oracle SessionPool +--------------------------- + +The cx_Oracle library provides its own connection pool implementation that may +be used in place of SQLAlchemy's pooling functionality. This can be achieved +by using the :paramref:`_sa.create_engine.creator` parameter to provide a +function that returns a new connection, along with setting +:paramref:`_sa.create_engine.pool_class` to ``NullPool`` to disable +SQLAlchemy's pooling:: + + import cx_Oracle + from sqlalchemy import create_engine + from sqlalchemy.pool import NullPool + + pool = cx_Oracle.SessionPool( + user="scott", password="tiger", dsn="orclpdb", + min=2, max=5, increment=1, threaded=True, + encoding="UTF-8", nencoding="UTF-8" + ) + + engine = create_engine("oracle://", creator=pool.acquire, poolclass=NullPool) + +The above engine may then be used normally where cx_Oracle's pool handles +connection pooling:: + + with engine.connect() as conn: + print(conn.scalar("select 1 FROM dual")) + + +As well as providing a scalable solution for multi-user applications, the +cx_Oracle session pool supports some Oracle features such as DRCP and +`Application Continuity +<https://cx-oracle.readthedocs.io/en/latest/user_guide/ha.html#application-continuity-ac>`_. + +Using Oracle Database Resident Connection Pooling (DRCP) +-------------------------------------------------------- + +When using Oracle's `DRCP +<https://www.oracle.com/pls/topic/lookup?ctx=dblatest&id=GUID-015CA8C1-2386-4626-855D-CC546DDC1086>`_, +the best practice is to pass a connection class and "purity" when acquiring a +connection from the SessionPool. Refer to the `cx_Oracle DRCP documentation +<https://cx-oracle.readthedocs.io/en/latest/user_guide/connection_handling.html#database-resident-connection-pooling-drcp>`_. + +This can be achieved by wrapping ``pool.acquire()``:: + + import cx_Oracle + from sqlalchemy import create_engine + from sqlalchemy.pool import NullPool + + pool = cx_Oracle.SessionPool( + user="scott", password="tiger", dsn="orclpdb", + min=2, max=5, increment=1, threaded=True, + encoding="UTF-8", nencoding="UTF-8" + ) + + def creator(): + return pool.acquire(cclass="MYCLASS", purity=cx_Oracle.ATTR_PURITY_SELF) + + engine = create_engine("oracle://", creator=creator, poolclass=NullPool) + +The above engine may then be used normally where cx_Oracle handles session +pooling and Oracle Database additionally uses DRCP:: + + with engine.connect() as conn: + print(conn.scalar("select 1 FROM dual")) + +.. _cx_oracle_unicode: + +Unicode +------- + +As is the case for all DBAPIs under Python 3, all strings are inherently +Unicode strings. Under Python 2, cx_Oracle also supports Python Unicode +objects directly. In all cases however, the driver requires an explicit +encoding configuration. + +Ensuring the Correct Client Encoding +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The long accepted standard for establishing client encoding for nearly all +Oracle related software is via the `NLS_LANG <https://www.oracle.com/database/technologies/faq-nls-lang.html>`_ +environment variable. cx_Oracle like most other Oracle drivers will use +this environment variable as the source of its encoding configuration. The +format of this variable is idiosyncratic; a typical value would be +``AMERICAN_AMERICA.AL32UTF8``. + +The cx_Oracle driver also supports a programmatic alternative which is to +pass the ``encoding`` and ``nencoding`` parameters directly to its +``.connect()`` function. These can be present in the URL as follows:: + + engine = create_engine("oracle+cx_oracle://scott:tiger@orclpdb/?encoding=UTF-8&nencoding=UTF-8") + +For the meaning of the ``encoding`` and ``nencoding`` parameters, please +consult +`Characters Sets and National Language Support (NLS) <https://cx-oracle.readthedocs.io/en/latest/user_guide/globalization.html#globalization>`_. + +.. seealso:: + + `Characters Sets and National Language Support (NLS) <https://cx-oracle.readthedocs.io/en/latest/user_guide/globalization.html#globalization>`_ + - in the cx_Oracle documentation. + + +Unicode-specific Column datatypes +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The Core expression language handles unicode data by use of the :class:`.Unicode` +and :class:`.UnicodeText` +datatypes. These types correspond to the VARCHAR2 and CLOB Oracle datatypes by +default. When using these datatypes with Unicode data, it is expected that +the Oracle database is configured with a Unicode-aware character set, as well +as that the ``NLS_LANG`` environment variable is set appropriately, so that +the VARCHAR2 and CLOB datatypes can accommodate the data. + +In the case that the Oracle database is not configured with a Unicode character +set, the two options are to use the :class:`_types.NCHAR` and +:class:`_oracle.NCLOB` datatypes explicitly, or to pass the flag +``use_nchar_for_unicode=True`` to :func:`_sa.create_engine`, +which will cause the +SQLAlchemy dialect to use NCHAR/NCLOB for the :class:`.Unicode` / +:class:`.UnicodeText` datatypes instead of VARCHAR/CLOB. + +.. versionchanged:: 1.3 The :class:`.Unicode` and :class:`.UnicodeText` + datatypes now correspond to the ``VARCHAR2`` and ``CLOB`` Oracle datatypes + unless the ``use_nchar_for_unicode=True`` is passed to the dialect + when :func:`_sa.create_engine` is called. + +Unicode Coercion of result rows under Python 2 +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +When result sets are fetched that include strings, under Python 3 the cx_Oracle +DBAPI returns all strings as Python Unicode objects, since Python 3 only has a +Unicode string type. This occurs for data fetched from datatypes such as +VARCHAR2, CHAR, CLOB, NCHAR, NCLOB, etc. In order to provide cross- +compatibility under Python 2, the SQLAlchemy cx_Oracle dialect will add +Unicode-conversion to string data under Python 2 as well. Historically, this +made use of converters that were supplied by cx_Oracle but were found to be +non-performant; SQLAlchemy's own converters are used for the string to Unicode +conversion under Python 2. To disable the Python 2 Unicode conversion for +VARCHAR2, CHAR, and CLOB, the flag ``coerce_to_unicode=False`` can be passed to +:func:`_sa.create_engine`. + +.. versionchanged:: 1.3 Unicode conversion is applied to all string values + by default under python 2. The ``coerce_to_unicode`` now defaults to True + and can be set to False to disable the Unicode coercion of strings that are + delivered as VARCHAR2/CHAR/CLOB data. + +.. _cx_oracle_unicode_encoding_errors: + +Encoding Errors +^^^^^^^^^^^^^^^ + +For the unusual case that data in the Oracle database is present with a broken +encoding, the dialect accepts a parameter ``encoding_errors`` which will be +passed to Unicode decoding functions in order to affect how decoding errors are +handled. The value is ultimately consumed by the Python `decode +<https://docs.python.org/3/library/stdtypes.html#bytes.decode>`_ function, and +is passed both via cx_Oracle's ``encodingErrors`` parameter consumed by +``Cursor.var()``, as well as SQLAlchemy's own decoding function, as the +cx_Oracle dialect makes use of both under different circumstances. + +.. versionadded:: 1.3.11 + + +.. _cx_oracle_setinputsizes: + +Fine grained control over cx_Oracle data binding performance with setinputsizes +------------------------------------------------------------------------------- + +The cx_Oracle DBAPI has a deep and fundamental reliance upon the usage of the +DBAPI ``setinputsizes()`` call. The purpose of this call is to establish the +datatypes that are bound to a SQL statement for Python values being passed as +parameters. While virtually no other DBAPI assigns any use to the +``setinputsizes()`` call, the cx_Oracle DBAPI relies upon it heavily in its +interactions with the Oracle client interface, and in some scenarios it is not +possible for SQLAlchemy to know exactly how data should be bound, as some +settings can cause profoundly different performance characteristics, while +altering the type coercion behavior at the same time. + +Users of the cx_Oracle dialect are **strongly encouraged** to read through +cx_Oracle's list of built-in datatype symbols at +https://cx-oracle.readthedocs.io/en/latest/api_manual/module.html#database-types. +Note that in some cases, significant performance degradation can occur when +using these types vs. not, in particular when specifying ``cx_Oracle.CLOB``. + +On the SQLAlchemy side, the :meth:`.DialectEvents.do_setinputsizes` event can +be used both for runtime visibility (e.g. logging) of the setinputsizes step as +well as to fully control how ``setinputsizes()`` is used on a per-statement +basis. + +.. versionadded:: 1.2.9 Added :meth:`.DialectEvents.setinputsizes` + + +Example 1 - logging all setinputsizes calls +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The following example illustrates how to log the intermediary values from a +SQLAlchemy perspective before they are converted to the raw ``setinputsizes()`` +parameter dictionary. The keys of the dictionary are :class:`.BindParameter` +objects which have a ``.key`` and a ``.type`` attribute:: + + from sqlalchemy import create_engine, event + + engine = create_engine("oracle+cx_oracle://scott:tiger@host/xe") + + @event.listens_for(engine, "do_setinputsizes") + def _log_setinputsizes(inputsizes, cursor, statement, parameters, context): + for bindparam, dbapitype in inputsizes.items(): + log.info( + "Bound parameter name: %s SQLAlchemy type: %r " + "DBAPI object: %s", + bindparam.key, bindparam.type, dbapitype) + +Example 2 - remove all bindings to CLOB +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The ``CLOB`` datatype in cx_Oracle incurs a significant performance overhead, +however is set by default for the ``Text`` type within the SQLAlchemy 1.2 +series. This setting can be modified as follows:: + + from sqlalchemy import create_engine, event + from cx_Oracle import CLOB + + engine = create_engine("oracle+cx_oracle://scott:tiger@host/xe") + + @event.listens_for(engine, "do_setinputsizes") + def _remove_clob(inputsizes, cursor, statement, parameters, context): + for bindparam, dbapitype in list(inputsizes.items()): + if dbapitype is CLOB: + del inputsizes[bindparam] + +.. _cx_oracle_returning: + +RETURNING Support +----------------- + +The cx_Oracle dialect implements RETURNING using OUT parameters. +The dialect supports RETURNING fully, however cx_Oracle 6 is recommended +for complete support. + +.. _cx_oracle_lob: + +LOB Objects +----------- + +cx_oracle returns oracle LOBs using the cx_oracle.LOB object. SQLAlchemy +converts these to strings so that the interface of the Binary type is +consistent with that of other backends, which takes place within a cx_Oracle +outputtypehandler. + +cx_Oracle prior to version 6 would require that LOB objects be read before +a new batch of rows would be read, as determined by the ``cursor.arraysize``. +As of the 6 series, this limitation has been lifted. Nevertheless, because +SQLAlchemy pre-reads these LOBs up front, this issue is avoided in any case. + +To disable the auto "read()" feature of the dialect, the flag +``auto_convert_lobs=False`` may be passed to :func:`_sa.create_engine`. Under +the cx_Oracle 5 series, having this flag turned off means there is the chance +of reading from a stale LOB object if not read as it is fetched. With +cx_Oracle 6, this issue is resolved. + +.. versionchanged:: 1.2 the LOB handling system has been greatly simplified + internally to make use of outputtypehandlers, and no longer makes use + of alternate "buffered" result set objects. + +Two Phase Transactions Not Supported +------------------------------------- + +Two phase transactions are **not supported** under cx_Oracle due to poor +driver support. As of cx_Oracle 6.0b1, the interface for +two phase transactions has been changed to be more of a direct pass-through +to the underlying OCI layer with less automation. The additional logic +to support this system is not implemented in SQLAlchemy. + +.. _cx_oracle_numeric: + +Precision Numerics +------------------ + +SQLAlchemy's numeric types can handle receiving and returning values as Python +``Decimal`` objects or float objects. When a :class:`.Numeric` object, or a +subclass such as :class:`.Float`, :class:`_oracle.DOUBLE_PRECISION` etc. is in +use, the :paramref:`.Numeric.asdecimal` flag determines if values should be +coerced to ``Decimal`` upon return, or returned as float objects. To make +matters more complicated under Oracle, Oracle's ``NUMBER`` type can also +represent integer values if the "scale" is zero, so the Oracle-specific +:class:`_oracle.NUMBER` type takes this into account as well. + +The cx_Oracle dialect makes extensive use of connection- and cursor-level +"outputtypehandler" callables in order to coerce numeric values as requested. +These callables are specific to the specific flavor of :class:`.Numeric` in +use, as well as if no SQLAlchemy typing objects are present. There are +observed scenarios where Oracle may sends incomplete or ambiguous information +about the numeric types being returned, such as a query where the numeric types +are buried under multiple levels of subquery. The type handlers do their best +to make the right decision in all cases, deferring to the underlying cx_Oracle +DBAPI for all those cases where the driver can make the best decision. + +When no typing objects are present, as when executing plain SQL strings, a +default "outputtypehandler" is present which will generally return numeric +values which specify precision and scale as Python ``Decimal`` objects. To +disable this coercion to decimal for performance reasons, pass the flag +``coerce_to_decimal=False`` to :func:`_sa.create_engine`:: + + engine = create_engine("oracle+cx_oracle://dsn", coerce_to_decimal=False) + +The ``coerce_to_decimal`` flag only impacts the results of plain string +SQL statements that are not otherwise associated with a :class:`.Numeric` +SQLAlchemy type (or a subclass of such). + +.. versionchanged:: 1.2 The numeric handling system for cx_Oracle has been + reworked to take advantage of newer cx_Oracle features as well + as better integration of outputtypehandlers. + +""" # noqa + +from __future__ import absolute_import + +import decimal +import random +import re + +from . import base as oracle +from .base import OracleCompiler +from .base import OracleDialect +from .base import OracleExecutionContext +from ... import exc +from ... import processors +from ... import types as sqltypes +from ... import util +from ...engine import cursor as _cursor +from ...util import compat + + +class _OracleInteger(sqltypes.Integer): + def get_dbapi_type(self, dbapi): + # see https://github.com/oracle/python-cx_Oracle/issues/ + # 208#issuecomment-409715955 + return int + + def _cx_oracle_var(self, dialect, cursor): + cx_Oracle = dialect.dbapi + return cursor.var( + cx_Oracle.STRING, 255, arraysize=cursor.arraysize, outconverter=int + ) + + def _cx_oracle_outputtypehandler(self, dialect): + def handler(cursor, name, default_type, size, precision, scale): + return self._cx_oracle_var(dialect, cursor) + + return handler + + +class _OracleNumeric(sqltypes.Numeric): + is_number = False + + def bind_processor(self, dialect): + if self.scale == 0: + return None + elif self.asdecimal: + processor = processors.to_decimal_processor_factory( + decimal.Decimal, self._effective_decimal_return_scale + ) + + def process(value): + if isinstance(value, (int, float)): + return processor(value) + elif value is not None and value.is_infinite(): + return float(value) + else: + return value + + return process + else: + return processors.to_float + + def result_processor(self, dialect, coltype): + return None + + def _cx_oracle_outputtypehandler(self, dialect): + cx_Oracle = dialect.dbapi + + is_cx_oracle_6 = dialect._is_cx_oracle_6 + + def handler(cursor, name, default_type, size, precision, scale): + outconverter = None + + if precision: + if self.asdecimal: + if default_type == cx_Oracle.NATIVE_FLOAT: + # receiving float and doing Decimal after the fact + # allows for float("inf") to be handled + type_ = default_type + outconverter = decimal.Decimal + elif is_cx_oracle_6: + type_ = decimal.Decimal + else: + type_ = cx_Oracle.STRING + outconverter = dialect._to_decimal + else: + if self.is_number and scale == 0: + # integer. cx_Oracle is observed to handle the widest + # variety of ints when no directives are passed, + # from 5.2 to 7.0. See [ticket:4457] + return None + else: + type_ = cx_Oracle.NATIVE_FLOAT + + else: + if self.asdecimal: + if default_type == cx_Oracle.NATIVE_FLOAT: + type_ = default_type + outconverter = decimal.Decimal + elif is_cx_oracle_6: + type_ = decimal.Decimal + else: + type_ = cx_Oracle.STRING + outconverter = dialect._to_decimal + else: + if self.is_number and scale == 0: + # integer. cx_Oracle is observed to handle the widest + # variety of ints when no directives are passed, + # from 5.2 to 7.0. See [ticket:4457] + return None + else: + type_ = cx_Oracle.NATIVE_FLOAT + + return cursor.var( + type_, + 255, + arraysize=cursor.arraysize, + outconverter=outconverter, + ) + + return handler + + +class _OracleBinaryFloat(_OracleNumeric): + def get_dbapi_type(self, dbapi): + return dbapi.NATIVE_FLOAT + + +class _OracleBINARY_FLOAT(_OracleBinaryFloat, oracle.BINARY_FLOAT): + pass + + +class _OracleBINARY_DOUBLE(_OracleBinaryFloat, oracle.BINARY_DOUBLE): + pass + + +class _OracleNUMBER(_OracleNumeric): + is_number = True + + +class _OracleDate(sqltypes.Date): + def bind_processor(self, dialect): + return None + + def result_processor(self, dialect, coltype): + def process(value): + if value is not None: + return value.date() + else: + return value + + return process + + +# TODO: the names used across CHAR / VARCHAR / NCHAR / NVARCHAR +# here are inconsistent and not very good +class _OracleChar(sqltypes.CHAR): + def get_dbapi_type(self, dbapi): + return dbapi.FIXED_CHAR + + +class _OracleNChar(sqltypes.NCHAR): + def get_dbapi_type(self, dbapi): + return dbapi.FIXED_NCHAR + + +class _OracleUnicodeStringNCHAR(oracle.NVARCHAR2): + def get_dbapi_type(self, dbapi): + return dbapi.NCHAR + + +class _OracleUnicodeStringCHAR(sqltypes.Unicode): + def get_dbapi_type(self, dbapi): + return dbapi.LONG_STRING + + +class _OracleUnicodeTextNCLOB(oracle.NCLOB): + def get_dbapi_type(self, dbapi): + return dbapi.NCLOB + + +class _OracleUnicodeTextCLOB(sqltypes.UnicodeText): + def get_dbapi_type(self, dbapi): + return dbapi.CLOB + + +class _OracleText(sqltypes.Text): + def get_dbapi_type(self, dbapi): + return dbapi.CLOB + + +class _OracleLong(oracle.LONG): + def get_dbapi_type(self, dbapi): + return dbapi.LONG_STRING + + +class _OracleString(sqltypes.String): + pass + + +class _OracleEnum(sqltypes.Enum): + def bind_processor(self, dialect): + enum_proc = sqltypes.Enum.bind_processor(self, dialect) + + def process(value): + raw_str = enum_proc(value) + return raw_str + + return process + + +class _OracleBinary(sqltypes.LargeBinary): + def get_dbapi_type(self, dbapi): + return dbapi.BLOB + + def bind_processor(self, dialect): + return None + + def result_processor(self, dialect, coltype): + if not dialect.auto_convert_lobs: + return None + else: + return super(_OracleBinary, self).result_processor( + dialect, coltype + ) + + +class _OracleInterval(oracle.INTERVAL): + def get_dbapi_type(self, dbapi): + return dbapi.INTERVAL + + +class _OracleRaw(oracle.RAW): + pass + + +class _OracleRowid(oracle.ROWID): + def get_dbapi_type(self, dbapi): + return dbapi.ROWID + + +class OracleCompiler_cx_oracle(OracleCompiler): + _oracle_cx_sql_compiler = True + + def bindparam_string(self, name, **kw): + quote = getattr(name, "quote", None) + if ( + quote is True + or quote is not False + and self.preparer._bindparam_requires_quotes(name) + and not kw.get("post_compile", False) + ): + # interesting to note about expanding parameters - since the + # new parameters take the form <paramname>_<int>, at least if + # they are originally formed from reserved words, they no longer + # need quoting :). names that include illegal characters + # won't work however. + quoted_name = '"%s"' % name + kw["escaped_from"] = name + name = quoted_name + + return OracleCompiler.bindparam_string(self, name, **kw) + + +class OracleExecutionContext_cx_oracle(OracleExecutionContext): + out_parameters = None + + def _generate_out_parameter_vars(self): + # check for has_out_parameters or RETURNING, create cx_Oracle.var + # objects if so + if self.compiled.returning or self.compiled.has_out_parameters: + quoted_bind_names = self.compiled.escaped_bind_names + for bindparam in self.compiled.binds.values(): + if bindparam.isoutparam: + name = self.compiled.bind_names[bindparam] + type_impl = bindparam.type.dialect_impl(self.dialect) + + if hasattr(type_impl, "_cx_oracle_var"): + self.out_parameters[name] = type_impl._cx_oracle_var( + self.dialect, self.cursor + ) + else: + dbtype = type_impl.get_dbapi_type(self.dialect.dbapi) + + cx_Oracle = self.dialect.dbapi + + if dbtype is None: + raise exc.InvalidRequestError( + "Cannot create out parameter for " + "parameter " + "%r - its type %r is not supported by" + " cx_oracle" % (bindparam.key, bindparam.type) + ) + + if compat.py2k and dbtype in ( + cx_Oracle.CLOB, + cx_Oracle.NCLOB, + ): + outconverter = ( + processors.to_unicode_processor_factory( + self.dialect.encoding, + errors=self.dialect.encoding_errors, + ) + ) + self.out_parameters[name] = self.cursor.var( + dbtype, + outconverter=lambda value: outconverter( + value.read() + ), + ) + + elif dbtype in ( + cx_Oracle.BLOB, + cx_Oracle.CLOB, + cx_Oracle.NCLOB, + ): + self.out_parameters[name] = self.cursor.var( + dbtype, outconverter=lambda value: value.read() + ) + elif compat.py2k and isinstance( + type_impl, sqltypes.Unicode + ): + outconverter = ( + processors.to_unicode_processor_factory( + self.dialect.encoding, + errors=self.dialect.encoding_errors, + ) + ) + self.out_parameters[name] = self.cursor.var( + dbtype, outconverter=outconverter + ) + else: + self.out_parameters[name] = self.cursor.var(dbtype) + self.parameters[0][ + quoted_bind_names.get(name, name) + ] = self.out_parameters[name] + + def _generate_cursor_outputtype_handler(self): + output_handlers = {} + + for (keyname, name, objects, type_) in self.compiled._result_columns: + handler = type_._cached_custom_processor( + self.dialect, + "cx_oracle_outputtypehandler", + self._get_cx_oracle_type_handler, + ) + + if handler: + denormalized_name = self.dialect.denormalize_name(keyname) + output_handlers[denormalized_name] = handler + + if output_handlers: + default_handler = self._dbapi_connection.outputtypehandler + + def output_type_handler( + cursor, name, default_type, size, precision, scale + ): + if name in output_handlers: + return output_handlers[name]( + cursor, name, default_type, size, precision, scale + ) + else: + return default_handler( + cursor, name, default_type, size, precision, scale + ) + + self.cursor.outputtypehandler = output_type_handler + + def _get_cx_oracle_type_handler(self, impl): + if hasattr(impl, "_cx_oracle_outputtypehandler"): + return impl._cx_oracle_outputtypehandler(self.dialect) + else: + return None + + def pre_exec(self): + if not getattr(self.compiled, "_oracle_cx_sql_compiler", False): + return + + self.out_parameters = {} + + self._generate_out_parameter_vars() + + self._generate_cursor_outputtype_handler() + + self.include_set_input_sizes = self.dialect._include_setinputsizes + + def post_exec(self): + if self.compiled and self.out_parameters and self.compiled.returning: + # create a fake cursor result from the out parameters. unlike + # get_out_parameter_values(), the result-row handlers here will be + # applied at the Result level + returning_params = [ + self.dialect._returningval(self.out_parameters["ret_%d" % i]) + for i in range(len(self.out_parameters)) + ] + + fetch_strategy = _cursor.FullyBufferedCursorFetchStrategy( + self.cursor, + [ + (getattr(col, "name", col._anon_name_label), None) + for col in self.compiled.returning + ], + initial_buffer=[tuple(returning_params)], + ) + + self.cursor_fetch_strategy = fetch_strategy + + def create_cursor(self): + c = self._dbapi_connection.cursor() + if self.dialect.arraysize: + c.arraysize = self.dialect.arraysize + + return c + + def get_out_parameter_values(self, out_param_names): + # this method should not be called when the compiler has + # RETURNING as we've turned the has_out_parameters flag set to + # False. + assert not self.compiled.returning + + return [ + self.dialect._paramval(self.out_parameters[name]) + for name in out_param_names + ] + + +class OracleDialect_cx_oracle(OracleDialect): + supports_statement_cache = True + execution_ctx_cls = OracleExecutionContext_cx_oracle + statement_compiler = OracleCompiler_cx_oracle + + supports_sane_rowcount = True + supports_sane_multi_rowcount = True + + supports_unicode_statements = True + supports_unicode_binds = True + + use_setinputsizes = True + + driver = "cx_oracle" + + colspecs = { + sqltypes.Numeric: _OracleNumeric, + sqltypes.Float: _OracleNumeric, + oracle.BINARY_FLOAT: _OracleBINARY_FLOAT, + oracle.BINARY_DOUBLE: _OracleBINARY_DOUBLE, + sqltypes.Integer: _OracleInteger, + oracle.NUMBER: _OracleNUMBER, + sqltypes.Date: _OracleDate, + sqltypes.LargeBinary: _OracleBinary, + sqltypes.Boolean: oracle._OracleBoolean, + sqltypes.Interval: _OracleInterval, + oracle.INTERVAL: _OracleInterval, + sqltypes.Text: _OracleText, + sqltypes.String: _OracleString, + sqltypes.UnicodeText: _OracleUnicodeTextCLOB, + sqltypes.CHAR: _OracleChar, + sqltypes.NCHAR: _OracleNChar, + sqltypes.Enum: _OracleEnum, + oracle.LONG: _OracleLong, + oracle.RAW: _OracleRaw, + sqltypes.Unicode: _OracleUnicodeStringCHAR, + sqltypes.NVARCHAR: _OracleUnicodeStringNCHAR, + oracle.NCLOB: _OracleUnicodeTextNCLOB, + oracle.ROWID: _OracleRowid, + } + + execute_sequence_format = list + + _cx_oracle_threaded = None + + @util.deprecated_params( + threaded=( + "1.3", + "The 'threaded' parameter to the cx_oracle dialect " + "is deprecated as a dialect-level argument, and will be removed " + "in a future release. As of version 1.3, it defaults to False " + "rather than True. The 'threaded' option can be passed to " + "cx_Oracle directly in the URL query string passed to " + ":func:`_sa.create_engine`.", + ) + ) + def __init__( + self, + auto_convert_lobs=True, + coerce_to_unicode=True, + coerce_to_decimal=True, + arraysize=50, + encoding_errors=None, + threaded=None, + **kwargs + ): + + OracleDialect.__init__(self, **kwargs) + self.arraysize = arraysize + self.encoding_errors = encoding_errors + if threaded is not None: + self._cx_oracle_threaded = threaded + self.auto_convert_lobs = auto_convert_lobs + self.coerce_to_unicode = coerce_to_unicode + self.coerce_to_decimal = coerce_to_decimal + if self._use_nchar_for_unicode: + self.colspecs = self.colspecs.copy() + self.colspecs[sqltypes.Unicode] = _OracleUnicodeStringNCHAR + self.colspecs[sqltypes.UnicodeText] = _OracleUnicodeTextNCLOB + + cx_Oracle = self.dbapi + + if cx_Oracle is None: + self._include_setinputsizes = {} + self.cx_oracle_ver = (0, 0, 0) + else: + self.cx_oracle_ver = self._parse_cx_oracle_ver(cx_Oracle.version) + if self.cx_oracle_ver < (5, 2) and self.cx_oracle_ver > (0, 0, 0): + raise exc.InvalidRequestError( + "cx_Oracle version 5.2 and above are supported" + ) + + self._include_setinputsizes = { + cx_Oracle.DATETIME, + cx_Oracle.NCLOB, + cx_Oracle.CLOB, + cx_Oracle.LOB, + cx_Oracle.NCHAR, + cx_Oracle.FIXED_NCHAR, + cx_Oracle.BLOB, + cx_Oracle.FIXED_CHAR, + cx_Oracle.TIMESTAMP, + _OracleInteger, + _OracleBINARY_FLOAT, + _OracleBINARY_DOUBLE, + } + + self._paramval = lambda value: value.getvalue() + + # https://github.com/oracle/python-cx_Oracle/issues/176#issuecomment-386821291 + # https://github.com/oracle/python-cx_Oracle/issues/224 + self._values_are_lists = self.cx_oracle_ver >= (6, 3) + if self._values_are_lists: + cx_Oracle.__future__.dml_ret_array_val = True + + def _returningval(value): + try: + return value.values[0][0] + except IndexError: + return None + + self._returningval = _returningval + else: + self._returningval = self._paramval + + self._is_cx_oracle_6 = self.cx_oracle_ver >= (6,) + + @property + def _cursor_var_unicode_kwargs(self): + if self.encoding_errors: + if self.cx_oracle_ver >= (6, 4): + return {"encodingErrors": self.encoding_errors} + else: + util.warn( + "cx_oracle version %r does not support encodingErrors" + % (self.cx_oracle_ver,) + ) + + return {} + + def _parse_cx_oracle_ver(self, version): + m = re.match(r"(\d+)\.(\d+)(?:\.(\d+))?", version) + if m: + return tuple(int(x) for x in m.group(1, 2, 3) if x is not None) + else: + return (0, 0, 0) + + @classmethod + def dbapi(cls): + import cx_Oracle + + return cx_Oracle + + def initialize(self, connection): + super(OracleDialect_cx_oracle, self).initialize(connection) + if self._is_oracle_8: + self.supports_unicode_binds = False + + self._detect_decimal_char(connection) + + def get_isolation_level(self, connection): + # sources: + + # general idea of transaction id, have to start one, etc. + # https://stackoverflow.com/questions/10711204/how-to-check-isoloation-level + + # how to decode xid cols from v$transaction to match + # https://asktom.oracle.com/pls/apex/f?p=100:11:0::::P11_QUESTION_ID:9532779900346079444 + + # Oracle tuple comparison without using IN: + # https://www.sql-workbench.eu/comparison/tuple_comparison.html + + with connection.cursor() as cursor: + # this is the only way to ensure a transaction is started without + # actually running DML. There's no way to see the configured + # isolation level without getting it from v$transaction which + # means transaction has to be started. + outval = cursor.var(str) + cursor.execute( + """ + begin + :trans_id := dbms_transaction.local_transaction_id( TRUE ); + end; + """, + {"trans_id": outval}, + ) + trans_id = outval.getvalue() + xidusn, xidslot, xidsqn = trans_id.split(".", 2) + + cursor.execute( + "SELECT CASE BITAND(t.flag, POWER(2, 28)) " + "WHEN 0 THEN 'READ COMMITTED' " + "ELSE 'SERIALIZABLE' END AS isolation_level " + "FROM v$transaction t WHERE " + "(t.xidusn, t.xidslot, t.xidsqn) = " + "((:xidusn, :xidslot, :xidsqn))", + {"xidusn": xidusn, "xidslot": xidslot, "xidsqn": xidsqn}, + ) + row = cursor.fetchone() + if row is None: + raise exc.InvalidRequestError( + "could not retrieve isolation level" + ) + result = row[0] + + return result + + def set_isolation_level(self, connection, level): + if hasattr(connection, "dbapi_connection"): + dbapi_connection = connection.dbapi_connection + else: + dbapi_connection = connection + if level == "AUTOCOMMIT": + dbapi_connection.autocommit = True + else: + dbapi_connection.autocommit = False + connection.rollback() + with connection.cursor() as cursor: + cursor.execute("ALTER SESSION SET ISOLATION_LEVEL=%s" % level) + + def _detect_decimal_char(self, connection): + # we have the option to change this setting upon connect, + # or just look at what it is upon connect and convert. + # to minimize the chance of interference with changes to + # NLS_TERRITORY or formatting behavior of the DB, we opt + # to just look at it + + self._decimal_char = connection.exec_driver_sql( + "select value from nls_session_parameters " + "where parameter = 'NLS_NUMERIC_CHARACTERS'" + ).scalar()[0] + if self._decimal_char != ".": + _detect_decimal = self._detect_decimal + _to_decimal = self._to_decimal + + self._detect_decimal = lambda value: _detect_decimal( + value.replace(self._decimal_char, ".") + ) + self._to_decimal = lambda value: _to_decimal( + value.replace(self._decimal_char, ".") + ) + + def _detect_decimal(self, value): + if "." in value: + return self._to_decimal(value) + else: + return int(value) + + _to_decimal = decimal.Decimal + + def _generate_connection_outputtype_handler(self): + """establish the default outputtypehandler established at the + connection level. + + """ + + dialect = self + cx_Oracle = dialect.dbapi + + number_handler = _OracleNUMBER( + asdecimal=True + )._cx_oracle_outputtypehandler(dialect) + float_handler = _OracleNUMBER( + asdecimal=False + )._cx_oracle_outputtypehandler(dialect) + + def output_type_handler( + cursor, name, default_type, size, precision, scale + ): + + if ( + default_type == cx_Oracle.NUMBER + and default_type is not cx_Oracle.NATIVE_FLOAT + ): + if not dialect.coerce_to_decimal: + return None + elif precision == 0 and scale in (0, -127): + # ambiguous type, this occurs when selecting + # numbers from deep subqueries + return cursor.var( + cx_Oracle.STRING, + 255, + outconverter=dialect._detect_decimal, + arraysize=cursor.arraysize, + ) + elif precision and scale > 0: + return number_handler( + cursor, name, default_type, size, precision, scale + ) + else: + return float_handler( + cursor, name, default_type, size, precision, scale + ) + + # allow all strings to come back natively as Unicode + elif ( + dialect.coerce_to_unicode + and default_type + in ( + cx_Oracle.STRING, + cx_Oracle.FIXED_CHAR, + ) + and default_type is not cx_Oracle.CLOB + and default_type is not cx_Oracle.NCLOB + ): + if compat.py2k: + outconverter = processors.to_unicode_processor_factory( + dialect.encoding, errors=dialect.encoding_errors + ) + return cursor.var( + cx_Oracle.STRING, + size, + cursor.arraysize, + outconverter=outconverter, + ) + else: + return cursor.var( + util.text_type, + size, + cursor.arraysize, + **dialect._cursor_var_unicode_kwargs + ) + + elif dialect.auto_convert_lobs and default_type in ( + cx_Oracle.CLOB, + cx_Oracle.NCLOB, + ): + if compat.py2k: + outconverter = processors.to_unicode_processor_factory( + dialect.encoding, errors=dialect.encoding_errors + ) + return cursor.var( + cx_Oracle.LONG_STRING, + size, + cursor.arraysize, + outconverter=outconverter, + ) + else: + return cursor.var( + cx_Oracle.LONG_STRING, + size, + cursor.arraysize, + **dialect._cursor_var_unicode_kwargs + ) + + elif dialect.auto_convert_lobs and default_type in ( + cx_Oracle.BLOB, + ): + return cursor.var( + cx_Oracle.LONG_BINARY, + size, + cursor.arraysize, + ) + + return output_type_handler + + def on_connect(self): + + output_type_handler = self._generate_connection_outputtype_handler() + + def on_connect(conn): + conn.outputtypehandler = output_type_handler + + return on_connect + + def create_connect_args(self, url): + opts = dict(url.query) + + for opt in ("use_ansi", "auto_convert_lobs"): + if opt in opts: + util.warn_deprecated( + "cx_oracle dialect option %r should only be passed to " + "create_engine directly, not within the URL string" % opt, + version="1.3", + ) + util.coerce_kw_type(opts, opt, bool) + setattr(self, opt, opts.pop(opt)) + + database = url.database + service_name = opts.pop("service_name", None) + if database or service_name: + # if we have a database, then we have a remote host + port = url.port + if port: + port = int(port) + else: + port = 1521 + + if database and service_name: + raise exc.InvalidRequestError( + '"service_name" option shouldn\'t ' + 'be used with a "database" part of the url' + ) + if database: + makedsn_kwargs = {"sid": database} + if service_name: + makedsn_kwargs = {"service_name": service_name} + + dsn = self.dbapi.makedsn(url.host, port, **makedsn_kwargs) + else: + # we have a local tnsname + dsn = url.host + + if dsn is not None: + opts["dsn"] = dsn + if url.password is not None: + opts["password"] = url.password + if url.username is not None: + opts["user"] = url.username + + if self._cx_oracle_threaded is not None: + opts.setdefault("threaded", self._cx_oracle_threaded) + + def convert_cx_oracle_constant(value): + if isinstance(value, util.string_types): + try: + int_val = int(value) + except ValueError: + value = value.upper() + return getattr(self.dbapi, value) + else: + return int_val + else: + return value + + util.coerce_kw_type(opts, "mode", convert_cx_oracle_constant) + util.coerce_kw_type(opts, "threaded", bool) + util.coerce_kw_type(opts, "events", bool) + util.coerce_kw_type(opts, "purity", convert_cx_oracle_constant) + return ([], opts) + + def _get_server_version_info(self, connection): + return tuple(int(x) for x in connection.connection.version.split(".")) + + def is_disconnect(self, e, connection, cursor): + (error,) = e.args + if isinstance( + e, (self.dbapi.InterfaceError, self.dbapi.DatabaseError) + ) and "not connected" in str(e): + return True + + if hasattr(error, "code") and error.code in { + 28, + 3114, + 3113, + 3135, + 1033, + 2396, + }: + # ORA-00028: your session has been killed + # ORA-03114: not connected to ORACLE + # ORA-03113: end-of-file on communication channel + # ORA-03135: connection lost contact + # ORA-01033: ORACLE initialization or shutdown in progress + # ORA-02396: exceeded maximum idle time, please connect again + # TODO: Others ? + return True + + if re.match(r"^(?:DPI-1010|DPI-1080|DPY-1001|DPY-4011)", str(e)): + # DPI-1010: not connected + # DPI-1080: connection was closed by ORA-3113 + # python-oracledb's DPY-1001: not connected to database + # python-oracledb's DPY-4011: the database or network closed the + # connection + # TODO: others? + return True + + return False + + def create_xid(self): + """create a two-phase transaction ID. + + this id will be passed to do_begin_twophase(), do_rollback_twophase(), + do_commit_twophase(). its format is unspecified. + + """ + + id_ = random.randint(0, 2 ** 128) + return (0x1234, "%032x" % id_, "%032x" % 9) + + def do_executemany(self, cursor, statement, parameters, context=None): + if isinstance(parameters, tuple): + parameters = list(parameters) + cursor.executemany(statement, parameters) + + def do_begin_twophase(self, connection, xid): + connection.connection.begin(*xid) + connection.connection.info["cx_oracle_xid"] = xid + + def do_prepare_twophase(self, connection, xid): + result = connection.connection.prepare() + connection.info["cx_oracle_prepared"] = result + + def do_rollback_twophase( + self, connection, xid, is_prepared=True, recover=False + ): + self.do_rollback(connection.connection) + # TODO: need to end XA state here + + def do_commit_twophase( + self, connection, xid, is_prepared=True, recover=False + ): + + if not is_prepared: + self.do_commit(connection.connection) + else: + if recover: + raise NotImplementedError( + "2pc recovery not implemented for cx_Oracle" + ) + oci_prepared = connection.info["cx_oracle_prepared"] + if oci_prepared: + self.do_commit(connection.connection) + # TODO: need to end XA state here + + def do_set_input_sizes(self, cursor, list_of_tuples, context): + if self.positional: + # not usually used, here to support if someone is modifying + # the dialect to use positional style + cursor.setinputsizes( + *[dbtype for key, dbtype, sqltype in list_of_tuples] + ) + else: + collection = ( + (key, dbtype) + for key, dbtype, sqltype in list_of_tuples + if dbtype + ) + + if not self.supports_unicode_binds: + # oracle 8 only + collection = ( + (self.dialect._encoder(key)[0], dbtype) + for key, dbtype in collection + ) + + cursor.setinputsizes(**{key: dbtype for key, dbtype in collection}) + + def do_recover_twophase(self, connection): + raise NotImplementedError( + "recover two phase query for cx_Oracle not implemented" + ) + + +dialect = OracleDialect_cx_oracle diff --git a/lib/sqlalchemy/dialects/oracle/provision.py b/lib/sqlalchemy/dialects/oracle/provision.py new file mode 100644 index 0000000..74ad1f2 --- /dev/null +++ b/lib/sqlalchemy/dialects/oracle/provision.py @@ -0,0 +1,160 @@ +from ... import create_engine +from ... import exc +from ...engine import url as sa_url +from ...testing.provision import configure_follower +from ...testing.provision import create_db +from ...testing.provision import drop_db +from ...testing.provision import follower_url_from_main +from ...testing.provision import log +from ...testing.provision import post_configure_engine +from ...testing.provision import run_reap_dbs +from ...testing.provision import set_default_schema_on_connection +from ...testing.provision import stop_test_class_outside_fixtures +from ...testing.provision import temp_table_keyword_args + + +@create_db.for_db("oracle") +def _oracle_create_db(cfg, eng, ident): + # NOTE: make sure you've run "ALTER DATABASE default tablespace users" or + # similar, so that the default tablespace is not "system"; reflection will + # fail otherwise + with eng.begin() as conn: + conn.exec_driver_sql("create user %s identified by xe" % ident) + conn.exec_driver_sql("create user %s_ts1 identified by xe" % ident) + conn.exec_driver_sql("create user %s_ts2 identified by xe" % ident) + conn.exec_driver_sql("grant dba to %s" % (ident,)) + conn.exec_driver_sql("grant unlimited tablespace to %s" % ident) + conn.exec_driver_sql("grant unlimited tablespace to %s_ts1" % ident) + conn.exec_driver_sql("grant unlimited tablespace to %s_ts2" % ident) + + +@configure_follower.for_db("oracle") +def _oracle_configure_follower(config, ident): + config.test_schema = "%s_ts1" % ident + config.test_schema_2 = "%s_ts2" % ident + + +def _ora_drop_ignore(conn, dbname): + try: + conn.exec_driver_sql("drop user %s cascade" % dbname) + log.info("Reaped db: %s", dbname) + return True + except exc.DatabaseError as err: + log.warning("couldn't drop db: %s", err) + return False + + +@drop_db.for_db("oracle") +def _oracle_drop_db(cfg, eng, ident): + with eng.begin() as conn: + # cx_Oracle seems to occasionally leak open connections when a large + # suite it run, even if we confirm we have zero references to + # connection objects. + # while there is a "kill session" command in Oracle, + # it unfortunately does not release the connection sufficiently. + _ora_drop_ignore(conn, ident) + _ora_drop_ignore(conn, "%s_ts1" % ident) + _ora_drop_ignore(conn, "%s_ts2" % ident) + + +@stop_test_class_outside_fixtures.for_db("oracle") +def stop_test_class_outside_fixtures(config, db, cls): + + try: + with db.begin() as conn: + # run magic command to get rid of identity sequences + # https://floo.bar/2019/11/29/drop-the-underlying-sequence-of-an-identity-column/ # noqa: E501 + conn.exec_driver_sql("purge recyclebin") + except exc.DatabaseError as err: + log.warning("purge recyclebin command failed: %s", err) + + # clear statement cache on all connections that were used + # https://github.com/oracle/python-cx_Oracle/issues/519 + + for cx_oracle_conn in _all_conns: + try: + sc = cx_oracle_conn.stmtcachesize + except db.dialect.dbapi.InterfaceError: + # connection closed + pass + else: + cx_oracle_conn.stmtcachesize = 0 + cx_oracle_conn.stmtcachesize = sc + _all_conns.clear() + + +_all_conns = set() + + +@post_configure_engine.for_db("oracle") +def _oracle_post_configure_engine(url, engine, follower_ident): + from sqlalchemy import event + + @event.listens_for(engine, "checkout") + def checkout(dbapi_con, con_record, con_proxy): + _all_conns.add(dbapi_con) + + @event.listens_for(engine, "checkin") + def checkin(dbapi_connection, connection_record): + # work around cx_Oracle issue: + # https://github.com/oracle/python-cx_Oracle/issues/530 + # invalidate oracle connections that had 2pc set up + if "cx_oracle_xid" in connection_record.info: + connection_record.invalidate() + + +@run_reap_dbs.for_db("oracle") +def _reap_oracle_dbs(url, idents): + log.info("db reaper connecting to %r", url) + eng = create_engine(url) + with eng.begin() as conn: + + log.info("identifiers in file: %s", ", ".join(idents)) + + to_reap = conn.exec_driver_sql( + "select u.username from all_users u where username " + "like 'TEST_%' and not exists (select username " + "from v$session where username=u.username)" + ) + all_names = {username.lower() for (username,) in to_reap} + to_drop = set() + for name in all_names: + if name.endswith("_ts1") or name.endswith("_ts2"): + continue + elif name in idents: + to_drop.add(name) + if "%s_ts1" % name in all_names: + to_drop.add("%s_ts1" % name) + if "%s_ts2" % name in all_names: + to_drop.add("%s_ts2" % name) + + dropped = total = 0 + for total, username in enumerate(to_drop, 1): + if _ora_drop_ignore(conn, username): + dropped += 1 + log.info( + "Dropped %d out of %d stale databases detected", dropped, total + ) + + +@follower_url_from_main.for_db("oracle") +def _oracle_follower_url_from_main(url, ident): + url = sa_url.make_url(url) + return url.set(username=ident, password="xe") + + +@temp_table_keyword_args.for_db("oracle") +def _oracle_temp_table_keyword_args(cfg, eng): + return { + "prefixes": ["GLOBAL TEMPORARY"], + "oracle_on_commit": "PRESERVE ROWS", + } + + +@set_default_schema_on_connection.for_db("oracle") +def _oracle_set_default_schema_on_connection( + cfg, dbapi_connection, schema_name +): + cursor = dbapi_connection.cursor() + cursor.execute("ALTER SESSION SET CURRENT_SCHEMA=%s" % schema_name) + cursor.close() |