From 1dac2263372df2b85db5d029a45721fa158a5c9d Mon Sep 17 00:00:00 2001 From: xiubuzhe Date: Sun, 8 Oct 2023 20:59:00 +0800 Subject: first add files --- lib/sqlalchemy/orm/decl_base.py | 1210 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 1210 insertions(+) create mode 100644 lib/sqlalchemy/orm/decl_base.py (limited to 'lib/sqlalchemy/orm/decl_base.py') diff --git a/lib/sqlalchemy/orm/decl_base.py b/lib/sqlalchemy/orm/decl_base.py new file mode 100644 index 0000000..6e1c797 --- /dev/null +++ b/lib/sqlalchemy/orm/decl_base.py @@ -0,0 +1,1210 @@ +# ext/declarative/base.py +# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors +# +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +"""Internal implementation for declarative.""" +from __future__ import absolute_import + +import collections +import weakref + +from sqlalchemy.orm import attributes +from sqlalchemy.orm import instrumentation +from . import clsregistry +from . import exc as orm_exc +from . import mapper as mapperlib +from .attributes import InstrumentedAttribute +from .attributes import QueryableAttribute +from .base import _is_mapped_class +from .base import InspectionAttr +from .descriptor_props import CompositeProperty +from .descriptor_props import SynonymProperty +from .interfaces import MapperProperty +from .mapper import Mapper as mapper +from .properties import ColumnProperty +from .util import class_mapper +from .. import event +from .. import exc +from .. import util +from ..sql import expression +from ..sql.schema import Column +from ..sql.schema import Table +from ..util import topological + + +def _declared_mapping_info(cls): + # deferred mapping + if _DeferredMapperConfig.has_cls(cls): + return _DeferredMapperConfig.config_for_cls(cls) + # regular mapping + elif _is_mapped_class(cls): + return class_mapper(cls, configure=False) + else: + return None + + +def _resolve_for_abstract_or_classical(cls): + if cls is object: + return None + + if cls.__dict__.get("__abstract__", False): + for sup in cls.__bases__: + sup = _resolve_for_abstract_or_classical(sup) + if sup is not None: + return sup + else: + return None + else: + clsmanager = _dive_for_cls_manager(cls) + + if clsmanager: + return clsmanager.class_ + else: + return cls + + +def _get_immediate_cls_attr(cls, attrname, strict=False): + """return an attribute of the class that is either present directly + on the class, e.g. not on a superclass, or is from a superclass but + this superclass is a non-mapped mixin, that is, not a descendant of + the declarative base and is also not classically mapped. + + This is used to detect attributes that indicate something about + a mapped class independently from any mapped classes that it may + inherit from. + + """ + + # the rules are different for this name than others, + # make sure we've moved it out. transitional + assert attrname != "__abstract__" + + if not issubclass(cls, object): + return None + + if attrname in cls.__dict__: + return getattr(cls, attrname) + + for base in cls.__mro__[1:]: + _is_classicial_inherits = _dive_for_cls_manager(base) + + if attrname in base.__dict__ and ( + base is cls + or ( + (base in cls.__bases__ if strict else True) + and not _is_classicial_inherits + ) + ): + return getattr(base, attrname) + else: + return None + + +def _dive_for_cls_manager(cls): + # because the class manager registration is pluggable, + # we need to do the search for every class in the hierarchy, + # rather than just a simple "cls._sa_class_manager" + + # python 2 old style class + if not hasattr(cls, "__mro__"): + return None + + for base in cls.__mro__: + manager = attributes.manager_of_class(base) + if manager: + return manager + return None + + +def _as_declarative(registry, cls, dict_): + + # declarative scans the class for attributes. no table or mapper + # args passed separately. + + return _MapperConfig.setup_mapping(registry, cls, dict_, None, {}) + + +def _mapper(registry, cls, table, mapper_kw): + _ImperativeMapperConfig(registry, cls, table, mapper_kw) + return cls.__mapper__ + + +@util.preload_module("sqlalchemy.orm.decl_api") +def _is_declarative_props(obj): + declared_attr = util.preloaded.orm_decl_api.declared_attr + + return isinstance(obj, (declared_attr, util.classproperty)) + + +def _check_declared_props_nocascade(obj, name, cls): + if _is_declarative_props(obj): + if getattr(obj, "_cascading", False): + util.warn( + "@declared_attr.cascading is not supported on the %s " + "attribute on class %s. This attribute invokes for " + "subclasses in any case." % (name, cls) + ) + return True + else: + return False + + +class _MapperConfig(object): + __slots__ = ( + "cls", + "classname", + "properties", + "declared_attr_reg", + "__weakref__", + ) + + @classmethod + def setup_mapping(cls, registry, cls_, dict_, table, mapper_kw): + manager = attributes.manager_of_class(cls) + if manager and manager.class_ is cls_: + raise exc.InvalidRequestError( + "Class %r already has been " "instrumented declaratively" % cls + ) + + if cls_.__dict__.get("__abstract__", False): + return + + defer_map = _get_immediate_cls_attr( + cls_, "_sa_decl_prepare_nocascade", strict=True + ) or hasattr(cls_, "_sa_decl_prepare") + + if defer_map: + cfg_cls = _DeferredMapperConfig + else: + cfg_cls = _ClassScanMapperConfig + + return cfg_cls(registry, cls_, dict_, table, mapper_kw) + + def __init__(self, registry, cls_, mapper_kw): + self.cls = util.assert_arg_type(cls_, type, "cls_") + self.classname = cls_.__name__ + self.properties = util.OrderedDict() + self.declared_attr_reg = {} + + if not mapper_kw.get("non_primary", False): + instrumentation.register_class( + self.cls, + finalize=False, + registry=registry, + declarative_scan=self, + init_method=registry.constructor, + ) + else: + manager = attributes.manager_of_class(self.cls) + if not manager or not manager.is_mapped: + raise exc.InvalidRequestError( + "Class %s has no primary mapper configured. Configure " + "a primary mapper first before setting up a non primary " + "Mapper." % self.cls + ) + + def set_cls_attribute(self, attrname, value): + + manager = instrumentation.manager_of_class(self.cls) + manager.install_member(attrname, value) + return value + + def _early_mapping(self, mapper_kw): + self.map(mapper_kw) + + +class _ImperativeMapperConfig(_MapperConfig): + __slots__ = ("dict_", "local_table", "inherits") + + def __init__( + self, + registry, + cls_, + table, + mapper_kw, + ): + super(_ImperativeMapperConfig, self).__init__( + registry, cls_, mapper_kw + ) + + self.dict_ = {} + self.local_table = self.set_cls_attribute("__table__", table) + + with mapperlib._CONFIGURE_MUTEX: + if not mapper_kw.get("non_primary", False): + clsregistry.add_class( + self.classname, self.cls, registry._class_registry + ) + + self._setup_inheritance(mapper_kw) + + self._early_mapping(mapper_kw) + + def map(self, mapper_kw=util.EMPTY_DICT): + mapper_cls = mapper + + return self.set_cls_attribute( + "__mapper__", + mapper_cls(self.cls, self.local_table, **mapper_kw), + ) + + def _setup_inheritance(self, mapper_kw): + cls = self.cls + + inherits = mapper_kw.get("inherits", None) + + if inherits is None: + # since we search for classical mappings now, search for + # multiple mapped bases as well and raise an error. + inherits_search = [] + for c in cls.__bases__: + c = _resolve_for_abstract_or_classical(c) + if c is None: + continue + if _declared_mapping_info( + c + ) is not None and not _get_immediate_cls_attr( + c, "_sa_decl_prepare_nocascade", strict=True + ): + inherits_search.append(c) + + if inherits_search: + if len(inherits_search) > 1: + raise exc.InvalidRequestError( + "Class %s has multiple mapped bases: %r" + % (cls, inherits_search) + ) + inherits = inherits_search[0] + elif isinstance(inherits, mapper): + inherits = inherits.class_ + + self.inherits = inherits + + +class _ClassScanMapperConfig(_MapperConfig): + __slots__ = ( + "dict_", + "local_table", + "persist_selectable", + "declared_columns", + "column_copies", + "table_args", + "tablename", + "mapper_args", + "mapper_args_fn", + "inherits", + ) + + def __init__( + self, + registry, + cls_, + dict_, + table, + mapper_kw, + ): + + # grab class dict before the instrumentation manager has been added. + # reduces cycles + self.dict_ = dict(dict_) if dict_ else {} + + super(_ClassScanMapperConfig, self).__init__(registry, cls_, mapper_kw) + + self.persist_selectable = None + self.declared_columns = set() + self.column_copies = {} + self._setup_declared_events() + + self._scan_attributes() + + with mapperlib._CONFIGURE_MUTEX: + clsregistry.add_class( + self.classname, self.cls, registry._class_registry + ) + + self._extract_mappable_attributes() + + self._extract_declared_columns() + + self._setup_table(table) + + self._setup_inheritance(mapper_kw) + + self._early_mapping(mapper_kw) + + def _setup_declared_events(self): + if _get_immediate_cls_attr(self.cls, "__declare_last__"): + + @event.listens_for(mapper, "after_configured") + def after_configured(): + self.cls.__declare_last__() + + if _get_immediate_cls_attr(self.cls, "__declare_first__"): + + @event.listens_for(mapper, "before_configured") + def before_configured(): + self.cls.__declare_first__() + + def _cls_attr_override_checker(self, cls): + """Produce a function that checks if a class has overridden an + attribute, taking SQLAlchemy-enabled dataclass fields into account. + + """ + sa_dataclass_metadata_key = _get_immediate_cls_attr( + cls, "__sa_dataclass_metadata_key__", None + ) + + if sa_dataclass_metadata_key is None: + + def attribute_is_overridden(key, obj): + return getattr(cls, key) is not obj + + else: + + all_datacls_fields = { + f.name: f.metadata[sa_dataclass_metadata_key] + for f in util.dataclass_fields(cls) + if sa_dataclass_metadata_key in f.metadata + } + local_datacls_fields = { + f.name: f.metadata[sa_dataclass_metadata_key] + for f in util.local_dataclass_fields(cls) + if sa_dataclass_metadata_key in f.metadata + } + + absent = object() + + def attribute_is_overridden(key, obj): + if _is_declarative_props(obj): + obj = obj.fget + + # this function likely has some failure modes still if + # someone is doing a deep mixing of the same attribute + # name as plain Python attribute vs. dataclass field. + + ret = local_datacls_fields.get(key, absent) + if _is_declarative_props(ret): + ret = ret.fget + + if ret is obj: + return False + elif ret is not absent: + return True + + all_field = all_datacls_fields.get(key, absent) + + ret = getattr(cls, key, obj) + + if ret is obj: + return False + + # for dataclasses, this could be the + # 'default' of the field. so filter more specifically + # for an already-mapped InstrumentedAttribute + if ret is not absent and isinstance( + ret, InstrumentedAttribute + ): + return True + + if all_field is obj: + return False + elif all_field is not absent: + return True + + # can't find another attribute + return False + + return attribute_is_overridden + + def _cls_attr_resolver(self, cls): + """produce a function to iterate the "attributes" of a class, + adjusting for SQLAlchemy fields embedded in dataclass fields. + + """ + sa_dataclass_metadata_key = _get_immediate_cls_attr( + cls, "__sa_dataclass_metadata_key__", None + ) + + if sa_dataclass_metadata_key is None: + + def local_attributes_for_class(): + for name, obj in vars(cls).items(): + yield name, obj, False + + else: + field_names = set() + + def local_attributes_for_class(): + for field in util.local_dataclass_fields(cls): + if sa_dataclass_metadata_key in field.metadata: + field_names.add(field.name) + yield field.name, _as_dc_declaredattr( + field.metadata, sa_dataclass_metadata_key + ), True + for name, obj in vars(cls).items(): + if name not in field_names: + yield name, obj, False + + return local_attributes_for_class + + def _scan_attributes(self): + cls = self.cls + dict_ = self.dict_ + column_copies = self.column_copies + mapper_args_fn = None + table_args = inherited_table_args = None + tablename = None + + attribute_is_overridden = self._cls_attr_override_checker(self.cls) + + bases = [] + + for base in cls.__mro__: + # collect bases and make sure standalone columns are copied + # to be the column they will ultimately be on the class, + # so that declared_attr functions use the right columns. + # need to do this all the way up the hierarchy first + # (see #8190) + + class_mapped = ( + base is not cls + and _declared_mapping_info(base) is not None + and not _get_immediate_cls_attr( + base, "_sa_decl_prepare_nocascade", strict=True + ) + ) + + local_attributes_for_class = self._cls_attr_resolver(base) + + if not class_mapped and base is not cls: + locally_collected_columns = self._produce_column_copies( + local_attributes_for_class, + attribute_is_overridden, + ) + else: + locally_collected_columns = {} + + bases.append( + ( + base, + class_mapped, + local_attributes_for_class, + locally_collected_columns, + ) + ) + + for ( + base, + class_mapped, + local_attributes_for_class, + locally_collected_columns, + ) in bases: + + # this transfer can also take place as we scan each name + # for finer-grained control of how collected_attributes is + # populated, as this is what impacts column ordering. + # however it's simpler to get it out of the way here. + dict_.update(locally_collected_columns) + + for name, obj, is_dataclass in local_attributes_for_class(): + if name == "__mapper_args__": + check_decl = _check_declared_props_nocascade( + obj, name, cls + ) + if not mapper_args_fn and (not class_mapped or check_decl): + # don't even invoke __mapper_args__ until + # after we've determined everything about the + # mapped table. + # make a copy of it so a class-level dictionary + # is not overwritten when we update column-based + # arguments. + def mapper_args_fn(): + return dict(cls.__mapper_args__) + + elif name == "__tablename__": + check_decl = _check_declared_props_nocascade( + obj, name, cls + ) + if not tablename and (not class_mapped or check_decl): + tablename = cls.__tablename__ + elif name == "__table_args__": + check_decl = _check_declared_props_nocascade( + obj, name, cls + ) + if not table_args and (not class_mapped or check_decl): + table_args = cls.__table_args__ + if not isinstance( + table_args, (tuple, dict, type(None)) + ): + raise exc.ArgumentError( + "__table_args__ value must be a tuple, " + "dict, or None" + ) + if base is not cls: + inherited_table_args = True + elif class_mapped: + if _is_declarative_props(obj): + util.warn( + "Regular (i.e. not __special__) " + "attribute '%s.%s' uses @declared_attr, " + "but owning class %s is mapped - " + "not applying to subclass %s." + % (base.__name__, name, base, cls) + ) + continue + elif base is not cls: + # we're a mixin, abstract base, or something that is + # acting like that for now. + if isinstance(obj, Column): + # already copied columns to the mapped class. + continue + elif isinstance(obj, MapperProperty): + raise exc.InvalidRequestError( + "Mapper properties (i.e. deferred," + "column_property(), relationship(), etc.) must " + "be declared as @declared_attr callables " + "on declarative mixin classes. For dataclass " + "field() objects, use a lambda:" + ) + elif _is_declarative_props(obj): + if obj._cascading: + if name in dict_: + # unfortunately, while we can use the user- + # defined attribute here to allow a clean + # override, if there's another + # subclass below then it still tries to use + # this. not sure if there is enough + # information here to add this as a feature + # later on. + util.warn( + "Attribute '%s' on class %s cannot be " + "processed due to " + "@declared_attr.cascading; " + "skipping" % (name, cls) + ) + dict_[name] = column_copies[ + obj + ] = ret = obj.__get__(obj, cls) + setattr(cls, name, ret) + else: + if is_dataclass: + # access attribute using normal class access + # first, to see if it's been mapped on a + # superclass. note if the dataclasses.field() + # has "default", this value can be anything. + ret = getattr(cls, name, None) + + # so, if it's anything that's not ORM + # mapped, assume we should invoke the + # declared_attr + if not isinstance(ret, InspectionAttr): + ret = obj.fget() + else: + # access attribute using normal class access. + # if the declared attr already took place + # on a superclass that is mapped, then + # this is no longer a declared_attr, it will + # be the InstrumentedAttribute + ret = getattr(cls, name) + + # correct for proxies created from hybrid_property + # or similar. note there is no known case that + # produces nested proxies, so we are only + # looking one level deep right now. + if ( + isinstance(ret, InspectionAttr) + and ret._is_internal_proxy + and not isinstance( + ret.original_property, MapperProperty + ) + ): + ret = ret.descriptor + + dict_[name] = column_copies[obj] = ret + if ( + isinstance(ret, (Column, MapperProperty)) + and ret.doc is None + ): + ret.doc = obj.__doc__ + # here, the attribute is some other kind of property that + # we assume is not part of the declarative mapping. + # however, check for some more common mistakes + else: + self._warn_for_decl_attributes(base, name, obj) + elif is_dataclass and ( + name not in dict_ or dict_[name] is not obj + ): + # here, we are definitely looking at the target class + # and not a superclass. this is currently a + # dataclass-only path. if the name is only + # a dataclass field and isn't in local cls.__dict__, + # put the object there. + # assert that the dataclass-enabled resolver agrees + # with what we are seeing + + assert not attribute_is_overridden(name, obj) + + if _is_declarative_props(obj): + obj = obj.fget() + + dict_[name] = obj + + if inherited_table_args and not tablename: + table_args = None + + self.table_args = table_args + self.tablename = tablename + self.mapper_args_fn = mapper_args_fn + + def _warn_for_decl_attributes(self, cls, key, c): + if isinstance(c, expression.ColumnClause): + util.warn( + "Attribute '%s' on class %s appears to be a non-schema " + "'sqlalchemy.sql.column()' " + "object; this won't be part of the declarative mapping" + % (key, cls) + ) + + def _produce_column_copies( + self, attributes_for_class, attribute_is_overridden + ): + cls = self.cls + dict_ = self.dict_ + locally_collected_attributes = {} + column_copies = self.column_copies + # copy mixin columns to the mapped class + + for name, obj, is_dataclass in attributes_for_class(): + if isinstance(obj, Column): + if attribute_is_overridden(name, obj): + # if column has been overridden + # (like by the InstrumentedAttribute of the + # superclass), skip + continue + elif obj.foreign_keys: + raise exc.InvalidRequestError( + "Columns with foreign keys to other columns " + "must be declared as @declared_attr callables " + "on declarative mixin classes. For dataclass " + "field() objects, use a lambda:." + ) + elif name not in dict_ and not ( + "__table__" in dict_ + and (obj.name or name) in dict_["__table__"].c + ): + column_copies[obj] = copy_ = obj._copy() + copy_._creation_order = obj._creation_order + setattr(cls, name, copy_) + locally_collected_attributes[name] = copy_ + return locally_collected_attributes + + def _extract_mappable_attributes(self): + cls = self.cls + dict_ = self.dict_ + + our_stuff = self.properties + + late_mapped = _get_immediate_cls_attr( + cls, "_sa_decl_prepare_nocascade", strict=True + ) + + for k in list(dict_): + + if k in ("__table__", "__tablename__", "__mapper_args__"): + continue + + value = dict_[k] + if _is_declarative_props(value): + if value._cascading: + util.warn( + "Use of @declared_attr.cascading only applies to " + "Declarative 'mixin' and 'abstract' classes. " + "Currently, this flag is ignored on mapped class " + "%s" % self.cls + ) + + value = getattr(cls, k) + + elif ( + isinstance(value, QueryableAttribute) + and value.class_ is not cls + and value.key != k + ): + # detect a QueryableAttribute that's already mapped being + # assigned elsewhere in userland, turn into a synonym() + value = SynonymProperty(value.key) + setattr(cls, k, value) + + if ( + isinstance(value, tuple) + and len(value) == 1 + and isinstance(value[0], (Column, MapperProperty)) + ): + util.warn( + "Ignoring declarative-like tuple value of attribute " + "'%s': possibly a copy-and-paste error with a comma " + "accidentally placed at the end of the line?" % k + ) + continue + elif not isinstance(value, (Column, MapperProperty)): + # using @declared_attr for some object that + # isn't Column/MapperProperty; remove from the dict_ + # and place the evaluated value onto the class. + if not k.startswith("__"): + dict_.pop(k) + self._warn_for_decl_attributes(cls, k, value) + if not late_mapped: + setattr(cls, k, value) + continue + # we expect to see the name 'metadata' in some valid cases; + # however at this point we see it's assigned to something trying + # to be mapped, so raise for that. + elif k == "metadata": + raise exc.InvalidRequestError( + "Attribute name 'metadata' is reserved " + "for the MetaData instance when using a " + "declarative base class." + ) + our_stuff[k] = value + + def _extract_declared_columns(self): + our_stuff = self.properties + + # set up attributes in the order they were created + util.sort_dictionary( + our_stuff, key=lambda key: our_stuff[key]._creation_order + ) + + # extract columns from the class dict + declared_columns = self.declared_columns + name_to_prop_key = collections.defaultdict(set) + for key, c in list(our_stuff.items()): + if isinstance(c, (ColumnProperty, CompositeProperty)): + for col in c.columns: + if isinstance(col, Column) and col.table is None: + _undefer_column_name(key, col) + if not isinstance(c, CompositeProperty): + name_to_prop_key[col.name].add(key) + declared_columns.add(col) + elif isinstance(c, Column): + _undefer_column_name(key, c) + name_to_prop_key[c.name].add(key) + declared_columns.add(c) + # if the column is the same name as the key, + # remove it from the explicit properties dict. + # the normal rules for assigning column-based properties + # will take over, including precedence of columns + # in multi-column ColumnProperties. + if key == c.key: + del our_stuff[key] + + for name, keys in name_to_prop_key.items(): + if len(keys) > 1: + util.warn( + "On class %r, Column object %r named " + "directly multiple times, " + "only one will be used: %s. " + "Consider using orm.synonym instead" + % (self.classname, name, (", ".join(sorted(keys)))) + ) + + def _setup_table(self, table=None): + cls = self.cls + tablename = self.tablename + table_args = self.table_args + dict_ = self.dict_ + declared_columns = self.declared_columns + + manager = attributes.manager_of_class(cls) + + declared_columns = self.declared_columns = sorted( + declared_columns, key=lambda c: c._creation_order + ) + + if "__table__" not in dict_ and table is None: + if hasattr(cls, "__table_cls__"): + table_cls = util.unbound_method_to_callable(cls.__table_cls__) + else: + table_cls = Table + + if tablename is not None: + + args, table_kw = (), {} + if table_args: + if isinstance(table_args, dict): + table_kw = table_args + elif isinstance(table_args, tuple): + if isinstance(table_args[-1], dict): + args, table_kw = table_args[0:-1], table_args[-1] + else: + args = table_args + + autoload_with = dict_.get("__autoload_with__") + if autoload_with: + table_kw["autoload_with"] = autoload_with + + autoload = dict_.get("__autoload__") + if autoload: + table_kw["autoload"] = True + + table = self.set_cls_attribute( + "__table__", + table_cls( + tablename, + self._metadata_for_cls(manager), + *(tuple(declared_columns) + tuple(args)), + **table_kw + ), + ) + else: + if table is None: + table = cls.__table__ + if declared_columns: + for c in declared_columns: + if not table.c.contains_column(c): + raise exc.ArgumentError( + "Can't add additional column %r when " + "specifying __table__" % c.key + ) + self.local_table = table + + def _metadata_for_cls(self, manager): + if hasattr(self.cls, "metadata"): + return self.cls.metadata + else: + return manager.registry.metadata + + def _setup_inheritance(self, mapper_kw): + table = self.local_table + cls = self.cls + table_args = self.table_args + declared_columns = self.declared_columns + + inherits = mapper_kw.get("inherits", None) + + if inherits is None: + # since we search for classical mappings now, search for + # multiple mapped bases as well and raise an error. + inherits_search = [] + for c in cls.__bases__: + c = _resolve_for_abstract_or_classical(c) + if c is None: + continue + if _declared_mapping_info( + c + ) is not None and not _get_immediate_cls_attr( + c, "_sa_decl_prepare_nocascade", strict=True + ): + if c not in inherits_search: + inherits_search.append(c) + + if inherits_search: + if len(inherits_search) > 1: + raise exc.InvalidRequestError( + "Class %s has multiple mapped bases: %r" + % (cls, inherits_search) + ) + inherits = inherits_search[0] + elif isinstance(inherits, mapper): + inherits = inherits.class_ + + self.inherits = inherits + + if ( + table is None + and self.inherits is None + and not _get_immediate_cls_attr(cls, "__no_table__") + ): + + raise exc.InvalidRequestError( + "Class %r does not have a __table__ or __tablename__ " + "specified and does not inherit from an existing " + "table-mapped class." % cls + ) + elif self.inherits: + inherited_mapper = _declared_mapping_info(self.inherits) + inherited_table = inherited_mapper.local_table + inherited_persist_selectable = inherited_mapper.persist_selectable + + if table is None: + # single table inheritance. + # ensure no table args + if table_args: + raise exc.ArgumentError( + "Can't place __table_args__ on an inherited class " + "with no table." + ) + # add any columns declared here to the inherited table. + for c in declared_columns: + if c.name in inherited_table.c: + if inherited_table.c[c.name] is c: + continue + raise exc.ArgumentError( + "Column '%s' on class %s conflicts with " + "existing column '%s'" + % (c, cls, inherited_table.c[c.name]) + ) + if c.primary_key: + raise exc.ArgumentError( + "Can't place primary key columns on an inherited " + "class with no table." + ) + inherited_table.append_column(c) + if ( + inherited_persist_selectable is not None + and inherited_persist_selectable is not inherited_table + ): + inherited_persist_selectable._refresh_for_new_column(c) + + def _prepare_mapper_arguments(self, mapper_kw): + properties = self.properties + + if self.mapper_args_fn: + mapper_args = self.mapper_args_fn() + else: + mapper_args = {} + + if mapper_kw: + mapper_args.update(mapper_kw) + + if "properties" in mapper_args: + properties = dict(properties) + properties.update(mapper_args["properties"]) + + # make sure that column copies are used rather + # than the original columns from any mixins + for k in ("version_id_col", "polymorphic_on"): + if k in mapper_args: + v = mapper_args[k] + mapper_args[k] = self.column_copies.get(v, v) + + if "inherits" in mapper_args: + inherits_arg = mapper_args["inherits"] + if isinstance(inherits_arg, mapper): + inherits_arg = inherits_arg.class_ + + if inherits_arg is not self.inherits: + raise exc.InvalidRequestError( + "mapper inherits argument given for non-inheriting " + "class %s" % (mapper_args["inherits"]) + ) + + if self.inherits: + mapper_args["inherits"] = self.inherits + + if self.inherits and not mapper_args.get("concrete", False): + # single or joined inheritance + # exclude any cols on the inherited table which are + # not mapped on the parent class, to avoid + # mapping columns specific to sibling/nephew classes + inherited_mapper = _declared_mapping_info(self.inherits) + inherited_table = inherited_mapper.local_table + + if "exclude_properties" not in mapper_args: + mapper_args["exclude_properties"] = exclude_properties = set( + [ + c.key + for c in inherited_table.c + if c not in inherited_mapper._columntoproperty + ] + ).union(inherited_mapper.exclude_properties or ()) + exclude_properties.difference_update( + [c.key for c in self.declared_columns] + ) + + # look through columns in the current mapper that + # are keyed to a propname different than the colname + # (if names were the same, we'd have popped it out above, + # in which case the mapper makes this combination). + # See if the superclass has a similar column property. + # If so, join them together. + for k, col in list(properties.items()): + if not isinstance(col, expression.ColumnElement): + continue + if k in inherited_mapper._props: + p = inherited_mapper._props[k] + if isinstance(p, ColumnProperty): + # note here we place the subclass column + # first. See [ticket:1892] for background. + properties[k] = [col] + p.columns + result_mapper_args = mapper_args.copy() + result_mapper_args["properties"] = properties + self.mapper_args = result_mapper_args + + def map(self, mapper_kw=util.EMPTY_DICT): + self._prepare_mapper_arguments(mapper_kw) + if hasattr(self.cls, "__mapper_cls__"): + mapper_cls = util.unbound_method_to_callable( + self.cls.__mapper_cls__ + ) + else: + mapper_cls = mapper + + return self.set_cls_attribute( + "__mapper__", + mapper_cls(self.cls, self.local_table, **self.mapper_args), + ) + + +@util.preload_module("sqlalchemy.orm.decl_api") +def _as_dc_declaredattr(field_metadata, sa_dataclass_metadata_key): + # wrap lambdas inside dataclass fields inside an ad-hoc declared_attr. + # we can't write it because field.metadata is immutable :( so we have + # to go through extra trouble to compare these + decl_api = util.preloaded.orm_decl_api + obj = field_metadata[sa_dataclass_metadata_key] + if callable(obj) and not isinstance(obj, decl_api.declared_attr): + return decl_api.declared_attr(obj) + else: + return obj + + +class _DeferredMapperConfig(_ClassScanMapperConfig): + _configs = util.OrderedDict() + + def _early_mapping(self, mapper_kw): + pass + + @property + def cls(self): + return self._cls() + + @cls.setter + def cls(self, class_): + self._cls = weakref.ref(class_, self._remove_config_cls) + self._configs[self._cls] = self + + @classmethod + def _remove_config_cls(cls, ref): + cls._configs.pop(ref, None) + + @classmethod + def has_cls(cls, class_): + # 2.6 fails on weakref if class_ is an old style class + return isinstance(class_, type) and weakref.ref(class_) in cls._configs + + @classmethod + def raise_unmapped_for_cls(cls, class_): + if hasattr(class_, "_sa_raise_deferred_config"): + class_._sa_raise_deferred_config() + + raise orm_exc.UnmappedClassError( + class_, + msg="Class %s has a deferred mapping on it. It is not yet " + "usable as a mapped class." % orm_exc._safe_cls_name(class_), + ) + + @classmethod + def config_for_cls(cls, class_): + return cls._configs[weakref.ref(class_)] + + @classmethod + def classes_for_base(cls, base_cls, sort=True): + classes_for_base = [ + m + for m, cls_ in [(m, m.cls) for m in cls._configs.values()] + if cls_ is not None and issubclass(cls_, base_cls) + ] + + if not sort: + return classes_for_base + + all_m_by_cls = dict((m.cls, m) for m in classes_for_base) + + tuples = [] + for m_cls in all_m_by_cls: + tuples.extend( + (all_m_by_cls[base_cls], all_m_by_cls[m_cls]) + for base_cls in m_cls.__bases__ + if base_cls in all_m_by_cls + ) + return list(topological.sort(tuples, classes_for_base)) + + def map(self, mapper_kw=util.EMPTY_DICT): + self._configs.pop(self._cls, None) + return super(_DeferredMapperConfig, self).map(mapper_kw) + + +def _add_attribute(cls, key, value): + """add an attribute to an existing declarative class. + + This runs through the logic to determine MapperProperty, + adds it to the Mapper, adds a column to the mapped Table, etc. + + """ + + if "__mapper__" in cls.__dict__: + if isinstance(value, Column): + _undefer_column_name(key, value) + cls.__table__.append_column(value, replace_existing=True) + cls.__mapper__.add_property(key, value) + elif isinstance(value, ColumnProperty): + for col in value.columns: + if isinstance(col, Column) and col.table is None: + _undefer_column_name(key, col) + cls.__table__.append_column(col, replace_existing=True) + cls.__mapper__.add_property(key, value) + elif isinstance(value, MapperProperty): + cls.__mapper__.add_property(key, value) + elif isinstance(value, QueryableAttribute) and value.key != key: + # detect a QueryableAttribute that's already mapped being + # assigned elsewhere in userland, turn into a synonym() + value = SynonymProperty(value.key) + cls.__mapper__.add_property(key, value) + else: + type.__setattr__(cls, key, value) + cls.__mapper__._expire_memoizations() + else: + type.__setattr__(cls, key, value) + + +def _del_attribute(cls, key): + + if ( + "__mapper__" in cls.__dict__ + and key in cls.__dict__ + and not cls.__mapper__._dispose_called + ): + value = cls.__dict__[key] + if isinstance( + value, (Column, ColumnProperty, MapperProperty, QueryableAttribute) + ): + raise NotImplementedError( + "Can't un-map individual mapped attributes on a mapped class." + ) + else: + type.__delattr__(cls, key) + cls.__mapper__._expire_memoizations() + else: + type.__delattr__(cls, key) + + +def _declarative_constructor(self, **kwargs): + """A simple constructor that allows initialization from kwargs. + + Sets attributes on the constructed instance using the names and + values in ``kwargs``. + + Only keys that are present as + attributes of the instance's class are allowed. These could be, + for example, any mapped columns or relationships. + """ + cls_ = type(self) + for k in kwargs: + if not hasattr(cls_, k): + raise TypeError( + "%r is an invalid keyword argument for %s" % (k, cls_.__name__) + ) + setattr(self, k, kwargs[k]) + + +_declarative_constructor.__name__ = "__init__" + + +def _undefer_column_name(key, column): + if column.key is None: + column.key = key + if column.name is None: + column.name = key -- cgit v1.2.3