diff options
author | xiubuzhe <xiubuzhe@sina.com> | 2023-10-08 20:59:00 +0800 |
---|---|---|
committer | xiubuzhe <xiubuzhe@sina.com> | 2023-10-08 20:59:00 +0800 |
commit | 1dac2263372df2b85db5d029a45721fa158a5c9d (patch) | |
tree | 0365f9c57df04178a726d7584ca6a6b955a7ce6a /lib/sqlalchemy/engine/base.py | |
parent | b494be364bb39e1de128ada7dc576a729d99907e (diff) | |
download | sunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.tar.gz sunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.tar.bz2 sunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.zip |
first add files
Diffstat (limited to 'lib/sqlalchemy/engine/base.py')
-rw-r--r-- | lib/sqlalchemy/engine/base.py | 3450 |
1 files changed, 3450 insertions, 0 deletions
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py new file mode 100644 index 0000000..f126eb0 --- /dev/null +++ b/lib/sqlalchemy/engine/base.py @@ -0,0 +1,3450 @@ +# engine/base.py +# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +from __future__ import with_statement + +import contextlib +import sys + +from .interfaces import Connectable +from .interfaces import ExceptionContext +from .util import _distill_params +from .util import _distill_params_20 +from .util import TransactionalContext +from .. import exc +from .. import inspection +from .. import log +from .. import util +from ..sql import compiler +from ..sql import util as sql_util + + +"""Defines :class:`_engine.Connection` and :class:`_engine.Engine`. + +""" + +_EMPTY_EXECUTION_OPTS = util.immutabledict() + + +class Connection(Connectable): + """Provides high-level functionality for a wrapped DB-API connection. + + **This is the SQLAlchemy 1.x.x version** of the :class:`_engine.Connection` + class. For the :term:`2.0 style` version, which features some API + differences, see :class:`_future.Connection`. + + The :class:`_engine.Connection` object is procured by calling + the :meth:`_engine.Engine.connect` method of the :class:`_engine.Engine` + object, and provides services for execution of SQL statements as well + as transaction control. + + The Connection object is **not** thread-safe. While a Connection can be + shared among threads using properly synchronized access, it is still + possible that the underlying DBAPI connection may not support shared + access between threads. Check the DBAPI documentation for details. + + The Connection object represents a single DBAPI connection checked out + from the connection pool. In this state, the connection pool has no affect + upon the connection, including its expiration or timeout state. For the + connection pool to properly manage connections, connections should be + returned to the connection pool (i.e. ``connection.close()``) whenever the + connection is not in use. + + .. index:: + single: thread safety; Connection + + """ + + _is_future = False + _sqla_logger_namespace = "sqlalchemy.engine.Connection" + + # used by sqlalchemy.engine.util.TransactionalContext + _trans_context_manager = None + + def __init__( + self, + engine, + connection=None, + close_with_result=False, + _branch_from=None, + _execution_options=None, + _dispatch=None, + _has_events=None, + _allow_revalidate=True, + ): + """Construct a new Connection.""" + self.engine = engine + self.dialect = engine.dialect + self.__branch_from = _branch_from + + if _branch_from: + # branching is always "from" the root connection + assert _branch_from.__branch_from is None + self._dbapi_connection = connection + self._execution_options = _execution_options + self._echo = _branch_from._echo + self.should_close_with_result = False + self.dispatch = _dispatch + self._has_events = _branch_from._has_events + else: + self._dbapi_connection = ( + connection + if connection is not None + else engine.raw_connection() + ) + + self._transaction = self._nested_transaction = None + self.__savepoint_seq = 0 + self.__in_begin = False + self.should_close_with_result = close_with_result + + self.__can_reconnect = _allow_revalidate + self._echo = self.engine._should_log_info() + + if _has_events is None: + # if _has_events is sent explicitly as False, + # then don't join the dispatch of the engine; we don't + # want to handle any of the engine's events in that case. + self.dispatch = self.dispatch._join(engine.dispatch) + self._has_events = _has_events or ( + _has_events is None and engine._has_events + ) + + assert not _execution_options + self._execution_options = engine._execution_options + + if self._has_events or self.engine._has_events: + self.dispatch.engine_connect(self, _branch_from is not None) + + @util.memoized_property + def _message_formatter(self): + if "logging_token" in self._execution_options: + token = self._execution_options["logging_token"] + return lambda msg: "[%s] %s" % (token, msg) + else: + return None + + def _log_info(self, message, *arg, **kw): + fmt = self._message_formatter + + if fmt: + message = fmt(message) + + if log.STACKLEVEL: + kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET + + self.engine.logger.info(message, *arg, **kw) + + def _log_debug(self, message, *arg, **kw): + fmt = self._message_formatter + + if fmt: + message = fmt(message) + + if log.STACKLEVEL: + kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET + + self.engine.logger.debug(message, *arg, **kw) + + @property + def _schema_translate_map(self): + return self._execution_options.get("schema_translate_map", None) + + def schema_for_object(self, obj): + """Return the schema name for the given schema item taking into + account current schema translate map. + + """ + + name = obj.schema + schema_translate_map = self._execution_options.get( + "schema_translate_map", None + ) + + if ( + schema_translate_map + and name in schema_translate_map + and obj._use_schema_map + ): + return schema_translate_map[name] + else: + return name + + def _branch(self): + """Return a new Connection which references this Connection's + engine and connection; but does not have close_with_result enabled, + and also whose close() method does nothing. + + .. deprecated:: 1.4 the "branching" concept will be removed in + SQLAlchemy 2.0 as well as the "Connection.connect()" method which + is the only consumer for this. + + The Core uses this very sparingly, only in the case of + custom SQL default functions that are to be INSERTed as the + primary key of a row where we need to get the value back, so we have + to invoke it distinctly - this is a very uncommon case. + + Userland code accesses _branch() when the connect() + method is called. The branched connection + acts as much as possible like the parent, except that it stays + connected when a close() event occurs. + + """ + return self.engine._connection_cls( + self.engine, + self._dbapi_connection, + _branch_from=self.__branch_from if self.__branch_from else self, + _execution_options=self._execution_options, + _has_events=self._has_events, + _dispatch=self.dispatch, + ) + + def _generate_for_options(self): + """define connection method chaining behavior for execution_options""" + + if self._is_future: + return self + else: + c = self.__class__.__new__(self.__class__) + c.__dict__ = self.__dict__.copy() + return c + + def __enter__(self): + return self + + def __exit__(self, type_, value, traceback): + self.close() + + def execution_options(self, **opt): + r""" Set non-SQL options for the connection which take effect + during execution. + + For a "future" style connection, this method returns this same + :class:`_future.Connection` object with the new options added. + + For a legacy connection, this method returns a copy of this + :class:`_engine.Connection` which references the same underlying DBAPI + connection, but also defines the given execution options which will + take effect for a call to + :meth:`execute`. As the new :class:`_engine.Connection` references the + same underlying resource, it's usually a good idea to ensure that + the copies will be discarded immediately, which is implicit if used + as in:: + + result = connection.execution_options(stream_results=True).\ + execute(stmt) + + Note that any key/value can be passed to + :meth:`_engine.Connection.execution_options`, + and it will be stored in the + ``_execution_options`` dictionary of the :class:`_engine.Connection`. + It + is suitable for usage by end-user schemes to communicate with + event listeners, for example. + + The keywords that are currently recognized by SQLAlchemy itself + include all those listed under :meth:`.Executable.execution_options`, + as well as others that are specific to :class:`_engine.Connection`. + + :param autocommit: Available on: Connection, statement. + When True, a COMMIT will be invoked after execution + when executed in 'autocommit' mode, i.e. when an explicit + transaction is not begun on the connection. Note that this + is **library level, not DBAPI level autocommit**. The DBAPI + connection will remain in a real transaction unless the + "AUTOCOMMIT" isolation level is used. + + .. deprecated:: 1.4 The "autocommit" execution option is deprecated + and will be removed in SQLAlchemy 2.0. See + :ref:`migration_20_autocommit` for discussion. + + :param compiled_cache: Available on: Connection. + A dictionary where :class:`.Compiled` objects + will be cached when the :class:`_engine.Connection` + compiles a clause + expression into a :class:`.Compiled` object. This dictionary will + supersede the statement cache that may be configured on the + :class:`_engine.Engine` itself. If set to None, caching + is disabled, even if the engine has a configured cache size. + + Note that the ORM makes use of its own "compiled" caches for + some operations, including flush operations. The caching + used by the ORM internally supersedes a cache dictionary + specified here. + + :param logging_token: Available on: :class:`_engine.Connection`, + :class:`_engine.Engine`. + + Adds the specified string token surrounded by brackets in log + messages logged by the connection, i.e. the logging that's enabled + either via the :paramref:`_sa.create_engine.echo` flag or via the + ``logging.getLogger("sqlalchemy.engine")`` logger. This allows a + per-connection or per-sub-engine token to be available which is + useful for debugging concurrent connection scenarios. + + .. versionadded:: 1.4.0b2 + + .. seealso:: + + :ref:`dbengine_logging_tokens` - usage example + + :paramref:`_sa.create_engine.logging_name` - adds a name to the + name used by the Python logger object itself. + + :param isolation_level: Available on: :class:`_engine.Connection`. + + Set the transaction isolation level for the lifespan of this + :class:`_engine.Connection` object. + Valid values include those string + values accepted by the :paramref:`_sa.create_engine.isolation_level` + parameter passed to :func:`_sa.create_engine`. These levels are + semi-database specific; see individual dialect documentation for + valid levels. + + The isolation level option applies the isolation level by emitting + statements on the DBAPI connection, and **necessarily affects the + original Connection object overall**, not just the copy that is + returned by the call to :meth:`_engine.Connection.execution_options` + method. The isolation level will remain at the given setting until + the DBAPI connection itself is returned to the connection pool, i.e. + the :meth:`_engine.Connection.close` method on the original + :class:`_engine.Connection` is called, + where an event handler will emit + additional statements on the DBAPI connection in order to revert the + isolation level change. + + .. warning:: The ``isolation_level`` execution option should + **not** be used when a transaction is already established, that + is, the :meth:`_engine.Connection.begin` + method or similar has been + called. A database cannot change the isolation level on a + transaction in progress, and different DBAPIs and/or + SQLAlchemy dialects may implicitly roll back or commit + the transaction, or not affect the connection at all. + + .. note:: The ``isolation_level`` execution option is implicitly + reset if the :class:`_engine.Connection` is invalidated, e.g. via + the :meth:`_engine.Connection.invalidate` method, or if a + disconnection error occurs. The new connection produced after + the invalidation will not have the isolation level re-applied + to it automatically. + + .. seealso:: + + :paramref:`_sa.create_engine.isolation_level` + - set per :class:`_engine.Engine` isolation level + + :meth:`_engine.Connection.get_isolation_level` + - view current level + + :ref:`SQLite Transaction Isolation <sqlite_isolation_level>` + + :ref:`PostgreSQL Transaction Isolation <postgresql_isolation_level>` + + :ref:`MySQL Transaction Isolation <mysql_isolation_level>` + + :ref:`SQL Server Transaction Isolation <mssql_isolation_level>` + + :ref:`session_transaction_isolation` - for the ORM + + :param no_parameters: When ``True``, if the final parameter + list or dictionary is totally empty, will invoke the + statement on the cursor as ``cursor.execute(statement)``, + not passing the parameter collection at all. + Some DBAPIs such as psycopg2 and mysql-python consider + percent signs as significant only when parameters are + present; this option allows code to generate SQL + containing percent signs (and possibly other characters) + that is neutral regarding whether it's executed by the DBAPI + or piped into a script that's later invoked by + command line tools. + + :param stream_results: Available on: Connection, statement. + Indicate to the dialect that results should be + "streamed" and not pre-buffered, if possible. For backends + such as PostgreSQL, MySQL and MariaDB, this indicates the use of + a "server side cursor" as opposed to a client side cursor. + Other backends such as that of Oracle may already use server + side cursors by default. + + The usage of + :paramref:`_engine.Connection.execution_options.stream_results` is + usually combined with setting a fixed number of rows to to be fetched + in batches, to allow for efficient iteration of database rows while + at the same time not loading all result rows into memory at once; + this can be configured on a :class:`_engine.Result` object using the + :meth:`_engine.Result.yield_per` method, after execution has + returned a new :class:`_engine.Result`. If + :meth:`_engine.Result.yield_per` is not used, + the :paramref:`_engine.Connection.execution_options.stream_results` + mode of operation will instead use a dynamically sized buffer + which buffers sets of rows at a time, growing on each batch + based on a fixed growth size up until a limit which may + be configured using the + :paramref:`_engine.Connection.execution_options.max_row_buffer` + parameter. + + When using the ORM to fetch ORM mapped objects from a result, + :meth:`_engine.Result.yield_per` should always be used with + :paramref:`_engine.Connection.execution_options.stream_results`, + so that the ORM does not fetch all rows into new ORM objects at once. + + For typical use, the + :paramref:`_engine.Connection.execution_options.yield_per` execution + option should be preferred, which sets up both + :paramref:`_engine.Connection.execution_options.stream_results` and + :meth:`_engine.Result.yield_per` at once. This option is supported + both at a core level by :class:`_engine.Connection` as well as by the + ORM :class:`_engine.Session`; the latter is described at + :ref:`orm_queryguide_yield_per`. + + .. seealso:: + + :ref:`engine_stream_results` - background on + :paramref:`_engine.Connection.execution_options.stream_results` + + :paramref:`_engine.Connection.execution_options.max_row_buffer` + + :paramref:`_engine.Connection.execution_options.yield_per` + + :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` + describing the ORM version of ``yield_per`` + + :param max_row_buffer: Available on: :class:`_engine.Connection`, + :class:`_sql.Executable`. Sets a maximum + buffer size to use when the + :paramref:`_engine.Connection.execution_options.stream_results` + execution option is used on a backend that supports server side + cursors. The default value if not specified is 1000. + + .. seealso:: + + :paramref:`_engine.Connection.execution_options.stream_results` + + :ref:`engine_stream_results` + + + :param yield_per: Available on: :class:`_engine.Connection`, + :class:`_sql.Executable`. Integer value applied which will + set the :paramref:`_engine.Connection.execution_options.stream_results` + execution option and invoke :meth:`_engine.Result.yield_per` + automatically at once. Allows equivalent functionality as + is present when using this parameter with the ORM. + + .. versionadded:: 1.4.40 + + .. seealso:: + + :ref:`engine_stream_results` - background and examples + on using server side cursors with Core. + + :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` + describing the ORM version of ``yield_per`` + + :param schema_translate_map: Available on: :class:`_engine.Connection`, + :class:`_engine.Engine`, :class:`_sql.Executable`. + + :param schema_translate_map: Available on: Connection, Engine. + A dictionary mapping schema names to schema names, that will be + applied to the :paramref:`_schema.Table.schema` element of each + :class:`_schema.Table` + encountered when SQL or DDL expression elements + are compiled into strings; the resulting schema name will be + converted based on presence in the map of the original name. + + .. versionadded:: 1.1 + + .. seealso:: + + :ref:`schema_translating` + + .. seealso:: + + :meth:`_engine.Engine.execution_options` + + :meth:`.Executable.execution_options` + + :meth:`_engine.Connection.get_execution_options` + + + """ # noqa + c = self._generate_for_options() + c._execution_options = c._execution_options.union(opt) + if self._has_events or self.engine._has_events: + self.dispatch.set_connection_execution_options(c, opt) + self.dialect.set_connection_execution_options(c, opt) + return c + + def get_execution_options(self): + """Get the non-SQL options which will take effect during execution. + + .. versionadded:: 1.3 + + .. seealso:: + + :meth:`_engine.Connection.execution_options` + """ + return self._execution_options + + @property + def closed(self): + """Return True if this connection is closed.""" + + # note this is independent for a "branched" connection vs. + # the base + + return self._dbapi_connection is None and not self.__can_reconnect + + @property + def invalidated(self): + """Return True if this connection was invalidated.""" + + # prior to 1.4, "invalid" was stored as a state independent of + # "closed", meaning an invalidated connection could be "closed", + # the _dbapi_connection would be None and closed=True, yet the + # "invalid" flag would stay True. This meant that there were + # three separate states (open/valid, closed/valid, closed/invalid) + # when there is really no reason for that; a connection that's + # "closed" does not need to be "invalid". So the state is now + # represented by the two facts alone. + + if self.__branch_from: + return self.__branch_from.invalidated + + return self._dbapi_connection is None and not self.closed + + @property + def connection(self): + """The underlying DB-API connection managed by this Connection. + + This is a SQLAlchemy connection-pool proxied connection + which then has the attribute + :attr:`_pool._ConnectionFairy.dbapi_connection` that refers to the + actual driver connection. + + .. seealso:: + + + :ref:`dbapi_connections` + + """ + + if self._dbapi_connection is None: + try: + return self._revalidate_connection() + except (exc.PendingRollbackError, exc.ResourceClosedError): + raise + except BaseException as e: + self._handle_dbapi_exception(e, None, None, None, None) + else: + return self._dbapi_connection + + def get_isolation_level(self): + """Return the current isolation level assigned to this + :class:`_engine.Connection`. + + This will typically be the default isolation level as determined + by the dialect, unless if the + :paramref:`.Connection.execution_options.isolation_level` + feature has been used to alter the isolation level on a + per-:class:`_engine.Connection` basis. + + This attribute will typically perform a live SQL operation in order + to procure the current isolation level, so the value returned is the + actual level on the underlying DBAPI connection regardless of how + this state was set. Compare to the + :attr:`_engine.Connection.default_isolation_level` accessor + which returns the dialect-level setting without performing a SQL + query. + + .. versionadded:: 0.9.9 + + .. seealso:: + + :attr:`_engine.Connection.default_isolation_level` + - view default level + + :paramref:`_sa.create_engine.isolation_level` + - set per :class:`_engine.Engine` isolation level + + :paramref:`.Connection.execution_options.isolation_level` + - set per :class:`_engine.Connection` isolation level + + """ + try: + return self.dialect.get_isolation_level(self.connection) + except BaseException as e: + self._handle_dbapi_exception(e, None, None, None, None) + + @property + def default_isolation_level(self): + """The default isolation level assigned to this + :class:`_engine.Connection`. + + This is the isolation level setting that the + :class:`_engine.Connection` + has when first procured via the :meth:`_engine.Engine.connect` method. + This level stays in place until the + :paramref:`.Connection.execution_options.isolation_level` is used + to change the setting on a per-:class:`_engine.Connection` basis. + + Unlike :meth:`_engine.Connection.get_isolation_level`, + this attribute is set + ahead of time from the first connection procured by the dialect, + so SQL query is not invoked when this accessor is called. + + .. versionadded:: 0.9.9 + + .. seealso:: + + :meth:`_engine.Connection.get_isolation_level` + - view current level + + :paramref:`_sa.create_engine.isolation_level` + - set per :class:`_engine.Engine` isolation level + + :paramref:`.Connection.execution_options.isolation_level` + - set per :class:`_engine.Connection` isolation level + + """ + return self.dialect.default_isolation_level + + def _invalid_transaction(self): + if self.invalidated: + raise exc.PendingRollbackError( + "Can't reconnect until invalid %stransaction is rolled " + "back." + % ( + "savepoint " + if self._nested_transaction is not None + else "" + ), + code="8s2b", + ) + else: + assert not self._is_future + raise exc.PendingRollbackError( + "This connection is on an inactive %stransaction. " + "Please rollback() fully before proceeding." + % ( + "savepoint " + if self._nested_transaction is not None + else "" + ), + code="8s2a", + ) + + def _revalidate_connection(self): + if self.__branch_from: + return self.__branch_from._revalidate_connection() + if self.__can_reconnect and self.invalidated: + if self._transaction is not None: + self._invalid_transaction() + self._dbapi_connection = self.engine.raw_connection( + _connection=self + ) + return self._dbapi_connection + raise exc.ResourceClosedError("This Connection is closed") + + @property + def _still_open_and_dbapi_connection_is_valid(self): + return self._dbapi_connection is not None and getattr( + self._dbapi_connection, "is_valid", False + ) + + @property + def info(self): + """Info dictionary associated with the underlying DBAPI connection + referred to by this :class:`_engine.Connection`, allowing user-defined + data to be associated with the connection. + + The data here will follow along with the DBAPI connection including + after it is returned to the connection pool and used again + in subsequent instances of :class:`_engine.Connection`. + + """ + + return self.connection.info + + @util.deprecated_20(":meth:`.Connection.connect`") + def connect(self, close_with_result=False): + """Returns a branched version of this :class:`_engine.Connection`. + + The :meth:`_engine.Connection.close` method on the returned + :class:`_engine.Connection` can be called and this + :class:`_engine.Connection` will remain open. + + This method provides usage symmetry with + :meth:`_engine.Engine.connect`, including for usage + with context managers. + + """ + + return self._branch() + + def invalidate(self, exception=None): + """Invalidate the underlying DBAPI connection associated with + this :class:`_engine.Connection`. + + An attempt will be made to close the underlying DBAPI connection + immediately; however if this operation fails, the error is logged + but not raised. The connection is then discarded whether or not + close() succeeded. + + Upon the next use (where "use" typically means using the + :meth:`_engine.Connection.execute` method or similar), + this :class:`_engine.Connection` will attempt to + procure a new DBAPI connection using the services of the + :class:`_pool.Pool` as a source of connectivity (e.g. + a "reconnection"). + + If a transaction was in progress (e.g. the + :meth:`_engine.Connection.begin` method has been called) when + :meth:`_engine.Connection.invalidate` method is called, at the DBAPI + level all state associated with this transaction is lost, as + the DBAPI connection is closed. The :class:`_engine.Connection` + will not allow a reconnection to proceed until the + :class:`.Transaction` object is ended, by calling the + :meth:`.Transaction.rollback` method; until that point, any attempt at + continuing to use the :class:`_engine.Connection` will raise an + :class:`~sqlalchemy.exc.InvalidRequestError`. + This is to prevent applications from accidentally + continuing an ongoing transactional operations despite the + fact that the transaction has been lost due to an + invalidation. + + The :meth:`_engine.Connection.invalidate` method, + just like auto-invalidation, + will at the connection pool level invoke the + :meth:`_events.PoolEvents.invalidate` event. + + :param exception: an optional ``Exception`` instance that's the + reason for the invalidation. is passed along to event handlers + and logging functions. + + .. seealso:: + + :ref:`pool_connection_invalidation` + + """ + + if self.__branch_from: + return self.__branch_from.invalidate(exception=exception) + + if self.invalidated: + return + + if self.closed: + raise exc.ResourceClosedError("This Connection is closed") + + if self._still_open_and_dbapi_connection_is_valid: + self._dbapi_connection.invalidate(exception) + self._dbapi_connection = None + + def detach(self): + """Detach the underlying DB-API connection from its connection pool. + + E.g.:: + + with engine.connect() as conn: + conn.detach() + conn.execute(text("SET search_path TO schema1, schema2")) + + # work with connection + + # connection is fully closed (since we used "with:", can + # also call .close()) + + This :class:`_engine.Connection` instance will remain usable. + When closed + (or exited from a context manager context as above), + the DB-API connection will be literally closed and not + returned to its originating pool. + + This method can be used to insulate the rest of an application + from a modified state on a connection (such as a transaction + isolation level or similar). + + """ + + self._dbapi_connection.detach() + + def _autobegin(self): + self.begin() + + def begin(self): + """Begin a transaction and return a transaction handle. + + The returned object is an instance of :class:`.Transaction`. + This object represents the "scope" of the transaction, + which completes when either the :meth:`.Transaction.rollback` + or :meth:`.Transaction.commit` method is called. + + .. tip:: + + The :meth:`_engine.Connection.begin` method is invoked when using + the :meth:`_engine.Engine.begin` context manager method as well. + All documentation that refers to behaviors specific to the + :meth:`_engine.Connection.begin` method also apply to use of the + :meth:`_engine.Engine.begin` method. + + Legacy use: nested calls to :meth:`.begin` on the same + :class:`_engine.Connection` will return new :class:`.Transaction` + objects that represent an emulated transaction within the scope of the + enclosing transaction, that is:: + + trans = conn.begin() # outermost transaction + trans2 = conn.begin() # "nested" + trans2.commit() # does nothing + trans.commit() # actually commits + + Calls to :meth:`.Transaction.commit` only have an effect + when invoked via the outermost :class:`.Transaction` object, though the + :meth:`.Transaction.rollback` method of any of the + :class:`.Transaction` objects will roll back the + transaction. + + .. tip:: + + The above "nesting" behavior is a legacy behavior specific to + :term:`1.x style` use and will be removed in SQLAlchemy 2.0. For + notes on :term:`2.0 style` use, see + :meth:`_future.Connection.begin`. + + + .. seealso:: + + :meth:`_engine.Connection.begin_nested` - use a SAVEPOINT + + :meth:`_engine.Connection.begin_twophase` - + use a two phase /XID transaction + + :meth:`_engine.Engine.begin` - context manager available from + :class:`_engine.Engine` + + """ + if self._is_future: + assert not self.__branch_from + elif self.__branch_from: + return self.__branch_from.begin() + + if self.__in_begin: + # for dialects that emit SQL within the process of + # dialect.do_begin() or dialect.do_begin_twophase(), this + # flag prevents "autobegin" from being emitted within that + # process, while allowing self._transaction to remain at None + # until it's complete. + return + elif self._transaction is None: + self._transaction = RootTransaction(self) + return self._transaction + else: + if self._is_future: + raise exc.InvalidRequestError( + "This connection has already initialized a SQLAlchemy " + "Transaction() object via begin() or autobegin; can't " + "call begin() here unless rollback() or commit() " + "is called first." + ) + else: + return MarkerTransaction(self) + + def begin_nested(self): + """Begin a nested transaction (i.e. SAVEPOINT) and return a + transaction handle, assuming an outer transaction is already + established. + + Nested transactions require SAVEPOINT support in the + underlying database. Any transaction in the hierarchy may + ``commit`` and ``rollback``, however the outermost transaction + still controls the overall ``commit`` or ``rollback`` of the + transaction of a whole. + + The legacy form of :meth:`_engine.Connection.begin_nested` method has + alternate behaviors based on whether or not the + :meth:`_engine.Connection.begin` method was called previously. If + :meth:`_engine.Connection.begin` was not called, then this method will + behave the same as the :meth:`_engine.Connection.begin` method and + return a :class:`.RootTransaction` object that begins and commits a + real transaction - **no savepoint is invoked**. If + :meth:`_engine.Connection.begin` **has** been called, and a + :class:`.RootTransaction` is already established, then this method + returns an instance of :class:`.NestedTransaction` which will invoke + and manage the scope of a SAVEPOINT. + + .. tip:: + + The above mentioned behavior of + :meth:`_engine.Connection.begin_nested` is a legacy behavior + specific to :term:`1.x style` use. In :term:`2.0 style` use, the + :meth:`_future.Connection.begin_nested` method instead autobegins + the outer transaction that can be committed using + "commit-as-you-go" style; see + :meth:`_future.Connection.begin_nested` for migration details. + + .. versionchanged:: 1.4.13 The behavior of + :meth:`_engine.Connection.begin_nested` + as returning a :class:`.RootTransaction` if + :meth:`_engine.Connection.begin` were not called has been restored + as was the case in 1.3.x versions; in previous 1.4.x versions, an + outer transaction would be "autobegun" but would not be committed. + + + .. seealso:: + + :meth:`_engine.Connection.begin` + + :ref:`session_begin_nested` - ORM support for SAVEPOINT + + """ + if self._is_future: + assert not self.__branch_from + elif self.__branch_from: + return self.__branch_from.begin_nested() + + if self._transaction is None: + if not self._is_future: + util.warn_deprecated_20( + "Calling Connection.begin_nested() in 2.0 style use will " + "return a NestedTransaction (SAVEPOINT) in all cases, " + "that will not commit the outer transaction. For code " + "that is cross-compatible between 1.x and 2.0 style use, " + "ensure Connection.begin() is called before calling " + "Connection.begin_nested()." + ) + return self.begin() + else: + self._autobegin() + + return NestedTransaction(self) + + def begin_twophase(self, xid=None): + """Begin a two-phase or XA transaction and return a transaction + handle. + + The returned object is an instance of :class:`.TwoPhaseTransaction`, + which in addition to the methods provided by + :class:`.Transaction`, also provides a + :meth:`~.TwoPhaseTransaction.prepare` method. + + :param xid: the two phase transaction id. If not supplied, a + random id will be generated. + + .. seealso:: + + :meth:`_engine.Connection.begin` + + :meth:`_engine.Connection.begin_twophase` + + """ + + if self.__branch_from: + return self.__branch_from.begin_twophase(xid=xid) + + if self._transaction is not None: + raise exc.InvalidRequestError( + "Cannot start a two phase transaction when a transaction " + "is already in progress." + ) + if xid is None: + xid = self.engine.dialect.create_xid() + return TwoPhaseTransaction(self, xid) + + def recover_twophase(self): + return self.engine.dialect.do_recover_twophase(self) + + def rollback_prepared(self, xid, recover=False): + self.engine.dialect.do_rollback_twophase(self, xid, recover=recover) + + def commit_prepared(self, xid, recover=False): + self.engine.dialect.do_commit_twophase(self, xid, recover=recover) + + def in_transaction(self): + """Return True if a transaction is in progress.""" + if self.__branch_from is not None: + return self.__branch_from.in_transaction() + + return self._transaction is not None and self._transaction.is_active + + def in_nested_transaction(self): + """Return True if a transaction is in progress.""" + if self.__branch_from is not None: + return self.__branch_from.in_nested_transaction() + + return ( + self._nested_transaction is not None + and self._nested_transaction.is_active + ) + + def _is_autocommit_isolation(self): + opt_iso = self._execution_options.get("isolation_level", None) + return bool( + opt_iso == "AUTOCOMMIT" + or ( + opt_iso is None + and getattr(self.engine.dialect, "isolation_level", None) + == "AUTOCOMMIT" + ) + ) + + def get_transaction(self): + """Return the current root transaction in progress, if any. + + .. versionadded:: 1.4 + + """ + + if self.__branch_from is not None: + return self.__branch_from.get_transaction() + + return self._transaction + + def get_nested_transaction(self): + """Return the current nested transaction in progress, if any. + + .. versionadded:: 1.4 + + """ + if self.__branch_from is not None: + + return self.__branch_from.get_nested_transaction() + + return self._nested_transaction + + def _begin_impl(self, transaction): + assert not self.__branch_from + + if self._echo: + if self._is_autocommit_isolation(): + self._log_info( + "BEGIN (implicit; DBAPI should not BEGIN due to " + "autocommit mode)" + ) + else: + self._log_info("BEGIN (implicit)") + + self.__in_begin = True + + if self._has_events or self.engine._has_events: + self.dispatch.begin(self) + + try: + self.engine.dialect.do_begin(self.connection) + except BaseException as e: + self._handle_dbapi_exception(e, None, None, None, None) + finally: + self.__in_begin = False + + def _rollback_impl(self): + assert not self.__branch_from + + if self._has_events or self.engine._has_events: + self.dispatch.rollback(self) + + if self._still_open_and_dbapi_connection_is_valid: + if self._echo: + if self._is_autocommit_isolation(): + self._log_info( + "ROLLBACK using DBAPI connection.rollback(), " + "DBAPI should ignore due to autocommit mode" + ) + else: + self._log_info("ROLLBACK") + try: + self.engine.dialect.do_rollback(self.connection) + except BaseException as e: + self._handle_dbapi_exception(e, None, None, None, None) + + def _commit_impl(self, autocommit=False): + assert not self.__branch_from + + # AUTOCOMMIT isolation-level is a dialect-specific concept, however + # if a connection has this set as the isolation level, we can skip + # the "autocommit" warning as the operation will do "autocommit" + # in any case + if autocommit and not self._is_autocommit_isolation(): + util.warn_deprecated_20( + "The current statement is being autocommitted using " + "implicit autocommit, which will be removed in " + "SQLAlchemy 2.0. " + "Use the .begin() method of Engine or Connection in order to " + "use an explicit transaction for DML and DDL statements." + ) + + if self._has_events or self.engine._has_events: + self.dispatch.commit(self) + + if self._echo: + if self._is_autocommit_isolation(): + self._log_info( + "COMMIT using DBAPI connection.commit(), " + "DBAPI should ignore due to autocommit mode" + ) + else: + self._log_info("COMMIT") + try: + self.engine.dialect.do_commit(self.connection) + except BaseException as e: + self._handle_dbapi_exception(e, None, None, None, None) + + def _savepoint_impl(self, name=None): + assert not self.__branch_from + + if self._has_events or self.engine._has_events: + self.dispatch.savepoint(self, name) + + if name is None: + self.__savepoint_seq += 1 + name = "sa_savepoint_%s" % self.__savepoint_seq + if self._still_open_and_dbapi_connection_is_valid: + self.engine.dialect.do_savepoint(self, name) + return name + + def _rollback_to_savepoint_impl(self, name): + assert not self.__branch_from + + if self._has_events or self.engine._has_events: + self.dispatch.rollback_savepoint(self, name, None) + + if self._still_open_and_dbapi_connection_is_valid: + self.engine.dialect.do_rollback_to_savepoint(self, name) + + def _release_savepoint_impl(self, name): + assert not self.__branch_from + + if self._has_events or self.engine._has_events: + self.dispatch.release_savepoint(self, name, None) + + if self._still_open_and_dbapi_connection_is_valid: + self.engine.dialect.do_release_savepoint(self, name) + + def _begin_twophase_impl(self, transaction): + assert not self.__branch_from + + if self._echo: + self._log_info("BEGIN TWOPHASE (implicit)") + if self._has_events or self.engine._has_events: + self.dispatch.begin_twophase(self, transaction.xid) + + if self._still_open_and_dbapi_connection_is_valid: + self.__in_begin = True + try: + self.engine.dialect.do_begin_twophase(self, transaction.xid) + except BaseException as e: + self._handle_dbapi_exception(e, None, None, None, None) + finally: + self.__in_begin = False + + def _prepare_twophase_impl(self, xid): + assert not self.__branch_from + + if self._has_events or self.engine._has_events: + self.dispatch.prepare_twophase(self, xid) + + if self._still_open_and_dbapi_connection_is_valid: + assert isinstance(self._transaction, TwoPhaseTransaction) + try: + self.engine.dialect.do_prepare_twophase(self, xid) + except BaseException as e: + self._handle_dbapi_exception(e, None, None, None, None) + + def _rollback_twophase_impl(self, xid, is_prepared): + assert not self.__branch_from + + if self._has_events or self.engine._has_events: + self.dispatch.rollback_twophase(self, xid, is_prepared) + + if self._still_open_and_dbapi_connection_is_valid: + assert isinstance(self._transaction, TwoPhaseTransaction) + try: + self.engine.dialect.do_rollback_twophase( + self, xid, is_prepared + ) + except BaseException as e: + self._handle_dbapi_exception(e, None, None, None, None) + + def _commit_twophase_impl(self, xid, is_prepared): + assert not self.__branch_from + + if self._has_events or self.engine._has_events: + self.dispatch.commit_twophase(self, xid, is_prepared) + + if self._still_open_and_dbapi_connection_is_valid: + assert isinstance(self._transaction, TwoPhaseTransaction) + try: + self.engine.dialect.do_commit_twophase(self, xid, is_prepared) + except BaseException as e: + self._handle_dbapi_exception(e, None, None, None, None) + + def _autorollback(self): + if self.__branch_from: + self.__branch_from._autorollback() + + if not self.in_transaction(): + self._rollback_impl() + + def _warn_for_legacy_exec_format(self): + util.warn_deprecated_20( + "The connection.execute() method in " + "SQLAlchemy 2.0 will accept parameters as a single " + "dictionary or a " + "single sequence of dictionaries only. " + "Parameters passed as keyword arguments, tuples or positionally " + "oriented dictionaries and/or tuples " + "will no longer be accepted." + ) + + def close(self): + """Close this :class:`_engine.Connection`. + + This results in a release of the underlying database + resources, that is, the DBAPI connection referenced + internally. The DBAPI connection is typically restored + back to the connection-holding :class:`_pool.Pool` referenced + by the :class:`_engine.Engine` that produced this + :class:`_engine.Connection`. Any transactional state present on + the DBAPI connection is also unconditionally released via + the DBAPI connection's ``rollback()`` method, regardless + of any :class:`.Transaction` object that may be + outstanding with regards to this :class:`_engine.Connection`. + + After :meth:`_engine.Connection.close` is called, the + :class:`_engine.Connection` is permanently in a closed state, + and will allow no further operations. + + """ + + if self.__branch_from: + assert not self._is_future + util.warn_deprecated_20( + "The .close() method on a so-called 'branched' connection is " + "deprecated as of 1.4, as are 'branched' connections overall, " + "and will be removed in a future release. If this is a " + "default-handling function, don't close the connection." + ) + self._dbapi_connection = None + self.__can_reconnect = False + return + + if self._transaction: + self._transaction.close() + skip_reset = True + else: + skip_reset = False + + if self._dbapi_connection is not None: + conn = self._dbapi_connection + + # as we just closed the transaction, close the connection + # pool connection without doing an additional reset + if skip_reset: + conn._close_no_reset() + else: + conn.close() + + # There is a slight chance that conn.close() may have + # triggered an invalidation here in which case + # _dbapi_connection would already be None, however usually + # it will be non-None here and in a "closed" state. + self._dbapi_connection = None + self.__can_reconnect = False + + def scalar(self, object_, *multiparams, **params): + """Executes and returns the first column of the first row. + + The underlying result/cursor is closed after execution. + + """ + + return self.execute(object_, *multiparams, **params).scalar() + + def scalars(self, object_, *multiparams, **params): + """Executes and returns a scalar result set, which yields scalar values + from the first column of each row. + + This method is equivalent to calling :meth:`_engine.Connection.execute` + to receive a :class:`_result.Result` object, then invoking the + :meth:`_result.Result.scalars` method to produce a + :class:`_result.ScalarResult` instance. + + :return: a :class:`_result.ScalarResult` + + .. versionadded:: 1.4.24 + + """ + + return self.execute(object_, *multiparams, **params).scalars() + + def execute(self, statement, *multiparams, **params): + r"""Executes a SQL statement construct and returns a + :class:`_engine.CursorResult`. + + :param statement: The statement to be executed. May be + one of: + + * a plain string (deprecated) + * any :class:`_expression.ClauseElement` construct that is also + a subclass of :class:`.Executable`, such as a + :func:`_expression.select` construct + * a :class:`.FunctionElement`, such as that generated + by :data:`.func`, will be automatically wrapped in + a SELECT statement, which is then executed. + * a :class:`.DDLElement` object + * a :class:`.DefaultGenerator` object + * a :class:`.Compiled` object + + .. deprecated:: 2.0 passing a string to + :meth:`_engine.Connection.execute` is + deprecated and will be removed in version 2.0. Use the + :func:`_expression.text` construct with + :meth:`_engine.Connection.execute`, or the + :meth:`_engine.Connection.exec_driver_sql` + method to invoke a driver-level + SQL string. + + :param \*multiparams/\**params: represent bound parameter + values to be used in the execution. Typically, + the format is either a collection of one or more + dictionaries passed to \*multiparams:: + + conn.execute( + table.insert(), + {"id":1, "value":"v1"}, + {"id":2, "value":"v2"} + ) + + ...or individual key/values interpreted by \**params:: + + conn.execute( + table.insert(), id=1, value="v1" + ) + + In the case that a plain SQL string is passed, and the underlying + DBAPI accepts positional bind parameters, a collection of tuples + or individual values in \*multiparams may be passed:: + + conn.execute( + "INSERT INTO table (id, value) VALUES (?, ?)", + (1, "v1"), (2, "v2") + ) + + conn.execute( + "INSERT INTO table (id, value) VALUES (?, ?)", + 1, "v1" + ) + + Note above, the usage of a question mark "?" or other + symbol is contingent upon the "paramstyle" accepted by the DBAPI + in use, which may be any of "qmark", "named", "pyformat", "format", + "numeric". See `pep-249 + <https://www.python.org/dev/peps/pep-0249/>`_ for details on + paramstyle. + + To execute a textual SQL statement which uses bound parameters in a + DBAPI-agnostic way, use the :func:`_expression.text` construct. + + .. deprecated:: 2.0 use of tuple or scalar positional parameters + is deprecated. All params should be dicts or sequences of dicts. + Use :meth:`.exec_driver_sql` to execute a plain string with + tuple or scalar positional parameters. + + """ + + if isinstance(statement, util.string_types): + util.warn_deprecated_20( + "Passing a string to Connection.execute() is " + "deprecated and will be removed in version 2.0. Use the " + "text() construct, " + "or the Connection.exec_driver_sql() method to invoke a " + "driver-level SQL string." + ) + + return self._exec_driver_sql( + statement, + multiparams, + params, + _EMPTY_EXECUTION_OPTS, + future=False, + ) + + try: + meth = statement._execute_on_connection + except AttributeError as err: + util.raise_( + exc.ObjectNotExecutableError(statement), replace_context=err + ) + else: + return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS) + + def _execute_function(self, func, multiparams, params, execution_options): + """Execute a sql.FunctionElement object.""" + + return self._execute_clauseelement( + func.select(), multiparams, params, execution_options + ) + + def _execute_default( + self, + default, + multiparams, + params, + # migrate is calling this directly :( + execution_options=_EMPTY_EXECUTION_OPTS, + ): + """Execute a schema.ColumnDefault object.""" + + execution_options = self._execution_options.merge_with( + execution_options + ) + + distilled_parameters = _distill_params(self, multiparams, params) + + if self._has_events or self.engine._has_events: + ( + default, + distilled_params, + event_multiparams, + event_params, + ) = self._invoke_before_exec_event( + default, distilled_parameters, execution_options + ) + + try: + conn = self._dbapi_connection + if conn is None: + conn = self._revalidate_connection() + + dialect = self.dialect + ctx = dialect.execution_ctx_cls._init_default( + dialect, self, conn, execution_options + ) + except (exc.PendingRollbackError, exc.ResourceClosedError): + raise + except BaseException as e: + self._handle_dbapi_exception(e, None, None, None, None) + + ret = ctx._exec_default(None, default, None) + if self.should_close_with_result: + self.close() + + if self._has_events or self.engine._has_events: + self.dispatch.after_execute( + self, + default, + event_multiparams, + event_params, + execution_options, + ret, + ) + + return ret + + def _execute_ddl(self, ddl, multiparams, params, execution_options): + """Execute a schema.DDL object.""" + + execution_options = ddl._execution_options.merge_with( + self._execution_options, execution_options + ) + + distilled_parameters = _distill_params(self, multiparams, params) + + if self._has_events or self.engine._has_events: + ( + ddl, + distilled_params, + event_multiparams, + event_params, + ) = self._invoke_before_exec_event( + ddl, distilled_parameters, execution_options + ) + + exec_opts = self._execution_options.merge_with(execution_options) + schema_translate_map = exec_opts.get("schema_translate_map", None) + + dialect = self.dialect + + compiled = ddl.compile( + dialect=dialect, schema_translate_map=schema_translate_map + ) + ret = self._execute_context( + dialect, + dialect.execution_ctx_cls._init_ddl, + compiled, + None, + execution_options, + compiled, + ) + if self._has_events or self.engine._has_events: + self.dispatch.after_execute( + self, + ddl, + event_multiparams, + event_params, + execution_options, + ret, + ) + return ret + + def _invoke_before_exec_event( + self, elem, distilled_params, execution_options + ): + + if len(distilled_params) == 1: + event_multiparams, event_params = [], distilled_params[0] + else: + event_multiparams, event_params = distilled_params, {} + + for fn in self.dispatch.before_execute: + elem, event_multiparams, event_params = fn( + self, + elem, + event_multiparams, + event_params, + execution_options, + ) + + if event_multiparams: + distilled_params = list(event_multiparams) + if event_params: + raise exc.InvalidRequestError( + "Event handler can't return non-empty multiparams " + "and params at the same time" + ) + elif event_params: + distilled_params = [event_params] + else: + distilled_params = [] + + return elem, distilled_params, event_multiparams, event_params + + def _execute_clauseelement( + self, elem, multiparams, params, execution_options + ): + """Execute a sql.ClauseElement object.""" + + execution_options = elem._execution_options.merge_with( + self._execution_options, execution_options + ) + + distilled_params = _distill_params(self, multiparams, params) + + has_events = self._has_events or self.engine._has_events + if has_events: + ( + elem, + distilled_params, + event_multiparams, + event_params, + ) = self._invoke_before_exec_event( + elem, distilled_params, execution_options + ) + + if distilled_params: + # ensure we don't retain a link to the view object for keys() + # which links to the values, which we don't want to cache + keys = sorted(distilled_params[0]) + for_executemany = len(distilled_params) > 1 + else: + keys = [] + for_executemany = False + + dialect = self.dialect + + schema_translate_map = execution_options.get( + "schema_translate_map", None + ) + + compiled_cache = execution_options.get( + "compiled_cache", self.engine._compiled_cache + ) + + compiled_sql, extracted_params, cache_hit = elem._compile_w_cache( + dialect=dialect, + compiled_cache=compiled_cache, + column_keys=keys, + for_executemany=for_executemany, + schema_translate_map=schema_translate_map, + linting=self.dialect.compiler_linting | compiler.WARN_LINTING, + ) + ret = self._execute_context( + dialect, + dialect.execution_ctx_cls._init_compiled, + compiled_sql, + distilled_params, + execution_options, + compiled_sql, + distilled_params, + elem, + extracted_params, + cache_hit=cache_hit, + ) + if has_events: + self.dispatch.after_execute( + self, + elem, + event_multiparams, + event_params, + execution_options, + ret, + ) + return ret + + def _execute_compiled( + self, + compiled, + multiparams, + params, + execution_options=_EMPTY_EXECUTION_OPTS, + ): + """Execute a sql.Compiled object. + + TODO: why do we have this? likely deprecate or remove + + """ + + execution_options = compiled.execution_options.merge_with( + self._execution_options, execution_options + ) + distilled_parameters = _distill_params(self, multiparams, params) + + if self._has_events or self.engine._has_events: + ( + compiled, + distilled_params, + event_multiparams, + event_params, + ) = self._invoke_before_exec_event( + compiled, distilled_parameters, execution_options + ) + + dialect = self.dialect + + ret = self._execute_context( + dialect, + dialect.execution_ctx_cls._init_compiled, + compiled, + distilled_parameters, + execution_options, + compiled, + distilled_parameters, + None, + None, + ) + if self._has_events or self.engine._has_events: + self.dispatch.after_execute( + self, + compiled, + event_multiparams, + event_params, + execution_options, + ret, + ) + return ret + + def _exec_driver_sql( + self, statement, multiparams, params, execution_options, future + ): + + execution_options = self._execution_options.merge_with( + execution_options + ) + + distilled_parameters = _distill_params(self, multiparams, params) + + if not future: + if self._has_events or self.engine._has_events: + ( + statement, + distilled_params, + event_multiparams, + event_params, + ) = self._invoke_before_exec_event( + statement, distilled_parameters, execution_options + ) + + dialect = self.dialect + ret = self._execute_context( + dialect, + dialect.execution_ctx_cls._init_statement, + statement, + distilled_parameters, + execution_options, + statement, + distilled_parameters, + ) + + if not future: + if self._has_events or self.engine._has_events: + self.dispatch.after_execute( + self, + statement, + event_multiparams, + event_params, + execution_options, + ret, + ) + return ret + + def _execute_20( + self, + statement, + parameters=None, + execution_options=_EMPTY_EXECUTION_OPTS, + ): + args_10style, kwargs_10style = _distill_params_20(parameters) + try: + meth = statement._execute_on_connection + except AttributeError as err: + util.raise_( + exc.ObjectNotExecutableError(statement), replace_context=err + ) + else: + return meth(self, args_10style, kwargs_10style, execution_options) + + def exec_driver_sql( + self, statement, parameters=None, execution_options=None + ): + r"""Executes a SQL statement construct and returns a + :class:`_engine.CursorResult`. + + :param statement: The statement str to be executed. Bound parameters + must use the underlying DBAPI's paramstyle, such as "qmark", + "pyformat", "format", etc. + + :param parameters: represent bound parameter values to be used in the + execution. The format is one of: a dictionary of named parameters, + a tuple of positional parameters, or a list containing either + dictionaries or tuples for multiple-execute support. + + E.g. multiple dictionaries:: + + + conn.exec_driver_sql( + "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)", + [{"id":1, "value":"v1"}, {"id":2, "value":"v2"}] + ) + + Single dictionary:: + + conn.exec_driver_sql( + "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)", + dict(id=1, value="v1") + ) + + Single tuple:: + + conn.exec_driver_sql( + "INSERT INTO table (id, value) VALUES (?, ?)", + (1, 'v1') + ) + + .. note:: The :meth:`_engine.Connection.exec_driver_sql` method does + not participate in the + :meth:`_events.ConnectionEvents.before_execute` and + :meth:`_events.ConnectionEvents.after_execute` events. To + intercept calls to :meth:`_engine.Connection.exec_driver_sql`, use + :meth:`_events.ConnectionEvents.before_cursor_execute` and + :meth:`_events.ConnectionEvents.after_cursor_execute`. + + .. seealso:: + + :pep:`249` + + """ + + args_10style, kwargs_10style = _distill_params_20(parameters) + + return self._exec_driver_sql( + statement, + args_10style, + kwargs_10style, + execution_options, + future=True, + ) + + def _execute_context( + self, + dialect, + constructor, + statement, + parameters, + execution_options, + *args, + **kw + ): + """Create an :class:`.ExecutionContext` and execute, returning + a :class:`_engine.CursorResult`.""" + + branched = self + if self.__branch_from: + # if this is a "branched" connection, do everything in terms + # of the "root" connection, *except* for .close(), which is + # the only feature that branching provides + self = self.__branch_from + + if execution_options: + yp = execution_options.get("yield_per", None) + if yp: + execution_options = execution_options.union( + {"stream_results": True, "max_row_buffer": yp} + ) + + try: + conn = self._dbapi_connection + if conn is None: + conn = self._revalidate_connection() + + context = constructor( + dialect, self, conn, execution_options, *args, **kw + ) + except (exc.PendingRollbackError, exc.ResourceClosedError): + raise + except BaseException as e: + self._handle_dbapi_exception( + e, util.text_type(statement), parameters, None, None + ) + + if ( + self._transaction + and not self._transaction.is_active + or ( + self._nested_transaction + and not self._nested_transaction.is_active + ) + ): + self._invalid_transaction() + + elif self._trans_context_manager: + TransactionalContext._trans_ctx_check(self) + + if self._is_future and self._transaction is None: + self._autobegin() + + context.pre_exec() + + if dialect.use_setinputsizes: + context._set_input_sizes() + + cursor, statement, parameters = ( + context.cursor, + context.statement, + context.parameters, + ) + + if not context.executemany: + parameters = parameters[0] + + if self._has_events or self.engine._has_events: + for fn in self.dispatch.before_cursor_execute: + statement, parameters = fn( + self, + cursor, + statement, + parameters, + context, + context.executemany, + ) + + if self._echo: + + self._log_info(statement) + + stats = context._get_cache_stats() + + if not self.engine.hide_parameters: + self._log_info( + "[%s] %r", + stats, + sql_util._repr_params( + parameters, batches=10, ismulti=context.executemany + ), + ) + else: + self._log_info( + "[%s] [SQL parameters hidden due to hide_parameters=True]" + % (stats,) + ) + + evt_handled = False + try: + if context.executemany: + if self.dialect._has_events: + for fn in self.dialect.dispatch.do_executemany: + if fn(cursor, statement, parameters, context): + evt_handled = True + break + if not evt_handled: + self.dialect.do_executemany( + cursor, statement, parameters, context + ) + elif not parameters and context.no_parameters: + if self.dialect._has_events: + for fn in self.dialect.dispatch.do_execute_no_params: + if fn(cursor, statement, context): + evt_handled = True + break + if not evt_handled: + self.dialect.do_execute_no_params( + cursor, statement, context + ) + else: + if self.dialect._has_events: + for fn in self.dialect.dispatch.do_execute: + if fn(cursor, statement, parameters, context): + evt_handled = True + break + if not evt_handled: + self.dialect.do_execute( + cursor, statement, parameters, context + ) + + if self._has_events or self.engine._has_events: + self.dispatch.after_cursor_execute( + self, + cursor, + statement, + parameters, + context, + context.executemany, + ) + + context.post_exec() + + result = context._setup_result_proxy() + + if not self._is_future: + should_close_with_result = branched.should_close_with_result + + if not result._soft_closed and should_close_with_result: + result._autoclose_connection = True + + if ( + # usually we're in a transaction so avoid relatively + # expensive / legacy should_autocommit call + self._transaction is None + and context.should_autocommit + ): + self._commit_impl(autocommit=True) + + # for "connectionless" execution, we have to close this + # Connection after the statement is complete. + # legacy stuff. + if should_close_with_result and context._soft_closed: + assert not self._is_future + + # CursorResult already exhausted rows / has no rows. + # close us now + branched.close() + + except BaseException as e: + self._handle_dbapi_exception( + e, statement, parameters, cursor, context + ) + + return result + + def _cursor_execute(self, cursor, statement, parameters, context=None): + """Execute a statement + params on the given cursor. + + Adds appropriate logging and exception handling. + + This method is used by DefaultDialect for special-case + executions, such as for sequences and column defaults. + The path of statement execution in the majority of cases + terminates at _execute_context(). + + """ + if self._has_events or self.engine._has_events: + for fn in self.dispatch.before_cursor_execute: + statement, parameters = fn( + self, cursor, statement, parameters, context, False + ) + + if self._echo: + self._log_info(statement) + self._log_info("[raw sql] %r", parameters) + try: + for fn in ( + () + if not self.dialect._has_events + else self.dialect.dispatch.do_execute + ): + if fn(cursor, statement, parameters, context): + break + else: + self.dialect.do_execute(cursor, statement, parameters, context) + except BaseException as e: + self._handle_dbapi_exception( + e, statement, parameters, cursor, context + ) + + if self._has_events or self.engine._has_events: + self.dispatch.after_cursor_execute( + self, cursor, statement, parameters, context, False + ) + + def _safe_close_cursor(self, cursor): + """Close the given cursor, catching exceptions + and turning into log warnings. + + """ + try: + cursor.close() + except Exception: + # log the error through the connection pool's logger. + self.engine.pool.logger.error( + "Error closing cursor", exc_info=True + ) + + _reentrant_error = False + _is_disconnect = False + + def _handle_dbapi_exception( + self, e, statement, parameters, cursor, context + ): + exc_info = sys.exc_info() + + is_exit_exception = util.is_exit_exception(e) + + if not self._is_disconnect: + self._is_disconnect = ( + isinstance(e, self.dialect.dbapi.Error) + and not self.closed + and self.dialect.is_disconnect( + e, + self._dbapi_connection if not self.invalidated else None, + cursor, + ) + ) or (is_exit_exception and not self.closed) + + invalidate_pool_on_disconnect = not is_exit_exception + + if self._reentrant_error: + util.raise_( + exc.DBAPIError.instance( + statement, + parameters, + e, + self.dialect.dbapi.Error, + hide_parameters=self.engine.hide_parameters, + dialect=self.dialect, + ismulti=context.executemany + if context is not None + else None, + ), + with_traceback=exc_info[2], + from_=e, + ) + self._reentrant_error = True + try: + # non-DBAPI error - if we already got a context, + # or there's no string statement, don't wrap it + should_wrap = isinstance(e, self.dialect.dbapi.Error) or ( + statement is not None + and context is None + and not is_exit_exception + ) + + if should_wrap: + sqlalchemy_exception = exc.DBAPIError.instance( + statement, + parameters, + e, + self.dialect.dbapi.Error, + hide_parameters=self.engine.hide_parameters, + connection_invalidated=self._is_disconnect, + dialect=self.dialect, + ismulti=context.executemany + if context is not None + else None, + ) + else: + sqlalchemy_exception = None + + newraise = None + + if ( + self._has_events or self.engine._has_events + ) and not self._execution_options.get( + "skip_user_error_events", False + ): + ctx = ExceptionContextImpl( + e, + sqlalchemy_exception, + self.engine, + self, + cursor, + statement, + parameters, + context, + self._is_disconnect, + invalidate_pool_on_disconnect, + ) + + for fn in self.dispatch.handle_error: + try: + # handler returns an exception; + # call next handler in a chain + per_fn = fn(ctx) + if per_fn is not None: + ctx.chained_exception = newraise = per_fn + except Exception as _raised: + # handler raises an exception - stop processing + newraise = _raised + break + + if self._is_disconnect != ctx.is_disconnect: + self._is_disconnect = ctx.is_disconnect + if sqlalchemy_exception: + sqlalchemy_exception.connection_invalidated = ( + ctx.is_disconnect + ) + + # set up potentially user-defined value for + # invalidate pool. + invalidate_pool_on_disconnect = ( + ctx.invalidate_pool_on_disconnect + ) + + if should_wrap and context: + context.handle_dbapi_exception(e) + + if not self._is_disconnect: + if cursor: + self._safe_close_cursor(cursor) + with util.safe_reraise(warn_only=True): + self._autorollback() + + if newraise: + util.raise_(newraise, with_traceback=exc_info[2], from_=e) + elif should_wrap: + util.raise_( + sqlalchemy_exception, with_traceback=exc_info[2], from_=e + ) + else: + util.raise_(exc_info[1], with_traceback=exc_info[2]) + + finally: + del self._reentrant_error + if self._is_disconnect: + del self._is_disconnect + if not self.invalidated: + dbapi_conn_wrapper = self._dbapi_connection + if invalidate_pool_on_disconnect: + self.engine.pool._invalidate(dbapi_conn_wrapper, e) + self.invalidate(e) + if self.should_close_with_result: + assert not self._is_future + self.close() + + @classmethod + def _handle_dbapi_exception_noconnection(cls, e, dialect, engine): + exc_info = sys.exc_info() + + is_disconnect = dialect.is_disconnect(e, None, None) + + should_wrap = isinstance(e, dialect.dbapi.Error) + + if should_wrap: + sqlalchemy_exception = exc.DBAPIError.instance( + None, + None, + e, + dialect.dbapi.Error, + hide_parameters=engine.hide_parameters, + connection_invalidated=is_disconnect, + ) + else: + sqlalchemy_exception = None + + newraise = None + + if engine._has_events: + ctx = ExceptionContextImpl( + e, + sqlalchemy_exception, + engine, + None, + None, + None, + None, + None, + is_disconnect, + True, + ) + for fn in engine.dispatch.handle_error: + try: + # handler returns an exception; + # call next handler in a chain + per_fn = fn(ctx) + if per_fn is not None: + ctx.chained_exception = newraise = per_fn + except Exception as _raised: + # handler raises an exception - stop processing + newraise = _raised + break + + if sqlalchemy_exception and is_disconnect != ctx.is_disconnect: + sqlalchemy_exception.connection_invalidated = ( + is_disconnect + ) = ctx.is_disconnect + + if newraise: + util.raise_(newraise, with_traceback=exc_info[2], from_=e) + elif should_wrap: + util.raise_( + sqlalchemy_exception, with_traceback=exc_info[2], from_=e + ) + else: + util.raise_(exc_info[1], with_traceback=exc_info[2]) + + def _run_ddl_visitor(self, visitorcallable, element, **kwargs): + """run a DDL visitor. + + This method is only here so that the MockConnection can change the + options given to the visitor so that "checkfirst" is skipped. + + """ + visitorcallable(self.dialect, self, **kwargs).traverse_single(element) + + @util.deprecated( + "1.4", + "The :meth:`_engine.Connection.transaction` " + "method is deprecated and will be " + "removed in a future release. Use the :meth:`_engine.Engine.begin` " + "context manager instead.", + ) + def transaction(self, callable_, *args, **kwargs): + r"""Execute the given function within a transaction boundary. + + The function is passed this :class:`_engine.Connection` + as the first argument, followed by the given \*args and \**kwargs, + e.g.:: + + def do_something(conn, x, y): + conn.execute(text("some statement"), {'x':x, 'y':y}) + + conn.transaction(do_something, 5, 10) + + The operations inside the function are all invoked within the + context of a single :class:`.Transaction`. + Upon success, the transaction is committed. If an + exception is raised, the transaction is rolled back + before propagating the exception. + + .. note:: + + The :meth:`.transaction` method is superseded by + the usage of the Python ``with:`` statement, which can + be used with :meth:`_engine.Connection.begin`:: + + with conn.begin(): + conn.execute(text("some statement"), {'x':5, 'y':10}) + + As well as with :meth:`_engine.Engine.begin`:: + + with engine.begin() as conn: + conn.execute(text("some statement"), {'x':5, 'y':10}) + + .. seealso:: + + :meth:`_engine.Engine.begin` - engine-level transactional + context + + :meth:`_engine.Engine.transaction` - engine-level version of + :meth:`_engine.Connection.transaction` + + """ + + kwargs["_sa_skip_warning"] = True + trans = self.begin() + try: + ret = self.run_callable(callable_, *args, **kwargs) + trans.commit() + return ret + except: + with util.safe_reraise(): + trans.rollback() + + @util.deprecated( + "1.4", + "The :meth:`_engine.Connection.run_callable` " + "method is deprecated and will " + "be removed in a future release. Invoke the callable function " + "directly, passing the Connection.", + ) + def run_callable(self, callable_, *args, **kwargs): + r"""Given a callable object or function, execute it, passing + a :class:`_engine.Connection` as the first argument. + + The given \*args and \**kwargs are passed subsequent + to the :class:`_engine.Connection` argument. + + This function, along with :meth:`_engine.Engine.run_callable`, + allows a function to be run with a :class:`_engine.Connection` + or :class:`_engine.Engine` object without the need to know + which one is being dealt with. + + """ + return callable_(self, *args, **kwargs) + + +class ExceptionContextImpl(ExceptionContext): + """Implement the :class:`.ExceptionContext` interface.""" + + def __init__( + self, + exception, + sqlalchemy_exception, + engine, + connection, + cursor, + statement, + parameters, + context, + is_disconnect, + invalidate_pool_on_disconnect, + ): + self.engine = engine + self.connection = connection + self.sqlalchemy_exception = sqlalchemy_exception + self.original_exception = exception + self.execution_context = context + self.statement = statement + self.parameters = parameters + self.is_disconnect = is_disconnect + self.invalidate_pool_on_disconnect = invalidate_pool_on_disconnect + + +class Transaction(TransactionalContext): + """Represent a database transaction in progress. + + The :class:`.Transaction` object is procured by + calling the :meth:`_engine.Connection.begin` method of + :class:`_engine.Connection`:: + + from sqlalchemy import create_engine + engine = create_engine("postgresql://scott:tiger@localhost/test") + connection = engine.connect() + trans = connection.begin() + connection.execute(text("insert into x (a, b) values (1, 2)")) + trans.commit() + + The object provides :meth:`.rollback` and :meth:`.commit` + methods in order to control transaction boundaries. It + also implements a context manager interface so that + the Python ``with`` statement can be used with the + :meth:`_engine.Connection.begin` method:: + + with connection.begin(): + connection.execute(text("insert into x (a, b) values (1, 2)")) + + The Transaction object is **not** threadsafe. + + .. seealso:: + + :meth:`_engine.Connection.begin` + + :meth:`_engine.Connection.begin_twophase` + + :meth:`_engine.Connection.begin_nested` + + .. index:: + single: thread safety; Transaction + """ + + __slots__ = () + + _is_root = False + + def __init__(self, connection): + raise NotImplementedError() + + def _do_deactivate(self): + """do whatever steps are necessary to set this transaction as + "deactive", however leave this transaction object in place as far + as the connection's state. + + for a "real" transaction this should roll back the transaction + and ensure this transaction is no longer a reset agent. + + this is used for nesting of marker transactions where the marker + can set the "real" transaction as rolled back, however it stays + in place. + + for 2.0 we hope to remove this nesting feature. + + """ + raise NotImplementedError() + + @property + def _deactivated_from_connection(self): + """True if this transaction is totally deactivated from the connection + and therefore can no longer affect its state. + + """ + raise NotImplementedError() + + def _do_close(self): + raise NotImplementedError() + + def _do_rollback(self): + raise NotImplementedError() + + def _do_commit(self): + raise NotImplementedError() + + @property + def is_valid(self): + return self.is_active and not self.connection.invalidated + + def close(self): + """Close this :class:`.Transaction`. + + If this transaction is the base transaction in a begin/commit + nesting, the transaction will rollback(). Otherwise, the + method returns. + + This is used to cancel a Transaction without affecting the scope of + an enclosing transaction. + + """ + try: + self._do_close() + finally: + assert not self.is_active + + def rollback(self): + """Roll back this :class:`.Transaction`. + + The implementation of this may vary based on the type of transaction in + use: + + * For a simple database transaction (e.g. :class:`.RootTransaction`), + it corresponds to a ROLLBACK. + + * For a :class:`.NestedTransaction`, it corresponds to a + "ROLLBACK TO SAVEPOINT" operation. + + * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two + phase transactions may be used. + + + """ + try: + self._do_rollback() + finally: + assert not self.is_active + + def commit(self): + """Commit this :class:`.Transaction`. + + The implementation of this may vary based on the type of transaction in + use: + + * For a simple database transaction (e.g. :class:`.RootTransaction`), + it corresponds to a COMMIT. + + * For a :class:`.NestedTransaction`, it corresponds to a + "RELEASE SAVEPOINT" operation. + + * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two + phase transactions may be used. + + """ + try: + self._do_commit() + finally: + assert not self.is_active + + def _get_subject(self): + return self.connection + + def _transaction_is_active(self): + return self.is_active + + def _transaction_is_closed(self): + return not self._deactivated_from_connection + + def _rollback_can_be_called(self): + # for RootTransaction / NestedTransaction, it's safe to call + # rollback() even if the transaction is deactive and no warnings + # will be emitted. tested in + # test_transaction.py -> test_no_rollback_in_deactive(?:_savepoint)? + return True + + +class MarkerTransaction(Transaction): + """A 'marker' transaction that is used for nested begin() calls. + + .. deprecated:: 1.4 future connection for 2.0 won't support this pattern. + + """ + + __slots__ = ("connection", "_is_active", "_transaction") + + def __init__(self, connection): + assert connection._transaction is not None + if not connection._transaction.is_active: + raise exc.InvalidRequestError( + "the current transaction on this connection is inactive. " + "Please issue a rollback first." + ) + + assert not connection._is_future + util.warn_deprecated_20( + "Calling .begin() when a transaction is already begun, creating " + "a 'sub' transaction, is deprecated " + "and will be removed in 2.0. See the documentation section " + "'Migrating from the nesting pattern' for background on how " + "to migrate from this pattern." + ) + + self.connection = connection + + if connection._trans_context_manager: + TransactionalContext._trans_ctx_check(connection) + + if connection._nested_transaction is not None: + self._transaction = connection._nested_transaction + else: + self._transaction = connection._transaction + self._is_active = True + + @property + def _deactivated_from_connection(self): + return not self.is_active + + @property + def is_active(self): + return self._is_active and self._transaction.is_active + + def _deactivate(self): + self._is_active = False + + def _do_close(self): + # does not actually roll back the root + self._deactivate() + + def _do_rollback(self): + # does roll back the root + if self._is_active: + try: + self._transaction._do_deactivate() + finally: + self._deactivate() + + def _do_commit(self): + self._deactivate() + + +class RootTransaction(Transaction): + """Represent the "root" transaction on a :class:`_engine.Connection`. + + This corresponds to the current "BEGIN/COMMIT/ROLLBACK" that's occurring + for the :class:`_engine.Connection`. The :class:`_engine.RootTransaction` + is created by calling upon the :meth:`_engine.Connection.begin` method, and + remains associated with the :class:`_engine.Connection` throughout its + active span. The current :class:`_engine.RootTransaction` in use is + accessible via the :attr:`_engine.Connection.get_transaction` method of + :class:`_engine.Connection`. + + In :term:`2.0 style` use, the :class:`_future.Connection` also employs + "autobegin" behavior that will create a new + :class:`_engine.RootTransaction` whenever a connection in a + non-transactional state is used to emit commands on the DBAPI connection. + The scope of the :class:`_engine.RootTransaction` in 2.0 style + use can be controlled using the :meth:`_future.Connection.commit` and + :meth:`_future.Connection.rollback` methods. + + + """ + + _is_root = True + + __slots__ = ("connection", "is_active") + + def __init__(self, connection): + assert connection._transaction is None + if connection._trans_context_manager: + TransactionalContext._trans_ctx_check(connection) + self.connection = connection + self._connection_begin_impl() + connection._transaction = self + + self.is_active = True + + def _deactivate_from_connection(self): + if self.is_active: + assert self.connection._transaction is self + self.is_active = False + + elif self.connection._transaction is not self: + util.warn("transaction already deassociated from connection") + + @property + def _deactivated_from_connection(self): + return self.connection._transaction is not self + + def _do_deactivate(self): + # called from a MarkerTransaction to cancel this root transaction. + # the transaction stays in place as connection._transaction, but + # is no longer active and is no longer the reset agent for the + # pooled connection. the connection won't support a new begin() + # until this transaction is explicitly closed, rolled back, + # or committed. + + assert self.connection._transaction is self + + if self.is_active: + self._connection_rollback_impl() + + # handle case where a savepoint was created inside of a marker + # transaction that refers to a root. nested has to be cancelled + # also. + if self.connection._nested_transaction: + self.connection._nested_transaction._cancel() + + self._deactivate_from_connection() + + def _connection_begin_impl(self): + self.connection._begin_impl(self) + + def _connection_rollback_impl(self): + self.connection._rollback_impl() + + def _connection_commit_impl(self): + self.connection._commit_impl() + + def _close_impl(self, try_deactivate=False): + try: + if self.is_active: + self._connection_rollback_impl() + + if self.connection._nested_transaction: + self.connection._nested_transaction._cancel() + finally: + if self.is_active or try_deactivate: + self._deactivate_from_connection() + if self.connection._transaction is self: + self.connection._transaction = None + + assert not self.is_active + assert self.connection._transaction is not self + + def _do_close(self): + self._close_impl() + + def _do_rollback(self): + self._close_impl(try_deactivate=True) + + def _do_commit(self): + if self.is_active: + assert self.connection._transaction is self + + try: + self._connection_commit_impl() + finally: + # whether or not commit succeeds, cancel any + # nested transactions, make this transaction "inactive" + # and remove it as a reset agent + if self.connection._nested_transaction: + self.connection._nested_transaction._cancel() + + self._deactivate_from_connection() + + # ...however only remove as the connection's current transaction + # if commit succeeded. otherwise it stays on so that a rollback + # needs to occur. + self.connection._transaction = None + else: + if self.connection._transaction is self: + self.connection._invalid_transaction() + else: + raise exc.InvalidRequestError("This transaction is inactive") + + assert not self.is_active + assert self.connection._transaction is not self + + +class NestedTransaction(Transaction): + """Represent a 'nested', or SAVEPOINT transaction. + + The :class:`.NestedTransaction` object is created by calling the + :meth:`_engine.Connection.begin_nested` method of + :class:`_engine.Connection`. + + When using :class:`.NestedTransaction`, the semantics of "begin" / + "commit" / "rollback" are as follows: + + * the "begin" operation corresponds to the "BEGIN SAVEPOINT" command, where + the savepoint is given an explicit name that is part of the state + of this object. + + * The :meth:`.NestedTransaction.commit` method corresponds to a + "RELEASE SAVEPOINT" operation, using the savepoint identifier associated + with this :class:`.NestedTransaction`. + + * The :meth:`.NestedTransaction.rollback` method corresponds to a + "ROLLBACK TO SAVEPOINT" operation, using the savepoint identifier + associated with this :class:`.NestedTransaction`. + + The rationale for mimicking the semantics of an outer transaction in + terms of savepoints so that code may deal with a "savepoint" transaction + and an "outer" transaction in an agnostic way. + + .. seealso:: + + :ref:`session_begin_nested` - ORM version of the SAVEPOINT API. + + """ + + __slots__ = ("connection", "is_active", "_savepoint", "_previous_nested") + + def __init__(self, connection): + assert connection._transaction is not None + if connection._trans_context_manager: + TransactionalContext._trans_ctx_check(connection) + self.connection = connection + self._savepoint = self.connection._savepoint_impl() + self.is_active = True + self._previous_nested = connection._nested_transaction + connection._nested_transaction = self + + def _deactivate_from_connection(self, warn=True): + if self.connection._nested_transaction is self: + self.connection._nested_transaction = self._previous_nested + elif warn: + util.warn( + "nested transaction already deassociated from connection" + ) + + @property + def _deactivated_from_connection(self): + return self.connection._nested_transaction is not self + + def _cancel(self): + # called by RootTransaction when the outer transaction is + # committed, rolled back, or closed to cancel all savepoints + # without any action being taken + self.is_active = False + self._deactivate_from_connection() + if self._previous_nested: + self._previous_nested._cancel() + + def _close_impl(self, deactivate_from_connection, warn_already_deactive): + try: + if self.is_active and self.connection._transaction.is_active: + self.connection._rollback_to_savepoint_impl(self._savepoint) + finally: + self.is_active = False + + if deactivate_from_connection: + self._deactivate_from_connection(warn=warn_already_deactive) + + assert not self.is_active + if deactivate_from_connection: + assert self.connection._nested_transaction is not self + + def _do_deactivate(self): + self._close_impl(False, False) + + def _do_close(self): + self._close_impl(True, False) + + def _do_rollback(self): + self._close_impl(True, True) + + def _do_commit(self): + if self.is_active: + try: + self.connection._release_savepoint_impl(self._savepoint) + finally: + # nested trans becomes inactive on failed release + # unconditionally. this prevents it from trying to + # emit SQL when it rolls back. + self.is_active = False + + # but only de-associate from connection if it succeeded + self._deactivate_from_connection() + else: + if self.connection._nested_transaction is self: + self.connection._invalid_transaction() + else: + raise exc.InvalidRequestError( + "This nested transaction is inactive" + ) + + +class TwoPhaseTransaction(RootTransaction): + """Represent a two-phase transaction. + + A new :class:`.TwoPhaseTransaction` object may be procured + using the :meth:`_engine.Connection.begin_twophase` method. + + The interface is the same as that of :class:`.Transaction` + with the addition of the :meth:`prepare` method. + + """ + + __slots__ = ("connection", "is_active", "xid", "_is_prepared") + + def __init__(self, connection, xid): + self._is_prepared = False + self.xid = xid + super(TwoPhaseTransaction, self).__init__(connection) + + def prepare(self): + """Prepare this :class:`.TwoPhaseTransaction`. + + After a PREPARE, the transaction can be committed. + + """ + if not self.is_active: + raise exc.InvalidRequestError("This transaction is inactive") + self.connection._prepare_twophase_impl(self.xid) + self._is_prepared = True + + def _connection_begin_impl(self): + self.connection._begin_twophase_impl(self) + + def _connection_rollback_impl(self): + self.connection._rollback_twophase_impl(self.xid, self._is_prepared) + + def _connection_commit_impl(self): + self.connection._commit_twophase_impl(self.xid, self._is_prepared) + + +class Engine(Connectable, log.Identified): + """ + Connects a :class:`~sqlalchemy.pool.Pool` and + :class:`~sqlalchemy.engine.interfaces.Dialect` together to provide a + source of database connectivity and behavior. + + This is the **SQLAlchemy 1.x version** of :class:`_engine.Engine`. For + the :term:`2.0 style` version, which includes some API differences, + see :class:`_future.Engine`. + + An :class:`_engine.Engine` object is instantiated publicly using the + :func:`~sqlalchemy.create_engine` function. + + .. seealso:: + + :doc:`/core/engines` + + :ref:`connections_toplevel` + + """ + + _execution_options = _EMPTY_EXECUTION_OPTS + _has_events = False + _connection_cls = Connection + _sqla_logger_namespace = "sqlalchemy.engine.Engine" + _is_future = False + + _schema_translate_map = None + + def __init__( + self, + pool, + dialect, + url, + logging_name=None, + echo=None, + query_cache_size=500, + execution_options=None, + hide_parameters=False, + ): + self.pool = pool + self.url = url + self.dialect = dialect + if logging_name: + self.logging_name = logging_name + self.echo = echo + self.hide_parameters = hide_parameters + if query_cache_size != 0: + self._compiled_cache = util.LRUCache( + query_cache_size, size_alert=self._lru_size_alert + ) + else: + self._compiled_cache = None + log.instance_logger(self, echoflag=echo) + if execution_options: + self.update_execution_options(**execution_options) + + def _lru_size_alert(self, cache): + if self._should_log_info: + self.logger.info( + "Compiled cache size pruning from %d items to %d. " + "Increase cache size to reduce the frequency of pruning.", + len(cache), + cache.capacity, + ) + + @property + def engine(self): + return self + + def clear_compiled_cache(self): + """Clear the compiled cache associated with the dialect. + + This applies **only** to the built-in cache that is established + via the :paramref:`_engine.create_engine.query_cache_size` parameter. + It will not impact any dictionary caches that were passed via the + :paramref:`.Connection.execution_options.query_cache` parameter. + + .. versionadded:: 1.4 + + """ + if self._compiled_cache: + self._compiled_cache.clear() + + def update_execution_options(self, **opt): + r"""Update the default execution_options dictionary + of this :class:`_engine.Engine`. + + The given keys/values in \**opt are added to the + default execution options that will be used for + all connections. The initial contents of this dictionary + can be sent via the ``execution_options`` parameter + to :func:`_sa.create_engine`. + + .. seealso:: + + :meth:`_engine.Connection.execution_options` + + :meth:`_engine.Engine.execution_options` + + """ + self._execution_options = self._execution_options.union(opt) + self.dispatch.set_engine_execution_options(self, opt) + self.dialect.set_engine_execution_options(self, opt) + + def execution_options(self, **opt): + """Return a new :class:`_engine.Engine` that will provide + :class:`_engine.Connection` objects with the given execution options. + + The returned :class:`_engine.Engine` remains related to the original + :class:`_engine.Engine` in that it shares the same connection pool and + other state: + + * The :class:`_pool.Pool` used by the new :class:`_engine.Engine` + is the + same instance. The :meth:`_engine.Engine.dispose` + method will replace + the connection pool instance for the parent engine as well + as this one. + * Event listeners are "cascaded" - meaning, the new + :class:`_engine.Engine` + inherits the events of the parent, and new events can be associated + with the new :class:`_engine.Engine` individually. + * The logging configuration and logging_name is copied from the parent + :class:`_engine.Engine`. + + The intent of the :meth:`_engine.Engine.execution_options` method is + to implement "sharding" schemes where multiple :class:`_engine.Engine` + objects refer to the same connection pool, but are differentiated + by options that would be consumed by a custom event:: + + primary_engine = create_engine("mysql://") + shard1 = primary_engine.execution_options(shard_id="shard1") + shard2 = primary_engine.execution_options(shard_id="shard2") + + Above, the ``shard1`` engine serves as a factory for + :class:`_engine.Connection` + objects that will contain the execution option + ``shard_id=shard1``, and ``shard2`` will produce + :class:`_engine.Connection` + objects that contain the execution option ``shard_id=shard2``. + + An event handler can consume the above execution option to perform + a schema switch or other operation, given a connection. Below + we emit a MySQL ``use`` statement to switch databases, at the same + time keeping track of which database we've established using the + :attr:`_engine.Connection.info` dictionary, + which gives us a persistent + storage space that follows the DBAPI connection:: + + from sqlalchemy import event + from sqlalchemy.engine import Engine + + shards = {"default": "base", shard_1: "db1", "shard_2": "db2"} + + @event.listens_for(Engine, "before_cursor_execute") + def _switch_shard(conn, cursor, stmt, + params, context, executemany): + shard_id = conn._execution_options.get('shard_id', "default") + current_shard = conn.info.get("current_shard", None) + + if current_shard != shard_id: + cursor.execute("use %s" % shards[shard_id]) + conn.info["current_shard"] = shard_id + + .. seealso:: + + :meth:`_engine.Connection.execution_options` + - update execution options + on a :class:`_engine.Connection` object. + + :meth:`_engine.Engine.update_execution_options` + - update the execution + options for a given :class:`_engine.Engine` in place. + + :meth:`_engine.Engine.get_execution_options` + + + """ + return self._option_cls(self, opt) + + def get_execution_options(self): + """Get the non-SQL options which will take effect during execution. + + .. versionadded: 1.3 + + .. seealso:: + + :meth:`_engine.Engine.execution_options` + """ + return self._execution_options + + @property + def name(self): + """String name of the :class:`~sqlalchemy.engine.interfaces.Dialect` + in use by this :class:`Engine`.""" + + return self.dialect.name + + @property + def driver(self): + """Driver name of the :class:`~sqlalchemy.engine.interfaces.Dialect` + in use by this :class:`Engine`.""" + + return self.dialect.driver + + echo = log.echo_property() + + def __repr__(self): + return "Engine(%r)" % (self.url,) + + def dispose(self, close=True): + """Dispose of the connection pool used by this + :class:`_engine.Engine`. + + A new connection pool is created immediately after the old one has been + disposed. The previous connection pool is disposed either actively, by + closing out all currently checked-in connections in that pool, or + passively, by losing references to it but otherwise not closing any + connections. The latter strategy is more appropriate for an initializer + in a forked Python process. + + :param close: if left at its default of ``True``, has the + effect of fully closing all **currently checked in** + database connections. Connections that are still checked out + will **not** be closed, however they will no longer be associated + with this :class:`_engine.Engine`, + so when they are closed individually, eventually the + :class:`_pool.Pool` which they are associated with will + be garbage collected and they will be closed out fully, if + not already closed on checkin. + + If set to ``False``, the previous connection pool is de-referenced, + and otherwise not touched in any way. + + .. versionadded:: 1.4.33 Added the :paramref:`.Engine.dispose.close` + parameter to allow the replacement of a connection pool in a child + process without interfering with the connections used by the parent + process. + + + .. seealso:: + + :ref:`engine_disposal` + + :ref:`pooling_multiprocessing` + + """ + if close: + self.pool.dispose() + self.pool = self.pool.recreate() + self.dispatch.engine_disposed(self) + + def _execute_default( + self, default, multiparams=(), params=util.EMPTY_DICT + ): + with self.connect() as conn: + return conn._execute_default(default, multiparams, params) + + @contextlib.contextmanager + def _optional_conn_ctx_manager(self, connection=None): + if connection is None: + with self.connect() as conn: + yield conn + else: + yield connection + + class _trans_ctx(object): + def __init__(self, conn, transaction, close_with_result): + self.conn = conn + self.transaction = transaction + self.close_with_result = close_with_result + + def __enter__(self): + self.transaction.__enter__() + return self.conn + + def __exit__(self, type_, value, traceback): + try: + self.transaction.__exit__(type_, value, traceback) + finally: + if not self.close_with_result: + self.conn.close() + + def begin(self, close_with_result=False): + """Return a context manager delivering a :class:`_engine.Connection` + with a :class:`.Transaction` established. + + E.g.:: + + with engine.begin() as conn: + conn.execute( + text("insert into table (x, y, z) values (1, 2, 3)") + ) + conn.execute(text("my_special_procedure(5)")) + + Upon successful operation, the :class:`.Transaction` + is committed. If an error is raised, the :class:`.Transaction` + is rolled back. + + Legacy use only: the ``close_with_result`` flag is normally ``False``, + and indicates that the :class:`_engine.Connection` will be closed when + the operation is complete. When set to ``True``, it indicates the + :class:`_engine.Connection` is in "single use" mode, where the + :class:`_engine.CursorResult` returned by the first call to + :meth:`_engine.Connection.execute` will close the + :class:`_engine.Connection` when that :class:`_engine.CursorResult` has + exhausted all result rows. + + .. seealso:: + + :meth:`_engine.Engine.connect` - procure a + :class:`_engine.Connection` from + an :class:`_engine.Engine`. + + :meth:`_engine.Connection.begin` - start a :class:`.Transaction` + for a particular :class:`_engine.Connection`. + + """ + if self._connection_cls._is_future: + conn = self.connect() + else: + conn = self.connect(close_with_result=close_with_result) + try: + trans = conn.begin() + except: + with util.safe_reraise(): + conn.close() + return Engine._trans_ctx(conn, trans, close_with_result) + + @util.deprecated( + "1.4", + "The :meth:`_engine.Engine.transaction` " + "method is deprecated and will be " + "removed in a future release. Use the :meth:`_engine.Engine.begin` " + "context " + "manager instead.", + ) + def transaction(self, callable_, *args, **kwargs): + r"""Execute the given function within a transaction boundary. + + The function is passed a :class:`_engine.Connection` newly procured + from :meth:`_engine.Engine.connect` as the first argument, + followed by the given \*args and \**kwargs. + + e.g.:: + + def do_something(conn, x, y): + conn.execute(text("some statement"), {'x':x, 'y':y}) + + engine.transaction(do_something, 5, 10) + + The operations inside the function are all invoked within the + context of a single :class:`.Transaction`. + Upon success, the transaction is committed. If an + exception is raised, the transaction is rolled back + before propagating the exception. + + .. note:: + + The :meth:`.transaction` method is superseded by + the usage of the Python ``with:`` statement, which can + be used with :meth:`_engine.Engine.begin`:: + + with engine.begin() as conn: + conn.execute(text("some statement"), {'x':5, 'y':10}) + + .. seealso:: + + :meth:`_engine.Engine.begin` - engine-level transactional + context + + :meth:`_engine.Connection.transaction` + - connection-level version of + :meth:`_engine.Engine.transaction` + + """ + kwargs["_sa_skip_warning"] = True + with self.connect() as conn: + return conn.transaction(callable_, *args, **kwargs) + + @util.deprecated( + "1.4", + "The :meth:`_engine.Engine.run_callable` " + "method is deprecated and will be " + "removed in a future release. Use the :meth:`_engine.Engine.begin` " + "context manager instead.", + ) + def run_callable(self, callable_, *args, **kwargs): + r"""Given a callable object or function, execute it, passing + a :class:`_engine.Connection` as the first argument. + + The given \*args and \**kwargs are passed subsequent + to the :class:`_engine.Connection` argument. + + This function, along with :meth:`_engine.Connection.run_callable`, + allows a function to be run with a :class:`_engine.Connection` + or :class:`_engine.Engine` object without the need to know + which one is being dealt with. + + """ + kwargs["_sa_skip_warning"] = True + with self.connect() as conn: + return conn.run_callable(callable_, *args, **kwargs) + + def _run_ddl_visitor(self, visitorcallable, element, **kwargs): + with self.begin() as conn: + conn._run_ddl_visitor(visitorcallable, element, **kwargs) + + @util.deprecated_20( + ":meth:`_engine.Engine.execute`", + alternative="All statement execution in SQLAlchemy 2.0 is performed " + "by the :meth:`_engine.Connection.execute` method of " + ":class:`_engine.Connection`, " + "or in the ORM by the :meth:`.Session.execute` method of " + ":class:`.Session`.", + ) + def execute(self, statement, *multiparams, **params): + """Executes the given construct and returns a + :class:`_engine.CursorResult`. + + The arguments are the same as those used by + :meth:`_engine.Connection.execute`. + + Here, a :class:`_engine.Connection` is acquired using the + :meth:`_engine.Engine.connect` method, and the statement executed + with that connection. The returned :class:`_engine.CursorResult` + is flagged + such that when the :class:`_engine.CursorResult` is exhausted and its + underlying cursor is closed, the :class:`_engine.Connection` + created here + will also be closed, which allows its associated DBAPI connection + resource to be returned to the connection pool. + + """ + connection = self.connect(close_with_result=True) + return connection.execute(statement, *multiparams, **params) + + @util.deprecated_20( + ":meth:`_engine.Engine.scalar`", + alternative="All statement execution in SQLAlchemy 2.0 is performed " + "by the :meth:`_engine.Connection.execute` method of " + ":class:`_engine.Connection`, " + "or in the ORM by the :meth:`.Session.execute` method of " + ":class:`.Session`; the :meth:`_future.Result.scalar` " + "method can then be " + "used to return a scalar result.", + ) + def scalar(self, statement, *multiparams, **params): + """Executes and returns the first column of the first row. + + The underlying result/cursor is closed after execution. + """ + return self.execute(statement, *multiparams, **params).scalar() + + def _execute_clauseelement( + self, + elem, + multiparams=None, + params=None, + execution_options=_EMPTY_EXECUTION_OPTS, + ): + connection = self.connect(close_with_result=True) + return connection._execute_clauseelement( + elem, multiparams, params, execution_options + ) + + def _execute_compiled( + self, + compiled, + multiparams, + params, + execution_options=_EMPTY_EXECUTION_OPTS, + ): + connection = self.connect(close_with_result=True) + return connection._execute_compiled( + compiled, multiparams, params, execution_options + ) + + def connect(self, close_with_result=False): + """Return a new :class:`_engine.Connection` object. + + The :class:`_engine.Connection` object is a facade that uses a DBAPI + connection internally in order to communicate with the database. This + connection is procured from the connection-holding :class:`_pool.Pool` + referenced by this :class:`_engine.Engine`. When the + :meth:`_engine.Connection.close` method of the + :class:`_engine.Connection` object + is called, the underlying DBAPI connection is then returned to the + connection pool, where it may be used again in a subsequent call to + :meth:`_engine.Engine.connect`. + + """ + + return self._connection_cls(self, close_with_result=close_with_result) + + @util.deprecated( + "1.4", + "The :meth:`_engine.Engine.table_names` " + "method is deprecated and will be " + "removed in a future release. Please refer to " + ":meth:`_reflection.Inspector.get_table_names`.", + ) + def table_names(self, schema=None, connection=None): + """Return a list of all table names available in the database. + + :param schema: Optional, retrieve names from a non-default schema. + + :param connection: Optional, use a specified connection. + """ + with self._optional_conn_ctx_manager(connection) as conn: + insp = inspection.inspect(conn) + return insp.get_table_names(schema) + + @util.deprecated( + "1.4", + "The :meth:`_engine.Engine.has_table` " + "method is deprecated and will be " + "removed in a future release. Please refer to " + ":meth:`_reflection.Inspector.has_table`.", + ) + def has_table(self, table_name, schema=None): + """Return True if the given backend has a table of the given name. + + .. seealso:: + + :ref:`metadata_reflection_inspector` - detailed schema inspection + using the :class:`_reflection.Inspector` interface. + + :class:`.quoted_name` - used to pass quoting information along + with a schema identifier. + + """ + with self._optional_conn_ctx_manager(None) as conn: + insp = inspection.inspect(conn) + return insp.has_table(table_name, schema=schema) + + def _wrap_pool_connect(self, fn, connection): + dialect = self.dialect + try: + return fn() + except dialect.dbapi.Error as e: + if connection is None: + Connection._handle_dbapi_exception_noconnection( + e, dialect, self + ) + else: + util.raise_( + sys.exc_info()[1], with_traceback=sys.exc_info()[2] + ) + + def raw_connection(self, _connection=None): + """Return a "raw" DBAPI connection from the connection pool. + + The returned object is a proxied version of the DBAPI + connection object used by the underlying driver in use. + The object will have all the same behavior as the real DBAPI + connection, except that its ``close()`` method will result in the + connection being returned to the pool, rather than being closed + for real. + + This method provides direct DBAPI connection access for + special situations when the API provided by + :class:`_engine.Connection` + is not needed. When a :class:`_engine.Connection` object is already + present, the DBAPI connection is available using + the :attr:`_engine.Connection.connection` accessor. + + .. seealso:: + + :ref:`dbapi_connections` + + """ + return self._wrap_pool_connect(self.pool.connect, _connection) + + +class OptionEngineMixin(object): + _sa_propagate_class_events = False + + def __init__(self, proxied, execution_options): + self._proxied = proxied + self.url = proxied.url + self.dialect = proxied.dialect + self.logging_name = proxied.logging_name + self.echo = proxied.echo + self._compiled_cache = proxied._compiled_cache + self.hide_parameters = proxied.hide_parameters + log.instance_logger(self, echoflag=self.echo) + + # note: this will propagate events that are assigned to the parent + # engine after this OptionEngine is created. Since we share + # the events of the parent we also disallow class-level events + # to apply to the OptionEngine class directly. + # + # the other way this can work would be to transfer existing + # events only, using: + # self.dispatch._update(proxied.dispatch) + # + # that might be more appropriate however it would be a behavioral + # change for logic that assigns events to the parent engine and + # would like it to take effect for the already-created sub-engine. + self.dispatch = self.dispatch._join(proxied.dispatch) + + self._execution_options = proxied._execution_options + self.update_execution_options(**execution_options) + + def _get_pool(self): + return self._proxied.pool + + def _set_pool(self, pool): + self._proxied.pool = pool + + pool = property(_get_pool, _set_pool) + + def _get_has_events(self): + return self._proxied._has_events or self.__dict__.get( + "_has_events", False + ) + + def _set_has_events(self, value): + self.__dict__["_has_events"] = value + + _has_events = property(_get_has_events, _set_has_events) + + +class OptionEngine(OptionEngineMixin, Engine): + pass + + +Engine._option_cls = OptionEngine |