summaryrefslogtreecommitdiffstats
path: root/lib/sqlalchemy/sql/coercions.py
diff options
context:
space:
mode:
authorxiubuzhe <xiubuzhe@sina.com>2023-10-08 20:59:00 +0800
committerxiubuzhe <xiubuzhe@sina.com>2023-10-08 20:59:00 +0800
commit1dac2263372df2b85db5d029a45721fa158a5c9d (patch)
tree0365f9c57df04178a726d7584ca6a6b955a7ce6a /lib/sqlalchemy/sql/coercions.py
parentb494be364bb39e1de128ada7dc576a729d99907e (diff)
downloadsunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.tar.gz
sunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.tar.bz2
sunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.zip
first add files
Diffstat (limited to 'lib/sqlalchemy/sql/coercions.py')
-rw-r--r--lib/sqlalchemy/sql/coercions.py1096
1 files changed, 1096 insertions, 0 deletions
diff --git a/lib/sqlalchemy/sql/coercions.py b/lib/sqlalchemy/sql/coercions.py
new file mode 100644
index 0000000..8cc73cb
--- /dev/null
+++ b/lib/sqlalchemy/sql/coercions.py
@@ -0,0 +1,1096 @@
+# sql/coercions.py
+# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+
+import numbers
+import re
+
+from . import operators
+from . import roles
+from . import visitors
+from .base import ExecutableOption
+from .base import Options
+from .traversals import HasCacheKey
+from .visitors import Visitable
+from .. import exc
+from .. import inspection
+from .. import util
+from ..util import collections_abc
+
+
+elements = None
+lambdas = None
+schema = None
+selectable = None
+sqltypes = None
+traversals = None
+
+
+def _is_literal(element):
+ """Return whether or not the element is a "literal" in the context
+ of a SQL expression construct.
+
+ """
+
+ return (
+ not isinstance(
+ element,
+ (Visitable, schema.SchemaEventTarget),
+ )
+ and not hasattr(element, "__clause_element__")
+ )
+
+
+def _deep_is_literal(element):
+ """Return whether or not the element is a "literal" in the context
+ of a SQL expression construct.
+
+ does a deeper more esoteric check than _is_literal. is used
+ for lambda elements that have to distinguish values that would
+ be bound vs. not without any context.
+
+ """
+
+ if isinstance(element, collections_abc.Sequence) and not isinstance(
+ element, str
+ ):
+ for elem in element:
+ if not _deep_is_literal(elem):
+ return False
+ else:
+ return True
+
+ return (
+ not isinstance(
+ element,
+ (
+ Visitable,
+ schema.SchemaEventTarget,
+ HasCacheKey,
+ Options,
+ util.langhelpers._symbol,
+ ),
+ )
+ and not hasattr(element, "__clause_element__")
+ and (
+ not isinstance(element, type)
+ or not issubclass(element, HasCacheKey)
+ )
+ )
+
+
+def _document_text_coercion(paramname, meth_rst, param_rst):
+ return util.add_parameter_text(
+ paramname,
+ (
+ ".. warning:: "
+ "The %s argument to %s can be passed as a Python string argument, "
+ "which will be treated "
+ "as **trusted SQL text** and rendered as given. **DO NOT PASS "
+ "UNTRUSTED INPUT TO THIS PARAMETER**."
+ )
+ % (param_rst, meth_rst),
+ )
+
+
+def _expression_collection_was_a_list(attrname, fnname, args):
+ if args and isinstance(args[0], (list, set, dict)) and len(args) == 1:
+ if isinstance(args[0], list):
+ util.warn_deprecated_20(
+ 'The "%s" argument to %s(), when referring to a sequence '
+ "of items, is now passed as a series of positional "
+ "elements, rather than as a list. " % (attrname, fnname)
+ )
+ return args[0]
+ else:
+ return args
+
+
+def expect(
+ role,
+ element,
+ apply_propagate_attrs=None,
+ argname=None,
+ post_inspect=False,
+ **kw
+):
+ if (
+ role.allows_lambda
+ # note callable() will not invoke a __getattr__() method, whereas
+ # hasattr(obj, "__call__") will. by keeping the callable() check here
+ # we prevent most needless calls to hasattr() and therefore
+ # __getattr__(), which is present on ColumnElement.
+ and callable(element)
+ and hasattr(element, "__code__")
+ ):
+ return lambdas.LambdaElement(
+ element,
+ role,
+ lambdas.LambdaOptions(**kw),
+ apply_propagate_attrs=apply_propagate_attrs,
+ )
+
+ # major case is that we are given a ClauseElement already, skip more
+ # elaborate logic up front if possible
+ impl = _impl_lookup[role]
+
+ original_element = element
+
+ if not isinstance(
+ element,
+ (elements.ClauseElement, schema.SchemaItem, schema.FetchedValue),
+ ):
+ resolved = None
+
+ if impl._resolve_literal_only:
+ resolved = impl._literal_coercion(element, **kw)
+ else:
+
+ original_element = element
+
+ is_clause_element = False
+
+ # this is a special performance optimization for ORM
+ # joins used by JoinTargetImpl that we don't go through the
+ # work of creating __clause_element__() when we only need the
+ # original QueryableAttribute, as the former will do clause
+ # adaption and all that which is just thrown away here.
+ if (
+ impl._skip_clauseelement_for_target_match
+ and isinstance(element, role)
+ and hasattr(element, "__clause_element__")
+ ):
+ is_clause_element = True
+ else:
+ while hasattr(element, "__clause_element__"):
+ is_clause_element = True
+
+ if not getattr(element, "is_clause_element", False):
+ element = element.__clause_element__()
+ else:
+ break
+
+ if not is_clause_element:
+ if impl._use_inspection:
+ insp = inspection.inspect(element, raiseerr=False)
+ if insp is not None:
+ if post_inspect:
+ insp._post_inspect
+ try:
+ resolved = insp.__clause_element__()
+ except AttributeError:
+ impl._raise_for_expected(original_element, argname)
+
+ if resolved is None:
+ resolved = impl._literal_coercion(
+ element, argname=argname, **kw
+ )
+ else:
+ resolved = element
+ else:
+ resolved = element
+ if (
+ apply_propagate_attrs is not None
+ and not apply_propagate_attrs._propagate_attrs
+ and resolved._propagate_attrs
+ ):
+ apply_propagate_attrs._propagate_attrs = resolved._propagate_attrs
+
+ if impl._role_class in resolved.__class__.__mro__:
+ if impl._post_coercion:
+ resolved = impl._post_coercion(
+ resolved,
+ argname=argname,
+ original_element=original_element,
+ **kw
+ )
+ return resolved
+ else:
+ return impl._implicit_coercions(
+ original_element, resolved, argname=argname, **kw
+ )
+
+
+def expect_as_key(role, element, **kw):
+ kw["as_key"] = True
+ return expect(role, element, **kw)
+
+
+def expect_col_expression_collection(role, expressions):
+ for expr in expressions:
+ strname = None
+ column = None
+
+ resolved = expect(role, expr)
+ if isinstance(resolved, util.string_types):
+ strname = resolved = expr
+ else:
+ cols = []
+ visitors.traverse(resolved, {}, {"column": cols.append})
+ if cols:
+ column = cols[0]
+ add_element = column if column is not None else strname
+ yield resolved, column, strname, add_element
+
+
+class RoleImpl(object):
+ __slots__ = ("_role_class", "name", "_use_inspection")
+
+ def _literal_coercion(self, element, **kw):
+ raise NotImplementedError()
+
+ _post_coercion = None
+ _resolve_literal_only = False
+ _skip_clauseelement_for_target_match = False
+
+ def __init__(self, role_class):
+ self._role_class = role_class
+ self.name = role_class._role_name
+ self._use_inspection = issubclass(role_class, roles.UsesInspection)
+
+ def _implicit_coercions(self, element, resolved, argname=None, **kw):
+ self._raise_for_expected(element, argname, resolved)
+
+ def _raise_for_expected(
+ self,
+ element,
+ argname=None,
+ resolved=None,
+ advice=None,
+ code=None,
+ err=None,
+ ):
+ if resolved is not None and resolved is not element:
+ got = "%r object resolved from %r object" % (resolved, element)
+ else:
+ got = repr(element)
+
+ if argname:
+ msg = "%s expected for argument %r; got %s." % (
+ self.name,
+ argname,
+ got,
+ )
+ else:
+ msg = "%s expected, got %s." % (self.name, got)
+
+ if advice:
+ msg += " " + advice
+
+ util.raise_(exc.ArgumentError(msg, code=code), replace_context=err)
+
+
+class _Deannotate(object):
+ __slots__ = ()
+
+ def _post_coercion(self, resolved, **kw):
+ from .util import _deep_deannotate
+
+ return _deep_deannotate(resolved)
+
+
+class _StringOnly(object):
+ __slots__ = ()
+
+ _resolve_literal_only = True
+
+
+class _ReturnsStringKey(object):
+ __slots__ = ()
+
+ def _implicit_coercions(
+ self, original_element, resolved, argname=None, **kw
+ ):
+ if isinstance(original_element, util.string_types):
+ return original_element
+ else:
+ self._raise_for_expected(original_element, argname, resolved)
+
+ def _literal_coercion(self, element, **kw):
+ return element
+
+
+class _ColumnCoercions(object):
+ __slots__ = ()
+
+ def _warn_for_scalar_subquery_coercion(self):
+ util.warn(
+ "implicitly coercing SELECT object to scalar subquery; "
+ "please use the .scalar_subquery() method to produce a scalar "
+ "subquery.",
+ )
+
+ def _implicit_coercions(
+ self, original_element, resolved, argname=None, **kw
+ ):
+ if not getattr(resolved, "is_clause_element", False):
+ self._raise_for_expected(original_element, argname, resolved)
+ elif resolved._is_select_statement:
+ self._warn_for_scalar_subquery_coercion()
+ return resolved.scalar_subquery()
+ elif resolved._is_from_clause and isinstance(
+ resolved, selectable.Subquery
+ ):
+ self._warn_for_scalar_subquery_coercion()
+ return resolved.element.scalar_subquery()
+ elif self._role_class.allows_lambda and resolved._is_lambda_element:
+ return resolved
+ else:
+ self._raise_for_expected(original_element, argname, resolved)
+
+
+def _no_text_coercion(
+ element, argname=None, exc_cls=exc.ArgumentError, extra=None, err=None
+):
+ util.raise_(
+ exc_cls(
+ "%(extra)sTextual SQL expression %(expr)r %(argname)sshould be "
+ "explicitly declared as text(%(expr)r)"
+ % {
+ "expr": util.ellipses_string(element),
+ "argname": "for argument %s" % (argname,) if argname else "",
+ "extra": "%s " % extra if extra else "",
+ }
+ ),
+ replace_context=err,
+ )
+
+
+class _NoTextCoercion(object):
+ __slots__ = ()
+
+ def _literal_coercion(self, element, argname=None, **kw):
+ if isinstance(element, util.string_types) and issubclass(
+ elements.TextClause, self._role_class
+ ):
+ _no_text_coercion(element, argname)
+ else:
+ self._raise_for_expected(element, argname)
+
+
+class _CoerceLiterals(object):
+ __slots__ = ()
+ _coerce_consts = False
+ _coerce_star = False
+ _coerce_numerics = False
+
+ def _text_coercion(self, element, argname=None):
+ return _no_text_coercion(element, argname)
+
+ def _literal_coercion(self, element, argname=None, **kw):
+ if isinstance(element, util.string_types):
+ if self._coerce_star and element == "*":
+ return elements.ColumnClause("*", is_literal=True)
+ else:
+ return self._text_coercion(element, argname, **kw)
+
+ if self._coerce_consts:
+ if element is None:
+ return elements.Null()
+ elif element is False:
+ return elements.False_()
+ elif element is True:
+ return elements.True_()
+
+ if self._coerce_numerics and isinstance(element, (numbers.Number)):
+ return elements.ColumnClause(str(element), is_literal=True)
+
+ self._raise_for_expected(element, argname)
+
+
+class LiteralValueImpl(RoleImpl):
+ _resolve_literal_only = True
+
+ def _implicit_coercions(
+ self, element, resolved, argname, type_=None, **kw
+ ):
+ if not _is_literal(resolved):
+ self._raise_for_expected(
+ element, resolved=resolved, argname=argname, **kw
+ )
+
+ return elements.BindParameter(None, element, type_=type_, unique=True)
+
+ def _literal_coercion(self, element, argname=None, type_=None, **kw):
+ return element
+
+
+class _SelectIsNotFrom(object):
+ __slots__ = ()
+
+ def _raise_for_expected(self, element, argname=None, resolved=None, **kw):
+ if isinstance(element, roles.SelectStatementRole) or isinstance(
+ resolved, roles.SelectStatementRole
+ ):
+ advice = (
+ "To create a "
+ "FROM clause from a %s object, use the .subquery() method."
+ % (resolved.__class__ if resolved is not None else element,)
+ )
+ code = "89ve"
+ else:
+ advice = code = None
+
+ return super(_SelectIsNotFrom, self)._raise_for_expected(
+ element,
+ argname=argname,
+ resolved=resolved,
+ advice=advice,
+ code=code,
+ **kw
+ )
+
+
+class HasCacheKeyImpl(RoleImpl):
+ __slots__ = ()
+
+ def _implicit_coercions(
+ self, original_element, resolved, argname=None, **kw
+ ):
+ if isinstance(original_element, traversals.HasCacheKey):
+ return original_element
+ else:
+ self._raise_for_expected(original_element, argname, resolved)
+
+ def _literal_coercion(self, element, **kw):
+ return element
+
+
+class ExecutableOptionImpl(RoleImpl):
+ __slots__ = ()
+
+ def _implicit_coercions(
+ self, original_element, resolved, argname=None, **kw
+ ):
+ if isinstance(original_element, ExecutableOption):
+ return original_element
+ else:
+ self._raise_for_expected(original_element, argname, resolved)
+
+ def _literal_coercion(self, element, **kw):
+ return element
+
+
+class ExpressionElementImpl(_ColumnCoercions, RoleImpl):
+ __slots__ = ()
+
+ def _literal_coercion(
+ self, element, name=None, type_=None, argname=None, is_crud=False, **kw
+ ):
+ if (
+ element is None
+ and not is_crud
+ and (type_ is None or not type_.should_evaluate_none)
+ ):
+ # TODO: there's no test coverage now for the
+ # "should_evaluate_none" part of this, as outside of "crud" this
+ # codepath is not normally used except in some special cases
+ return elements.Null()
+ else:
+ try:
+ return elements.BindParameter(
+ name, element, type_, unique=True, _is_crud=is_crud
+ )
+ except exc.ArgumentError as err:
+ self._raise_for_expected(element, err=err)
+
+ def _raise_for_expected(self, element, argname=None, resolved=None, **kw):
+ if isinstance(element, roles.AnonymizedFromClauseRole):
+ advice = (
+ "To create a "
+ "column expression from a FROM clause row "
+ "as a whole, use the .table_valued() method."
+ )
+ else:
+ advice = None
+
+ return super(ExpressionElementImpl, self)._raise_for_expected(
+ element, argname=argname, resolved=resolved, advice=advice, **kw
+ )
+
+
+class BinaryElementImpl(ExpressionElementImpl, RoleImpl):
+
+ __slots__ = ()
+
+ def _literal_coercion(
+ self, element, expr, operator, bindparam_type=None, argname=None, **kw
+ ):
+ try:
+ return expr._bind_param(operator, element, type_=bindparam_type)
+ except exc.ArgumentError as err:
+ self._raise_for_expected(element, err=err)
+
+ def _post_coercion(self, resolved, expr, bindparam_type=None, **kw):
+ if resolved.type._isnull and not expr.type._isnull:
+ resolved = resolved._with_binary_element_type(
+ bindparam_type if bindparam_type is not None else expr.type
+ )
+ return resolved
+
+
+class InElementImpl(RoleImpl):
+ __slots__ = ()
+
+ def _implicit_coercions(
+ self, original_element, resolved, argname=None, **kw
+ ):
+ if resolved._is_from_clause:
+ if (
+ isinstance(resolved, selectable.Alias)
+ and resolved.element._is_select_statement
+ ):
+ self._warn_for_implicit_coercion(resolved)
+ return self._post_coercion(resolved.element, **kw)
+ else:
+ self._warn_for_implicit_coercion(resolved)
+ return self._post_coercion(resolved.select(), **kw)
+ else:
+ self._raise_for_expected(original_element, argname, resolved)
+
+ def _warn_for_implicit_coercion(self, elem):
+ util.warn(
+ "Coercing %s object into a select() for use in IN(); "
+ "please pass a select() construct explicitly"
+ % (elem.__class__.__name__)
+ )
+
+ def _literal_coercion(self, element, expr, operator, **kw):
+ if isinstance(element, collections_abc.Iterable) and not isinstance(
+ element, util.string_types
+ ):
+ non_literal_expressions = {}
+ element = list(element)
+ for o in element:
+ if not _is_literal(o):
+ if not isinstance(o, operators.ColumnOperators):
+ self._raise_for_expected(element, **kw)
+ else:
+ non_literal_expressions[o] = o
+ elif o is None:
+ non_literal_expressions[o] = elements.Null()
+
+ if non_literal_expressions:
+ return elements.ClauseList(
+ *[
+ non_literal_expressions[o]
+ if o in non_literal_expressions
+ else expr._bind_param(operator, o)
+ for o in element
+ ]
+ )
+ else:
+ return expr._bind_param(operator, element, expanding=True)
+
+ else:
+ self._raise_for_expected(element, **kw)
+
+ def _post_coercion(self, element, expr, operator, **kw):
+ if element._is_select_statement:
+ # for IN, we are doing scalar_subquery() coercion without
+ # a warning
+ return element.scalar_subquery()
+ elif isinstance(element, elements.ClauseList):
+ assert not len(element.clauses) == 0
+ return element.self_group(against=operator)
+
+ elif isinstance(element, elements.BindParameter):
+ element = element._clone(maintain_key=True)
+ element.expanding = True
+ element.expand_op = operator
+
+ return element
+ else:
+ return element
+
+
+class OnClauseImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl):
+ __slots__ = ()
+
+ _coerce_consts = True
+
+ def _implicit_coercions(
+ self, original_element, resolved, argname=None, legacy=False, **kw
+ ):
+ if legacy and isinstance(resolved, str):
+ return resolved
+ else:
+ return super(OnClauseImpl, self)._implicit_coercions(
+ original_element,
+ resolved,
+ argname=argname,
+ legacy=legacy,
+ **kw
+ )
+
+ def _text_coercion(self, element, argname=None, legacy=False):
+ if legacy and isinstance(element, str):
+ util.warn_deprecated_20(
+ "Using strings to indicate relationship names in "
+ "Query.join() is deprecated and will be removed in "
+ "SQLAlchemy 2.0. Please use the class-bound attribute "
+ "directly."
+ )
+ return element
+
+ return super(OnClauseImpl, self)._text_coercion(element, argname)
+
+ def _post_coercion(self, resolved, original_element=None, **kw):
+ # this is a hack right now as we want to use coercion on an
+ # ORM InstrumentedAttribute, but we want to return the object
+ # itself if it is one, not its clause element.
+ # ORM context _join and _legacy_join() would need to be improved
+ # to look for annotations in a clause element form.
+ if isinstance(original_element, roles.JoinTargetRole):
+ return original_element
+ return resolved
+
+
+class WhereHavingImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl):
+ __slots__ = ()
+
+ _coerce_consts = True
+
+ def _text_coercion(self, element, argname=None):
+ return _no_text_coercion(element, argname)
+
+
+class StatementOptionImpl(_CoerceLiterals, RoleImpl):
+ __slots__ = ()
+
+ _coerce_consts = True
+
+ def _text_coercion(self, element, argname=None):
+ return elements.TextClause(element)
+
+
+class ColumnArgumentImpl(_NoTextCoercion, RoleImpl):
+ __slots__ = ()
+
+
+class ColumnArgumentOrKeyImpl(_ReturnsStringKey, RoleImpl):
+ __slots__ = ()
+
+
+class StrAsPlainColumnImpl(_CoerceLiterals, RoleImpl):
+ __slots__ = ()
+
+ def _text_coercion(self, element, argname=None):
+ return elements.ColumnClause(element)
+
+
+class ByOfImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl, roles.ByOfRole):
+
+ __slots__ = ()
+
+ _coerce_consts = True
+
+ def _text_coercion(self, element, argname=None):
+ return elements._textual_label_reference(element)
+
+
+class OrderByImpl(ByOfImpl, RoleImpl):
+ __slots__ = ()
+
+ def _post_coercion(self, resolved, **kw):
+ if (
+ isinstance(resolved, self._role_class)
+ and resolved._order_by_label_element is not None
+ ):
+ return elements._label_reference(resolved)
+ else:
+ return resolved
+
+
+class GroupByImpl(ByOfImpl, RoleImpl):
+ __slots__ = ()
+
+ def _implicit_coercions(
+ self, original_element, resolved, argname=None, **kw
+ ):
+ if isinstance(resolved, roles.StrictFromClauseRole):
+ return elements.ClauseList(*resolved.c)
+ else:
+ return resolved
+
+
+class DMLColumnImpl(_ReturnsStringKey, RoleImpl):
+ __slots__ = ()
+
+ def _post_coercion(self, element, as_key=False, **kw):
+ if as_key:
+ return element.key
+ else:
+ return element
+
+
+class ConstExprImpl(RoleImpl):
+ __slots__ = ()
+
+ def _literal_coercion(self, element, argname=None, **kw):
+ if element is None:
+ return elements.Null()
+ elif element is False:
+ return elements.False_()
+ elif element is True:
+ return elements.True_()
+ else:
+ self._raise_for_expected(element, argname)
+
+
+class TruncatedLabelImpl(_StringOnly, RoleImpl):
+ __slots__ = ()
+
+ def _implicit_coercions(
+ self, original_element, resolved, argname=None, **kw
+ ):
+ if isinstance(original_element, util.string_types):
+ return resolved
+ else:
+ self._raise_for_expected(original_element, argname, resolved)
+
+ def _literal_coercion(self, element, argname=None, **kw):
+ """coerce the given value to :class:`._truncated_label`.
+
+ Existing :class:`._truncated_label` and
+ :class:`._anonymous_label` objects are passed
+ unchanged.
+ """
+
+ if isinstance(element, elements._truncated_label):
+ return element
+ else:
+ return elements._truncated_label(element)
+
+
+class DDLExpressionImpl(_Deannotate, _CoerceLiterals, RoleImpl):
+
+ __slots__ = ()
+
+ _coerce_consts = True
+
+ def _text_coercion(self, element, argname=None):
+ # see #5754 for why we can't easily deprecate this coercion.
+ # essentially expressions like postgresql_where would have to be
+ # text() as they come back from reflection and we don't want to
+ # have text() elements wired into the inspection dictionaries.
+ return elements.TextClause(element)
+
+
+class DDLConstraintColumnImpl(_Deannotate, _ReturnsStringKey, RoleImpl):
+ __slots__ = ()
+
+
+class DDLReferredColumnImpl(DDLConstraintColumnImpl):
+ __slots__ = ()
+
+
+class LimitOffsetImpl(RoleImpl):
+ __slots__ = ()
+
+ def _implicit_coercions(self, element, resolved, argname=None, **kw):
+ if resolved is None:
+ return None
+ else:
+ self._raise_for_expected(element, argname, resolved)
+
+ def _literal_coercion(self, element, name, type_, **kw):
+ if element is None:
+ return None
+ else:
+ value = util.asint(element)
+ return selectable._OffsetLimitParam(
+ name, value, type_=type_, unique=True
+ )
+
+
+class LabeledColumnExprImpl(ExpressionElementImpl):
+ __slots__ = ()
+
+ def _implicit_coercions(
+ self, original_element, resolved, argname=None, **kw
+ ):
+ if isinstance(resolved, roles.ExpressionElementRole):
+ return resolved.label(None)
+ else:
+ new = super(LabeledColumnExprImpl, self)._implicit_coercions(
+ original_element, resolved, argname=argname, **kw
+ )
+ if isinstance(new, roles.ExpressionElementRole):
+ return new.label(None)
+ else:
+ self._raise_for_expected(original_element, argname, resolved)
+
+
+class ColumnsClauseImpl(_SelectIsNotFrom, _CoerceLiterals, RoleImpl):
+ __slots__ = ()
+
+ _coerce_consts = True
+ _coerce_numerics = True
+ _coerce_star = True
+
+ _guess_straight_column = re.compile(r"^\w\S*$", re.I)
+
+ def _text_coercion(self, element, argname=None):
+ element = str(element)
+
+ guess_is_literal = not self._guess_straight_column.match(element)
+ raise exc.ArgumentError(
+ "Textual column expression %(column)r %(argname)sshould be "
+ "explicitly declared with text(%(column)r), "
+ "or use %(literal_column)s(%(column)r) "
+ "for more specificity"
+ % {
+ "column": util.ellipses_string(element),
+ "argname": "for argument %s" % (argname,) if argname else "",
+ "literal_column": "literal_column"
+ if guess_is_literal
+ else "column",
+ }
+ )
+
+
+class ReturnsRowsImpl(RoleImpl):
+ __slots__ = ()
+
+
+class StatementImpl(_CoerceLiterals, RoleImpl):
+ __slots__ = ()
+
+ def _post_coercion(self, resolved, original_element, argname=None, **kw):
+ if resolved is not original_element and not isinstance(
+ original_element, util.string_types
+ ):
+ # use same method as Connection uses; this will later raise
+ # ObjectNotExecutableError
+ try:
+ original_element._execute_on_connection
+ except AttributeError:
+ util.warn_deprecated(
+ "Object %r should not be used directly in a SQL statement "
+ "context, such as passing to methods such as "
+ "session.execute(). This usage will be disallowed in a "
+ "future release. "
+ "Please use Core select() / update() / delete() etc. "
+ "with Session.execute() and other statement execution "
+ "methods." % original_element,
+ "1.4",
+ )
+
+ return resolved
+
+ def _implicit_coercions(
+ self, original_element, resolved, argname=None, **kw
+ ):
+ if resolved._is_lambda_element:
+ return resolved
+ else:
+ return super(StatementImpl, self)._implicit_coercions(
+ original_element, resolved, argname=argname, **kw
+ )
+
+ def _text_coercion(self, element, argname=None):
+ util.warn_deprecated_20(
+ "Using plain strings to indicate SQL statements without using "
+ "the text() construct is "
+ "deprecated and will be removed in version 2.0. Ensure plain "
+ "SQL statements are passed using the text() construct."
+ )
+ return elements.TextClause(element)
+
+
+class SelectStatementImpl(_NoTextCoercion, RoleImpl):
+ __slots__ = ()
+
+ def _implicit_coercions(
+ self, original_element, resolved, argname=None, **kw
+ ):
+ if resolved._is_text_clause:
+ return resolved.columns()
+ else:
+ self._raise_for_expected(original_element, argname, resolved)
+
+
+class HasCTEImpl(ReturnsRowsImpl):
+ __slots__ = ()
+
+
+class IsCTEImpl(RoleImpl):
+ __slots__ = ()
+
+
+class JoinTargetImpl(RoleImpl):
+ __slots__ = ()
+
+ _skip_clauseelement_for_target_match = True
+
+ def _literal_coercion(self, element, legacy=False, **kw):
+ if isinstance(element, str):
+ return element
+
+ def _implicit_coercions(
+ self, original_element, resolved, argname=None, legacy=False, **kw
+ ):
+ if isinstance(original_element, roles.JoinTargetRole):
+ # note that this codepath no longer occurs as of
+ # #6550, unless JoinTargetImpl._skip_clauseelement_for_target_match
+ # were set to False.
+ return original_element
+ elif legacy and isinstance(resolved, str):
+ util.warn_deprecated_20(
+ "Using strings to indicate relationship names in "
+ "Query.join() is deprecated and will be removed in "
+ "SQLAlchemy 2.0. Please use the class-bound attribute "
+ "directly."
+ )
+ return resolved
+ elif legacy and isinstance(resolved, roles.WhereHavingRole):
+ return resolved
+ elif legacy and resolved._is_select_statement:
+ util.warn_deprecated(
+ "Implicit coercion of SELECT and textual SELECT "
+ "constructs into FROM clauses is deprecated; please call "
+ ".subquery() on any Core select or ORM Query object in "
+ "order to produce a subquery object.",
+ version="1.4",
+ )
+ # TODO: doing _implicit_subquery here causes tests to fail,
+ # how was this working before? probably that ORM
+ # join logic treated it as a select and subquery would happen
+ # in _ORMJoin->Join
+ return resolved
+ else:
+ self._raise_for_expected(original_element, argname, resolved)
+
+
+class FromClauseImpl(_SelectIsNotFrom, _NoTextCoercion, RoleImpl):
+ __slots__ = ()
+
+ def _implicit_coercions(
+ self,
+ original_element,
+ resolved,
+ argname=None,
+ explicit_subquery=False,
+ allow_select=True,
+ **kw
+ ):
+ if resolved._is_select_statement:
+ if explicit_subquery:
+ return resolved.subquery()
+ elif allow_select:
+ util.warn_deprecated(
+ "Implicit coercion of SELECT and textual SELECT "
+ "constructs into FROM clauses is deprecated; please call "
+ ".subquery() on any Core select or ORM Query object in "
+ "order to produce a subquery object.",
+ version="1.4",
+ )
+ return resolved._implicit_subquery
+ elif resolved._is_text_clause:
+ return resolved
+ else:
+ self._raise_for_expected(original_element, argname, resolved)
+
+ def _post_coercion(self, element, deannotate=False, **kw):
+ if deannotate:
+ return element._deannotate()
+ else:
+ return element
+
+
+class StrictFromClauseImpl(FromClauseImpl):
+ __slots__ = ()
+
+ def _implicit_coercions(
+ self,
+ original_element,
+ resolved,
+ argname=None,
+ allow_select=False,
+ **kw
+ ):
+ if resolved._is_select_statement and allow_select:
+ util.warn_deprecated(
+ "Implicit coercion of SELECT and textual SELECT constructs "
+ "into FROM clauses is deprecated; please call .subquery() "
+ "on any Core select or ORM Query object in order to produce a "
+ "subquery object.",
+ version="1.4",
+ )
+ return resolved._implicit_subquery
+ else:
+ self._raise_for_expected(original_element, argname, resolved)
+
+
+class AnonymizedFromClauseImpl(StrictFromClauseImpl):
+ __slots__ = ()
+
+ def _post_coercion(self, element, flat=False, name=None, **kw):
+ assert name is None
+
+ return element._anonymous_fromclause(flat=flat)
+
+
+class DMLTableImpl(_SelectIsNotFrom, _NoTextCoercion, RoleImpl):
+ __slots__ = ()
+
+ def _post_coercion(self, element, **kw):
+ if "dml_table" in element._annotations:
+ return element._annotations["dml_table"]
+ else:
+ return element
+
+
+class DMLSelectImpl(_NoTextCoercion, RoleImpl):
+ __slots__ = ()
+
+ def _implicit_coercions(
+ self, original_element, resolved, argname=None, **kw
+ ):
+ if resolved._is_from_clause:
+ if (
+ isinstance(resolved, selectable.Alias)
+ and resolved.element._is_select_statement
+ ):
+ return resolved.element
+ else:
+ return resolved.select()
+ else:
+ self._raise_for_expected(original_element, argname, resolved)
+
+
+class CompoundElementImpl(_NoTextCoercion, RoleImpl):
+ __slots__ = ()
+
+ def _raise_for_expected(self, element, argname=None, resolved=None, **kw):
+ if isinstance(element, roles.FromClauseRole):
+ if element._is_subquery:
+ advice = (
+ "Use the plain select() object without "
+ "calling .subquery() or .alias()."
+ )
+ else:
+ advice = (
+ "To SELECT from any FROM clause, use the .select() method."
+ )
+ else:
+ advice = None
+ return super(CompoundElementImpl, self)._raise_for_expected(
+ element, argname=argname, resolved=resolved, advice=advice, **kw
+ )
+
+
+_impl_lookup = {}
+
+
+for name in dir(roles):
+ cls = getattr(roles, name)
+ if name.endswith("Role"):
+ name = name.replace("Role", "Impl")
+ if name in globals():
+ impl = globals()[name](cls)
+ _impl_lookup[cls] = impl