| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192 |
- # orm/decl_base.py
- # Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
- # <see AUTHORS file>
- #
- # This module is part of SQLAlchemy and is released under
- # the MIT License: https://www.opensource.org/licenses/mit-license.php
- """Internal implementation for declarative."""
- from __future__ import annotations
- import collections
- import dataclasses
- import re
- from typing import Any
- from typing import Callable
- from typing import cast
- from typing import Dict
- from typing import Iterable
- from typing import List
- from typing import Mapping
- from typing import NamedTuple
- from typing import NoReturn
- from typing import Optional
- from typing import Sequence
- from typing import Tuple
- from typing import Type
- from typing import TYPE_CHECKING
- from typing import TypeVar
- from typing import Union
- import weakref
- from . import attributes
- from . import clsregistry
- from . import exc as orm_exc
- from . import instrumentation
- from . import mapperlib
- from ._typing import _O
- from ._typing import attr_is_internal_proxy
- from .attributes import InstrumentedAttribute
- from .attributes import QueryableAttribute
- from .base import _is_mapped_class
- from .base import InspectionAttr
- from .descriptor_props import CompositeProperty
- from .descriptor_props import SynonymProperty
- from .interfaces import _AttributeOptions
- from .interfaces import _DCAttributeOptions
- from .interfaces import _IntrospectsAnnotations
- from .interfaces import _MappedAttribute
- from .interfaces import _MapsColumns
- from .interfaces import MapperProperty
- from .mapper import Mapper
- from .properties import ColumnProperty
- from .properties import MappedColumn
- from .util import _extract_mapped_subtype
- from .util import _is_mapped_annotation
- from .util import class_mapper
- from .util import de_stringify_annotation
- from .. import event
- from .. import exc
- from .. import util
- from ..sql import expression
- from ..sql.base import _NoArg
- from ..sql.schema import Column
- from ..sql.schema import Table
- from ..util import topological
- from ..util.typing import _AnnotationScanType
- from ..util.typing import get_args
- from ..util.typing import is_fwd_ref
- from ..util.typing import is_literal
- from ..util.typing import Protocol
- from ..util.typing import TypedDict
- if TYPE_CHECKING:
- from ._typing import _ClassDict
- from ._typing import _RegistryType
- from .base import Mapped
- from .decl_api import declared_attr
- from .instrumentation import ClassManager
- from ..sql.elements import NamedColumn
- from ..sql.schema import MetaData
- from ..sql.selectable import FromClause
- _T = TypeVar("_T", bound=Any)
- _MapperKwArgs = Mapping[str, Any]
- _TableArgsType = Union[Tuple[Any, ...], Dict[str, Any]]
- class MappedClassProtocol(Protocol[_O]):
- """A protocol representing a SQLAlchemy mapped class.
- The protocol is generic on the type of class, use
- ``MappedClassProtocol[Any]`` to allow any mapped class.
- """
- __name__: str
- __mapper__: Mapper[_O]
- __table__: FromClause
- def __call__(self, **kw: Any) -> _O: ...
- class _DeclMappedClassProtocol(MappedClassProtocol[_O], Protocol):
- "Internal more detailed version of ``MappedClassProtocol``."
- metadata: MetaData
- __tablename__: str
- __mapper_args__: _MapperKwArgs
- __table_args__: Optional[_TableArgsType]
- _sa_apply_dc_transforms: Optional[_DataclassArguments]
- def __declare_first__(self) -> None: ...
- def __declare_last__(self) -> None: ...
- class _DataclassArguments(TypedDict):
- init: Union[_NoArg, bool]
- repr: Union[_NoArg, bool]
- eq: Union[_NoArg, bool]
- order: Union[_NoArg, bool]
- unsafe_hash: Union[_NoArg, bool]
- match_args: Union[_NoArg, bool]
- kw_only: Union[_NoArg, bool]
- dataclass_callable: Union[_NoArg, Callable[..., Type[Any]]]
- def _declared_mapping_info(
- cls: Type[Any],
- ) -> Optional[Union[_DeferredMapperConfig, Mapper[Any]]]:
- # deferred mapping
- if _DeferredMapperConfig.has_cls(cls):
- return _DeferredMapperConfig.config_for_cls(cls)
- # regular mapping
- elif _is_mapped_class(cls):
- return class_mapper(cls, configure=False)
- else:
- return None
- def _is_supercls_for_inherits(cls: Type[Any]) -> bool:
- """return True if this class will be used as a superclass to set in
- 'inherits'.
- This includes deferred mapper configs that aren't mapped yet, however does
- not include classes with _sa_decl_prepare_nocascade (e.g.
- ``AbstractConcreteBase``); these concrete-only classes are not set up as
- "inherits" until after mappers are configured using
- mapper._set_concrete_base()
- """
- if _DeferredMapperConfig.has_cls(cls):
- return not _get_immediate_cls_attr(
- cls, "_sa_decl_prepare_nocascade", strict=True
- )
- # regular mapping
- elif _is_mapped_class(cls):
- return True
- else:
- return False
- def _resolve_for_abstract_or_classical(cls: Type[Any]) -> Optional[Type[Any]]:
- if cls is object:
- return None
- sup: Optional[Type[Any]]
- if cls.__dict__.get("__abstract__", False):
- for base_ in cls.__bases__:
- sup = _resolve_for_abstract_or_classical(base_)
- if sup is not None:
- return sup
- else:
- return None
- else:
- clsmanager = _dive_for_cls_manager(cls)
- if clsmanager:
- return clsmanager.class_
- else:
- return cls
- def _get_immediate_cls_attr(
- cls: Type[Any], attrname: str, strict: bool = False
- ) -> Optional[Any]:
- """return an attribute of the class that is either present directly
- on the class, e.g. not on a superclass, or is from a superclass but
- this superclass is a non-mapped mixin, that is, not a descendant of
- the declarative base and is also not classically mapped.
- This is used to detect attributes that indicate something about
- a mapped class independently from any mapped classes that it may
- inherit from.
- """
- # the rules are different for this name than others,
- # make sure we've moved it out. transitional
- assert attrname != "__abstract__"
- if not issubclass(cls, object):
- return None
- if attrname in cls.__dict__:
- return getattr(cls, attrname)
- for base in cls.__mro__[1:]:
- _is_classical_inherits = _dive_for_cls_manager(base) is not None
- if attrname in base.__dict__ and (
- base is cls
- or (
- (base in cls.__bases__ if strict else True)
- and not _is_classical_inherits
- )
- ):
- return getattr(base, attrname)
- else:
- return None
- def _dive_for_cls_manager(cls: Type[_O]) -> Optional[ClassManager[_O]]:
- # because the class manager registration is pluggable,
- # we need to do the search for every class in the hierarchy,
- # rather than just a simple "cls._sa_class_manager"
- for base in cls.__mro__:
- manager: Optional[ClassManager[_O]] = attributes.opt_manager_of_class(
- base
- )
- if manager:
- return manager
- return None
- def _as_declarative(
- registry: _RegistryType, cls: Type[Any], dict_: _ClassDict
- ) -> Optional[_MapperConfig]:
- # declarative scans the class for attributes. no table or mapper
- # args passed separately.
- return _MapperConfig.setup_mapping(registry, cls, dict_, None, {})
- def _mapper(
- registry: _RegistryType,
- cls: Type[_O],
- table: Optional[FromClause],
- mapper_kw: _MapperKwArgs,
- ) -> Mapper[_O]:
- _ImperativeMapperConfig(registry, cls, table, mapper_kw)
- return cast("MappedClassProtocol[_O]", cls).__mapper__
- @util.preload_module("sqlalchemy.orm.decl_api")
- def _is_declarative_props(obj: Any) -> bool:
- _declared_attr_common = util.preloaded.orm_decl_api._declared_attr_common
- return isinstance(obj, (_declared_attr_common, util.classproperty))
- def _check_declared_props_nocascade(
- obj: Any, name: str, cls: Type[_O]
- ) -> bool:
- if _is_declarative_props(obj):
- if getattr(obj, "_cascading", False):
- util.warn(
- "@declared_attr.cascading is not supported on the %s "
- "attribute on class %s. This attribute invokes for "
- "subclasses in any case." % (name, cls)
- )
- return True
- else:
- return False
- class _MapperConfig:
- __slots__ = (
- "cls",
- "classname",
- "properties",
- "declared_attr_reg",
- "__weakref__",
- )
- cls: Type[Any]
- classname: str
- properties: util.OrderedDict[
- str,
- Union[
- Sequence[NamedColumn[Any]], NamedColumn[Any], MapperProperty[Any]
- ],
- ]
- declared_attr_reg: Dict[declared_attr[Any], Any]
- @classmethod
- def setup_mapping(
- cls,
- registry: _RegistryType,
- cls_: Type[_O],
- dict_: _ClassDict,
- table: Optional[FromClause],
- mapper_kw: _MapperKwArgs,
- ) -> Optional[_MapperConfig]:
- manager = attributes.opt_manager_of_class(cls)
- if manager and manager.class_ is cls_:
- raise exc.InvalidRequestError(
- f"Class {cls!r} already has been instrumented declaratively"
- )
- if cls_.__dict__.get("__abstract__", False):
- return None
- defer_map = _get_immediate_cls_attr(
- cls_, "_sa_decl_prepare_nocascade", strict=True
- ) or hasattr(cls_, "_sa_decl_prepare")
- if defer_map:
- return _DeferredMapperConfig(
- registry, cls_, dict_, table, mapper_kw
- )
- else:
- return _ClassScanMapperConfig(
- registry, cls_, dict_, table, mapper_kw
- )
- def __init__(
- self,
- registry: _RegistryType,
- cls_: Type[Any],
- mapper_kw: _MapperKwArgs,
- ):
- self.cls = util.assert_arg_type(cls_, type, "cls_")
- self.classname = cls_.__name__
- self.properties = util.OrderedDict()
- self.declared_attr_reg = {}
- if not mapper_kw.get("non_primary", False):
- instrumentation.register_class(
- self.cls,
- finalize=False,
- registry=registry,
- declarative_scan=self,
- init_method=registry.constructor,
- )
- else:
- manager = attributes.opt_manager_of_class(self.cls)
- if not manager or not manager.is_mapped:
- raise exc.InvalidRequestError(
- "Class %s has no primary mapper configured. Configure "
- "a primary mapper first before setting up a non primary "
- "Mapper." % self.cls
- )
- def set_cls_attribute(self, attrname: str, value: _T) -> _T:
- manager = instrumentation.manager_of_class(self.cls)
- manager.install_member(attrname, value)
- return value
- def map(self, mapper_kw: _MapperKwArgs = ...) -> Mapper[Any]:
- raise NotImplementedError()
- def _early_mapping(self, mapper_kw: _MapperKwArgs) -> None:
- self.map(mapper_kw)
- class _ImperativeMapperConfig(_MapperConfig):
- __slots__ = ("local_table", "inherits")
- def __init__(
- self,
- registry: _RegistryType,
- cls_: Type[_O],
- table: Optional[FromClause],
- mapper_kw: _MapperKwArgs,
- ):
- super().__init__(registry, cls_, mapper_kw)
- self.local_table = self.set_cls_attribute("__table__", table)
- with mapperlib._CONFIGURE_MUTEX:
- if not mapper_kw.get("non_primary", False):
- clsregistry.add_class(
- self.classname, self.cls, registry._class_registry
- )
- self._setup_inheritance(mapper_kw)
- self._early_mapping(mapper_kw)
- def map(self, mapper_kw: _MapperKwArgs = util.EMPTY_DICT) -> Mapper[Any]:
- mapper_cls = Mapper
- return self.set_cls_attribute(
- "__mapper__",
- mapper_cls(self.cls, self.local_table, **mapper_kw),
- )
- def _setup_inheritance(self, mapper_kw: _MapperKwArgs) -> None:
- cls = self.cls
- inherits = mapper_kw.get("inherits", None)
- if inherits is None:
- # since we search for classical mappings now, search for
- # multiple mapped bases as well and raise an error.
- inherits_search = []
- for base_ in cls.__bases__:
- c = _resolve_for_abstract_or_classical(base_)
- if c is None:
- continue
- if _is_supercls_for_inherits(c) and c not in inherits_search:
- inherits_search.append(c)
- if inherits_search:
- if len(inherits_search) > 1:
- raise exc.InvalidRequestError(
- "Class %s has multiple mapped bases: %r"
- % (cls, inherits_search)
- )
- inherits = inherits_search[0]
- elif isinstance(inherits, Mapper):
- inherits = inherits.class_
- self.inherits = inherits
- class _CollectedAnnotation(NamedTuple):
- raw_annotation: _AnnotationScanType
- mapped_container: Optional[Type[Mapped[Any]]]
- extracted_mapped_annotation: Union[_AnnotationScanType, str]
- is_dataclass: bool
- attr_value: Any
- originating_module: str
- originating_class: Type[Any]
- class _ClassScanMapperConfig(_MapperConfig):
- __slots__ = (
- "registry",
- "clsdict_view",
- "collected_attributes",
- "collected_annotations",
- "local_table",
- "persist_selectable",
- "declared_columns",
- "column_ordering",
- "column_copies",
- "table_args",
- "tablename",
- "mapper_args",
- "mapper_args_fn",
- "table_fn",
- "inherits",
- "single",
- "allow_dataclass_fields",
- "dataclass_setup_arguments",
- "is_dataclass_prior_to_mapping",
- "allow_unmapped_annotations",
- )
- is_deferred = False
- registry: _RegistryType
- clsdict_view: _ClassDict
- collected_annotations: Dict[str, _CollectedAnnotation]
- collected_attributes: Dict[str, Any]
- local_table: Optional[FromClause]
- persist_selectable: Optional[FromClause]
- declared_columns: util.OrderedSet[Column[Any]]
- column_ordering: Dict[Column[Any], int]
- column_copies: Dict[
- Union[MappedColumn[Any], Column[Any]],
- Union[MappedColumn[Any], Column[Any]],
- ]
- tablename: Optional[str]
- mapper_args: Mapping[str, Any]
- table_args: Optional[_TableArgsType]
- mapper_args_fn: Optional[Callable[[], Dict[str, Any]]]
- inherits: Optional[Type[Any]]
- single: bool
- is_dataclass_prior_to_mapping: bool
- allow_unmapped_annotations: bool
- dataclass_setup_arguments: Optional[_DataclassArguments]
- """if the class has SQLAlchemy native dataclass parameters, where
- we will turn the class into a dataclass within the declarative mapping
- process.
- """
- allow_dataclass_fields: bool
- """if true, look for dataclass-processed Field objects on the target
- class as well as superclasses and extract ORM mapping directives from
- the "metadata" attribute of each Field.
- if False, dataclass fields can still be used, however they won't be
- mapped.
- """
- def __init__(
- self,
- registry: _RegistryType,
- cls_: Type[_O],
- dict_: _ClassDict,
- table: Optional[FromClause],
- mapper_kw: _MapperKwArgs,
- ):
- # grab class dict before the instrumentation manager has been added.
- # reduces cycles
- self.clsdict_view = (
- util.immutabledict(dict_) if dict_ else util.EMPTY_DICT
- )
- super().__init__(registry, cls_, mapper_kw)
- self.registry = registry
- self.persist_selectable = None
- self.collected_attributes = {}
- self.collected_annotations = {}
- self.declared_columns = util.OrderedSet()
- self.column_ordering = {}
- self.column_copies = {}
- self.single = False
- self.dataclass_setup_arguments = dca = getattr(
- self.cls, "_sa_apply_dc_transforms", None
- )
- self.allow_unmapped_annotations = getattr(
- self.cls, "__allow_unmapped__", False
- ) or bool(self.dataclass_setup_arguments)
- self.is_dataclass_prior_to_mapping = cld = dataclasses.is_dataclass(
- cls_
- )
- sdk = _get_immediate_cls_attr(cls_, "__sa_dataclass_metadata_key__")
- # we don't want to consume Field objects from a not-already-dataclass.
- # the Field objects won't have their "name" or "type" populated,
- # and while it seems like we could just set these on Field as we
- # read them, Field is documented as "user read only" and we need to
- # stay far away from any off-label use of dataclasses APIs.
- if (not cld or dca) and sdk:
- raise exc.InvalidRequestError(
- "SQLAlchemy mapped dataclasses can't consume mapping "
- "information from dataclass.Field() objects if the immediate "
- "class is not already a dataclass."
- )
- # if already a dataclass, and __sa_dataclass_metadata_key__ present,
- # then also look inside of dataclass.Field() objects yielded by
- # dataclasses.get_fields(cls) when scanning for attributes
- self.allow_dataclass_fields = bool(sdk and cld)
- self._setup_declared_events()
- self._scan_attributes()
- self._setup_dataclasses_transforms()
- with mapperlib._CONFIGURE_MUTEX:
- clsregistry.add_class(
- self.classname, self.cls, registry._class_registry
- )
- self._setup_inheriting_mapper(mapper_kw)
- self._extract_mappable_attributes()
- self._extract_declared_columns()
- self._setup_table(table)
- self._setup_inheriting_columns(mapper_kw)
- self._early_mapping(mapper_kw)
- def _setup_declared_events(self) -> None:
- if _get_immediate_cls_attr(self.cls, "__declare_last__"):
- @event.listens_for(Mapper, "after_configured")
- def after_configured() -> None:
- cast(
- "_DeclMappedClassProtocol[Any]", self.cls
- ).__declare_last__()
- if _get_immediate_cls_attr(self.cls, "__declare_first__"):
- @event.listens_for(Mapper, "before_configured")
- def before_configured() -> None:
- cast(
- "_DeclMappedClassProtocol[Any]", self.cls
- ).__declare_first__()
- def _cls_attr_override_checker(
- self, cls: Type[_O]
- ) -> Callable[[str, Any], bool]:
- """Produce a function that checks if a class has overridden an
- attribute, taking SQLAlchemy-enabled dataclass fields into account.
- """
- if self.allow_dataclass_fields:
- sa_dataclass_metadata_key = _get_immediate_cls_attr(
- cls, "__sa_dataclass_metadata_key__"
- )
- else:
- sa_dataclass_metadata_key = None
- if not sa_dataclass_metadata_key:
- def attribute_is_overridden(key: str, obj: Any) -> bool:
- return getattr(cls, key, obj) is not obj
- else:
- all_datacls_fields = {
- f.name: f.metadata[sa_dataclass_metadata_key]
- for f in util.dataclass_fields(cls)
- if sa_dataclass_metadata_key in f.metadata
- }
- local_datacls_fields = {
- f.name: f.metadata[sa_dataclass_metadata_key]
- for f in util.local_dataclass_fields(cls)
- if sa_dataclass_metadata_key in f.metadata
- }
- absent = object()
- def attribute_is_overridden(key: str, obj: Any) -> bool:
- if _is_declarative_props(obj):
- obj = obj.fget
- # this function likely has some failure modes still if
- # someone is doing a deep mixing of the same attribute
- # name as plain Python attribute vs. dataclass field.
- ret = local_datacls_fields.get(key, absent)
- if _is_declarative_props(ret):
- ret = ret.fget
- if ret is obj:
- return False
- elif ret is not absent:
- return True
- all_field = all_datacls_fields.get(key, absent)
- ret = getattr(cls, key, obj)
- if ret is obj:
- return False
- # for dataclasses, this could be the
- # 'default' of the field. so filter more specifically
- # for an already-mapped InstrumentedAttribute
- if ret is not absent and isinstance(
- ret, InstrumentedAttribute
- ):
- return True
- if all_field is obj:
- return False
- elif all_field is not absent:
- return True
- # can't find another attribute
- return False
- return attribute_is_overridden
- _include_dunders = {
- "__table__",
- "__mapper_args__",
- "__tablename__",
- "__table_args__",
- }
- _match_exclude_dunders = re.compile(r"^(?:_sa_|__)")
- def _cls_attr_resolver(
- self, cls: Type[Any]
- ) -> Callable[[], Iterable[Tuple[str, Any, Any, bool]]]:
- """produce a function to iterate the "attributes" of a class
- which we want to consider for mapping, adjusting for SQLAlchemy fields
- embedded in dataclass fields.
- """
- cls_annotations = util.get_annotations(cls)
- cls_vars = vars(cls)
- _include_dunders = self._include_dunders
- _match_exclude_dunders = self._match_exclude_dunders
- names = [
- n
- for n in util.merge_lists_w_ordering(
- list(cls_vars), list(cls_annotations)
- )
- if not _match_exclude_dunders.match(n) or n in _include_dunders
- ]
- if self.allow_dataclass_fields:
- sa_dataclass_metadata_key: Optional[str] = _get_immediate_cls_attr(
- cls, "__sa_dataclass_metadata_key__"
- )
- else:
- sa_dataclass_metadata_key = None
- if not sa_dataclass_metadata_key:
- def local_attributes_for_class() -> (
- Iterable[Tuple[str, Any, Any, bool]]
- ):
- return (
- (
- name,
- cls_vars.get(name),
- cls_annotations.get(name),
- False,
- )
- for name in names
- )
- else:
- dataclass_fields = {
- field.name: field for field in util.local_dataclass_fields(cls)
- }
- fixed_sa_dataclass_metadata_key = sa_dataclass_metadata_key
- def local_attributes_for_class() -> (
- Iterable[Tuple[str, Any, Any, bool]]
- ):
- for name in names:
- field = dataclass_fields.get(name, None)
- if field and sa_dataclass_metadata_key in field.metadata:
- yield field.name, _as_dc_declaredattr(
- field.metadata, fixed_sa_dataclass_metadata_key
- ), cls_annotations.get(field.name), True
- else:
- yield name, cls_vars.get(name), cls_annotations.get(
- name
- ), False
- return local_attributes_for_class
- def _scan_attributes(self) -> None:
- cls = self.cls
- cls_as_Decl = cast("_DeclMappedClassProtocol[Any]", cls)
- clsdict_view = self.clsdict_view
- collected_attributes = self.collected_attributes
- column_copies = self.column_copies
- _include_dunders = self._include_dunders
- mapper_args_fn = None
- table_args = inherited_table_args = None
- table_fn = None
- tablename = None
- fixed_table = "__table__" in clsdict_view
- attribute_is_overridden = self._cls_attr_override_checker(self.cls)
- bases = []
- for base in cls.__mro__:
- # collect bases and make sure standalone columns are copied
- # to be the column they will ultimately be on the class,
- # so that declared_attr functions use the right columns.
- # need to do this all the way up the hierarchy first
- # (see #8190)
- class_mapped = base is not cls and _is_supercls_for_inherits(base)
- local_attributes_for_class = self._cls_attr_resolver(base)
- if not class_mapped and base is not cls:
- locally_collected_columns = self._produce_column_copies(
- local_attributes_for_class,
- attribute_is_overridden,
- fixed_table,
- base,
- )
- else:
- locally_collected_columns = {}
- bases.append(
- (
- base,
- class_mapped,
- local_attributes_for_class,
- locally_collected_columns,
- )
- )
- for (
- base,
- class_mapped,
- local_attributes_for_class,
- locally_collected_columns,
- ) in bases:
- # this transfer can also take place as we scan each name
- # for finer-grained control of how collected_attributes is
- # populated, as this is what impacts column ordering.
- # however it's simpler to get it out of the way here.
- collected_attributes.update(locally_collected_columns)
- for (
- name,
- obj,
- annotation,
- is_dataclass_field,
- ) in local_attributes_for_class():
- if name in _include_dunders:
- if name == "__mapper_args__":
- check_decl = _check_declared_props_nocascade(
- obj, name, cls
- )
- if not mapper_args_fn and (
- not class_mapped or check_decl
- ):
- # don't even invoke __mapper_args__ until
- # after we've determined everything about the
- # mapped table.
- # make a copy of it so a class-level dictionary
- # is not overwritten when we update column-based
- # arguments.
- def _mapper_args_fn() -> Dict[str, Any]:
- return dict(cls_as_Decl.__mapper_args__)
- mapper_args_fn = _mapper_args_fn
- elif name == "__tablename__":
- check_decl = _check_declared_props_nocascade(
- obj, name, cls
- )
- if not tablename and (not class_mapped or check_decl):
- tablename = cls_as_Decl.__tablename__
- elif name == "__table__":
- check_decl = _check_declared_props_nocascade(
- obj, name, cls
- )
- # if a @declared_attr using "__table__" is detected,
- # wrap up a callable to look for "__table__" from
- # the final concrete class when we set up a table.
- # this was fixed by
- # #11509, regression in 2.0 from version 1.4.
- if check_decl and not table_fn:
- # don't even invoke __table__ until we're ready
- def _table_fn() -> FromClause:
- return cls_as_Decl.__table__
- table_fn = _table_fn
- elif name == "__table_args__":
- check_decl = _check_declared_props_nocascade(
- obj, name, cls
- )
- if not table_args and (not class_mapped or check_decl):
- table_args = cls_as_Decl.__table_args__
- if not isinstance(
- table_args, (tuple, dict, type(None))
- ):
- raise exc.ArgumentError(
- "__table_args__ value must be a tuple, "
- "dict, or None"
- )
- if base is not cls:
- inherited_table_args = True
- else:
- # any other dunder names; should not be here
- # as we have tested for all four names in
- # _include_dunders
- assert False
- elif class_mapped:
- if _is_declarative_props(obj) and not obj._quiet:
- util.warn(
- "Regular (i.e. not __special__) "
- "attribute '%s.%s' uses @declared_attr, "
- "but owning class %s is mapped - "
- "not applying to subclass %s."
- % (base.__name__, name, base, cls)
- )
- continue
- elif base is not cls:
- # we're a mixin, abstract base, or something that is
- # acting like that for now.
- if isinstance(obj, (Column, MappedColumn)):
- # already copied columns to the mapped class.
- continue
- elif isinstance(obj, MapperProperty):
- raise exc.InvalidRequestError(
- "Mapper properties (i.e. deferred,"
- "column_property(), relationship(), etc.) must "
- "be declared as @declared_attr callables "
- "on declarative mixin classes. For dataclass "
- "field() objects, use a lambda:"
- )
- elif _is_declarative_props(obj):
- # tried to get overloads to tell this to
- # pylance, no luck
- assert obj is not None
- if obj._cascading:
- if name in clsdict_view:
- # unfortunately, while we can use the user-
- # defined attribute here to allow a clean
- # override, if there's another
- # subclass below then it still tries to use
- # this. not sure if there is enough
- # information here to add this as a feature
- # later on.
- util.warn(
- "Attribute '%s' on class %s cannot be "
- "processed due to "
- "@declared_attr.cascading; "
- "skipping" % (name, cls)
- )
- collected_attributes[name] = column_copies[obj] = (
- ret
- ) = obj.__get__(obj, cls)
- setattr(cls, name, ret)
- else:
- if is_dataclass_field:
- # access attribute using normal class access
- # first, to see if it's been mapped on a
- # superclass. note if the dataclasses.field()
- # has "default", this value can be anything.
- ret = getattr(cls, name, None)
- # so, if it's anything that's not ORM
- # mapped, assume we should invoke the
- # declared_attr
- if not isinstance(ret, InspectionAttr):
- ret = obj.fget()
- else:
- # access attribute using normal class access.
- # if the declared attr already took place
- # on a superclass that is mapped, then
- # this is no longer a declared_attr, it will
- # be the InstrumentedAttribute
- ret = getattr(cls, name)
- # correct for proxies created from hybrid_property
- # or similar. note there is no known case that
- # produces nested proxies, so we are only
- # looking one level deep right now.
- if (
- isinstance(ret, InspectionAttr)
- and attr_is_internal_proxy(ret)
- and not isinstance(
- ret.original_property, MapperProperty
- )
- ):
- ret = ret.descriptor
- collected_attributes[name] = column_copies[obj] = (
- ret
- )
- if (
- isinstance(ret, (Column, MapperProperty))
- and ret.doc is None
- ):
- ret.doc = obj.__doc__
- self._collect_annotation(
- name,
- obj._collect_return_annotation(),
- base,
- True,
- obj,
- )
- elif _is_mapped_annotation(annotation, cls, base):
- # Mapped annotation without any object.
- # product_column_copies should have handled this.
- # if future support for other MapperProperty,
- # then test if this name is already handled and
- # otherwise proceed to generate.
- if not fixed_table:
- assert (
- name in collected_attributes
- or attribute_is_overridden(name, None)
- )
- continue
- else:
- # here, the attribute is some other kind of
- # property that we assume is not part of the
- # declarative mapping. however, check for some
- # more common mistakes
- self._warn_for_decl_attributes(base, name, obj)
- elif is_dataclass_field and (
- name not in clsdict_view or clsdict_view[name] is not obj
- ):
- # here, we are definitely looking at the target class
- # and not a superclass. this is currently a
- # dataclass-only path. if the name is only
- # a dataclass field and isn't in local cls.__dict__,
- # put the object there.
- # assert that the dataclass-enabled resolver agrees
- # with what we are seeing
- assert not attribute_is_overridden(name, obj)
- if _is_declarative_props(obj):
- obj = obj.fget()
- collected_attributes[name] = obj
- self._collect_annotation(
- name, annotation, base, False, obj
- )
- else:
- collected_annotation = self._collect_annotation(
- name, annotation, base, None, obj
- )
- is_mapped = (
- collected_annotation is not None
- and collected_annotation.mapped_container is not None
- )
- generated_obj = (
- collected_annotation.attr_value
- if collected_annotation is not None
- else obj
- )
- if obj is None and not fixed_table and is_mapped:
- collected_attributes[name] = (
- generated_obj
- if generated_obj is not None
- else MappedColumn()
- )
- elif name in clsdict_view:
- collected_attributes[name] = obj
- # else if the name is not in the cls.__dict__,
- # don't collect it as an attribute.
- # we will see the annotation only, which is meaningful
- # both for mapping and dataclasses setup
- if inherited_table_args and not tablename:
- table_args = None
- self.table_args = table_args
- self.tablename = tablename
- self.mapper_args_fn = mapper_args_fn
- self.table_fn = table_fn
- def _setup_dataclasses_transforms(self) -> None:
- dataclass_setup_arguments = self.dataclass_setup_arguments
- if not dataclass_setup_arguments:
- return
- # can't use is_dataclass since it uses hasattr
- if "__dataclass_fields__" in self.cls.__dict__:
- raise exc.InvalidRequestError(
- f"Class {self.cls} is already a dataclass; ensure that "
- "base classes / decorator styles of establishing dataclasses "
- "are not being mixed. "
- "This can happen if a class that inherits from "
- "'MappedAsDataclass', even indirectly, is been mapped with "
- "'@registry.mapped_as_dataclass'"
- )
- # can't create a dataclass if __table__ is already there. This would
- # fail an assertion when calling _get_arguments_for_make_dataclass:
- # assert False, "Mapped[] received without a mapping declaration"
- if "__table__" in self.cls.__dict__:
- raise exc.InvalidRequestError(
- f"Class {self.cls} already defines a '__table__'. "
- "ORM Annotated Dataclasses do not support a pre-existing "
- "'__table__' element"
- )
- warn_for_non_dc_attrs = collections.defaultdict(list)
- def _allow_dataclass_field(
- key: str, originating_class: Type[Any]
- ) -> bool:
- if (
- originating_class is not self.cls
- and "__dataclass_fields__" not in originating_class.__dict__
- ):
- warn_for_non_dc_attrs[originating_class].append(key)
- return True
- manager = instrumentation.manager_of_class(self.cls)
- assert manager is not None
- field_list = [
- _AttributeOptions._get_arguments_for_make_dataclass(
- key,
- anno,
- mapped_container,
- self.collected_attributes.get(key, _NoArg.NO_ARG),
- )
- for key, anno, mapped_container in (
- (
- key,
- mapped_anno if mapped_anno else raw_anno,
- mapped_container,
- )
- for key, (
- raw_anno,
- mapped_container,
- mapped_anno,
- is_dc,
- attr_value,
- originating_module,
- originating_class,
- ) in self.collected_annotations.items()
- if _allow_dataclass_field(key, originating_class)
- and (
- key not in self.collected_attributes
- # issue #9226; check for attributes that we've collected
- # which are already instrumented, which we would assume
- # mean we are in an ORM inheritance mapping and this
- # attribute is already mapped on the superclass. Under
- # no circumstance should any QueryableAttribute be sent to
- # the dataclass() function; anything that's mapped should
- # be Field and that's it
- or not isinstance(
- self.collected_attributes[key], QueryableAttribute
- )
- )
- )
- ]
- if warn_for_non_dc_attrs:
- for (
- originating_class,
- non_dc_attrs,
- ) in warn_for_non_dc_attrs.items():
- util.warn_deprecated(
- f"When transforming {self.cls} to a dataclass, "
- f"attribute(s) "
- f"{', '.join(repr(key) for key in non_dc_attrs)} "
- f"originates from superclass "
- f"{originating_class}, which is not a dataclass. This "
- f"usage is deprecated and will raise an error in "
- f"SQLAlchemy 2.1. When declaring SQLAlchemy Declarative "
- f"Dataclasses, ensure that all mixin classes and other "
- f"superclasses which include attributes are also a "
- f"subclass of MappedAsDataclass.",
- "2.0",
- code="dcmx",
- )
- annotations = {}
- defaults = {}
- for item in field_list:
- if len(item) == 2:
- name, tp = item
- elif len(item) == 3:
- name, tp, spec = item
- defaults[name] = spec
- else:
- assert False
- annotations[name] = tp
- for k, v in defaults.items():
- setattr(self.cls, k, v)
- self._apply_dataclasses_to_any_class(
- dataclass_setup_arguments, self.cls, annotations
- )
- @classmethod
- def _update_annotations_for_non_mapped_class(
- cls, klass: Type[_O]
- ) -> Mapping[str, _AnnotationScanType]:
- cls_annotations = util.get_annotations(klass)
- new_anno = {}
- for name, annotation in cls_annotations.items():
- if _is_mapped_annotation(annotation, klass, klass):
- extracted = _extract_mapped_subtype(
- annotation,
- klass,
- klass.__module__,
- name,
- type(None),
- required=False,
- is_dataclass_field=False,
- expect_mapped=False,
- )
- if extracted:
- inner, _ = extracted
- new_anno[name] = inner
- else:
- new_anno[name] = annotation
- return new_anno
- @classmethod
- def _apply_dataclasses_to_any_class(
- cls,
- dataclass_setup_arguments: _DataclassArguments,
- klass: Type[_O],
- use_annotations: Mapping[str, _AnnotationScanType],
- ) -> None:
- cls._assert_dc_arguments(dataclass_setup_arguments)
- dataclass_callable = dataclass_setup_arguments["dataclass_callable"]
- if dataclass_callable is _NoArg.NO_ARG:
- dataclass_callable = dataclasses.dataclass
- restored: Optional[Any]
- if use_annotations:
- # apply constructed annotations that should look "normal" to a
- # dataclasses callable, based on the fields present. This
- # means remove the Mapped[] container and ensure all Field
- # entries have an annotation
- restored = getattr(klass, "__annotations__", None)
- klass.__annotations__ = cast("Dict[str, Any]", use_annotations)
- else:
- restored = None
- try:
- dataclass_callable( # type: ignore[call-overload]
- klass,
- **{ # type: ignore[call-overload,unused-ignore]
- k: v
- for k, v in dataclass_setup_arguments.items()
- if v is not _NoArg.NO_ARG and k != "dataclass_callable"
- },
- )
- except (TypeError, ValueError) as ex:
- raise exc.InvalidRequestError(
- f"Python dataclasses error encountered when creating "
- f"dataclass for {klass.__name__!r}: "
- f"{ex!r}. Please refer to Python dataclasses "
- "documentation for additional information.",
- code="dcte",
- ) from ex
- finally:
- # restore original annotations outside of the dataclasses
- # process; for mixins and __abstract__ superclasses, SQLAlchemy
- # Declarative will need to see the Mapped[] container inside the
- # annotations in order to map subclasses
- if use_annotations:
- if restored is None:
- del klass.__annotations__
- else:
- klass.__annotations__ = restored
- @classmethod
- def _assert_dc_arguments(cls, arguments: _DataclassArguments) -> None:
- allowed = {
- "init",
- "repr",
- "order",
- "eq",
- "unsafe_hash",
- "kw_only",
- "match_args",
- "dataclass_callable",
- }
- disallowed_args = set(arguments).difference(allowed)
- if disallowed_args:
- msg = ", ".join(f"{arg!r}" for arg in sorted(disallowed_args))
- raise exc.ArgumentError(
- f"Dataclass argument(s) {msg} are not accepted"
- )
- def _collect_annotation(
- self,
- name: str,
- raw_annotation: _AnnotationScanType,
- originating_class: Type[Any],
- expect_mapped: Optional[bool],
- attr_value: Any,
- ) -> Optional[_CollectedAnnotation]:
- if name in self.collected_annotations:
- return self.collected_annotations[name]
- if raw_annotation is None:
- return None
- is_dataclass = self.is_dataclass_prior_to_mapping
- allow_unmapped = self.allow_unmapped_annotations
- if expect_mapped is None:
- is_dataclass_field = isinstance(attr_value, dataclasses.Field)
- expect_mapped = (
- not is_dataclass_field
- and not allow_unmapped
- and (
- attr_value is None
- or isinstance(attr_value, _MappedAttribute)
- )
- )
- is_dataclass_field = False
- extracted = _extract_mapped_subtype(
- raw_annotation,
- self.cls,
- originating_class.__module__,
- name,
- type(attr_value),
- required=False,
- is_dataclass_field=is_dataclass_field,
- expect_mapped=expect_mapped and not is_dataclass,
- )
- if extracted is None:
- # ClassVar can come out here
- return None
- extracted_mapped_annotation, mapped_container = extracted
- if attr_value is None and not is_literal(extracted_mapped_annotation):
- for elem in get_args(extracted_mapped_annotation):
- if is_fwd_ref(
- elem, check_generic=True, check_for_plain_string=True
- ):
- elem = de_stringify_annotation(
- self.cls,
- elem,
- originating_class.__module__,
- include_generic=True,
- )
- # look in Annotated[...] for an ORM construct,
- # such as Annotated[int, mapped_column(primary_key=True)]
- if isinstance(elem, _IntrospectsAnnotations):
- attr_value = elem.found_in_pep593_annotated()
- self.collected_annotations[name] = ca = _CollectedAnnotation(
- raw_annotation,
- mapped_container,
- extracted_mapped_annotation,
- is_dataclass,
- attr_value,
- originating_class.__module__,
- originating_class,
- )
- return ca
- def _warn_for_decl_attributes(
- self, cls: Type[Any], key: str, c: Any
- ) -> None:
- if isinstance(c, expression.ColumnElement):
- util.warn(
- f"Attribute '{key}' on class {cls} appears to "
- "be a non-schema SQLAlchemy expression "
- "object; this won't be part of the declarative mapping. "
- "To map arbitrary expressions, use ``column_property()`` "
- "or a similar function such as ``deferred()``, "
- "``query_expression()`` etc. "
- )
- def _produce_column_copies(
- self,
- attributes_for_class: Callable[
- [], Iterable[Tuple[str, Any, Any, bool]]
- ],
- attribute_is_overridden: Callable[[str, Any], bool],
- fixed_table: bool,
- originating_class: Type[Any],
- ) -> Dict[str, Union[Column[Any], MappedColumn[Any]]]:
- cls = self.cls
- dict_ = self.clsdict_view
- locally_collected_attributes = {}
- column_copies = self.column_copies
- # copy mixin columns to the mapped class
- for name, obj, annotation, is_dataclass in attributes_for_class():
- if (
- not fixed_table
- and obj is None
- and _is_mapped_annotation(annotation, cls, originating_class)
- ):
- # obj is None means this is the annotation only path
- if attribute_is_overridden(name, obj):
- # perform same "overridden" check as we do for
- # Column/MappedColumn, this is how a mixin col is not
- # applied to an inherited subclass that does not have
- # the mixin. the anno-only path added here for
- # #9564
- continue
- collected_annotation = self._collect_annotation(
- name, annotation, originating_class, True, obj
- )
- obj = (
- collected_annotation.attr_value
- if collected_annotation is not None
- else obj
- )
- if obj is None:
- obj = MappedColumn()
- locally_collected_attributes[name] = obj
- setattr(cls, name, obj)
- elif isinstance(obj, (Column, MappedColumn)):
- if attribute_is_overridden(name, obj):
- # if column has been overridden
- # (like by the InstrumentedAttribute of the
- # superclass), skip. don't collect the annotation
- # either (issue #8718)
- continue
- collected_annotation = self._collect_annotation(
- name, annotation, originating_class, True, obj
- )
- obj = (
- collected_annotation.attr_value
- if collected_annotation is not None
- else obj
- )
- if name not in dict_ and not (
- "__table__" in dict_
- and (getattr(obj, "name", None) or name)
- in dict_["__table__"].c
- ):
- if obj.foreign_keys:
- for fk in obj.foreign_keys:
- if (
- fk._table_column is not None
- and fk._table_column.table is None
- ):
- raise exc.InvalidRequestError(
- "Columns with foreign keys to "
- "non-table-bound "
- "columns must be declared as "
- "@declared_attr callables "
- "on declarative mixin classes. "
- "For dataclass "
- "field() objects, use a lambda:."
- )
- column_copies[obj] = copy_ = obj._copy()
- locally_collected_attributes[name] = copy_
- setattr(cls, name, copy_)
- return locally_collected_attributes
- def _extract_mappable_attributes(self) -> None:
- cls = self.cls
- collected_attributes = self.collected_attributes
- our_stuff = self.properties
- _include_dunders = self._include_dunders
- late_mapped = _get_immediate_cls_attr(
- cls, "_sa_decl_prepare_nocascade", strict=True
- )
- allow_unmapped_annotations = self.allow_unmapped_annotations
- expect_annotations_wo_mapped = (
- allow_unmapped_annotations or self.is_dataclass_prior_to_mapping
- )
- look_for_dataclass_things = bool(self.dataclass_setup_arguments)
- for k in list(collected_attributes):
- if k in _include_dunders:
- continue
- value = collected_attributes[k]
- if _is_declarative_props(value):
- # @declared_attr in collected_attributes only occurs here for a
- # @declared_attr that's directly on the mapped class;
- # for a mixin, these have already been evaluated
- if value._cascading:
- util.warn(
- "Use of @declared_attr.cascading only applies to "
- "Declarative 'mixin' and 'abstract' classes. "
- "Currently, this flag is ignored on mapped class "
- "%s" % self.cls
- )
- value = getattr(cls, k)
- elif (
- isinstance(value, QueryableAttribute)
- and value.class_ is not cls
- and value.key != k
- ):
- # detect a QueryableAttribute that's already mapped being
- # assigned elsewhere in userland, turn into a synonym()
- value = SynonymProperty(value.key)
- setattr(cls, k, value)
- if (
- isinstance(value, tuple)
- and len(value) == 1
- and isinstance(value[0], (Column, _MappedAttribute))
- ):
- util.warn(
- "Ignoring declarative-like tuple value of attribute "
- "'%s': possibly a copy-and-paste error with a comma "
- "accidentally placed at the end of the line?" % k
- )
- continue
- elif look_for_dataclass_things and isinstance(
- value, dataclasses.Field
- ):
- # we collected a dataclass Field; dataclasses would have
- # set up the correct state on the class
- continue
- elif not isinstance(value, (Column, _DCAttributeOptions)):
- # using @declared_attr for some object that
- # isn't Column/MapperProperty/_DCAttributeOptions; remove
- # from the clsdict_view
- # and place the evaluated value onto the class.
- collected_attributes.pop(k)
- self._warn_for_decl_attributes(cls, k, value)
- if not late_mapped:
- setattr(cls, k, value)
- continue
- # we expect to see the name 'metadata' in some valid cases;
- # however at this point we see it's assigned to something trying
- # to be mapped, so raise for that.
- # TODO: should "registry" here be also? might be too late
- # to change that now (2.0 betas)
- elif k in ("metadata",):
- raise exc.InvalidRequestError(
- f"Attribute name '{k}' is reserved when using the "
- "Declarative API."
- )
- elif isinstance(value, Column):
- _undefer_column_name(
- k, self.column_copies.get(value, value) # type: ignore
- )
- else:
- if isinstance(value, _IntrospectsAnnotations):
- (
- annotation,
- mapped_container,
- extracted_mapped_annotation,
- is_dataclass,
- attr_value,
- originating_module,
- originating_class,
- ) = self.collected_annotations.get(
- k, (None, None, None, False, None, None, None)
- )
- # issue #8692 - don't do any annotation interpretation if
- # an annotation were present and a container such as
- # Mapped[] etc. were not used. If annotation is None,
- # do declarative_scan so that the property can raise
- # for required
- if (
- mapped_container is not None
- or annotation is None
- # issue #10516: need to do declarative_scan even with
- # a non-Mapped annotation if we are doing
- # __allow_unmapped__, for things like col.name
- # assignment
- or allow_unmapped_annotations
- ):
- try:
- value.declarative_scan(
- self,
- self.registry,
- cls,
- originating_module,
- k,
- mapped_container,
- annotation,
- extracted_mapped_annotation,
- is_dataclass,
- )
- except NameError as ne:
- raise orm_exc.MappedAnnotationError(
- f"Could not resolve all types within mapped "
- f'annotation: "{annotation}". Ensure all '
- f"types are written correctly and are "
- f"imported within the module in use."
- ) from ne
- else:
- # assert that we were expecting annotations
- # without Mapped[] were going to be passed.
- # otherwise an error should have been raised
- # by util._extract_mapped_subtype before we got here.
- assert expect_annotations_wo_mapped
- if isinstance(value, _DCAttributeOptions):
- if (
- value._has_dataclass_arguments
- and not look_for_dataclass_things
- ):
- if isinstance(value, MapperProperty):
- argnames = [
- "init",
- "default_factory",
- "repr",
- "default",
- "dataclass_metadata",
- ]
- else:
- argnames = [
- "init",
- "default_factory",
- "repr",
- "dataclass_metadata",
- ]
- args = {
- a
- for a in argnames
- if getattr(
- value._attribute_options, f"dataclasses_{a}"
- )
- is not _NoArg.NO_ARG
- }
- raise exc.ArgumentError(
- f"Attribute '{k}' on class {cls} includes "
- f"dataclasses argument(s): "
- f"{', '.join(sorted(repr(a) for a in args))} but "
- f"class does not specify "
- "SQLAlchemy native dataclass configuration."
- )
- if not isinstance(value, (MapperProperty, _MapsColumns)):
- # filter for _DCAttributeOptions objects that aren't
- # MapperProperty / mapped_column(). Currently this
- # includes AssociationProxy. pop it from the things
- # we're going to map and set it up as a descriptor
- # on the class.
- collected_attributes.pop(k)
- # Assoc Prox (or other descriptor object that may
- # use _DCAttributeOptions) is usually here, except if
- # 1. we're a
- # dataclass, dataclasses would have removed the
- # attr here or 2. assoc proxy is coming from a
- # superclass, we want it to be direct here so it
- # tracks state or 3. assoc prox comes from
- # declared_attr, uncommon case
- setattr(cls, k, value)
- continue
- our_stuff[k] = value
- def _extract_declared_columns(self) -> None:
- our_stuff = self.properties
- # extract columns from the class dict
- declared_columns = self.declared_columns
- column_ordering = self.column_ordering
- name_to_prop_key = collections.defaultdict(set)
- for key, c in list(our_stuff.items()):
- if isinstance(c, _MapsColumns):
- mp_to_assign = c.mapper_property_to_assign
- if mp_to_assign:
- our_stuff[key] = mp_to_assign
- else:
- # if no mapper property to assign, this currently means
- # this is a MappedColumn that will produce a Column for us
- del our_stuff[key]
- for col, sort_order in c.columns_to_assign:
- if not isinstance(c, CompositeProperty):
- name_to_prop_key[col.name].add(key)
- declared_columns.add(col)
- # we would assert this, however we want the below
- # warning to take effect instead. See #9630
- # assert col not in column_ordering
- column_ordering[col] = sort_order
- # if this is a MappedColumn and the attribute key we
- # have is not what the column has for its key, map the
- # Column explicitly under the attribute key name.
- # otherwise, Mapper will map it under the column key.
- if mp_to_assign is None and key != col.key:
- our_stuff[key] = col
- elif isinstance(c, Column):
- # undefer previously occurred here, and now occurs earlier.
- # ensure every column we get here has been named
- assert c.name is not None
- name_to_prop_key[c.name].add(key)
- declared_columns.add(c)
- # if the column is the same name as the key,
- # remove it from the explicit properties dict.
- # the normal rules for assigning column-based properties
- # will take over, including precedence of columns
- # in multi-column ColumnProperties.
- if key == c.key:
- del our_stuff[key]
- for name, keys in name_to_prop_key.items():
- if len(keys) > 1:
- util.warn(
- "On class %r, Column object %r named "
- "directly multiple times, "
- "only one will be used: %s. "
- "Consider using orm.synonym instead"
- % (self.classname, name, (", ".join(sorted(keys))))
- )
- def _setup_table(self, table: Optional[FromClause] = None) -> None:
- cls = self.cls
- cls_as_Decl = cast("MappedClassProtocol[Any]", cls)
- tablename = self.tablename
- table_args = self.table_args
- clsdict_view = self.clsdict_view
- declared_columns = self.declared_columns
- column_ordering = self.column_ordering
- manager = attributes.manager_of_class(cls)
- if (
- self.table_fn is None
- and "__table__" not in clsdict_view
- and table is None
- ):
- if hasattr(cls, "__table_cls__"):
- table_cls = cast(
- Type[Table],
- util.unbound_method_to_callable(cls.__table_cls__), # type: ignore # noqa: E501
- )
- else:
- table_cls = Table
- if tablename is not None:
- args: Tuple[Any, ...] = ()
- table_kw: Dict[str, Any] = {}
- if table_args:
- if isinstance(table_args, dict):
- table_kw = table_args
- elif isinstance(table_args, tuple):
- if isinstance(table_args[-1], dict):
- args, table_kw = table_args[0:-1], table_args[-1]
- else:
- args = table_args
- autoload_with = clsdict_view.get("__autoload_with__")
- if autoload_with:
- table_kw["autoload_with"] = autoload_with
- autoload = clsdict_view.get("__autoload__")
- if autoload:
- table_kw["autoload"] = True
- sorted_columns = sorted(
- declared_columns,
- key=lambda c: column_ordering.get(c, 0),
- )
- table = self.set_cls_attribute(
- "__table__",
- table_cls(
- tablename,
- self._metadata_for_cls(manager),
- *sorted_columns,
- *args,
- **table_kw,
- ),
- )
- else:
- if table is None:
- if self.table_fn:
- table = self.set_cls_attribute(
- "__table__", self.table_fn()
- )
- else:
- table = cls_as_Decl.__table__
- if declared_columns:
- for c in declared_columns:
- if not table.c.contains_column(c):
- raise exc.ArgumentError(
- "Can't add additional column %r when "
- "specifying __table__" % c.key
- )
- self.local_table = table
- def _metadata_for_cls(self, manager: ClassManager[Any]) -> MetaData:
- meta: Optional[MetaData] = getattr(self.cls, "metadata", None)
- if meta is not None:
- return meta
- else:
- return manager.registry.metadata
- def _setup_inheriting_mapper(self, mapper_kw: _MapperKwArgs) -> None:
- cls = self.cls
- inherits = mapper_kw.get("inherits", None)
- if inherits is None:
- # since we search for classical mappings now, search for
- # multiple mapped bases as well and raise an error.
- inherits_search = []
- for base_ in cls.__bases__:
- c = _resolve_for_abstract_or_classical(base_)
- if c is None:
- continue
- if _is_supercls_for_inherits(c) and c not in inherits_search:
- inherits_search.append(c)
- if inherits_search:
- if len(inherits_search) > 1:
- raise exc.InvalidRequestError(
- "Class %s has multiple mapped bases: %r"
- % (cls, inherits_search)
- )
- inherits = inherits_search[0]
- elif isinstance(inherits, Mapper):
- inherits = inherits.class_
- self.inherits = inherits
- clsdict_view = self.clsdict_view
- if "__table__" not in clsdict_view and self.tablename is None:
- self.single = True
- def _setup_inheriting_columns(self, mapper_kw: _MapperKwArgs) -> None:
- table = self.local_table
- cls = self.cls
- table_args = self.table_args
- declared_columns = self.declared_columns
- if (
- table is None
- and self.inherits is None
- and not _get_immediate_cls_attr(cls, "__no_table__")
- ):
- raise exc.InvalidRequestError(
- "Class %r does not have a __table__ or __tablename__ "
- "specified and does not inherit from an existing "
- "table-mapped class." % cls
- )
- elif self.inherits:
- inherited_mapper_or_config = _declared_mapping_info(self.inherits)
- assert inherited_mapper_or_config is not None
- inherited_table = inherited_mapper_or_config.local_table
- inherited_persist_selectable = (
- inherited_mapper_or_config.persist_selectable
- )
- if table is None:
- # single table inheritance.
- # ensure no table args
- if table_args:
- raise exc.ArgumentError(
- "Can't place __table_args__ on an inherited class "
- "with no table."
- )
- # add any columns declared here to the inherited table.
- if declared_columns and not isinstance(inherited_table, Table):
- raise exc.ArgumentError(
- f"Can't declare columns on single-table-inherited "
- f"subclass {self.cls}; superclass {self.inherits} "
- "is not mapped to a Table"
- )
- for col in declared_columns:
- assert inherited_table is not None
- if col.name in inherited_table.c:
- if inherited_table.c[col.name] is col:
- continue
- raise exc.ArgumentError(
- f"Column '{col}' on class {cls.__name__} "
- f"conflicts with existing column "
- f"'{inherited_table.c[col.name]}'. If using "
- f"Declarative, consider using the "
- "use_existing_column parameter of mapped_column() "
- "to resolve conflicts."
- )
- if col.primary_key:
- raise exc.ArgumentError(
- "Can't place primary key columns on an inherited "
- "class with no table."
- )
- if TYPE_CHECKING:
- assert isinstance(inherited_table, Table)
- inherited_table.append_column(col)
- if (
- inherited_persist_selectable is not None
- and inherited_persist_selectable is not inherited_table
- ):
- inherited_persist_selectable._refresh_for_new_column(
- col
- )
- def _prepare_mapper_arguments(self, mapper_kw: _MapperKwArgs) -> None:
- properties = self.properties
- if self.mapper_args_fn:
- mapper_args = self.mapper_args_fn()
- else:
- mapper_args = {}
- if mapper_kw:
- mapper_args.update(mapper_kw)
- if "properties" in mapper_args:
- properties = dict(properties)
- properties.update(mapper_args["properties"])
- # make sure that column copies are used rather
- # than the original columns from any mixins
- for k in ("version_id_col", "polymorphic_on"):
- if k in mapper_args:
- v = mapper_args[k]
- mapper_args[k] = self.column_copies.get(v, v)
- if "primary_key" in mapper_args:
- mapper_args["primary_key"] = [
- self.column_copies.get(v, v)
- for v in util.to_list(mapper_args["primary_key"])
- ]
- if "inherits" in mapper_args:
- inherits_arg = mapper_args["inherits"]
- if isinstance(inherits_arg, Mapper):
- inherits_arg = inherits_arg.class_
- if inherits_arg is not self.inherits:
- raise exc.InvalidRequestError(
- "mapper inherits argument given for non-inheriting "
- "class %s" % (mapper_args["inherits"])
- )
- if self.inherits:
- mapper_args["inherits"] = self.inherits
- if self.inherits and not mapper_args.get("concrete", False):
- # note the superclass is expected to have a Mapper assigned and
- # not be a deferred config, as this is called within map()
- inherited_mapper = class_mapper(self.inherits, False)
- inherited_table = inherited_mapper.local_table
- # single or joined inheritance
- # exclude any cols on the inherited table which are
- # not mapped on the parent class, to avoid
- # mapping columns specific to sibling/nephew classes
- if "exclude_properties" not in mapper_args:
- mapper_args["exclude_properties"] = exclude_properties = {
- c.key
- for c in inherited_table.c
- if c not in inherited_mapper._columntoproperty
- }.union(inherited_mapper.exclude_properties or ())
- exclude_properties.difference_update(
- [c.key for c in self.declared_columns]
- )
- # look through columns in the current mapper that
- # are keyed to a propname different than the colname
- # (if names were the same, we'd have popped it out above,
- # in which case the mapper makes this combination).
- # See if the superclass has a similar column property.
- # If so, join them together.
- for k, col in list(properties.items()):
- if not isinstance(col, expression.ColumnElement):
- continue
- if k in inherited_mapper._props:
- p = inherited_mapper._props[k]
- if isinstance(p, ColumnProperty):
- # note here we place the subclass column
- # first. See [ticket:1892] for background.
- properties[k] = [col] + p.columns
- result_mapper_args = mapper_args.copy()
- result_mapper_args["properties"] = properties
- self.mapper_args = result_mapper_args
- def map(self, mapper_kw: _MapperKwArgs = util.EMPTY_DICT) -> Mapper[Any]:
- self._prepare_mapper_arguments(mapper_kw)
- if hasattr(self.cls, "__mapper_cls__"):
- mapper_cls = cast(
- "Type[Mapper[Any]]",
- util.unbound_method_to_callable(
- self.cls.__mapper_cls__ # type: ignore
- ),
- )
- else:
- mapper_cls = Mapper
- return self.set_cls_attribute(
- "__mapper__",
- mapper_cls(self.cls, self.local_table, **self.mapper_args),
- )
- @util.preload_module("sqlalchemy.orm.decl_api")
- def _as_dc_declaredattr(
- field_metadata: Mapping[str, Any], sa_dataclass_metadata_key: str
- ) -> Any:
- # wrap lambdas inside dataclass fields inside an ad-hoc declared_attr.
- # we can't write it because field.metadata is immutable :( so we have
- # to go through extra trouble to compare these
- decl_api = util.preloaded.orm_decl_api
- obj = field_metadata[sa_dataclass_metadata_key]
- if callable(obj) and not isinstance(obj, decl_api.declared_attr):
- return decl_api.declared_attr(obj)
- else:
- return obj
- class _DeferredMapperConfig(_ClassScanMapperConfig):
- _cls: weakref.ref[Type[Any]]
- is_deferred = True
- _configs: util.OrderedDict[
- weakref.ref[Type[Any]], _DeferredMapperConfig
- ] = util.OrderedDict()
- def _early_mapping(self, mapper_kw: _MapperKwArgs) -> None:
- pass
- @property
- def cls(self) -> Type[Any]:
- return self._cls() # type: ignore
- @cls.setter
- def cls(self, class_: Type[Any]) -> None:
- self._cls = weakref.ref(class_, self._remove_config_cls)
- self._configs[self._cls] = self
- @classmethod
- def _remove_config_cls(cls, ref: weakref.ref[Type[Any]]) -> None:
- cls._configs.pop(ref, None)
- @classmethod
- def has_cls(cls, class_: Type[Any]) -> bool:
- # 2.6 fails on weakref if class_ is an old style class
- return isinstance(class_, type) and weakref.ref(class_) in cls._configs
- @classmethod
- def raise_unmapped_for_cls(cls, class_: Type[Any]) -> NoReturn:
- if hasattr(class_, "_sa_raise_deferred_config"):
- class_._sa_raise_deferred_config()
- raise orm_exc.UnmappedClassError(
- class_,
- msg=(
- f"Class {orm_exc._safe_cls_name(class_)} has a deferred "
- "mapping on it. It is not yet usable as a mapped class."
- ),
- )
- @classmethod
- def config_for_cls(cls, class_: Type[Any]) -> _DeferredMapperConfig:
- return cls._configs[weakref.ref(class_)]
- @classmethod
- def classes_for_base(
- cls, base_cls: Type[Any], sort: bool = True
- ) -> List[_DeferredMapperConfig]:
- classes_for_base = [
- m
- for m, cls_ in [(m, m.cls) for m in cls._configs.values()]
- if cls_ is not None and issubclass(cls_, base_cls)
- ]
- if not sort:
- return classes_for_base
- all_m_by_cls = {m.cls: m for m in classes_for_base}
- tuples: List[Tuple[_DeferredMapperConfig, _DeferredMapperConfig]] = []
- for m_cls in all_m_by_cls:
- tuples.extend(
- (all_m_by_cls[base_cls], all_m_by_cls[m_cls])
- for base_cls in m_cls.__bases__
- if base_cls in all_m_by_cls
- )
- return list(topological.sort(tuples, classes_for_base))
- def map(self, mapper_kw: _MapperKwArgs = util.EMPTY_DICT) -> Mapper[Any]:
- self._configs.pop(self._cls, None)
- return super().map(mapper_kw)
- def _add_attribute(
- cls: Type[Any], key: str, value: MapperProperty[Any]
- ) -> None:
- """add an attribute to an existing declarative class.
- This runs through the logic to determine MapperProperty,
- adds it to the Mapper, adds a column to the mapped Table, etc.
- """
- if "__mapper__" in cls.__dict__:
- mapped_cls = cast("MappedClassProtocol[Any]", cls)
- def _table_or_raise(mc: MappedClassProtocol[Any]) -> Table:
- if isinstance(mc.__table__, Table):
- return mc.__table__
- raise exc.InvalidRequestError(
- f"Cannot add a new attribute to mapped class {mc.__name__!r} "
- "because it's not mapped against a table."
- )
- if isinstance(value, Column):
- _undefer_column_name(key, value)
- _table_or_raise(mapped_cls).append_column(
- value, replace_existing=True
- )
- mapped_cls.__mapper__.add_property(key, value)
- elif isinstance(value, _MapsColumns):
- mp = value.mapper_property_to_assign
- for col, _ in value.columns_to_assign:
- _undefer_column_name(key, col)
- _table_or_raise(mapped_cls).append_column(
- col, replace_existing=True
- )
- if not mp:
- mapped_cls.__mapper__.add_property(key, col)
- if mp:
- mapped_cls.__mapper__.add_property(key, mp)
- elif isinstance(value, MapperProperty):
- mapped_cls.__mapper__.add_property(key, value)
- elif isinstance(value, QueryableAttribute) and value.key != key:
- # detect a QueryableAttribute that's already mapped being
- # assigned elsewhere in userland, turn into a synonym()
- value = SynonymProperty(value.key)
- mapped_cls.__mapper__.add_property(key, value)
- else:
- type.__setattr__(cls, key, value)
- mapped_cls.__mapper__._expire_memoizations()
- else:
- type.__setattr__(cls, key, value)
- def _del_attribute(cls: Type[Any], key: str) -> None:
- if (
- "__mapper__" in cls.__dict__
- and key in cls.__dict__
- and not cast(
- "MappedClassProtocol[Any]", cls
- ).__mapper__._dispose_called
- ):
- value = cls.__dict__[key]
- if isinstance(
- value, (Column, _MapsColumns, MapperProperty, QueryableAttribute)
- ):
- raise NotImplementedError(
- "Can't un-map individual mapped attributes on a mapped class."
- )
- else:
- type.__delattr__(cls, key)
- cast(
- "MappedClassProtocol[Any]", cls
- ).__mapper__._expire_memoizations()
- else:
- type.__delattr__(cls, key)
- def _declarative_constructor(self: Any, **kwargs: Any) -> None:
- """A simple constructor that allows initialization from kwargs.
- Sets attributes on the constructed instance using the names and
- values in ``kwargs``.
- Only keys that are present as
- attributes of the instance's class are allowed. These could be,
- for example, any mapped columns or relationships.
- """
- cls_ = type(self)
- for k in kwargs:
- if not hasattr(cls_, k):
- raise TypeError(
- "%r is an invalid keyword argument for %s" % (k, cls_.__name__)
- )
- setattr(self, k, kwargs[k])
- _declarative_constructor.__name__ = "__init__"
- def _undefer_column_name(key: str, column: Column[Any]) -> None:
- if column.key is None:
- column.key = key
- if column.name is None:
- column.name = key
|