diff options
Diffstat (limited to 'lib/sqlalchemy/util/_compat_py3k.py')
-rw-r--r-- | lib/sqlalchemy/util/_compat_py3k.py | 67 |
1 files changed, 67 insertions, 0 deletions
diff --git a/lib/sqlalchemy/util/_compat_py3k.py b/lib/sqlalchemy/util/_compat_py3k.py new file mode 100644 index 0000000..ce659a4 --- /dev/null +++ b/lib/sqlalchemy/util/_compat_py3k.py @@ -0,0 +1,67 @@ +# util/_compat_py3k.py +# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php + +from functools import wraps + +# vendored from py3.7 + + +class _AsyncGeneratorContextManager: + """Helper for @asynccontextmanager.""" + + def __init__(self, func, args, kwds): + self.gen = func(*args, **kwds) + self.func, self.args, self.kwds = func, args, kwds + doc = getattr(func, "__doc__", None) + if doc is None: + doc = type(self).__doc__ + self.__doc__ = doc + + async def __aenter__(self): + try: + return await self.gen.__anext__() + except StopAsyncIteration: + raise RuntimeError("generator didn't yield") from None + + async def __aexit__(self, typ, value, traceback): + if typ is None: + try: + await self.gen.__anext__() + except StopAsyncIteration: + return + else: + raise RuntimeError("generator didn't stop") + else: + if value is None: + value = typ() + # See _GeneratorContextManager.__exit__ for comments on subtleties + # in this implementation + try: + await self.gen.athrow(typ, value, traceback) + raise RuntimeError("generator didn't stop after athrow()") + except StopAsyncIteration as exc: + return exc is not value + except RuntimeError as exc: + if exc is value: + return False + if isinstance(value, (StopIteration, StopAsyncIteration)): + if exc.__cause__ is value: + return False + raise + except BaseException as exc: + if exc is not value: + raise + + +# using the vendored version in all cases at the moment to establish +# full test coverage +def asynccontextmanager(func): + @wraps(func) + def helper(*args, **kwds): + return _AsyncGeneratorContextManager(func, args, kwds) + + return helper |