summaryrefslogtreecommitdiffstats
path: root/lib/sqlalchemy/ext/asyncio
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/ext/asyncio')
-rw-r--r--lib/sqlalchemy/ext/asyncio/__init__.py22
-rw-r--r--lib/sqlalchemy/ext/asyncio/base.py89
-rw-r--r--lib/sqlalchemy/ext/asyncio/engine.py828
-rw-r--r--lib/sqlalchemy/ext/asyncio/events.py44
-rw-r--r--lib/sqlalchemy/ext/asyncio/exc.py21
-rw-r--r--lib/sqlalchemy/ext/asyncio/result.py671
-rw-r--r--lib/sqlalchemy/ext/asyncio/scoping.py107
-rw-r--r--lib/sqlalchemy/ext/asyncio/session.py759
8 files changed, 2541 insertions, 0 deletions
diff --git a/lib/sqlalchemy/ext/asyncio/__init__.py b/lib/sqlalchemy/ext/asyncio/__init__.py
new file mode 100644
index 0000000..15b2cb0
--- /dev/null
+++ b/lib/sqlalchemy/ext/asyncio/__init__.py
@@ -0,0 +1,22 @@
+# ext/asyncio/__init__.py
+# Copyright (C) 2020-2022 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+
+from .engine import async_engine_from_config
+from .engine import AsyncConnection
+from .engine import AsyncEngine
+from .engine import AsyncTransaction
+from .engine import create_async_engine
+from .events import AsyncConnectionEvents
+from .events import AsyncSessionEvents
+from .result import AsyncMappingResult
+from .result import AsyncResult
+from .result import AsyncScalarResult
+from .scoping import async_scoped_session
+from .session import async_object_session
+from .session import async_session
+from .session import AsyncSession
+from .session import AsyncSessionTransaction
diff --git a/lib/sqlalchemy/ext/asyncio/base.py b/lib/sqlalchemy/ext/asyncio/base.py
new file mode 100644
index 0000000..3f77f55
--- /dev/null
+++ b/lib/sqlalchemy/ext/asyncio/base.py
@@ -0,0 +1,89 @@
+import abc
+import functools
+import weakref
+
+from . import exc as async_exc
+
+
+class ReversibleProxy:
+ # weakref.ref(async proxy object) -> weakref.ref(sync proxied object)
+ _proxy_objects = {}
+ __slots__ = ("__weakref__",)
+
+ def _assign_proxied(self, target):
+ if target is not None:
+ target_ref = weakref.ref(target, ReversibleProxy._target_gced)
+ proxy_ref = weakref.ref(
+ self,
+ functools.partial(ReversibleProxy._target_gced, target_ref),
+ )
+ ReversibleProxy._proxy_objects[target_ref] = proxy_ref
+
+ return target
+
+ @classmethod
+ def _target_gced(cls, ref, proxy_ref=None):
+ cls._proxy_objects.pop(ref, None)
+
+ @classmethod
+ def _regenerate_proxy_for_target(cls, target):
+ raise NotImplementedError()
+
+ @classmethod
+ def _retrieve_proxy_for_target(cls, target, regenerate=True):
+ try:
+ proxy_ref = cls._proxy_objects[weakref.ref(target)]
+ except KeyError:
+ pass
+ else:
+ proxy = proxy_ref()
+ if proxy is not None:
+ return proxy
+
+ if regenerate:
+ return cls._regenerate_proxy_for_target(target)
+ else:
+ return None
+
+
+class StartableContext(abc.ABC):
+ __slots__ = ()
+
+ @abc.abstractmethod
+ async def start(self, is_ctxmanager=False):
+ pass
+
+ def __await__(self):
+ return self.start().__await__()
+
+ async def __aenter__(self):
+ return await self.start(is_ctxmanager=True)
+
+ @abc.abstractmethod
+ async def __aexit__(self, type_, value, traceback):
+ pass
+
+ def _raise_for_not_started(self):
+ raise async_exc.AsyncContextNotStarted(
+ "%s context has not been started and object has not been awaited."
+ % (self.__class__.__name__)
+ )
+
+
+class ProxyComparable(ReversibleProxy):
+ __slots__ = ()
+
+ def __hash__(self):
+ return id(self)
+
+ def __eq__(self, other):
+ return (
+ isinstance(other, self.__class__)
+ and self._proxied == other._proxied
+ )
+
+ def __ne__(self, other):
+ return (
+ not isinstance(other, self.__class__)
+ or self._proxied != other._proxied
+ )
diff --git a/lib/sqlalchemy/ext/asyncio/engine.py b/lib/sqlalchemy/ext/asyncio/engine.py
new file mode 100644
index 0000000..4fbe4f7
--- /dev/null
+++ b/lib/sqlalchemy/ext/asyncio/engine.py
@@ -0,0 +1,828 @@
+# ext/asyncio/engine.py
+# Copyright (C) 2020-2022 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+import asyncio
+
+from . import exc as async_exc
+from .base import ProxyComparable
+from .base import StartableContext
+from .result import _ensure_sync_result
+from .result import AsyncResult
+from ... import exc
+from ... import inspection
+from ... import util
+from ...engine import create_engine as _create_engine
+from ...engine.base import NestedTransaction
+from ...future import Connection
+from ...future import Engine
+from ...util.concurrency import greenlet_spawn
+
+
+def create_async_engine(*arg, **kw):
+ """Create a new async engine instance.
+
+ Arguments passed to :func:`_asyncio.create_async_engine` are mostly
+ identical to those passed to the :func:`_sa.create_engine` function.
+ The specified dialect must be an asyncio-compatible dialect
+ such as :ref:`dialect-postgresql-asyncpg`.
+
+ .. versionadded:: 1.4
+
+ """
+
+ if kw.get("server_side_cursors", False):
+ raise async_exc.AsyncMethodRequired(
+ "Can't set server_side_cursors for async engine globally; "
+ "use the connection.stream() method for an async "
+ "streaming result set"
+ )
+ kw["future"] = True
+ sync_engine = _create_engine(*arg, **kw)
+ return AsyncEngine(sync_engine)
+
+
+def async_engine_from_config(configuration, prefix="sqlalchemy.", **kwargs):
+ """Create a new AsyncEngine instance using a configuration dictionary.
+
+ This function is analogous to the :func:`_sa.engine_from_config` function
+ in SQLAlchemy Core, except that the requested dialect must be an
+ asyncio-compatible dialect such as :ref:`dialect-postgresql-asyncpg`.
+ The argument signature of the function is identical to that
+ of :func:`_sa.engine_from_config`.
+
+ .. versionadded:: 1.4.29
+
+ """
+ options = {
+ key[len(prefix) :]: value
+ for key, value in configuration.items()
+ if key.startswith(prefix)
+ }
+ options["_coerce_config"] = True
+ options.update(kwargs)
+ url = options.pop("url")
+ return create_async_engine(url, **options)
+
+
+class AsyncConnectable:
+ __slots__ = "_slots_dispatch", "__weakref__"
+
+
+@util.create_proxy_methods(
+ Connection,
+ ":class:`_future.Connection`",
+ ":class:`_asyncio.AsyncConnection`",
+ classmethods=[],
+ methods=[],
+ attributes=[
+ "closed",
+ "invalidated",
+ "dialect",
+ "default_isolation_level",
+ ],
+)
+class AsyncConnection(ProxyComparable, StartableContext, AsyncConnectable):
+ """An asyncio proxy for a :class:`_engine.Connection`.
+
+ :class:`_asyncio.AsyncConnection` is acquired using the
+ :meth:`_asyncio.AsyncEngine.connect`
+ method of :class:`_asyncio.AsyncEngine`::
+
+ from sqlalchemy.ext.asyncio import create_async_engine
+ engine = create_async_engine("postgresql+asyncpg://user:pass@host/dbname")
+
+ async with engine.connect() as conn:
+ result = await conn.execute(select(table))
+
+ .. versionadded:: 1.4
+
+ """ # noqa
+
+ # AsyncConnection is a thin proxy; no state should be added here
+ # that is not retrievable from the "sync" engine / connection, e.g.
+ # current transaction, info, etc. It should be possible to
+ # create a new AsyncConnection that matches this one given only the
+ # "sync" elements.
+ __slots__ = (
+ "engine",
+ "sync_engine",
+ "sync_connection",
+ )
+
+ def __init__(self, async_engine, sync_connection=None):
+ self.engine = async_engine
+ self.sync_engine = async_engine.sync_engine
+ self.sync_connection = self._assign_proxied(sync_connection)
+
+ sync_connection: Connection
+ """Reference to the sync-style :class:`_engine.Connection` this
+ :class:`_asyncio.AsyncConnection` proxies requests towards.
+
+ This instance can be used as an event target.
+
+ .. seealso::
+
+ :ref:`asyncio_events`
+ """
+
+ sync_engine: Engine
+ """Reference to the sync-style :class:`_engine.Engine` this
+ :class:`_asyncio.AsyncConnection` is associated with via its underlying
+ :class:`_engine.Connection`.
+
+ This instance can be used as an event target.
+
+ .. seealso::
+
+ :ref:`asyncio_events`
+ """
+
+ @classmethod
+ def _regenerate_proxy_for_target(cls, target):
+ return AsyncConnection(
+ AsyncEngine._retrieve_proxy_for_target(target.engine), target
+ )
+
+ async def start(self, is_ctxmanager=False):
+ """Start this :class:`_asyncio.AsyncConnection` object's context
+ outside of using a Python ``with:`` block.
+
+ """
+ if self.sync_connection:
+ raise exc.InvalidRequestError("connection is already started")
+ self.sync_connection = self._assign_proxied(
+ await (greenlet_spawn(self.sync_engine.connect))
+ )
+ return self
+
+ @property
+ def connection(self):
+ """Not implemented for async; call
+ :meth:`_asyncio.AsyncConnection.get_raw_connection`.
+ """
+ raise exc.InvalidRequestError(
+ "AsyncConnection.connection accessor is not implemented as the "
+ "attribute may need to reconnect on an invalidated connection. "
+ "Use the get_raw_connection() method."
+ )
+
+ async def get_raw_connection(self):
+ """Return the pooled DBAPI-level connection in use by this
+ :class:`_asyncio.AsyncConnection`.
+
+ This is a SQLAlchemy connection-pool proxied connection
+ which then has the attribute
+ :attr:`_pool._ConnectionFairy.driver_connection` that refers to the
+ actual driver connection. Its
+ :attr:`_pool._ConnectionFairy.dbapi_connection` refers instead
+ to an :class:`_engine.AdaptedConnection` instance that
+ adapts the driver connection to the DBAPI protocol.
+
+ """
+ conn = self._sync_connection()
+
+ return await greenlet_spawn(getattr, conn, "connection")
+
+ @property
+ def _proxied(self):
+ return self.sync_connection
+
+ @property
+ def info(self):
+ """Return the :attr:`_engine.Connection.info` dictionary of the
+ underlying :class:`_engine.Connection`.
+
+ This dictionary is freely writable for user-defined state to be
+ associated with the database connection.
+
+ This attribute is only available if the :class:`.AsyncConnection` is
+ currently connected. If the :attr:`.AsyncConnection.closed` attribute
+ is ``True``, then accessing this attribute will raise
+ :class:`.ResourceClosedError`.
+
+ .. versionadded:: 1.4.0b2
+
+ """
+ return self.sync_connection.info
+
+ def _sync_connection(self):
+ if not self.sync_connection:
+ self._raise_for_not_started()
+ return self.sync_connection
+
+ def begin(self):
+ """Begin a transaction prior to autobegin occurring."""
+ self._sync_connection()
+ return AsyncTransaction(self)
+
+ def begin_nested(self):
+ """Begin a nested transaction and return a transaction handle."""
+ self._sync_connection()
+ return AsyncTransaction(self, nested=True)
+
+ async def invalidate(self, exception=None):
+ """Invalidate the underlying DBAPI connection associated with
+ this :class:`_engine.Connection`.
+
+ See the method :meth:`_engine.Connection.invalidate` for full
+ detail on this method.
+
+ """
+
+ conn = self._sync_connection()
+ return await greenlet_spawn(conn.invalidate, exception=exception)
+
+ async def get_isolation_level(self):
+ conn = self._sync_connection()
+ return await greenlet_spawn(conn.get_isolation_level)
+
+ async def set_isolation_level(self):
+ conn = self._sync_connection()
+ return await greenlet_spawn(conn.get_isolation_level)
+
+ def in_transaction(self):
+ """Return True if a transaction is in progress.
+
+ .. versionadded:: 1.4.0b2
+
+ """
+
+ conn = self._sync_connection()
+
+ return conn.in_transaction()
+
+ def in_nested_transaction(self):
+ """Return True if a transaction is in progress.
+
+ .. versionadded:: 1.4.0b2
+
+ """
+ conn = self._sync_connection()
+
+ return conn.in_nested_transaction()
+
+ def get_transaction(self):
+ """Return an :class:`.AsyncTransaction` representing the current
+ transaction, if any.
+
+ This makes use of the underlying synchronous connection's
+ :meth:`_engine.Connection.get_transaction` method to get the current
+ :class:`_engine.Transaction`, which is then proxied in a new
+ :class:`.AsyncTransaction` object.
+
+ .. versionadded:: 1.4.0b2
+
+ """
+ conn = self._sync_connection()
+
+ trans = conn.get_transaction()
+ if trans is not None:
+ return AsyncTransaction._retrieve_proxy_for_target(trans)
+ else:
+ return None
+
+ def get_nested_transaction(self):
+ """Return an :class:`.AsyncTransaction` representing the current
+ nested (savepoint) transaction, if any.
+
+ This makes use of the underlying synchronous connection's
+ :meth:`_engine.Connection.get_nested_transaction` method to get the
+ current :class:`_engine.Transaction`, which is then proxied in a new
+ :class:`.AsyncTransaction` object.
+
+ .. versionadded:: 1.4.0b2
+
+ """
+ conn = self._sync_connection()
+
+ trans = conn.get_nested_transaction()
+ if trans is not None:
+ return AsyncTransaction._retrieve_proxy_for_target(trans)
+ else:
+ return None
+
+ async def execution_options(self, **opt):
+ r"""Set non-SQL options for the connection which take effect
+ during execution.
+
+ This returns this :class:`_asyncio.AsyncConnection` object with
+ the new options added.
+
+ See :meth:`_future.Connection.execution_options` for full details
+ on this method.
+
+ """
+
+ conn = self._sync_connection()
+ c2 = await greenlet_spawn(conn.execution_options, **opt)
+ assert c2 is conn
+ return self
+
+ async def commit(self):
+ """Commit the transaction that is currently in progress.
+
+ This method commits the current transaction if one has been started.
+ If no transaction was started, the method has no effect, assuming
+ the connection is in a non-invalidated state.
+
+ A transaction is begun on a :class:`_future.Connection` automatically
+ whenever a statement is first executed, or when the
+ :meth:`_future.Connection.begin` method is called.
+
+ """
+ conn = self._sync_connection()
+ await greenlet_spawn(conn.commit)
+
+ async def rollback(self):
+ """Roll back the transaction that is currently in progress.
+
+ This method rolls back the current transaction if one has been started.
+ If no transaction was started, the method has no effect. If a
+ transaction was started and the connection is in an invalidated state,
+ the transaction is cleared using this method.
+
+ A transaction is begun on a :class:`_future.Connection` automatically
+ whenever a statement is first executed, or when the
+ :meth:`_future.Connection.begin` method is called.
+
+
+ """
+ conn = self._sync_connection()
+ await greenlet_spawn(conn.rollback)
+
+ async def close(self):
+ """Close this :class:`_asyncio.AsyncConnection`.
+
+ This has the effect of also rolling back the transaction if one
+ is in place.
+
+ """
+ conn = self._sync_connection()
+ await greenlet_spawn(conn.close)
+
+ async def exec_driver_sql(
+ self,
+ statement,
+ parameters=None,
+ execution_options=util.EMPTY_DICT,
+ ):
+ r"""Executes a driver-level SQL string and return buffered
+ :class:`_engine.Result`.
+
+ """
+
+ conn = self._sync_connection()
+
+ result = await greenlet_spawn(
+ conn.exec_driver_sql,
+ statement,
+ parameters,
+ execution_options,
+ _require_await=True,
+ )
+
+ return await _ensure_sync_result(result, self.exec_driver_sql)
+
+ async def stream(
+ self,
+ statement,
+ parameters=None,
+ execution_options=util.EMPTY_DICT,
+ ):
+ """Execute a statement and return a streaming
+ :class:`_asyncio.AsyncResult` object."""
+
+ conn = self._sync_connection()
+
+ result = await greenlet_spawn(
+ conn._execute_20,
+ statement,
+ parameters,
+ util.EMPTY_DICT.merge_with(
+ execution_options, {"stream_results": True}
+ ),
+ _require_await=True,
+ )
+ if not result.context._is_server_side:
+ # TODO: real exception here
+ assert False, "server side result expected"
+ return AsyncResult(result)
+
+ async def execute(
+ self,
+ statement,
+ parameters=None,
+ execution_options=util.EMPTY_DICT,
+ ):
+ r"""Executes a SQL statement construct and return a buffered
+ :class:`_engine.Result`.
+
+ :param object: The statement to be executed. This is always
+ an object that is in both the :class:`_expression.ClauseElement` and
+ :class:`_expression.Executable` hierarchies, including:
+
+ * :class:`_expression.Select`
+ * :class:`_expression.Insert`, :class:`_expression.Update`,
+ :class:`_expression.Delete`
+ * :class:`_expression.TextClause` and
+ :class:`_expression.TextualSelect`
+ * :class:`_schema.DDL` and objects which inherit from
+ :class:`_schema.DDLElement`
+
+ :param parameters: parameters which will be bound into the statement.
+ This may be either a dictionary of parameter names to values,
+ or a mutable sequence (e.g. a list) of dictionaries. When a
+ list of dictionaries is passed, the underlying statement execution
+ will make use of the DBAPI ``cursor.executemany()`` method.
+ When a single dictionary is passed, the DBAPI ``cursor.execute()``
+ method will be used.
+
+ :param execution_options: optional dictionary of execution options,
+ which will be associated with the statement execution. This
+ dictionary can provide a subset of the options that are accepted
+ by :meth:`_future.Connection.execution_options`.
+
+ :return: a :class:`_engine.Result` object.
+
+ """
+ conn = self._sync_connection()
+
+ result = await greenlet_spawn(
+ conn._execute_20,
+ statement,
+ parameters,
+ execution_options,
+ _require_await=True,
+ )
+ return await _ensure_sync_result(result, self.execute)
+
+ async def scalar(
+ self,
+ statement,
+ parameters=None,
+ execution_options=util.EMPTY_DICT,
+ ):
+ r"""Executes a SQL statement construct and returns a scalar object.
+
+ This method is shorthand for invoking the
+ :meth:`_engine.Result.scalar` method after invoking the
+ :meth:`_future.Connection.execute` method. Parameters are equivalent.
+
+ :return: a scalar Python value representing the first column of the
+ first row returned.
+
+ """
+ result = await self.execute(statement, parameters, execution_options)
+ return result.scalar()
+
+ async def scalars(
+ self,
+ statement,
+ parameters=None,
+ execution_options=util.EMPTY_DICT,
+ ):
+ r"""Executes a SQL statement construct and returns a scalar objects.
+
+ This method is shorthand for invoking the
+ :meth:`_engine.Result.scalars` method after invoking the
+ :meth:`_future.Connection.execute` method. Parameters are equivalent.
+
+ :return: a :class:`_engine.ScalarResult` object.
+
+ .. versionadded:: 1.4.24
+
+ """
+ result = await self.execute(statement, parameters, execution_options)
+ return result.scalars()
+
+ async def stream_scalars(
+ self,
+ statement,
+ parameters=None,
+ execution_options=util.EMPTY_DICT,
+ ):
+ r"""Executes a SQL statement and returns a streaming scalar result
+ object.
+
+ This method is shorthand for invoking the
+ :meth:`_engine.AsyncResult.scalars` method after invoking the
+ :meth:`_future.Connection.stream` method. Parameters are equivalent.
+
+ :return: an :class:`_asyncio.AsyncScalarResult` object.
+
+ .. versionadded:: 1.4.24
+
+ """
+ result = await self.stream(statement, parameters, execution_options)
+ return result.scalars()
+
+ async def run_sync(self, fn, *arg, **kw):
+ """Invoke the given sync callable passing self as the first argument.
+
+ This method maintains the asyncio event loop all the way through
+ to the database connection by running the given callable in a
+ specially instrumented greenlet.
+
+ E.g.::
+
+ with async_engine.begin() as conn:
+ await conn.run_sync(metadata.create_all)
+
+ .. note::
+
+ The provided callable is invoked inline within the asyncio event
+ loop, and will block on traditional IO calls. IO within this
+ callable should only call into SQLAlchemy's asyncio database
+ APIs which will be properly adapted to the greenlet context.
+
+ .. seealso::
+
+ :ref:`session_run_sync`
+ """
+
+ conn = self._sync_connection()
+
+ return await greenlet_spawn(fn, conn, *arg, **kw)
+
+ def __await__(self):
+ return self.start().__await__()
+
+ async def __aexit__(self, type_, value, traceback):
+ await asyncio.shield(self.close())
+
+
+@util.create_proxy_methods(
+ Engine,
+ ":class:`_future.Engine`",
+ ":class:`_asyncio.AsyncEngine`",
+ classmethods=[],
+ methods=[
+ "clear_compiled_cache",
+ "update_execution_options",
+ "get_execution_options",
+ ],
+ attributes=["url", "pool", "dialect", "engine", "name", "driver", "echo"],
+)
+class AsyncEngine(ProxyComparable, AsyncConnectable):
+ """An asyncio proxy for a :class:`_engine.Engine`.
+
+ :class:`_asyncio.AsyncEngine` is acquired using the
+ :func:`_asyncio.create_async_engine` function::
+
+ from sqlalchemy.ext.asyncio import create_async_engine
+ engine = create_async_engine("postgresql+asyncpg://user:pass@host/dbname")
+
+ .. versionadded:: 1.4
+
+ """ # noqa
+
+ # AsyncEngine is a thin proxy; no state should be added here
+ # that is not retrievable from the "sync" engine / connection, e.g.
+ # current transaction, info, etc. It should be possible to
+ # create a new AsyncEngine that matches this one given only the
+ # "sync" elements.
+ __slots__ = ("sync_engine", "_proxied")
+
+ _connection_cls = AsyncConnection
+
+ _option_cls: type
+
+ class _trans_ctx(StartableContext):
+ def __init__(self, conn):
+ self.conn = conn
+
+ async def start(self, is_ctxmanager=False):
+ await self.conn.start(is_ctxmanager=is_ctxmanager)
+ self.transaction = self.conn.begin()
+ await self.transaction.__aenter__()
+
+ return self.conn
+
+ async def __aexit__(self, type_, value, traceback):
+ async def go():
+ await self.transaction.__aexit__(type_, value, traceback)
+ await self.conn.close()
+
+ await asyncio.shield(go())
+
+ def __init__(self, sync_engine):
+ if not sync_engine.dialect.is_async:
+ raise exc.InvalidRequestError(
+ "The asyncio extension requires an async driver to be used. "
+ f"The loaded {sync_engine.dialect.driver!r} is not async."
+ )
+ self.sync_engine = self._proxied = self._assign_proxied(sync_engine)
+
+ sync_engine: Engine
+ """Reference to the sync-style :class:`_engine.Engine` this
+ :class:`_asyncio.AsyncEngine` proxies requests towards.
+
+ This instance can be used as an event target.
+
+ .. seealso::
+
+ :ref:`asyncio_events`
+ """
+
+ @classmethod
+ def _regenerate_proxy_for_target(cls, target):
+ return AsyncEngine(target)
+
+ def begin(self):
+ """Return a context manager which when entered will deliver an
+ :class:`_asyncio.AsyncConnection` with an
+ :class:`_asyncio.AsyncTransaction` established.
+
+ E.g.::
+
+ async with async_engine.begin() as conn:
+ await conn.execute(
+ text("insert into table (x, y, z) values (1, 2, 3)")
+ )
+ await conn.execute(text("my_special_procedure(5)"))
+
+
+ """
+ conn = self.connect()
+ return self._trans_ctx(conn)
+
+ def connect(self):
+ """Return an :class:`_asyncio.AsyncConnection` object.
+
+ The :class:`_asyncio.AsyncConnection` will procure a database
+ connection from the underlying connection pool when it is entered
+ as an async context manager::
+
+ async with async_engine.connect() as conn:
+ result = await conn.execute(select(user_table))
+
+ The :class:`_asyncio.AsyncConnection` may also be started outside of a
+ context manager by invoking its :meth:`_asyncio.AsyncConnection.start`
+ method.
+
+ """
+
+ return self._connection_cls(self)
+
+ async def raw_connection(self):
+ """Return a "raw" DBAPI connection from the connection pool.
+
+ .. seealso::
+
+ :ref:`dbapi_connections`
+
+ """
+ return await greenlet_spawn(self.sync_engine.raw_connection)
+
+ def execution_options(self, **opt):
+ """Return a new :class:`_asyncio.AsyncEngine` that will provide
+ :class:`_asyncio.AsyncConnection` objects with the given execution
+ options.
+
+ Proxied from :meth:`_future.Engine.execution_options`. See that
+ method for details.
+
+ """
+
+ return AsyncEngine(self.sync_engine.execution_options(**opt))
+
+ async def dispose(self):
+ """Dispose of the connection pool used by this
+ :class:`_asyncio.AsyncEngine`.
+
+ This will close all connection pool connections that are
+ **currently checked in**. See the documentation for the underlying
+ :meth:`_future.Engine.dispose` method for further notes.
+
+ .. seealso::
+
+ :meth:`_future.Engine.dispose`
+
+ """
+
+ await greenlet_spawn(self.sync_engine.dispose)
+
+
+class AsyncTransaction(ProxyComparable, StartableContext):
+ """An asyncio proxy for a :class:`_engine.Transaction`."""
+
+ __slots__ = ("connection", "sync_transaction", "nested")
+
+ def __init__(self, connection, nested=False):
+ self.connection = connection # AsyncConnection
+ self.sync_transaction = None # sqlalchemy.engine.Transaction
+ self.nested = nested
+
+ @classmethod
+ def _regenerate_proxy_for_target(cls, target):
+ sync_connection = target.connection
+ sync_transaction = target
+ nested = isinstance(target, NestedTransaction)
+
+ async_connection = AsyncConnection._retrieve_proxy_for_target(
+ sync_connection
+ )
+ assert async_connection is not None
+
+ obj = cls.__new__(cls)
+ obj.connection = async_connection
+ obj.sync_transaction = obj._assign_proxied(sync_transaction)
+ obj.nested = nested
+ return obj
+
+ def _sync_transaction(self):
+ if not self.sync_transaction:
+ self._raise_for_not_started()
+ return self.sync_transaction
+
+ @property
+ def _proxied(self):
+ return self.sync_transaction
+
+ @property
+ def is_valid(self):
+ return self._sync_transaction().is_valid
+
+ @property
+ def is_active(self):
+ return self._sync_transaction().is_active
+
+ async def close(self):
+ """Close this :class:`.Transaction`.
+
+ If this transaction is the base transaction in a begin/commit
+ nesting, the transaction will rollback(). Otherwise, the
+ method returns.
+
+ This is used to cancel a Transaction without affecting the scope of
+ an enclosing transaction.
+
+ """
+ await greenlet_spawn(self._sync_transaction().close)
+
+ async def rollback(self):
+ """Roll back this :class:`.Transaction`."""
+ await greenlet_spawn(self._sync_transaction().rollback)
+
+ async def commit(self):
+ """Commit this :class:`.Transaction`."""
+
+ await greenlet_spawn(self._sync_transaction().commit)
+
+ async def start(self, is_ctxmanager=False):
+ """Start this :class:`_asyncio.AsyncTransaction` object's context
+ outside of using a Python ``with:`` block.
+
+ """
+
+ self.sync_transaction = self._assign_proxied(
+ await greenlet_spawn(
+ self.connection._sync_connection().begin_nested
+ if self.nested
+ else self.connection._sync_connection().begin
+ )
+ )
+ if is_ctxmanager:
+ self.sync_transaction.__enter__()
+ return self
+
+ async def __aexit__(self, type_, value, traceback):
+ await greenlet_spawn(
+ self._sync_transaction().__exit__, type_, value, traceback
+ )
+
+
+def _get_sync_engine_or_connection(async_engine):
+ if isinstance(async_engine, AsyncConnection):
+ return async_engine.sync_connection
+
+ try:
+ return async_engine.sync_engine
+ except AttributeError as e:
+ raise exc.ArgumentError(
+ "AsyncEngine expected, got %r" % async_engine
+ ) from e
+
+
+@inspection._inspects(AsyncConnection)
+def _no_insp_for_async_conn_yet(subject):
+ raise exc.NoInspectionAvailable(
+ "Inspection on an AsyncConnection is currently not supported. "
+ "Please use ``run_sync`` to pass a callable where it's possible "
+ "to call ``inspect`` on the passed connection.",
+ code="xd3s",
+ )
+
+
+@inspection._inspects(AsyncEngine)
+def _no_insp_for_async_engine_xyet(subject):
+ raise exc.NoInspectionAvailable(
+ "Inspection on an AsyncEngine is currently not supported. "
+ "Please obtain a connection then use ``conn.run_sync`` to pass a "
+ "callable where it's possible to call ``inspect`` on the "
+ "passed connection.",
+ code="xd3s",
+ )
diff --git a/lib/sqlalchemy/ext/asyncio/events.py b/lib/sqlalchemy/ext/asyncio/events.py
new file mode 100644
index 0000000..c5d5e01
--- /dev/null
+++ b/lib/sqlalchemy/ext/asyncio/events.py
@@ -0,0 +1,44 @@
+# ext/asyncio/events.py
+# Copyright (C) 2020-2022 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+
+from .engine import AsyncConnectable
+from .session import AsyncSession
+from ...engine import events as engine_event
+from ...orm import events as orm_event
+
+
+class AsyncConnectionEvents(engine_event.ConnectionEvents):
+ _target_class_doc = "SomeEngine"
+ _dispatch_target = AsyncConnectable
+
+ @classmethod
+ def _no_async_engine_events(cls):
+ raise NotImplementedError(
+ "asynchronous events are not implemented at this time. Apply "
+ "synchronous listeners to the AsyncEngine.sync_engine or "
+ "AsyncConnection.sync_connection attributes."
+ )
+
+ @classmethod
+ def _listen(cls, event_key, retval=False):
+ cls._no_async_engine_events()
+
+
+class AsyncSessionEvents(orm_event.SessionEvents):
+ _target_class_doc = "SomeSession"
+ _dispatch_target = AsyncSession
+
+ @classmethod
+ def _no_async_engine_events(cls):
+ raise NotImplementedError(
+ "asynchronous events are not implemented at this time. Apply "
+ "synchronous listeners to the AsyncSession.sync_session."
+ )
+
+ @classmethod
+ def _listen(cls, event_key, retval=False):
+ cls._no_async_engine_events()
diff --git a/lib/sqlalchemy/ext/asyncio/exc.py b/lib/sqlalchemy/ext/asyncio/exc.py
new file mode 100644
index 0000000..cf0d9a8
--- /dev/null
+++ b/lib/sqlalchemy/ext/asyncio/exc.py
@@ -0,0 +1,21 @@
+# ext/asyncio/exc.py
+# Copyright (C) 2020-2022 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+
+from ... import exc
+
+
+class AsyncMethodRequired(exc.InvalidRequestError):
+ """an API can't be used because its result would not be
+ compatible with async"""
+
+
+class AsyncContextNotStarted(exc.InvalidRequestError):
+ """a startable context manager has not been started."""
+
+
+class AsyncContextAlreadyStarted(exc.InvalidRequestError):
+ """a startable context manager is already started."""
diff --git a/lib/sqlalchemy/ext/asyncio/result.py b/lib/sqlalchemy/ext/asyncio/result.py
new file mode 100644
index 0000000..a77b6a8
--- /dev/null
+++ b/lib/sqlalchemy/ext/asyncio/result.py
@@ -0,0 +1,671 @@
+# ext/asyncio/result.py
+# Copyright (C) 2020-2022 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+
+import operator
+
+from . import exc as async_exc
+from ...engine.result import _NO_ROW
+from ...engine.result import FilterResult
+from ...engine.result import FrozenResult
+from ...engine.result import MergedResult
+from ...sql.base import _generative
+from ...util.concurrency import greenlet_spawn
+
+
+class AsyncCommon(FilterResult):
+ async def close(self):
+ """Close this result."""
+
+ await greenlet_spawn(self._real_result.close)
+
+
+class AsyncResult(AsyncCommon):
+ """An asyncio wrapper around a :class:`_result.Result` object.
+
+ The :class:`_asyncio.AsyncResult` only applies to statement executions that
+ use a server-side cursor. It is returned only from the
+ :meth:`_asyncio.AsyncConnection.stream` and
+ :meth:`_asyncio.AsyncSession.stream` methods.
+
+ .. note:: As is the case with :class:`_engine.Result`, this object is
+ used for ORM results returned by :meth:`_asyncio.AsyncSession.execute`,
+ which can yield instances of ORM mapped objects either individually or
+ within tuple-like rows. Note that these result objects do not
+ deduplicate instances or rows automatically as is the case with the
+ legacy :class:`_orm.Query` object. For in-Python de-duplication of
+ instances or rows, use the :meth:`_asyncio.AsyncResult.unique` modifier
+ method.
+
+ .. versionadded:: 1.4
+
+ """
+
+ def __init__(self, real_result):
+ self._real_result = real_result
+
+ self._metadata = real_result._metadata
+ self._unique_filter_state = real_result._unique_filter_state
+
+ # BaseCursorResult pre-generates the "_row_getter". Use that
+ # if available rather than building a second one
+ if "_row_getter" in real_result.__dict__:
+ self._set_memoized_attribute(
+ "_row_getter", real_result.__dict__["_row_getter"]
+ )
+
+ def keys(self):
+ """Return the :meth:`_engine.Result.keys` collection from the
+ underlying :class:`_engine.Result`.
+
+ """
+ return self._metadata.keys
+
+ @_generative
+ def unique(self, strategy=None):
+ """Apply unique filtering to the objects returned by this
+ :class:`_asyncio.AsyncResult`.
+
+ Refer to :meth:`_engine.Result.unique` in the synchronous
+ SQLAlchemy API for a complete behavioral description.
+
+
+ """
+ self._unique_filter_state = (set(), strategy)
+
+ def columns(self, *col_expressions):
+ r"""Establish the columns that should be returned in each row.
+
+ Refer to :meth:`_engine.Result.columns` in the synchronous
+ SQLAlchemy API for a complete behavioral description.
+
+
+ """
+ return self._column_slices(col_expressions)
+
+ async def partitions(self, size=None):
+ """Iterate through sub-lists of rows of the size given.
+
+ An async iterator is returned::
+
+ async def scroll_results(connection):
+ result = await connection.stream(select(users_table))
+
+ async for partition in result.partitions(100):
+ print("list of rows: %s" % partition)
+
+ .. seealso::
+
+ :meth:`_engine.Result.partitions`
+
+ """
+
+ getter = self._manyrow_getter
+
+ while True:
+ partition = await greenlet_spawn(getter, self, size)
+ if partition:
+ yield partition
+ else:
+ break
+
+ async def fetchone(self):
+ """Fetch one row.
+
+ When all rows are exhausted, returns None.
+
+ This method is provided for backwards compatibility with
+ SQLAlchemy 1.x.x.
+
+ To fetch the first row of a result only, use the
+ :meth:`_engine.Result.first` method. To iterate through all
+ rows, iterate the :class:`_engine.Result` object directly.
+
+ :return: a :class:`.Row` object if no filters are applied, or None
+ if no rows remain.
+
+ """
+ row = await greenlet_spawn(self._onerow_getter, self)
+ if row is _NO_ROW:
+ return None
+ else:
+ return row
+
+ async def fetchmany(self, size=None):
+ """Fetch many rows.
+
+ When all rows are exhausted, returns an empty list.
+
+ This method is provided for backwards compatibility with
+ SQLAlchemy 1.x.x.
+
+ To fetch rows in groups, use the
+ :meth:`._asyncio.AsyncResult.partitions` method.
+
+ :return: a list of :class:`.Row` objects.
+
+ .. seealso::
+
+ :meth:`_asyncio.AsyncResult.partitions`
+
+ """
+
+ return await greenlet_spawn(self._manyrow_getter, self, size)
+
+ async def all(self):
+ """Return all rows in a list.
+
+ Closes the result set after invocation. Subsequent invocations
+ will return an empty list.
+
+ :return: a list of :class:`.Row` objects.
+
+ """
+
+ return await greenlet_spawn(self._allrows)
+
+ def __aiter__(self):
+ return self
+
+ async def __anext__(self):
+ row = await greenlet_spawn(self._onerow_getter, self)
+ if row is _NO_ROW:
+ raise StopAsyncIteration()
+ else:
+ return row
+
+ async def first(self):
+ """Fetch the first row or None if no row is present.
+
+ Closes the result set and discards remaining rows.
+
+ .. note:: This method returns one **row**, e.g. tuple, by default. To
+ return exactly one single scalar value, that is, the first column of
+ the first row, use the :meth:`_asyncio.AsyncResult.scalar` method,
+ or combine :meth:`_asyncio.AsyncResult.scalars` and
+ :meth:`_asyncio.AsyncResult.first`.
+
+ :return: a :class:`.Row` object, or None
+ if no rows remain.
+
+ .. seealso::
+
+ :meth:`_asyncio.AsyncResult.scalar`
+
+ :meth:`_asyncio.AsyncResult.one`
+
+ """
+ return await greenlet_spawn(self._only_one_row, False, False, False)
+
+ async def one_or_none(self):
+ """Return at most one result or raise an exception.
+
+ Returns ``None`` if the result has no rows.
+ Raises :class:`.MultipleResultsFound`
+ if multiple rows are returned.
+
+ .. versionadded:: 1.4
+
+ :return: The first :class:`.Row` or None if no row is available.
+
+ :raises: :class:`.MultipleResultsFound`
+
+ .. seealso::
+
+ :meth:`_asyncio.AsyncResult.first`
+
+ :meth:`_asyncio.AsyncResult.one`
+
+ """
+ return await greenlet_spawn(self._only_one_row, True, False, False)
+
+ async def scalar_one(self):
+ """Return exactly one scalar result or raise an exception.
+
+ This is equivalent to calling :meth:`_asyncio.AsyncResult.scalars` and
+ then :meth:`_asyncio.AsyncResult.one`.
+
+ .. seealso::
+
+ :meth:`_asyncio.AsyncResult.one`
+
+ :meth:`_asyncio.AsyncResult.scalars`
+
+ """
+ return await greenlet_spawn(self._only_one_row, True, True, True)
+
+ async def scalar_one_or_none(self):
+ """Return exactly one or no scalar result.
+
+ This is equivalent to calling :meth:`_asyncio.AsyncResult.scalars` and
+ then :meth:`_asyncio.AsyncResult.one_or_none`.
+
+ .. seealso::
+
+ :meth:`_asyncio.AsyncResult.one_or_none`
+
+ :meth:`_asyncio.AsyncResult.scalars`
+
+ """
+ return await greenlet_spawn(self._only_one_row, True, False, True)
+
+ async def one(self):
+ """Return exactly one row or raise an exception.
+
+ Raises :class:`.NoResultFound` if the result returns no
+ rows, or :class:`.MultipleResultsFound` if multiple rows
+ would be returned.
+
+ .. note:: This method returns one **row**, e.g. tuple, by default.
+ To return exactly one single scalar value, that is, the first
+ column of the first row, use the
+ :meth:`_asyncio.AsyncResult.scalar_one` method, or combine
+ :meth:`_asyncio.AsyncResult.scalars` and
+ :meth:`_asyncio.AsyncResult.one`.
+
+ .. versionadded:: 1.4
+
+ :return: The first :class:`.Row`.
+
+ :raises: :class:`.MultipleResultsFound`, :class:`.NoResultFound`
+
+ .. seealso::
+
+ :meth:`_asyncio.AsyncResult.first`
+
+ :meth:`_asyncio.AsyncResult.one_or_none`
+
+ :meth:`_asyncio.AsyncResult.scalar_one`
+
+ """
+ return await greenlet_spawn(self._only_one_row, True, True, False)
+
+ async def scalar(self):
+ """Fetch the first column of the first row, and close the result set.
+
+ Returns None if there are no rows to fetch.
+
+ No validation is performed to test if additional rows remain.
+
+ After calling this method, the object is fully closed,
+ e.g. the :meth:`_engine.CursorResult.close`
+ method will have been called.
+
+ :return: a Python scalar value , or None if no rows remain.
+
+ """
+ return await greenlet_spawn(self._only_one_row, False, False, True)
+
+ async def freeze(self):
+ """Return a callable object that will produce copies of this
+ :class:`_asyncio.AsyncResult` when invoked.
+
+ The callable object returned is an instance of
+ :class:`_engine.FrozenResult`.
+
+ This is used for result set caching. The method must be called
+ on the result when it has been unconsumed, and calling the method
+ will consume the result fully. When the :class:`_engine.FrozenResult`
+ is retrieved from a cache, it can be called any number of times where
+ it will produce a new :class:`_engine.Result` object each time
+ against its stored set of rows.
+
+ .. seealso::
+
+ :ref:`do_orm_execute_re_executing` - example usage within the
+ ORM to implement a result-set cache.
+
+ """
+
+ return await greenlet_spawn(FrozenResult, self)
+
+ def merge(self, *others):
+ """Merge this :class:`_asyncio.AsyncResult` with other compatible
+ result objects.
+
+ The object returned is an instance of :class:`_engine.MergedResult`,
+ which will be composed of iterators from the given result
+ objects.
+
+ The new result will use the metadata from this result object.
+ The subsequent result objects must be against an identical
+ set of result / cursor metadata, otherwise the behavior is
+ undefined.
+
+ """
+ return MergedResult(self._metadata, (self,) + others)
+
+ def scalars(self, index=0):
+ """Return an :class:`_asyncio.AsyncScalarResult` filtering object which
+ will return single elements rather than :class:`_row.Row` objects.
+
+ Refer to :meth:`_result.Result.scalars` in the synchronous
+ SQLAlchemy API for a complete behavioral description.
+
+ :param index: integer or row key indicating the column to be fetched
+ from each row, defaults to ``0`` indicating the first column.
+
+ :return: a new :class:`_asyncio.AsyncScalarResult` filtering object
+ referring to this :class:`_asyncio.AsyncResult` object.
+
+ """
+ return AsyncScalarResult(self._real_result, index)
+
+ def mappings(self):
+ """Apply a mappings filter to returned rows, returning an instance of
+ :class:`_asyncio.AsyncMappingResult`.
+
+ When this filter is applied, fetching rows will return
+ :class:`.RowMapping` objects instead of :class:`.Row` objects.
+
+ Refer to :meth:`_result.Result.mappings` in the synchronous
+ SQLAlchemy API for a complete behavioral description.
+
+ :return: a new :class:`_asyncio.AsyncMappingResult` filtering object
+ referring to the underlying :class:`_result.Result` object.
+
+ """
+
+ return AsyncMappingResult(self._real_result)
+
+
+class AsyncScalarResult(AsyncCommon):
+ """A wrapper for a :class:`_asyncio.AsyncResult` that returns scalar values
+ rather than :class:`_row.Row` values.
+
+ The :class:`_asyncio.AsyncScalarResult` object is acquired by calling the
+ :meth:`_asyncio.AsyncResult.scalars` method.
+
+ Refer to the :class:`_result.ScalarResult` object in the synchronous
+ SQLAlchemy API for a complete behavioral description.
+
+ .. versionadded:: 1.4
+
+ """
+
+ _generate_rows = False
+
+ def __init__(self, real_result, index):
+ self._real_result = real_result
+
+ if real_result._source_supports_scalars:
+ self._metadata = real_result._metadata
+ self._post_creational_filter = None
+ else:
+ self._metadata = real_result._metadata._reduce([index])
+ self._post_creational_filter = operator.itemgetter(0)
+
+ self._unique_filter_state = real_result._unique_filter_state
+
+ def unique(self, strategy=None):
+ """Apply unique filtering to the objects returned by this
+ :class:`_asyncio.AsyncScalarResult`.
+
+ See :meth:`_asyncio.AsyncResult.unique` for usage details.
+
+ """
+ self._unique_filter_state = (set(), strategy)
+ return self
+
+ async def partitions(self, size=None):
+ """Iterate through sub-lists of elements of the size given.
+
+ Equivalent to :meth:`_asyncio.AsyncResult.partitions` except that
+ scalar values, rather than :class:`_result.Row` objects,
+ are returned.
+
+ """
+
+ getter = self._manyrow_getter
+
+ while True:
+ partition = await greenlet_spawn(getter, self, size)
+ if partition:
+ yield partition
+ else:
+ break
+
+ async def fetchall(self):
+ """A synonym for the :meth:`_asyncio.AsyncScalarResult.all` method."""
+
+ return await greenlet_spawn(self._allrows)
+
+ async def fetchmany(self, size=None):
+ """Fetch many objects.
+
+ Equivalent to :meth:`_asyncio.AsyncResult.fetchmany` except that
+ scalar values, rather than :class:`_result.Row` objects,
+ are returned.
+
+ """
+ return await greenlet_spawn(self._manyrow_getter, self, size)
+
+ async def all(self):
+ """Return all scalar values in a list.
+
+ Equivalent to :meth:`_asyncio.AsyncResult.all` except that
+ scalar values, rather than :class:`_result.Row` objects,
+ are returned.
+
+ """
+ return await greenlet_spawn(self._allrows)
+
+ def __aiter__(self):
+ return self
+
+ async def __anext__(self):
+ row = await greenlet_spawn(self._onerow_getter, self)
+ if row is _NO_ROW:
+ raise StopAsyncIteration()
+ else:
+ return row
+
+ async def first(self):
+ """Fetch the first object or None if no object is present.
+
+ Equivalent to :meth:`_asyncio.AsyncResult.first` except that
+ scalar values, rather than :class:`_result.Row` objects,
+ are returned.
+
+ """
+ return await greenlet_spawn(self._only_one_row, False, False, False)
+
+ async def one_or_none(self):
+ """Return at most one object or raise an exception.
+
+ Equivalent to :meth:`_asyncio.AsyncResult.one_or_none` except that
+ scalar values, rather than :class:`_result.Row` objects,
+ are returned.
+
+ """
+ return await greenlet_spawn(self._only_one_row, True, False, False)
+
+ async def one(self):
+ """Return exactly one object or raise an exception.
+
+ Equivalent to :meth:`_asyncio.AsyncResult.one` except that
+ scalar values, rather than :class:`_result.Row` objects,
+ are returned.
+
+ """
+ return await greenlet_spawn(self._only_one_row, True, True, False)
+
+
+class AsyncMappingResult(AsyncCommon):
+ """A wrapper for a :class:`_asyncio.AsyncResult` that returns dictionary
+ values rather than :class:`_engine.Row` values.
+
+ The :class:`_asyncio.AsyncMappingResult` object is acquired by calling the
+ :meth:`_asyncio.AsyncResult.mappings` method.
+
+ Refer to the :class:`_result.MappingResult` object in the synchronous
+ SQLAlchemy API for a complete behavioral description.
+
+ .. versionadded:: 1.4
+
+ """
+
+ _generate_rows = True
+
+ _post_creational_filter = operator.attrgetter("_mapping")
+
+ def __init__(self, result):
+ self._real_result = result
+ self._unique_filter_state = result._unique_filter_state
+ self._metadata = result._metadata
+ if result._source_supports_scalars:
+ self._metadata = self._metadata._reduce([0])
+
+ def keys(self):
+ """Return an iterable view which yields the string keys that would
+ be represented by each :class:`.Row`.
+
+ The view also can be tested for key containment using the Python
+ ``in`` operator, which will test both for the string keys represented
+ in the view, as well as for alternate keys such as column objects.
+
+ .. versionchanged:: 1.4 a key view object is returned rather than a
+ plain list.
+
+
+ """
+ return self._metadata.keys
+
+ def unique(self, strategy=None):
+ """Apply unique filtering to the objects returned by this
+ :class:`_asyncio.AsyncMappingResult`.
+
+ See :meth:`_asyncio.AsyncResult.unique` for usage details.
+
+ """
+ self._unique_filter_state = (set(), strategy)
+ return self
+
+ def columns(self, *col_expressions):
+ r"""Establish the columns that should be returned in each row."""
+ return self._column_slices(col_expressions)
+
+ async def partitions(self, size=None):
+ """Iterate through sub-lists of elements of the size given.
+
+ Equivalent to :meth:`_asyncio.AsyncResult.partitions` except that
+ mapping values, rather than :class:`_result.Row` objects,
+ are returned.
+
+ """
+
+ getter = self._manyrow_getter
+
+ while True:
+ partition = await greenlet_spawn(getter, self, size)
+ if partition:
+ yield partition
+ else:
+ break
+
+ async def fetchall(self):
+ """A synonym for the :meth:`_asyncio.AsyncMappingResult.all` method."""
+
+ return await greenlet_spawn(self._allrows)
+
+ async def fetchone(self):
+ """Fetch one object.
+
+ Equivalent to :meth:`_asyncio.AsyncResult.fetchone` except that
+ mapping values, rather than :class:`_result.Row` objects,
+ are returned.
+
+ """
+
+ row = await greenlet_spawn(self._onerow_getter, self)
+ if row is _NO_ROW:
+ return None
+ else:
+ return row
+
+ async def fetchmany(self, size=None):
+ """Fetch many objects.
+
+ Equivalent to :meth:`_asyncio.AsyncResult.fetchmany` except that
+ mapping values, rather than :class:`_result.Row` objects,
+ are returned.
+
+ """
+
+ return await greenlet_spawn(self._manyrow_getter, self, size)
+
+ async def all(self):
+ """Return all scalar values in a list.
+
+ Equivalent to :meth:`_asyncio.AsyncResult.all` except that
+ mapping values, rather than :class:`_result.Row` objects,
+ are returned.
+
+ """
+
+ return await greenlet_spawn(self._allrows)
+
+ def __aiter__(self):
+ return self
+
+ async def __anext__(self):
+ row = await greenlet_spawn(self._onerow_getter, self)
+ if row is _NO_ROW:
+ raise StopAsyncIteration()
+ else:
+ return row
+
+ async def first(self):
+ """Fetch the first object or None if no object is present.
+
+ Equivalent to :meth:`_asyncio.AsyncResult.first` except that
+ mapping values, rather than :class:`_result.Row` objects,
+ are returned.
+
+
+ """
+ return await greenlet_spawn(self._only_one_row, False, False, False)
+
+ async def one_or_none(self):
+ """Return at most one object or raise an exception.
+
+ Equivalent to :meth:`_asyncio.AsyncResult.one_or_none` except that
+ mapping values, rather than :class:`_result.Row` objects,
+ are returned.
+
+ """
+ return await greenlet_spawn(self._only_one_row, True, False, False)
+
+ async def one(self):
+ """Return exactly one object or raise an exception.
+
+ Equivalent to :meth:`_asyncio.AsyncResult.one` except that
+ mapping values, rather than :class:`_result.Row` objects,
+ are returned.
+
+ """
+ return await greenlet_spawn(self._only_one_row, True, True, False)
+
+
+async def _ensure_sync_result(result, calling_method):
+ if not result._is_cursor:
+ cursor_result = getattr(result, "raw", None)
+ else:
+ cursor_result = result
+ if cursor_result and cursor_result.context._is_server_side:
+ await greenlet_spawn(cursor_result.close)
+ raise async_exc.AsyncMethodRequired(
+ "Can't use the %s.%s() method with a "
+ "server-side cursor. "
+ "Use the %s.stream() method for an async "
+ "streaming result set."
+ % (
+ calling_method.__self__.__class__.__name__,
+ calling_method.__name__,
+ calling_method.__self__.__class__.__name__,
+ )
+ )
+ return result
diff --git a/lib/sqlalchemy/ext/asyncio/scoping.py b/lib/sqlalchemy/ext/asyncio/scoping.py
new file mode 100644
index 0000000..8eca8c5
--- /dev/null
+++ b/lib/sqlalchemy/ext/asyncio/scoping.py
@@ -0,0 +1,107 @@
+# ext/asyncio/scoping.py
+# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+
+from .session import AsyncSession
+from ...orm.scoping import ScopedSessionMixin
+from ...util import create_proxy_methods
+from ...util import ScopedRegistry
+
+
+@create_proxy_methods(
+ AsyncSession,
+ ":class:`_asyncio.AsyncSession`",
+ ":class:`_asyncio.scoping.async_scoped_session`",
+ classmethods=["close_all", "object_session", "identity_key"],
+ methods=[
+ "__contains__",
+ "__iter__",
+ "add",
+ "add_all",
+ "begin",
+ "begin_nested",
+ "close",
+ "commit",
+ "connection",
+ "delete",
+ "execute",
+ "expire",
+ "expire_all",
+ "expunge",
+ "expunge_all",
+ "flush",
+ "get",
+ "get_bind",
+ "is_modified",
+ "invalidate",
+ "merge",
+ "refresh",
+ "rollback",
+ "scalar",
+ "scalars",
+ "stream",
+ "stream_scalars",
+ ],
+ attributes=[
+ "bind",
+ "dirty",
+ "deleted",
+ "new",
+ "identity_map",
+ "is_active",
+ "autoflush",
+ "no_autoflush",
+ "info",
+ ],
+)
+class async_scoped_session(ScopedSessionMixin):
+ """Provides scoped management of :class:`.AsyncSession` objects.
+
+ See the section :ref:`asyncio_scoped_session` for usage details.
+
+ .. versionadded:: 1.4.19
+
+
+ """
+
+ _support_async = True
+
+ def __init__(self, session_factory, scopefunc):
+ """Construct a new :class:`_asyncio.async_scoped_session`.
+
+ :param session_factory: a factory to create new :class:`_asyncio.AsyncSession`
+ instances. This is usually, but not necessarily, an instance
+ of :class:`_orm.sessionmaker` which itself was passed the
+ :class:`_asyncio.AsyncSession` to its :paramref:`_orm.sessionmaker.class_`
+ parameter::
+
+ async_session_factory = sessionmaker(some_async_engine, class_= AsyncSession)
+ AsyncSession = async_scoped_session(async_session_factory, scopefunc=current_task)
+
+ :param scopefunc: function which defines
+ the current scope. A function such as ``asyncio.current_task``
+ may be useful here.
+
+ """ # noqa: E501
+
+ self.session_factory = session_factory
+ self.registry = ScopedRegistry(session_factory, scopefunc)
+
+ @property
+ def _proxied(self):
+ return self.registry()
+
+ async def remove(self):
+ """Dispose of the current :class:`.AsyncSession`, if present.
+
+ Different from scoped_session's remove method, this method would use
+ await to wait for the close method of AsyncSession.
+
+ """
+
+ if self.registry.has():
+ await self.registry().close()
+ self.registry.clear()
diff --git a/lib/sqlalchemy/ext/asyncio/session.py b/lib/sqlalchemy/ext/asyncio/session.py
new file mode 100644
index 0000000..378cbcb
--- /dev/null
+++ b/lib/sqlalchemy/ext/asyncio/session.py
@@ -0,0 +1,759 @@
+# ext/asyncio/session.py
+# Copyright (C) 2020-2022 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+
+import asyncio
+
+from . import engine
+from . import result as _result
+from .base import ReversibleProxy
+from .base import StartableContext
+from .result import _ensure_sync_result
+from ... import util
+from ...orm import object_session
+from ...orm import Session
+from ...orm import state as _instance_state
+from ...util.concurrency import greenlet_spawn
+
+_EXECUTE_OPTIONS = util.immutabledict({"prebuffer_rows": True})
+_STREAM_OPTIONS = util.immutabledict({"stream_results": True})
+
+
+@util.create_proxy_methods(
+ Session,
+ ":class:`_orm.Session`",
+ ":class:`_asyncio.AsyncSession`",
+ classmethods=["object_session", "identity_key"],
+ methods=[
+ "__contains__",
+ "__iter__",
+ "add",
+ "add_all",
+ "expire",
+ "expire_all",
+ "expunge",
+ "expunge_all",
+ "is_modified",
+ "in_transaction",
+ "in_nested_transaction",
+ ],
+ attributes=[
+ "dirty",
+ "deleted",
+ "new",
+ "identity_map",
+ "is_active",
+ "autoflush",
+ "no_autoflush",
+ "info",
+ ],
+)
+class AsyncSession(ReversibleProxy):
+ """Asyncio version of :class:`_orm.Session`.
+
+ The :class:`_asyncio.AsyncSession` is a proxy for a traditional
+ :class:`_orm.Session` instance.
+
+ .. versionadded:: 1.4
+
+ To use an :class:`_asyncio.AsyncSession` with custom :class:`_orm.Session`
+ implementations, see the
+ :paramref:`_asyncio.AsyncSession.sync_session_class` parameter.
+
+
+ """
+
+ _is_asyncio = True
+
+ dispatch = None
+
+ def __init__(self, bind=None, binds=None, sync_session_class=None, **kw):
+ r"""Construct a new :class:`_asyncio.AsyncSession`.
+
+ All parameters other than ``sync_session_class`` are passed to the
+ ``sync_session_class`` callable directly to instantiate a new
+ :class:`_orm.Session`. Refer to :meth:`_orm.Session.__init__` for
+ parameter documentation.
+
+ :param sync_session_class:
+ A :class:`_orm.Session` subclass or other callable which will be used
+ to construct the :class:`_orm.Session` which will be proxied. This
+ parameter may be used to provide custom :class:`_orm.Session`
+ subclasses. Defaults to the
+ :attr:`_asyncio.AsyncSession.sync_session_class` class-level
+ attribute.
+
+ .. versionadded:: 1.4.24
+
+ """
+ kw["future"] = True
+ if bind:
+ self.bind = bind
+ bind = engine._get_sync_engine_or_connection(bind)
+
+ if binds:
+ self.binds = binds
+ binds = {
+ key: engine._get_sync_engine_or_connection(b)
+ for key, b in binds.items()
+ }
+
+ if sync_session_class:
+ self.sync_session_class = sync_session_class
+
+ self.sync_session = self._proxied = self._assign_proxied(
+ self.sync_session_class(bind=bind, binds=binds, **kw)
+ )
+
+ sync_session_class = Session
+ """The class or callable that provides the
+ underlying :class:`_orm.Session` instance for a particular
+ :class:`_asyncio.AsyncSession`.
+
+ At the class level, this attribute is the default value for the
+ :paramref:`_asyncio.AsyncSession.sync_session_class` parameter. Custom
+ subclasses of :class:`_asyncio.AsyncSession` can override this.
+
+ At the instance level, this attribute indicates the current class or
+ callable that was used to provide the :class:`_orm.Session` instance for
+ this :class:`_asyncio.AsyncSession` instance.
+
+ .. versionadded:: 1.4.24
+
+ """
+
+ sync_session: Session
+ """Reference to the underlying :class:`_orm.Session` this
+ :class:`_asyncio.AsyncSession` proxies requests towards.
+
+ This instance can be used as an event target.
+
+ .. seealso::
+
+ :ref:`asyncio_events`
+
+ """
+
+ async def refresh(
+ self, instance, attribute_names=None, with_for_update=None
+ ):
+ """Expire and refresh the attributes on the given instance.
+
+ A query will be issued to the database and all attributes will be
+ refreshed with their current database value.
+
+ This is the async version of the :meth:`_orm.Session.refresh` method.
+ See that method for a complete description of all options.
+
+ .. seealso::
+
+ :meth:`_orm.Session.refresh` - main documentation for refresh
+
+ """
+
+ return await greenlet_spawn(
+ self.sync_session.refresh,
+ instance,
+ attribute_names=attribute_names,
+ with_for_update=with_for_update,
+ )
+
+ async def run_sync(self, fn, *arg, **kw):
+ """Invoke the given sync callable passing sync self as the first
+ argument.
+
+ This method maintains the asyncio event loop all the way through
+ to the database connection by running the given callable in a
+ specially instrumented greenlet.
+
+ E.g.::
+
+ with AsyncSession(async_engine) as session:
+ await session.run_sync(some_business_method)
+
+ .. note::
+
+ The provided callable is invoked inline within the asyncio event
+ loop, and will block on traditional IO calls. IO within this
+ callable should only call into SQLAlchemy's asyncio database
+ APIs which will be properly adapted to the greenlet context.
+
+ .. seealso::
+
+ :ref:`session_run_sync`
+ """
+
+ return await greenlet_spawn(fn, self.sync_session, *arg, **kw)
+
+ async def execute(
+ self,
+ statement,
+ params=None,
+ execution_options=util.EMPTY_DICT,
+ bind_arguments=None,
+ **kw
+ ):
+ """Execute a statement and return a buffered
+ :class:`_engine.Result` object.
+
+ .. seealso::
+
+ :meth:`_orm.Session.execute` - main documentation for execute
+
+ """
+
+ if execution_options:
+ execution_options = util.immutabledict(execution_options).union(
+ _EXECUTE_OPTIONS
+ )
+ else:
+ execution_options = _EXECUTE_OPTIONS
+
+ result = await greenlet_spawn(
+ self.sync_session.execute,
+ statement,
+ params=params,
+ execution_options=execution_options,
+ bind_arguments=bind_arguments,
+ **kw
+ )
+ return await _ensure_sync_result(result, self.execute)
+
+ async def scalar(
+ self,
+ statement,
+ params=None,
+ execution_options=util.EMPTY_DICT,
+ bind_arguments=None,
+ **kw
+ ):
+ """Execute a statement and return a scalar result.
+
+ .. seealso::
+
+ :meth:`_orm.Session.scalar` - main documentation for scalar
+
+ """
+
+ result = await self.execute(
+ statement,
+ params=params,
+ execution_options=execution_options,
+ bind_arguments=bind_arguments,
+ **kw
+ )
+ return result.scalar()
+
+ async def scalars(
+ self,
+ statement,
+ params=None,
+ execution_options=util.EMPTY_DICT,
+ bind_arguments=None,
+ **kw
+ ):
+ """Execute a statement and return scalar results.
+
+ :return: a :class:`_result.ScalarResult` object
+
+ .. versionadded:: 1.4.24
+
+ .. seealso::
+
+ :meth:`_orm.Session.scalars` - main documentation for scalars
+
+ :meth:`_asyncio.AsyncSession.stream_scalars` - streaming version
+
+ """
+
+ result = await self.execute(
+ statement,
+ params=params,
+ execution_options=execution_options,
+ bind_arguments=bind_arguments,
+ **kw
+ )
+ return result.scalars()
+
+ async def get(
+ self,
+ entity,
+ ident,
+ options=None,
+ populate_existing=False,
+ with_for_update=None,
+ identity_token=None,
+ ):
+ """Return an instance based on the given primary key identifier,
+ or ``None`` if not found.
+
+ .. seealso::
+
+ :meth:`_orm.Session.get` - main documentation for get
+
+
+ """
+ return await greenlet_spawn(
+ self.sync_session.get,
+ entity,
+ ident,
+ options=options,
+ populate_existing=populate_existing,
+ with_for_update=with_for_update,
+ identity_token=identity_token,
+ )
+
+ async def stream(
+ self,
+ statement,
+ params=None,
+ execution_options=util.EMPTY_DICT,
+ bind_arguments=None,
+ **kw
+ ):
+ """Execute a statement and return a streaming
+ :class:`_asyncio.AsyncResult` object.
+
+ """
+
+ if execution_options:
+ execution_options = util.immutabledict(execution_options).union(
+ _STREAM_OPTIONS
+ )
+ else:
+ execution_options = _STREAM_OPTIONS
+
+ result = await greenlet_spawn(
+ self.sync_session.execute,
+ statement,
+ params=params,
+ execution_options=execution_options,
+ bind_arguments=bind_arguments,
+ **kw
+ )
+ return _result.AsyncResult(result)
+
+ async def stream_scalars(
+ self,
+ statement,
+ params=None,
+ execution_options=util.EMPTY_DICT,
+ bind_arguments=None,
+ **kw
+ ):
+ """Execute a statement and return a stream of scalar results.
+
+ :return: an :class:`_asyncio.AsyncScalarResult` object
+
+ .. versionadded:: 1.4.24
+
+ .. seealso::
+
+ :meth:`_orm.Session.scalars` - main documentation for scalars
+
+ :meth:`_asyncio.AsyncSession.scalars` - non streaming version
+
+ """
+
+ result = await self.stream(
+ statement,
+ params=params,
+ execution_options=execution_options,
+ bind_arguments=bind_arguments,
+ **kw
+ )
+ return result.scalars()
+
+ async def delete(self, instance):
+ """Mark an instance as deleted.
+
+ The database delete operation occurs upon ``flush()``.
+
+ As this operation may need to cascade along unloaded relationships,
+ it is awaitable to allow for those queries to take place.
+
+ .. seealso::
+
+ :meth:`_orm.Session.delete` - main documentation for delete
+
+ """
+ return await greenlet_spawn(self.sync_session.delete, instance)
+
+ async def merge(self, instance, load=True, options=None):
+ """Copy the state of a given instance into a corresponding instance
+ within this :class:`_asyncio.AsyncSession`.
+
+ .. seealso::
+
+ :meth:`_orm.Session.merge` - main documentation for merge
+
+ """
+ return await greenlet_spawn(
+ self.sync_session.merge, instance, load=load, options=options
+ )
+
+ async def flush(self, objects=None):
+ """Flush all the object changes to the database.
+
+ .. seealso::
+
+ :meth:`_orm.Session.flush` - main documentation for flush
+
+ """
+ await greenlet_spawn(self.sync_session.flush, objects=objects)
+
+ def get_transaction(self):
+ """Return the current root transaction in progress, if any.
+
+ :return: an :class:`_asyncio.AsyncSessionTransaction` object, or
+ ``None``.
+
+ .. versionadded:: 1.4.18
+
+ """
+ trans = self.sync_session.get_transaction()
+ if trans is not None:
+ return AsyncSessionTransaction._retrieve_proxy_for_target(trans)
+ else:
+ return None
+
+ def get_nested_transaction(self):
+ """Return the current nested transaction in progress, if any.
+
+ :return: an :class:`_asyncio.AsyncSessionTransaction` object, or
+ ``None``.
+
+ .. versionadded:: 1.4.18
+
+ """
+
+ trans = self.sync_session.get_nested_transaction()
+ if trans is not None:
+ return AsyncSessionTransaction._retrieve_proxy_for_target(trans)
+ else:
+ return None
+
+ def get_bind(self, mapper=None, clause=None, bind=None, **kw):
+ """Return a "bind" to which the synchronous proxied :class:`_orm.Session`
+ is bound.
+
+ Unlike the :meth:`_orm.Session.get_bind` method, this method is
+ currently **not** used by this :class:`.AsyncSession` in any way
+ in order to resolve engines for requests.
+
+ .. note::
+
+ This method proxies directly to the :meth:`_orm.Session.get_bind`
+ method, however is currently **not** useful as an override target,
+ in contrast to that of the :meth:`_orm.Session.get_bind` method.
+ The example below illustrates how to implement custom
+ :meth:`_orm.Session.get_bind` schemes that work with
+ :class:`.AsyncSession` and :class:`.AsyncEngine`.
+
+ The pattern introduced at :ref:`session_custom_partitioning`
+ illustrates how to apply a custom bind-lookup scheme to a
+ :class:`_orm.Session` given a set of :class:`_engine.Engine` objects.
+ To apply a corresponding :meth:`_orm.Session.get_bind` implementation
+ for use with a :class:`.AsyncSession` and :class:`.AsyncEngine`
+ objects, continue to subclass :class:`_orm.Session` and apply it to
+ :class:`.AsyncSession` using
+ :paramref:`.AsyncSession.sync_session_class`. The inner method must
+ continue to return :class:`_engine.Engine` instances, which can be
+ acquired from a :class:`_asyncio.AsyncEngine` using the
+ :attr:`_asyncio.AsyncEngine.sync_engine` attribute::
+
+ # using example from "Custom Vertical Partitioning"
+
+
+ import random
+
+ from sqlalchemy.ext.asyncio import AsyncSession
+ from sqlalchemy.ext.asyncio import create_async_engine
+ from sqlalchemy.orm import Session, sessionmaker
+
+ # construct async engines w/ async drivers
+ engines = {
+ 'leader':create_async_engine("sqlite+aiosqlite:///leader.db"),
+ 'other':create_async_engine("sqlite+aiosqlite:///other.db"),
+ 'follower1':create_async_engine("sqlite+aiosqlite:///follower1.db"),
+ 'follower2':create_async_engine("sqlite+aiosqlite:///follower2.db"),
+ }
+
+ class RoutingSession(Session):
+ def get_bind(self, mapper=None, clause=None, **kw):
+ # within get_bind(), return sync engines
+ if mapper and issubclass(mapper.class_, MyOtherClass):
+ return engines['other'].sync_engine
+ elif self._flushing or isinstance(clause, (Update, Delete)):
+ return engines['leader'].sync_engine
+ else:
+ return engines[
+ random.choice(['follower1','follower2'])
+ ].sync_engine
+
+ # apply to AsyncSession using sync_session_class
+ AsyncSessionMaker = sessionmaker(
+ class_=AsyncSession,
+ sync_session_class=RoutingSession
+ )
+
+ The :meth:`_orm.Session.get_bind` method is called in a non-asyncio,
+ implicitly non-blocking context in the same manner as ORM event hooks
+ and functions that are invoked via :meth:`.AsyncSession.run_sync`, so
+ routines that wish to run SQL commands inside of
+ :meth:`_orm.Session.get_bind` can continue to do so using
+ blocking-style code, which will be translated to implicitly async calls
+ at the point of invoking IO on the database drivers.
+
+ """ # noqa: E501
+
+ return self.sync_session.get_bind(
+ mapper=mapper, clause=clause, bind=bind, **kw
+ )
+
+ async def connection(self, **kw):
+ r"""Return a :class:`_asyncio.AsyncConnection` object corresponding to
+ this :class:`.Session` object's transactional state.
+
+ This method may also be used to establish execution options for the
+ database connection used by the current transaction.
+
+ .. versionadded:: 1.4.24 Added \**kw arguments which are passed
+ through to the underlying :meth:`_orm.Session.connection` method.
+
+ .. seealso::
+
+ :meth:`_orm.Session.connection` - main documentation for
+ "connection"
+
+ """
+
+ sync_connection = await greenlet_spawn(
+ self.sync_session.connection, **kw
+ )
+ return engine.AsyncConnection._retrieve_proxy_for_target(
+ sync_connection
+ )
+
+ def begin(self, **kw):
+ """Return an :class:`_asyncio.AsyncSessionTransaction` object.
+
+ The underlying :class:`_orm.Session` will perform the
+ "begin" action when the :class:`_asyncio.AsyncSessionTransaction`
+ object is entered::
+
+ async with async_session.begin():
+ # .. ORM transaction is begun
+
+ Note that database IO will not normally occur when the session-level
+ transaction is begun, as database transactions begin on an
+ on-demand basis. However, the begin block is async to accommodate
+ for a :meth:`_orm.SessionEvents.after_transaction_create`
+ event hook that may perform IO.
+
+ For a general description of ORM begin, see
+ :meth:`_orm.Session.begin`.
+
+ """
+
+ return AsyncSessionTransaction(self)
+
+ def begin_nested(self, **kw):
+ """Return an :class:`_asyncio.AsyncSessionTransaction` object
+ which will begin a "nested" transaction, e.g. SAVEPOINT.
+
+ Behavior is the same as that of :meth:`_asyncio.AsyncSession.begin`.
+
+ For a general description of ORM begin nested, see
+ :meth:`_orm.Session.begin_nested`.
+
+ """
+
+ return AsyncSessionTransaction(self, nested=True)
+
+ async def rollback(self):
+ """Rollback the current transaction in progress."""
+ return await greenlet_spawn(self.sync_session.rollback)
+
+ async def commit(self):
+ """Commit the current transaction in progress."""
+ return await greenlet_spawn(self.sync_session.commit)
+
+ async def close(self):
+ """Close out the transactional resources and ORM objects used by this
+ :class:`_asyncio.AsyncSession`.
+
+ This expunges all ORM objects associated with this
+ :class:`_asyncio.AsyncSession`, ends any transaction in progress and
+ :term:`releases` any :class:`_asyncio.AsyncConnection` objects which
+ this :class:`_asyncio.AsyncSession` itself has checked out from
+ associated :class:`_asyncio.AsyncEngine` objects. The operation then
+ leaves the :class:`_asyncio.AsyncSession` in a state which it may be
+ used again.
+
+ .. tip::
+
+ The :meth:`_asyncio.AsyncSession.close` method **does not prevent
+ the Session from being used again**. The
+ :class:`_asyncio.AsyncSession` itself does not actually have a
+ distinct "closed" state; it merely means the
+ :class:`_asyncio.AsyncSession` will release all database
+ connections and ORM objects.
+
+
+ .. seealso::
+
+ :ref:`session_closing` - detail on the semantics of
+ :meth:`_asyncio.AsyncSession.close`
+
+ """
+ await greenlet_spawn(self.sync_session.close)
+
+ async def invalidate(self):
+ """Close this Session, using connection invalidation.
+
+ For a complete description, see :meth:`_orm.Session.invalidate`.
+ """
+ return await greenlet_spawn(self.sync_session.invalidate)
+
+ @classmethod
+ async def close_all(self):
+ """Close all :class:`_asyncio.AsyncSession` sessions."""
+ return await greenlet_spawn(self.sync_session.close_all)
+
+ async def __aenter__(self):
+ return self
+
+ async def __aexit__(self, type_, value, traceback):
+ await asyncio.shield(self.close())
+
+ def _maker_context_manager(self):
+ # no @contextlib.asynccontextmanager until python3.7, gr
+ return _AsyncSessionContextManager(self)
+
+
+class _AsyncSessionContextManager:
+ def __init__(self, async_session):
+ self.async_session = async_session
+
+ async def __aenter__(self):
+ self.trans = self.async_session.begin()
+ await self.trans.__aenter__()
+ return self.async_session
+
+ async def __aexit__(self, type_, value, traceback):
+ async def go():
+ await self.trans.__aexit__(type_, value, traceback)
+ await self.async_session.__aexit__(type_, value, traceback)
+
+ await asyncio.shield(go())
+
+
+class AsyncSessionTransaction(ReversibleProxy, StartableContext):
+ """A wrapper for the ORM :class:`_orm.SessionTransaction` object.
+
+ This object is provided so that a transaction-holding object
+ for the :meth:`_asyncio.AsyncSession.begin` may be returned.
+
+ The object supports both explicit calls to
+ :meth:`_asyncio.AsyncSessionTransaction.commit` and
+ :meth:`_asyncio.AsyncSessionTransaction.rollback`, as well as use as an
+ async context manager.
+
+
+ .. versionadded:: 1.4
+
+ """
+
+ __slots__ = ("session", "sync_transaction", "nested")
+
+ def __init__(self, session, nested=False):
+ self.session = session
+ self.nested = nested
+ self.sync_transaction = None
+
+ @property
+ def is_active(self):
+ return (
+ self._sync_transaction() is not None
+ and self._sync_transaction().is_active
+ )
+
+ def _sync_transaction(self):
+ if not self.sync_transaction:
+ self._raise_for_not_started()
+ return self.sync_transaction
+
+ async def rollback(self):
+ """Roll back this :class:`_asyncio.AsyncTransaction`."""
+ await greenlet_spawn(self._sync_transaction().rollback)
+
+ async def commit(self):
+ """Commit this :class:`_asyncio.AsyncTransaction`."""
+
+ await greenlet_spawn(self._sync_transaction().commit)
+
+ async def start(self, is_ctxmanager=False):
+ self.sync_transaction = self._assign_proxied(
+ await greenlet_spawn(
+ self.session.sync_session.begin_nested
+ if self.nested
+ else self.session.sync_session.begin
+ )
+ )
+ if is_ctxmanager:
+ self.sync_transaction.__enter__()
+ return self
+
+ async def __aexit__(self, type_, value, traceback):
+ await greenlet_spawn(
+ self._sync_transaction().__exit__, type_, value, traceback
+ )
+
+
+def async_object_session(instance):
+ """Return the :class:`_asyncio.AsyncSession` to which the given instance
+ belongs.
+
+ This function makes use of the sync-API function
+ :class:`_orm.object_session` to retrieve the :class:`_orm.Session` which
+ refers to the given instance, and from there links it to the original
+ :class:`_asyncio.AsyncSession`.
+
+ If the :class:`_asyncio.AsyncSession` has been garbage collected, the
+ return value is ``None``.
+
+ This functionality is also available from the
+ :attr:`_orm.InstanceState.async_session` accessor.
+
+ :param instance: an ORM mapped instance
+ :return: an :class:`_asyncio.AsyncSession` object, or ``None``.
+
+ .. versionadded:: 1.4.18
+
+ """
+
+ session = object_session(instance)
+ if session is not None:
+ return async_session(session)
+ else:
+ return None
+
+
+def async_session(session):
+ """Return the :class:`_asyncio.AsyncSession` which is proxying the given
+ :class:`_orm.Session` object, if any.
+
+ :param session: a :class:`_orm.Session` instance.
+ :return: a :class:`_asyncio.AsyncSession` instance, or ``None``.
+
+ .. versionadded:: 1.4.18
+
+ """
+ return AsyncSession._retrieve_proxy_for_target(session, regenerate=False)
+
+
+_instance_state._async_provider = async_session