| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403 |
- # orm/util.py
- # Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
- # <see AUTHORS file>
- #
- # This module is part of SQLAlchemy and is released under
- # the MIT License: https://www.opensource.org/licenses/mit-license.php
- # mypy: allow-untyped-defs, allow-untyped-calls
- from __future__ import annotations
- import enum
- import functools
- import re
- import types
- import typing
- from typing import AbstractSet
- from typing import Any
- from typing import Callable
- from typing import cast
- from typing import Dict
- from typing import FrozenSet
- from typing import Generic
- from typing import Iterable
- from typing import Iterator
- from typing import List
- from typing import Match
- from typing import Optional
- from typing import Sequence
- from typing import Tuple
- from typing import Type
- from typing import TYPE_CHECKING
- from typing import TypeVar
- from typing import Union
- import weakref
- from . import attributes # noqa
- from . import exc
- from . import exc as orm_exc
- from ._typing import _O
- from ._typing import insp_is_aliased_class
- from ._typing import insp_is_mapper
- from ._typing import prop_is_relationship
- from .base import _class_to_mapper as _class_to_mapper
- from .base import _MappedAnnotationBase
- from .base import _never_set as _never_set # noqa: F401
- from .base import _none_only_set as _none_only_set # noqa: F401
- from .base import _none_set as _none_set # noqa: F401
- from .base import attribute_str as attribute_str # noqa: F401
- from .base import class_mapper as class_mapper
- from .base import DynamicMapped
- from .base import InspectionAttr as InspectionAttr
- from .base import instance_str as instance_str # noqa: F401
- from .base import Mapped
- from .base import object_mapper as object_mapper
- from .base import object_state as object_state # noqa: F401
- from .base import opt_manager_of_class
- from .base import ORMDescriptor
- from .base import state_attribute_str as state_attribute_str # noqa: F401
- from .base import state_class_str as state_class_str # noqa: F401
- from .base import state_str as state_str # noqa: F401
- from .base import WriteOnlyMapped
- from .interfaces import CriteriaOption
- from .interfaces import MapperProperty as MapperProperty
- from .interfaces import ORMColumnsClauseRole
- from .interfaces import ORMEntityColumnsClauseRole
- from .interfaces import ORMFromClauseRole
- from .path_registry import PathRegistry as PathRegistry
- from .. import event
- from .. import exc as sa_exc
- from .. import inspection
- from .. import sql
- from .. import util
- from ..engine.result import result_tuple
- from ..sql import coercions
- from ..sql import expression
- from ..sql import lambdas
- from ..sql import roles
- from ..sql import util as sql_util
- from ..sql import visitors
- from ..sql._typing import is_selectable
- from ..sql.annotation import SupportsCloneAnnotations
- from ..sql.base import ColumnCollection
- from ..sql.cache_key import HasCacheKey
- from ..sql.cache_key import MemoizedHasCacheKey
- from ..sql.elements import ColumnElement
- from ..sql.elements import KeyedColumnElement
- from ..sql.selectable import FromClause
- from ..util.langhelpers import MemoizedSlots
- from ..util.typing import de_stringify_annotation as _de_stringify_annotation
- from ..util.typing import eval_name_only as _eval_name_only
- from ..util.typing import fixup_container_fwd_refs
- from ..util.typing import get_origin
- from ..util.typing import is_origin_of_cls
- from ..util.typing import Literal
- from ..util.typing import Protocol
- if typing.TYPE_CHECKING:
- from ._typing import _EntityType
- from ._typing import _IdentityKeyType
- from ._typing import _InternalEntityType
- from ._typing import _ORMCOLEXPR
- from .context import _MapperEntity
- from .context import ORMCompileState
- from .mapper import Mapper
- from .path_registry import AbstractEntityRegistry
- from .query import Query
- from .relationships import RelationshipProperty
- from ..engine import Row
- from ..engine import RowMapping
- from ..sql._typing import _CE
- from ..sql._typing import _ColumnExpressionArgument
- from ..sql._typing import _EquivalentColumnMap
- from ..sql._typing import _FromClauseArgument
- from ..sql._typing import _OnClauseArgument
- from ..sql._typing import _PropagateAttrsType
- from ..sql.annotation import _SA
- from ..sql.base import ReadOnlyColumnCollection
- from ..sql.elements import BindParameter
- from ..sql.selectable import _ColumnsClauseElement
- from ..sql.selectable import Select
- from ..sql.selectable import Selectable
- from ..sql.visitors import anon_map
- from ..util.typing import _AnnotationScanType
- _T = TypeVar("_T", bound=Any)
- all_cascades = frozenset(
- (
- "delete",
- "delete-orphan",
- "all",
- "merge",
- "expunge",
- "save-update",
- "refresh-expire",
- "none",
- )
- )
- _de_stringify_partial = functools.partial(
- functools.partial,
- locals_=util.immutabledict(
- {
- "Mapped": Mapped,
- "WriteOnlyMapped": WriteOnlyMapped,
- "DynamicMapped": DynamicMapped,
- }
- ),
- )
- # partial is practically useless as we have to write out the whole
- # function and maintain the signature anyway
- class _DeStringifyAnnotation(Protocol):
- def __call__(
- self,
- cls: Type[Any],
- annotation: _AnnotationScanType,
- originating_module: str,
- *,
- str_cleanup_fn: Optional[Callable[[str, str], str]] = None,
- include_generic: bool = False,
- ) -> Type[Any]: ...
- de_stringify_annotation = cast(
- _DeStringifyAnnotation, _de_stringify_partial(_de_stringify_annotation)
- )
- class _EvalNameOnly(Protocol):
- def __call__(self, name: str, module_name: str) -> Any: ...
- eval_name_only = cast(_EvalNameOnly, _de_stringify_partial(_eval_name_only))
- class CascadeOptions(FrozenSet[str]):
- """Keeps track of the options sent to
- :paramref:`.relationship.cascade`"""
- _add_w_all_cascades = all_cascades.difference(
- ["all", "none", "delete-orphan"]
- )
- _allowed_cascades = all_cascades
- _viewonly_cascades = ["expunge", "all", "none", "refresh-expire", "merge"]
- __slots__ = (
- "save_update",
- "delete",
- "refresh_expire",
- "merge",
- "expunge",
- "delete_orphan",
- )
- save_update: bool
- delete: bool
- refresh_expire: bool
- merge: bool
- expunge: bool
- delete_orphan: bool
- def __new__(
- cls, value_list: Optional[Union[Iterable[str], str]]
- ) -> CascadeOptions:
- if isinstance(value_list, str) or value_list is None:
- return cls.from_string(value_list) # type: ignore
- values = set(value_list)
- if values.difference(cls._allowed_cascades):
- raise sa_exc.ArgumentError(
- "Invalid cascade option(s): %s"
- % ", ".join(
- [
- repr(x)
- for x in sorted(
- values.difference(cls._allowed_cascades)
- )
- ]
- )
- )
- if "all" in values:
- values.update(cls._add_w_all_cascades)
- if "none" in values:
- values.clear()
- values.discard("all")
- self = super().__new__(cls, values)
- self.save_update = "save-update" in values
- self.delete = "delete" in values
- self.refresh_expire = "refresh-expire" in values
- self.merge = "merge" in values
- self.expunge = "expunge" in values
- self.delete_orphan = "delete-orphan" in values
- if self.delete_orphan and not self.delete:
- util.warn("The 'delete-orphan' cascade option requires 'delete'.")
- return self
- def __repr__(self):
- return "CascadeOptions(%r)" % (",".join([x for x in sorted(self)]))
- @classmethod
- def from_string(cls, arg):
- values = [c for c in re.split(r"\s*,\s*", arg or "") if c]
- return cls(values)
- def _validator_events(desc, key, validator, include_removes, include_backrefs):
- """Runs a validation method on an attribute value to be set or
- appended.
- """
- if not include_backrefs:
- def detect_is_backref(state, initiator):
- impl = state.manager[key].impl
- return initiator.impl is not impl
- if include_removes:
- def append(state, value, initiator):
- if initiator.op is not attributes.OP_BULK_REPLACE and (
- include_backrefs or not detect_is_backref(state, initiator)
- ):
- return validator(state.obj(), key, value, False)
- else:
- return value
- def bulk_set(state, values, initiator):
- if include_backrefs or not detect_is_backref(state, initiator):
- obj = state.obj()
- values[:] = [
- validator(obj, key, value, False) for value in values
- ]
- def set_(state, value, oldvalue, initiator):
- if include_backrefs or not detect_is_backref(state, initiator):
- return validator(state.obj(), key, value, False)
- else:
- return value
- def remove(state, value, initiator):
- if include_backrefs or not detect_is_backref(state, initiator):
- validator(state.obj(), key, value, True)
- else:
- def append(state, value, initiator):
- if initiator.op is not attributes.OP_BULK_REPLACE and (
- include_backrefs or not detect_is_backref(state, initiator)
- ):
- return validator(state.obj(), key, value)
- else:
- return value
- def bulk_set(state, values, initiator):
- if include_backrefs or not detect_is_backref(state, initiator):
- obj = state.obj()
- values[:] = [validator(obj, key, value) for value in values]
- def set_(state, value, oldvalue, initiator):
- if include_backrefs or not detect_is_backref(state, initiator):
- return validator(state.obj(), key, value)
- else:
- return value
- event.listen(desc, "append", append, raw=True, retval=True)
- event.listen(desc, "bulk_replace", bulk_set, raw=True)
- event.listen(desc, "set", set_, raw=True, retval=True)
- if include_removes:
- event.listen(desc, "remove", remove, raw=True, retval=True)
- def polymorphic_union(
- table_map, typecolname, aliasname="p_union", cast_nulls=True
- ):
- """Create a ``UNION`` statement used by a polymorphic mapper.
- See :ref:`concrete_inheritance` for an example of how
- this is used.
- :param table_map: mapping of polymorphic identities to
- :class:`_schema.Table` objects.
- :param typecolname: string name of a "discriminator" column, which will be
- derived from the query, producing the polymorphic identity for
- each row. If ``None``, no polymorphic discriminator is generated.
- :param aliasname: name of the :func:`~sqlalchemy.sql.expression.alias()`
- construct generated.
- :param cast_nulls: if True, non-existent columns, which are represented
- as labeled NULLs, will be passed into CAST. This is a legacy behavior
- that is problematic on some backends such as Oracle - in which case it
- can be set to False.
- """
- colnames: util.OrderedSet[str] = util.OrderedSet()
- colnamemaps = {}
- types = {}
- for key in table_map:
- table = table_map[key]
- table = coercions.expect(
- roles.StrictFromClauseRole, table, allow_select=True
- )
- table_map[key] = table
- m = {}
- for c in table.c:
- if c.key == typecolname:
- raise sa_exc.InvalidRequestError(
- "Polymorphic union can't use '%s' as the discriminator "
- "column due to mapped column %r; please apply the "
- "'typecolname' "
- "argument; this is available on "
- "ConcreteBase as '_concrete_discriminator_name'"
- % (typecolname, c)
- )
- colnames.add(c.key)
- m[c.key] = c
- types[c.key] = c.type
- colnamemaps[table] = m
- def col(name, table):
- try:
- return colnamemaps[table][name]
- except KeyError:
- if cast_nulls:
- return sql.cast(sql.null(), types[name]).label(name)
- else:
- return sql.type_coerce(sql.null(), types[name]).label(name)
- result = []
- for type_, table in table_map.items():
- if typecolname is not None:
- result.append(
- sql.select(
- *(
- [col(name, table) for name in colnames]
- + [
- sql.literal_column(
- sql_util._quote_ddl_expr(type_)
- ).label(typecolname)
- ]
- )
- ).select_from(table)
- )
- else:
- result.append(
- sql.select(
- *[col(name, table) for name in colnames]
- ).select_from(table)
- )
- return sql.union_all(*result).alias(aliasname)
- def identity_key(
- class_: Optional[Type[_T]] = None,
- ident: Union[Any, Tuple[Any, ...]] = None,
- *,
- instance: Optional[_T] = None,
- row: Optional[Union[Row[Any], RowMapping]] = None,
- identity_token: Optional[Any] = None,
- ) -> _IdentityKeyType[_T]:
- r"""Generate "identity key" tuples, as are used as keys in the
- :attr:`.Session.identity_map` dictionary.
- This function has several call styles:
- * ``identity_key(class, ident, identity_token=token)``
- This form receives a mapped class and a primary key scalar or
- tuple as an argument.
- E.g.::
- >>> identity_key(MyClass, (1, 2))
- (<class '__main__.MyClass'>, (1, 2), None)
- :param class: mapped class (must be a positional argument)
- :param ident: primary key, may be a scalar or tuple argument.
- :param identity_token: optional identity token
- .. versionadded:: 1.2 added identity_token
- * ``identity_key(instance=instance)``
- This form will produce the identity key for a given instance. The
- instance need not be persistent, only that its primary key attributes
- are populated (else the key will contain ``None`` for those missing
- values).
- E.g.::
- >>> instance = MyClass(1, 2)
- >>> identity_key(instance=instance)
- (<class '__main__.MyClass'>, (1, 2), None)
- In this form, the given instance is ultimately run though
- :meth:`_orm.Mapper.identity_key_from_instance`, which will have the
- effect of performing a database check for the corresponding row
- if the object is expired.
- :param instance: object instance (must be given as a keyword arg)
- * ``identity_key(class, row=row, identity_token=token)``
- This form is similar to the class/tuple form, except is passed a
- database result row as a :class:`.Row` or :class:`.RowMapping` object.
- E.g.::
- >>> row = engine.execute(text("select * from table where a=1 and b=2")).first()
- >>> identity_key(MyClass, row=row)
- (<class '__main__.MyClass'>, (1, 2), None)
- :param class: mapped class (must be a positional argument)
- :param row: :class:`.Row` row returned by a :class:`_engine.CursorResult`
- (must be given as a keyword arg)
- :param identity_token: optional identity token
- .. versionadded:: 1.2 added identity_token
- """ # noqa: E501
- if class_ is not None:
- mapper = class_mapper(class_)
- if row is None:
- if ident is None:
- raise sa_exc.ArgumentError("ident or row is required")
- return mapper.identity_key_from_primary_key(
- tuple(util.to_list(ident)), identity_token=identity_token
- )
- else:
- return mapper.identity_key_from_row(
- row, identity_token=identity_token
- )
- elif instance is not None:
- mapper = object_mapper(instance)
- return mapper.identity_key_from_instance(instance)
- else:
- raise sa_exc.ArgumentError("class or instance is required")
- class _TraceAdaptRole(enum.Enum):
- """Enumeration of all the use cases for ORMAdapter.
- ORMAdapter remains one of the most complicated aspects of the ORM, as it is
- used for in-place adaption of column expressions to be applied to a SELECT,
- replacing :class:`.Table` and other objects that are mapped to classes with
- aliases of those tables in the case of joined eager loading, or in the case
- of polymorphic loading as used with concrete mappings or other custom "with
- polymorphic" parameters, with whole user-defined subqueries. The
- enumerations provide an overview of all the use cases used by ORMAdapter, a
- layer of formality as to the introduction of new ORMAdapter use cases (of
- which none are anticipated), as well as a means to trace the origins of a
- particular ORMAdapter within runtime debugging.
- SQLAlchemy 2.0 has greatly scaled back ORM features which relied heavily on
- open-ended statement adaption, including the ``Query.with_polymorphic()``
- method and the ``Query.select_from_entity()`` methods, favoring
- user-explicit aliasing schemes using the ``aliased()`` and
- ``with_polymorphic()`` standalone constructs; these still use adaption,
- however the adaption is applied in a narrower scope.
- """
- # aliased() use that is used to adapt individual attributes at query
- # construction time
- ALIASED_INSP = enum.auto()
- # joinedload cases; typically adapt an ON clause of a relationship
- # join
- JOINEDLOAD_USER_DEFINED_ALIAS = enum.auto()
- JOINEDLOAD_PATH_WITH_POLYMORPHIC = enum.auto()
- JOINEDLOAD_MEMOIZED_ADAPTER = enum.auto()
- # polymorphic cases - these are complex ones that replace FROM
- # clauses, replacing tables with subqueries
- MAPPER_POLYMORPHIC_ADAPTER = enum.auto()
- WITH_POLYMORPHIC_ADAPTER = enum.auto()
- WITH_POLYMORPHIC_ADAPTER_RIGHT_JOIN = enum.auto()
- DEPRECATED_JOIN_ADAPT_RIGHT_SIDE = enum.auto()
- # the from_statement() case, used only to adapt individual attributes
- # from a given statement to local ORM attributes at result fetching
- # time. assigned to ORMCompileState._from_obj_alias
- ADAPT_FROM_STATEMENT = enum.auto()
- # the joinedload for queries that have LIMIT/OFFSET/DISTINCT case;
- # the query is placed inside of a subquery with the LIMIT/OFFSET/etc.,
- # joinedloads are then placed on the outside.
- # assigned to ORMCompileState.compound_eager_adapter
- COMPOUND_EAGER_STATEMENT = enum.auto()
- # the legacy Query._set_select_from() case.
- # this is needed for Query's set operations (i.e. UNION, etc. )
- # as well as "legacy from_self()", which while removed from 2.0 as
- # public API, is used for the Query.count() method. this one
- # still does full statement traversal
- # assigned to ORMCompileState._from_obj_alias
- LEGACY_SELECT_FROM_ALIAS = enum.auto()
- class ORMStatementAdapter(sql_util.ColumnAdapter):
- """ColumnAdapter which includes a role attribute."""
- __slots__ = ("role",)
- def __init__(
- self,
- role: _TraceAdaptRole,
- selectable: Selectable,
- *,
- equivalents: Optional[_EquivalentColumnMap] = None,
- adapt_required: bool = False,
- allow_label_resolve: bool = True,
- anonymize_labels: bool = False,
- adapt_on_names: bool = False,
- adapt_from_selectables: Optional[AbstractSet[FromClause]] = None,
- ):
- self.role = role
- super().__init__(
- selectable,
- equivalents=equivalents,
- adapt_required=adapt_required,
- allow_label_resolve=allow_label_resolve,
- anonymize_labels=anonymize_labels,
- adapt_on_names=adapt_on_names,
- adapt_from_selectables=adapt_from_selectables,
- )
- class ORMAdapter(sql_util.ColumnAdapter):
- """ColumnAdapter subclass which excludes adaptation of entities from
- non-matching mappers.
- """
- __slots__ = ("role", "mapper", "is_aliased_class", "aliased_insp")
- is_aliased_class: bool
- aliased_insp: Optional[AliasedInsp[Any]]
- def __init__(
- self,
- role: _TraceAdaptRole,
- entity: _InternalEntityType[Any],
- *,
- equivalents: Optional[_EquivalentColumnMap] = None,
- adapt_required: bool = False,
- allow_label_resolve: bool = True,
- anonymize_labels: bool = False,
- selectable: Optional[Selectable] = None,
- limit_on_entity: bool = True,
- adapt_on_names: bool = False,
- adapt_from_selectables: Optional[AbstractSet[FromClause]] = None,
- ):
- self.role = role
- self.mapper = entity.mapper
- if selectable is None:
- selectable = entity.selectable
- if insp_is_aliased_class(entity):
- self.is_aliased_class = True
- self.aliased_insp = entity
- else:
- self.is_aliased_class = False
- self.aliased_insp = None
- super().__init__(
- selectable,
- equivalents,
- adapt_required=adapt_required,
- allow_label_resolve=allow_label_resolve,
- anonymize_labels=anonymize_labels,
- include_fn=self._include_fn if limit_on_entity else None,
- adapt_on_names=adapt_on_names,
- adapt_from_selectables=adapt_from_selectables,
- )
- def _include_fn(self, elem):
- entity = elem._annotations.get("parentmapper", None)
- return not entity or entity.isa(self.mapper) or self.mapper.isa(entity)
- class AliasedClass(
- inspection.Inspectable["AliasedInsp[_O]"], ORMColumnsClauseRole[_O]
- ):
- r"""Represents an "aliased" form of a mapped class for usage with Query.
- The ORM equivalent of a :func:`~sqlalchemy.sql.expression.alias`
- construct, this object mimics the mapped class using a
- ``__getattr__`` scheme and maintains a reference to a
- real :class:`~sqlalchemy.sql.expression.Alias` object.
- A primary purpose of :class:`.AliasedClass` is to serve as an alternate
- within a SQL statement generated by the ORM, such that an existing
- mapped entity can be used in multiple contexts. A simple example::
- # find all pairs of users with the same name
- user_alias = aliased(User)
- session.query(User, user_alias).join(
- (user_alias, User.id > user_alias.id)
- ).filter(User.name == user_alias.name)
- :class:`.AliasedClass` is also capable of mapping an existing mapped
- class to an entirely new selectable, provided this selectable is column-
- compatible with the existing mapped selectable, and it can also be
- configured in a mapping as the target of a :func:`_orm.relationship`.
- See the links below for examples.
- The :class:`.AliasedClass` object is constructed typically using the
- :func:`_orm.aliased` function. It also is produced with additional
- configuration when using the :func:`_orm.with_polymorphic` function.
- The resulting object is an instance of :class:`.AliasedClass`.
- This object implements an attribute scheme which produces the
- same attribute and method interface as the original mapped
- class, allowing :class:`.AliasedClass` to be compatible
- with any attribute technique which works on the original class,
- including hybrid attributes (see :ref:`hybrids_toplevel`).
- The :class:`.AliasedClass` can be inspected for its underlying
- :class:`_orm.Mapper`, aliased selectable, and other information
- using :func:`_sa.inspect`::
- from sqlalchemy import inspect
- my_alias = aliased(MyClass)
- insp = inspect(my_alias)
- The resulting inspection object is an instance of :class:`.AliasedInsp`.
- .. seealso::
- :func:`.aliased`
- :func:`.with_polymorphic`
- :ref:`relationship_aliased_class`
- :ref:`relationship_to_window_function`
- """
- __name__: str
- def __init__(
- self,
- mapped_class_or_ac: _EntityType[_O],
- alias: Optional[FromClause] = None,
- name: Optional[str] = None,
- flat: bool = False,
- adapt_on_names: bool = False,
- with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]] = None,
- with_polymorphic_discriminator: Optional[ColumnElement[Any]] = None,
- base_alias: Optional[AliasedInsp[Any]] = None,
- use_mapper_path: bool = False,
- represents_outer_join: bool = False,
- ):
- insp = cast(
- "_InternalEntityType[_O]", inspection.inspect(mapped_class_or_ac)
- )
- mapper = insp.mapper
- nest_adapters = False
- if alias is None:
- if insp.is_aliased_class and insp.selectable._is_subquery:
- alias = insp.selectable.alias()
- else:
- alias = (
- mapper._with_polymorphic_selectable._anonymous_fromclause(
- name=name,
- flat=flat,
- )
- )
- elif insp.is_aliased_class:
- nest_adapters = True
- assert alias is not None
- self._aliased_insp = AliasedInsp(
- self,
- insp,
- alias,
- name,
- (
- with_polymorphic_mappers
- if with_polymorphic_mappers
- else mapper.with_polymorphic_mappers
- ),
- (
- with_polymorphic_discriminator
- if with_polymorphic_discriminator is not None
- else mapper.polymorphic_on
- ),
- base_alias,
- use_mapper_path,
- adapt_on_names,
- represents_outer_join,
- nest_adapters,
- )
- self.__name__ = f"aliased({mapper.class_.__name__})"
- @classmethod
- def _reconstitute_from_aliased_insp(
- cls, aliased_insp: AliasedInsp[_O]
- ) -> AliasedClass[_O]:
- obj = cls.__new__(cls)
- obj.__name__ = f"aliased({aliased_insp.mapper.class_.__name__})"
- obj._aliased_insp = aliased_insp
- if aliased_insp._is_with_polymorphic:
- for sub_aliased_insp in aliased_insp._with_polymorphic_entities:
- if sub_aliased_insp is not aliased_insp:
- ent = AliasedClass._reconstitute_from_aliased_insp(
- sub_aliased_insp
- )
- setattr(obj, sub_aliased_insp.class_.__name__, ent)
- return obj
- def __getattr__(self, key: str) -> Any:
- try:
- _aliased_insp = self.__dict__["_aliased_insp"]
- except KeyError:
- raise AttributeError()
- else:
- target = _aliased_insp._target
- # maintain all getattr mechanics
- attr = getattr(target, key)
- # attribute is a method, that will be invoked against a
- # "self"; so just return a new method with the same function and
- # new self
- if hasattr(attr, "__call__") and hasattr(attr, "__self__"):
- return types.MethodType(attr.__func__, self)
- # attribute is a descriptor, that will be invoked against a
- # "self"; so invoke the descriptor against this self
- if hasattr(attr, "__get__"):
- attr = attr.__get__(None, self)
- # attributes within the QueryableAttribute system will want this
- # to be invoked so the object can be adapted
- if hasattr(attr, "adapt_to_entity"):
- attr = attr.adapt_to_entity(_aliased_insp)
- setattr(self, key, attr)
- return attr
- def _get_from_serialized(
- self, key: str, mapped_class: _O, aliased_insp: AliasedInsp[_O]
- ) -> Any:
- # this method is only used in terms of the
- # sqlalchemy.ext.serializer extension
- attr = getattr(mapped_class, key)
- if hasattr(attr, "__call__") and hasattr(attr, "__self__"):
- return types.MethodType(attr.__func__, self)
- # attribute is a descriptor, that will be invoked against a
- # "self"; so invoke the descriptor against this self
- if hasattr(attr, "__get__"):
- attr = attr.__get__(None, self)
- # attributes within the QueryableAttribute system will want this
- # to be invoked so the object can be adapted
- if hasattr(attr, "adapt_to_entity"):
- aliased_insp._weak_entity = weakref.ref(self)
- attr = attr.adapt_to_entity(aliased_insp)
- setattr(self, key, attr)
- return attr
- def __repr__(self) -> str:
- return "<AliasedClass at 0x%x; %s>" % (
- id(self),
- self._aliased_insp._target.__name__,
- )
- def __str__(self) -> str:
- return str(self._aliased_insp)
- @inspection._self_inspects
- class AliasedInsp(
- ORMEntityColumnsClauseRole[_O],
- ORMFromClauseRole,
- HasCacheKey,
- InspectionAttr,
- MemoizedSlots,
- inspection.Inspectable["AliasedInsp[_O]"],
- Generic[_O],
- ):
- """Provide an inspection interface for an
- :class:`.AliasedClass` object.
- The :class:`.AliasedInsp` object is returned
- given an :class:`.AliasedClass` using the
- :func:`_sa.inspect` function::
- from sqlalchemy import inspect
- from sqlalchemy.orm import aliased
- my_alias = aliased(MyMappedClass)
- insp = inspect(my_alias)
- Attributes on :class:`.AliasedInsp`
- include:
- * ``entity`` - the :class:`.AliasedClass` represented.
- * ``mapper`` - the :class:`_orm.Mapper` mapping the underlying class.
- * ``selectable`` - the :class:`_expression.Alias`
- construct which ultimately
- represents an aliased :class:`_schema.Table` or
- :class:`_expression.Select`
- construct.
- * ``name`` - the name of the alias. Also is used as the attribute
- name when returned in a result tuple from :class:`_query.Query`.
- * ``with_polymorphic_mappers`` - collection of :class:`_orm.Mapper`
- objects
- indicating all those mappers expressed in the select construct
- for the :class:`.AliasedClass`.
- * ``polymorphic_on`` - an alternate column or SQL expression which
- will be used as the "discriminator" for a polymorphic load.
- .. seealso::
- :ref:`inspection_toplevel`
- """
- __slots__ = (
- "__weakref__",
- "_weak_entity",
- "mapper",
- "selectable",
- "name",
- "_adapt_on_names",
- "with_polymorphic_mappers",
- "polymorphic_on",
- "_use_mapper_path",
- "_base_alias",
- "represents_outer_join",
- "persist_selectable",
- "local_table",
- "_is_with_polymorphic",
- "_with_polymorphic_entities",
- "_adapter",
- "_target",
- "__clause_element__",
- "_memoized_values",
- "_all_column_expressions",
- "_nest_adapters",
- )
- _cache_key_traversal = [
- ("name", visitors.ExtendedInternalTraversal.dp_string),
- ("_adapt_on_names", visitors.ExtendedInternalTraversal.dp_boolean),
- ("_use_mapper_path", visitors.ExtendedInternalTraversal.dp_boolean),
- ("_target", visitors.ExtendedInternalTraversal.dp_inspectable),
- ("selectable", visitors.ExtendedInternalTraversal.dp_clauseelement),
- (
- "with_polymorphic_mappers",
- visitors.InternalTraversal.dp_has_cache_key_list,
- ),
- ("polymorphic_on", visitors.InternalTraversal.dp_clauseelement),
- ]
- mapper: Mapper[_O]
- selectable: FromClause
- _adapter: ORMAdapter
- with_polymorphic_mappers: Sequence[Mapper[Any]]
- _with_polymorphic_entities: Sequence[AliasedInsp[Any]]
- _weak_entity: weakref.ref[AliasedClass[_O]]
- """the AliasedClass that refers to this AliasedInsp"""
- _target: Union[Type[_O], AliasedClass[_O]]
- """the thing referenced by the AliasedClass/AliasedInsp.
- In the vast majority of cases, this is the mapped class. However
- it may also be another AliasedClass (alias of alias).
- """
- def __init__(
- self,
- entity: AliasedClass[_O],
- inspected: _InternalEntityType[_O],
- selectable: FromClause,
- name: Optional[str],
- with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]],
- polymorphic_on: Optional[ColumnElement[Any]],
- _base_alias: Optional[AliasedInsp[Any]],
- _use_mapper_path: bool,
- adapt_on_names: bool,
- represents_outer_join: bool,
- nest_adapters: bool,
- ):
- mapped_class_or_ac = inspected.entity
- mapper = inspected.mapper
- self._weak_entity = weakref.ref(entity)
- self.mapper = mapper
- self.selectable = self.persist_selectable = self.local_table = (
- selectable
- )
- self.name = name
- self.polymorphic_on = polymorphic_on
- self._base_alias = weakref.ref(_base_alias or self)
- self._use_mapper_path = _use_mapper_path
- self.represents_outer_join = represents_outer_join
- self._nest_adapters = nest_adapters
- if with_polymorphic_mappers:
- self._is_with_polymorphic = True
- self.with_polymorphic_mappers = with_polymorphic_mappers
- self._with_polymorphic_entities = []
- for poly in self.with_polymorphic_mappers:
- if poly is not mapper:
- ent = AliasedClass(
- poly.class_,
- selectable,
- base_alias=self,
- adapt_on_names=adapt_on_names,
- use_mapper_path=_use_mapper_path,
- )
- setattr(self.entity, poly.class_.__name__, ent)
- self._with_polymorphic_entities.append(ent._aliased_insp)
- else:
- self._is_with_polymorphic = False
- self.with_polymorphic_mappers = [mapper]
- self._adapter = ORMAdapter(
- _TraceAdaptRole.ALIASED_INSP,
- mapper,
- selectable=selectable,
- equivalents=mapper._equivalent_columns,
- adapt_on_names=adapt_on_names,
- anonymize_labels=True,
- # make sure the adapter doesn't try to grab other tables that
- # are not even the thing we are mapping, such as embedded
- # selectables in subqueries or CTEs. See issue #6060
- adapt_from_selectables={
- m.selectable
- for m in self.with_polymorphic_mappers
- if not adapt_on_names
- },
- limit_on_entity=False,
- )
- if nest_adapters:
- # supports "aliased class of aliased class" use case
- assert isinstance(inspected, AliasedInsp)
- self._adapter = inspected._adapter.wrap(self._adapter)
- self._adapt_on_names = adapt_on_names
- self._target = mapped_class_or_ac
- @classmethod
- def _alias_factory(
- cls,
- element: Union[_EntityType[_O], FromClause],
- alias: Optional[FromClause] = None,
- name: Optional[str] = None,
- flat: bool = False,
- adapt_on_names: bool = False,
- ) -> Union[AliasedClass[_O], FromClause]:
- if isinstance(element, FromClause):
- if adapt_on_names:
- raise sa_exc.ArgumentError(
- "adapt_on_names only applies to ORM elements"
- )
- if name:
- return element.alias(name=name, flat=flat)
- else:
- return coercions.expect(
- roles.AnonymizedFromClauseRole, element, flat=flat
- )
- else:
- return AliasedClass(
- element,
- alias=alias,
- flat=flat,
- name=name,
- adapt_on_names=adapt_on_names,
- )
- @classmethod
- def _with_polymorphic_factory(
- cls,
- base: Union[Type[_O], Mapper[_O]],
- classes: Union[Literal["*"], Iterable[_EntityType[Any]]],
- selectable: Union[Literal[False, None], FromClause] = False,
- flat: bool = False,
- polymorphic_on: Optional[ColumnElement[Any]] = None,
- aliased: bool = False,
- innerjoin: bool = False,
- adapt_on_names: bool = False,
- name: Optional[str] = None,
- _use_mapper_path: bool = False,
- ) -> AliasedClass[_O]:
- primary_mapper = _class_to_mapper(base)
- if selectable not in (None, False) and flat:
- raise sa_exc.ArgumentError(
- "the 'flat' and 'selectable' arguments cannot be passed "
- "simultaneously to with_polymorphic()"
- )
- mappers, selectable = primary_mapper._with_polymorphic_args(
- classes, selectable, innerjoin=innerjoin
- )
- if aliased or flat:
- assert selectable is not None
- selectable = selectable._anonymous_fromclause(flat=flat)
- return AliasedClass(
- base,
- selectable,
- name=name,
- with_polymorphic_mappers=mappers,
- adapt_on_names=adapt_on_names,
- with_polymorphic_discriminator=polymorphic_on,
- use_mapper_path=_use_mapper_path,
- represents_outer_join=not innerjoin,
- )
- @property
- def entity(self) -> AliasedClass[_O]:
- # to eliminate reference cycles, the AliasedClass is held weakly.
- # this produces some situations where the AliasedClass gets lost,
- # particularly when one is created internally and only the AliasedInsp
- # is passed around.
- # to work around this case, we just generate a new one when we need
- # it, as it is a simple class with very little initial state on it.
- ent = self._weak_entity()
- if ent is None:
- ent = AliasedClass._reconstitute_from_aliased_insp(self)
- self._weak_entity = weakref.ref(ent)
- return ent
- is_aliased_class = True
- "always returns True"
- def _memoized_method___clause_element__(self) -> FromClause:
- return self.selectable._annotate(
- {
- "parentmapper": self.mapper,
- "parententity": self,
- "entity_namespace": self,
- }
- )._set_propagate_attrs(
- {"compile_state_plugin": "orm", "plugin_subject": self}
- )
- @property
- def entity_namespace(self) -> AliasedClass[_O]:
- return self.entity
- @property
- def class_(self) -> Type[_O]:
- """Return the mapped class ultimately represented by this
- :class:`.AliasedInsp`."""
- return self.mapper.class_
- @property
- def _path_registry(self) -> AbstractEntityRegistry:
- if self._use_mapper_path:
- return self.mapper._path_registry
- else:
- return PathRegistry.per_mapper(self)
- def __getstate__(self) -> Dict[str, Any]:
- return {
- "entity": self.entity,
- "mapper": self.mapper,
- "alias": self.selectable,
- "name": self.name,
- "adapt_on_names": self._adapt_on_names,
- "with_polymorphic_mappers": self.with_polymorphic_mappers,
- "with_polymorphic_discriminator": self.polymorphic_on,
- "base_alias": self._base_alias(),
- "use_mapper_path": self._use_mapper_path,
- "represents_outer_join": self.represents_outer_join,
- "nest_adapters": self._nest_adapters,
- }
- def __setstate__(self, state: Dict[str, Any]) -> None:
- self.__init__( # type: ignore
- state["entity"],
- state["mapper"],
- state["alias"],
- state["name"],
- state["with_polymorphic_mappers"],
- state["with_polymorphic_discriminator"],
- state["base_alias"],
- state["use_mapper_path"],
- state["adapt_on_names"],
- state["represents_outer_join"],
- state["nest_adapters"],
- )
- def _merge_with(self, other: AliasedInsp[_O]) -> AliasedInsp[_O]:
- # assert self._is_with_polymorphic
- # assert other._is_with_polymorphic
- primary_mapper = other.mapper
- assert self.mapper is primary_mapper
- our_classes = util.to_set(
- mp.class_ for mp in self.with_polymorphic_mappers
- )
- new_classes = {mp.class_ for mp in other.with_polymorphic_mappers}
- if our_classes == new_classes:
- return other
- else:
- classes = our_classes.union(new_classes)
- mappers, selectable = primary_mapper._with_polymorphic_args(
- classes, None, innerjoin=not other.represents_outer_join
- )
- selectable = selectable._anonymous_fromclause(flat=True)
- return AliasedClass(
- primary_mapper,
- selectable,
- with_polymorphic_mappers=mappers,
- with_polymorphic_discriminator=other.polymorphic_on,
- use_mapper_path=other._use_mapper_path,
- represents_outer_join=other.represents_outer_join,
- )._aliased_insp
- def _adapt_element(
- self, expr: _ORMCOLEXPR, key: Optional[str] = None
- ) -> _ORMCOLEXPR:
- assert isinstance(expr, ColumnElement)
- d: Dict[str, Any] = {
- "parententity": self,
- "parentmapper": self.mapper,
- }
- if key:
- d["proxy_key"] = key
- # IMO mypy should see this one also as returning the same type
- # we put into it, but it's not
- return (
- self._adapter.traverse(expr)
- ._annotate(d)
- ._set_propagate_attrs(
- {"compile_state_plugin": "orm", "plugin_subject": self}
- )
- )
- if TYPE_CHECKING:
- # establish compatibility with the _ORMAdapterProto protocol,
- # which in turn is compatible with _CoreAdapterProto.
- def _orm_adapt_element(
- self,
- obj: _CE,
- key: Optional[str] = None,
- ) -> _CE: ...
- else:
- _orm_adapt_element = _adapt_element
- def _entity_for_mapper(self, mapper):
- self_poly = self.with_polymorphic_mappers
- if mapper in self_poly:
- if mapper is self.mapper:
- return self
- else:
- return getattr(
- self.entity, mapper.class_.__name__
- )._aliased_insp
- elif mapper.isa(self.mapper):
- return self
- else:
- assert False, "mapper %s doesn't correspond to %s" % (mapper, self)
- def _memoized_attr__get_clause(self):
- onclause, replacemap = self.mapper._get_clause
- return (
- self._adapter.traverse(onclause),
- {
- self._adapter.traverse(col): param
- for col, param in replacemap.items()
- },
- )
- def _memoized_attr__memoized_values(self):
- return {}
- def _memoized_attr__all_column_expressions(self):
- if self._is_with_polymorphic:
- cols_plus_keys = self.mapper._columns_plus_keys(
- [ent.mapper for ent in self._with_polymorphic_entities]
- )
- else:
- cols_plus_keys = self.mapper._columns_plus_keys()
- cols_plus_keys = [
- (key, self._adapt_element(col)) for key, col in cols_plus_keys
- ]
- return ColumnCollection(cols_plus_keys)
- def _memo(self, key, callable_, *args, **kw):
- if key in self._memoized_values:
- return self._memoized_values[key]
- else:
- self._memoized_values[key] = value = callable_(*args, **kw)
- return value
- def __repr__(self):
- if self.with_polymorphic_mappers:
- with_poly = "(%s)" % ", ".join(
- mp.class_.__name__ for mp in self.with_polymorphic_mappers
- )
- else:
- with_poly = ""
- return "<AliasedInsp at 0x%x; %s%s>" % (
- id(self),
- self.class_.__name__,
- with_poly,
- )
- def __str__(self):
- if self._is_with_polymorphic:
- return "with_polymorphic(%s, [%s])" % (
- self._target.__name__,
- ", ".join(
- mp.class_.__name__
- for mp in self.with_polymorphic_mappers
- if mp is not self.mapper
- ),
- )
- else:
- return "aliased(%s)" % (self._target.__name__,)
- class _WrapUserEntity:
- """A wrapper used within the loader_criteria lambda caller so that
- we can bypass declared_attr descriptors on unmapped mixins, which
- normally emit a warning for such use.
- might also be useful for other per-lambda instrumentations should
- the need arise.
- """
- __slots__ = ("subject",)
- def __init__(self, subject):
- self.subject = subject
- @util.preload_module("sqlalchemy.orm.decl_api")
- def __getattribute__(self, name):
- decl_api = util.preloaded.orm.decl_api
- subject = object.__getattribute__(self, "subject")
- if name in subject.__dict__ and isinstance(
- subject.__dict__[name], decl_api.declared_attr
- ):
- return subject.__dict__[name].fget(subject)
- else:
- return getattr(subject, name)
- class LoaderCriteriaOption(CriteriaOption):
- """Add additional WHERE criteria to the load for all occurrences of
- a particular entity.
- :class:`_orm.LoaderCriteriaOption` is invoked using the
- :func:`_orm.with_loader_criteria` function; see that function for
- details.
- .. versionadded:: 1.4
- """
- __slots__ = (
- "root_entity",
- "entity",
- "deferred_where_criteria",
- "where_criteria",
- "_where_crit_orig",
- "include_aliases",
- "propagate_to_loaders",
- )
- _traverse_internals = [
- ("root_entity", visitors.ExtendedInternalTraversal.dp_plain_obj),
- ("entity", visitors.ExtendedInternalTraversal.dp_has_cache_key),
- ("where_criteria", visitors.InternalTraversal.dp_clauseelement),
- ("include_aliases", visitors.InternalTraversal.dp_boolean),
- ("propagate_to_loaders", visitors.InternalTraversal.dp_boolean),
- ]
- root_entity: Optional[Type[Any]]
- entity: Optional[_InternalEntityType[Any]]
- where_criteria: Union[ColumnElement[bool], lambdas.DeferredLambdaElement]
- deferred_where_criteria: bool
- include_aliases: bool
- propagate_to_loaders: bool
- _where_crit_orig: Any
- def __init__(
- self,
- entity_or_base: _EntityType[Any],
- where_criteria: Union[
- _ColumnExpressionArgument[bool],
- Callable[[Any], _ColumnExpressionArgument[bool]],
- ],
- loader_only: bool = False,
- include_aliases: bool = False,
- propagate_to_loaders: bool = True,
- track_closure_variables: bool = True,
- ):
- entity = cast(
- "_InternalEntityType[Any]",
- inspection.inspect(entity_or_base, False),
- )
- if entity is None:
- self.root_entity = cast("Type[Any]", entity_or_base)
- self.entity = None
- else:
- self.root_entity = None
- self.entity = entity
- self._where_crit_orig = where_criteria
- if callable(where_criteria):
- if self.root_entity is not None:
- wrap_entity = self.root_entity
- else:
- assert entity is not None
- wrap_entity = entity.entity
- self.deferred_where_criteria = True
- self.where_criteria = lambdas.DeferredLambdaElement(
- where_criteria,
- roles.WhereHavingRole,
- lambda_args=(_WrapUserEntity(wrap_entity),),
- opts=lambdas.LambdaOptions(
- track_closure_variables=track_closure_variables
- ),
- )
- else:
- self.deferred_where_criteria = False
- self.where_criteria = coercions.expect(
- roles.WhereHavingRole, where_criteria
- )
- self.include_aliases = include_aliases
- self.propagate_to_loaders = propagate_to_loaders
- @classmethod
- def _unreduce(
- cls, entity, where_criteria, include_aliases, propagate_to_loaders
- ):
- return LoaderCriteriaOption(
- entity,
- where_criteria,
- include_aliases=include_aliases,
- propagate_to_loaders=propagate_to_loaders,
- )
- def __reduce__(self):
- return (
- LoaderCriteriaOption._unreduce,
- (
- self.entity.class_ if self.entity else self.root_entity,
- self._where_crit_orig,
- self.include_aliases,
- self.propagate_to_loaders,
- ),
- )
- def _all_mappers(self) -> Iterator[Mapper[Any]]:
- if self.entity:
- yield from self.entity.mapper.self_and_descendants
- else:
- assert self.root_entity
- stack = list(self.root_entity.__subclasses__())
- while stack:
- subclass = stack.pop(0)
- ent = cast(
- "_InternalEntityType[Any]",
- inspection.inspect(subclass, raiseerr=False),
- )
- if ent:
- yield from ent.mapper.self_and_descendants
- else:
- stack.extend(subclass.__subclasses__())
- def _should_include(self, compile_state: ORMCompileState) -> bool:
- if (
- compile_state.select_statement._annotations.get(
- "for_loader_criteria", None
- )
- is self
- ):
- return False
- return True
- def _resolve_where_criteria(
- self, ext_info: _InternalEntityType[Any]
- ) -> ColumnElement[bool]:
- if self.deferred_where_criteria:
- crit = cast(
- "ColumnElement[bool]",
- self.where_criteria._resolve_with_args(ext_info.entity),
- )
- else:
- crit = self.where_criteria # type: ignore
- assert isinstance(crit, ColumnElement)
- return sql_util._deep_annotate(
- crit,
- {"for_loader_criteria": self},
- detect_subquery_cols=True,
- ind_cols_on_fromclause=True,
- )
- def process_compile_state_replaced_entities(
- self,
- compile_state: ORMCompileState,
- mapper_entities: Iterable[_MapperEntity],
- ) -> None:
- self.process_compile_state(compile_state)
- def process_compile_state(self, compile_state: ORMCompileState) -> None:
- """Apply a modification to a given :class:`.CompileState`."""
- # if options to limit the criteria to immediate query only,
- # use compile_state.attributes instead
- self.get_global_criteria(compile_state.global_attributes)
- def get_global_criteria(self, attributes: Dict[Any, Any]) -> None:
- for mp in self._all_mappers():
- load_criteria = attributes.setdefault(
- ("additional_entity_criteria", mp), []
- )
- load_criteria.append(self)
- inspection._inspects(AliasedClass)(lambda target: target._aliased_insp)
- @inspection._inspects(type)
- def _inspect_mc(
- class_: Type[_O],
- ) -> Optional[Mapper[_O]]:
- try:
- class_manager = opt_manager_of_class(class_)
- if class_manager is None or not class_manager.is_mapped:
- return None
- mapper = class_manager.mapper
- except exc.NO_STATE:
- return None
- else:
- return mapper
- GenericAlias = type(List[Any])
- @inspection._inspects(GenericAlias)
- def _inspect_generic_alias(
- class_: Type[_O],
- ) -> Optional[Mapper[_O]]:
- origin = cast("Type[_O]", get_origin(class_))
- return _inspect_mc(origin)
- @inspection._self_inspects
- class Bundle(
- ORMColumnsClauseRole[_T],
- SupportsCloneAnnotations,
- MemoizedHasCacheKey,
- inspection.Inspectable["Bundle[_T]"],
- InspectionAttr,
- ):
- """A grouping of SQL expressions that are returned by a :class:`.Query`
- under one namespace.
- The :class:`.Bundle` essentially allows nesting of the tuple-based
- results returned by a column-oriented :class:`_query.Query` object.
- It also
- is extensible via simple subclassing, where the primary capability
- to override is that of how the set of expressions should be returned,
- allowing post-processing as well as custom return types, without
- involving ORM identity-mapped classes.
- .. seealso::
- :ref:`bundles`
- """
- single_entity = False
- """If True, queries for a single Bundle will be returned as a single
- entity, rather than an element within a keyed tuple."""
- is_clause_element = False
- is_mapper = False
- is_aliased_class = False
- is_bundle = True
- _propagate_attrs: _PropagateAttrsType = util.immutabledict()
- proxy_set = util.EMPTY_SET
- exprs: List[_ColumnsClauseElement]
- def __init__(
- self, name: str, *exprs: _ColumnExpressionArgument[Any], **kw: Any
- ):
- r"""Construct a new :class:`.Bundle`.
- e.g.::
- bn = Bundle("mybundle", MyClass.x, MyClass.y)
- for row in session.query(bn).filter(bn.c.x == 5).filter(bn.c.y == 4):
- print(row.mybundle.x, row.mybundle.y)
- :param name: name of the bundle.
- :param \*exprs: columns or SQL expressions comprising the bundle.
- :param single_entity=False: if True, rows for this :class:`.Bundle`
- can be returned as a "single entity" outside of any enclosing tuple
- in the same manner as a mapped entity.
- """ # noqa: E501
- self.name = self._label = name
- coerced_exprs = [
- coercions.expect(
- roles.ColumnsClauseRole, expr, apply_propagate_attrs=self
- )
- for expr in exprs
- ]
- self.exprs = coerced_exprs
- self.c = self.columns = ColumnCollection(
- (getattr(col, "key", col._label), col)
- for col in [e._annotations.get("bundle", e) for e in coerced_exprs]
- ).as_readonly()
- self.single_entity = kw.pop("single_entity", self.single_entity)
- def _gen_cache_key(
- self, anon_map: anon_map, bindparams: List[BindParameter[Any]]
- ) -> Tuple[Any, ...]:
- return (self.__class__, self.name, self.single_entity) + tuple(
- [expr._gen_cache_key(anon_map, bindparams) for expr in self.exprs]
- )
- @property
- def mapper(self) -> Optional[Mapper[Any]]:
- mp: Optional[Mapper[Any]] = self.exprs[0]._annotations.get(
- "parentmapper", None
- )
- return mp
- @property
- def entity(self) -> Optional[_InternalEntityType[Any]]:
- ie: Optional[_InternalEntityType[Any]] = self.exprs[
- 0
- ]._annotations.get("parententity", None)
- return ie
- @property
- def entity_namespace(
- self,
- ) -> ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]:
- return self.c
- columns: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]
- """A namespace of SQL expressions referred to by this :class:`.Bundle`.
- e.g.::
- bn = Bundle("mybundle", MyClass.x, MyClass.y)
- q = sess.query(bn).filter(bn.c.x == 5)
- Nesting of bundles is also supported::
- b1 = Bundle(
- "b1",
- Bundle("b2", MyClass.a, MyClass.b),
- Bundle("b3", MyClass.x, MyClass.y),
- )
- q = sess.query(b1).filter(b1.c.b2.c.a == 5).filter(b1.c.b3.c.y == 9)
- .. seealso::
- :attr:`.Bundle.c`
- """ # noqa: E501
- c: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]
- """An alias for :attr:`.Bundle.columns`."""
- def _clone(self, **kw):
- cloned = self.__class__.__new__(self.__class__)
- cloned.__dict__.update(self.__dict__)
- return cloned
- def __clause_element__(self):
- # ensure existing entity_namespace remains
- annotations = {"bundle": self, "entity_namespace": self}
- annotations.update(self._annotations)
- plugin_subject = self.exprs[0]._propagate_attrs.get(
- "plugin_subject", self.entity
- )
- return (
- expression.ClauseList(
- _literal_as_text_role=roles.ColumnsClauseRole,
- group=False,
- *[e._annotations.get("bundle", e) for e in self.exprs],
- )
- ._annotate(annotations)
- ._set_propagate_attrs(
- # the Bundle *must* use the orm plugin no matter what. the
- # subject can be None but it's much better if it's not.
- {
- "compile_state_plugin": "orm",
- "plugin_subject": plugin_subject,
- }
- )
- )
- @property
- def clauses(self):
- return self.__clause_element__().clauses
- def label(self, name):
- """Provide a copy of this :class:`.Bundle` passing a new label."""
- cloned = self._clone()
- cloned.name = name
- return cloned
- def create_row_processor(
- self,
- query: Select[Any],
- procs: Sequence[Callable[[Row[Any]], Any]],
- labels: Sequence[str],
- ) -> Callable[[Row[Any]], Any]:
- """Produce the "row processing" function for this :class:`.Bundle`.
- May be overridden by subclasses to provide custom behaviors when
- results are fetched. The method is passed the statement object and a
- set of "row processor" functions at query execution time; these
- processor functions when given a result row will return the individual
- attribute value, which can then be adapted into any kind of return data
- structure.
- The example below illustrates replacing the usual :class:`.Row`
- return structure with a straight Python dictionary::
- from sqlalchemy.orm import Bundle
- class DictBundle(Bundle):
- def create_row_processor(self, query, procs, labels):
- "Override create_row_processor to return values as dictionaries"
- def proc(row):
- return dict(zip(labels, (proc(row) for proc in procs)))
- return proc
- A result from the above :class:`_orm.Bundle` will return dictionary
- values::
- bn = DictBundle("mybundle", MyClass.data1, MyClass.data2)
- for row in session.execute(select(bn)).where(bn.c.data1 == "d1"):
- print(row.mybundle["data1"], row.mybundle["data2"])
- """ # noqa: E501
- keyed_tuple = result_tuple(labels, [() for l in labels])
- def proc(row: Row[Any]) -> Any:
- return keyed_tuple([proc(row) for proc in procs])
- return proc
- def _orm_annotate(element: _SA, exclude: Optional[Any] = None) -> _SA:
- """Deep copy the given ClauseElement, annotating each element with the
- "_orm_adapt" flag.
- Elements within the exclude collection will be cloned but not annotated.
- """
- return sql_util._deep_annotate(element, {"_orm_adapt": True}, exclude)
- def _orm_deannotate(element: _SA) -> _SA:
- """Remove annotations that link a column to a particular mapping.
- Note this doesn't affect "remote" and "foreign" annotations
- passed by the :func:`_orm.foreign` and :func:`_orm.remote`
- annotators.
- """
- return sql_util._deep_deannotate(
- element, values=("_orm_adapt", "parententity")
- )
- def _orm_full_deannotate(element: _SA) -> _SA:
- return sql_util._deep_deannotate(element)
- class _ORMJoin(expression.Join):
- """Extend Join to support ORM constructs as input."""
- __visit_name__ = expression.Join.__visit_name__
- inherit_cache = True
- def __init__(
- self,
- left: _FromClauseArgument,
- right: _FromClauseArgument,
- onclause: Optional[_OnClauseArgument] = None,
- isouter: bool = False,
- full: bool = False,
- _left_memo: Optional[Any] = None,
- _right_memo: Optional[Any] = None,
- _extra_criteria: Tuple[ColumnElement[bool], ...] = (),
- ):
- left_info = cast(
- "Union[FromClause, _InternalEntityType[Any]]",
- inspection.inspect(left),
- )
- right_info = cast(
- "Union[FromClause, _InternalEntityType[Any]]",
- inspection.inspect(right),
- )
- adapt_to = right_info.selectable
- # used by joined eager loader
- self._left_memo = _left_memo
- self._right_memo = _right_memo
- if isinstance(onclause, attributes.QueryableAttribute):
- if TYPE_CHECKING:
- assert isinstance(
- onclause.comparator, RelationshipProperty.Comparator
- )
- on_selectable = onclause.comparator._source_selectable()
- prop = onclause.property
- _extra_criteria += onclause._extra_criteria
- elif isinstance(onclause, MapperProperty):
- # used internally by joined eager loader...possibly not ideal
- prop = onclause
- on_selectable = prop.parent.selectable
- else:
- prop = None
- on_selectable = None
- left_selectable = left_info.selectable
- if prop:
- adapt_from: Optional[FromClause]
- if sql_util.clause_is_present(on_selectable, left_selectable):
- adapt_from = on_selectable
- else:
- assert isinstance(left_selectable, FromClause)
- adapt_from = left_selectable
- (
- pj,
- sj,
- source,
- dest,
- secondary,
- target_adapter,
- ) = prop._create_joins(
- source_selectable=adapt_from,
- dest_selectable=adapt_to,
- source_polymorphic=True,
- of_type_entity=right_info,
- alias_secondary=True,
- extra_criteria=_extra_criteria,
- )
- if sj is not None:
- if isouter:
- # note this is an inner join from secondary->right
- right = sql.join(secondary, right, sj)
- onclause = pj
- else:
- left = sql.join(left, secondary, pj, isouter)
- onclause = sj
- else:
- onclause = pj
- self._target_adapter = target_adapter
- # we don't use the normal coercions logic for _ORMJoin
- # (probably should), so do some gymnastics to get the entity.
- # logic here is for #8721, which was a major bug in 1.4
- # for almost two years, not reported/fixed until 1.4.43 (!)
- if is_selectable(left_info):
- parententity = left_selectable._annotations.get(
- "parententity", None
- )
- elif insp_is_mapper(left_info) or insp_is_aliased_class(left_info):
- parententity = left_info
- else:
- parententity = None
- if parententity is not None:
- self._annotations = self._annotations.union(
- {"parententity": parententity}
- )
- augment_onclause = bool(_extra_criteria) and not prop
- expression.Join.__init__(self, left, right, onclause, isouter, full)
- assert self.onclause is not None
- if augment_onclause:
- self.onclause &= sql.and_(*_extra_criteria)
- if (
- not prop
- and getattr(right_info, "mapper", None)
- and right_info.mapper.single # type: ignore
- ):
- right_info = cast("_InternalEntityType[Any]", right_info)
- # if single inheritance target and we are using a manual
- # or implicit ON clause, augment it the same way we'd augment the
- # WHERE.
- single_crit = right_info.mapper._single_table_criterion
- if single_crit is not None:
- if insp_is_aliased_class(right_info):
- single_crit = right_info._adapter.traverse(single_crit)
- self.onclause = self.onclause & single_crit
- def _splice_into_center(self, other):
- """Splice a join into the center.
- Given join(a, b) and join(b, c), return join(a, b).join(c)
- """
- leftmost = other
- while isinstance(leftmost, sql.Join):
- leftmost = leftmost.left
- assert self.right is leftmost
- left = _ORMJoin(
- self.left,
- other.left,
- self.onclause,
- isouter=self.isouter,
- _left_memo=self._left_memo,
- _right_memo=other._left_memo._path_registry,
- )
- return _ORMJoin(
- left,
- other.right,
- other.onclause,
- isouter=other.isouter,
- _right_memo=other._right_memo,
- )
- def join(
- self,
- right: _FromClauseArgument,
- onclause: Optional[_OnClauseArgument] = None,
- isouter: bool = False,
- full: bool = False,
- ) -> _ORMJoin:
- return _ORMJoin(self, right, onclause, full=full, isouter=isouter)
- def outerjoin(
- self,
- right: _FromClauseArgument,
- onclause: Optional[_OnClauseArgument] = None,
- full: bool = False,
- ) -> _ORMJoin:
- return _ORMJoin(self, right, onclause, isouter=True, full=full)
- def with_parent(
- instance: object,
- prop: attributes.QueryableAttribute[Any],
- from_entity: Optional[_EntityType[Any]] = None,
- ) -> ColumnElement[bool]:
- """Create filtering criterion that relates this query's primary entity
- to the given related instance, using established
- :func:`_orm.relationship()`
- configuration.
- E.g.::
- stmt = select(Address).where(with_parent(some_user, User.addresses))
- The SQL rendered is the same as that rendered when a lazy loader
- would fire off from the given parent on that attribute, meaning
- that the appropriate state is taken from the parent object in
- Python without the need to render joins to the parent table
- in the rendered statement.
- The given property may also make use of :meth:`_orm.PropComparator.of_type`
- to indicate the left side of the criteria::
- a1 = aliased(Address)
- a2 = aliased(Address)
- stmt = select(a1, a2).where(with_parent(u1, User.addresses.of_type(a2)))
- The above use is equivalent to using the
- :func:`_orm.with_parent.from_entity` argument::
- a1 = aliased(Address)
- a2 = aliased(Address)
- stmt = select(a1, a2).where(
- with_parent(u1, User.addresses, from_entity=a2)
- )
- :param instance:
- An instance which has some :func:`_orm.relationship`.
- :param property:
- Class-bound attribute, which indicates
- what relationship from the instance should be used to reconcile the
- parent/child relationship.
- :param from_entity:
- Entity in which to consider as the left side. This defaults to the
- "zero" entity of the :class:`_query.Query` itself.
- .. versionadded:: 1.2
- """ # noqa: E501
- prop_t: RelationshipProperty[Any]
- if isinstance(prop, str):
- raise sa_exc.ArgumentError(
- "with_parent() accepts class-bound mapped attributes, not strings"
- )
- elif isinstance(prop, attributes.QueryableAttribute):
- if prop._of_type:
- from_entity = prop._of_type
- mapper_property = prop.property
- if mapper_property is None or not prop_is_relationship(
- mapper_property
- ):
- raise sa_exc.ArgumentError(
- f"Expected relationship property for with_parent(), "
- f"got {mapper_property}"
- )
- prop_t = mapper_property
- else:
- prop_t = prop
- return prop_t._with_parent(instance, from_entity=from_entity)
- def has_identity(object_: object) -> bool:
- """Return True if the given object has a database
- identity.
- This typically corresponds to the object being
- in either the persistent or detached state.
- .. seealso::
- :func:`.was_deleted`
- """
- state = attributes.instance_state(object_)
- return state.has_identity
- def was_deleted(object_: object) -> bool:
- """Return True if the given object was deleted
- within a session flush.
- This is regardless of whether or not the object is
- persistent or detached.
- .. seealso::
- :attr:`.InstanceState.was_deleted`
- """
- state = attributes.instance_state(object_)
- return state.was_deleted
- def _entity_corresponds_to(
- given: _InternalEntityType[Any], entity: _InternalEntityType[Any]
- ) -> bool:
- """determine if 'given' corresponds to 'entity', in terms
- of an entity passed to Query that would match the same entity
- being referred to elsewhere in the query.
- """
- if insp_is_aliased_class(entity):
- if insp_is_aliased_class(given):
- if entity._base_alias() is given._base_alias():
- return True
- return False
- elif insp_is_aliased_class(given):
- if given._use_mapper_path:
- return entity in given.with_polymorphic_mappers
- else:
- return entity is given
- assert insp_is_mapper(given)
- return entity.common_parent(given)
- def _entity_corresponds_to_use_path_impl(
- given: _InternalEntityType[Any], entity: _InternalEntityType[Any]
- ) -> bool:
- """determine if 'given' corresponds to 'entity', in terms
- of a path of loader options where a mapped attribute is taken to
- be a member of a parent entity.
- e.g.::
- someoption(A).someoption(A.b) # -> fn(A, A) -> True
- someoption(A).someoption(C.d) # -> fn(A, C) -> False
- a1 = aliased(A)
- someoption(a1).someoption(A.b) # -> fn(a1, A) -> False
- someoption(a1).someoption(a1.b) # -> fn(a1, a1) -> True
- wp = with_polymorphic(A, [A1, A2])
- someoption(wp).someoption(A1.foo) # -> fn(wp, A1) -> False
- someoption(wp).someoption(wp.A1.foo) # -> fn(wp, wp.A1) -> True
- """
- if insp_is_aliased_class(given):
- return (
- insp_is_aliased_class(entity)
- and not entity._use_mapper_path
- and (given is entity or entity in given._with_polymorphic_entities)
- )
- elif not insp_is_aliased_class(entity):
- return given.isa(entity.mapper)
- else:
- return (
- entity._use_mapper_path
- and given in entity.with_polymorphic_mappers
- )
- def _entity_isa(given: _InternalEntityType[Any], mapper: Mapper[Any]) -> bool:
- """determine if 'given' "is a" mapper, in terms of the given
- would load rows of type 'mapper'.
- """
- if given.is_aliased_class:
- return mapper in given.with_polymorphic_mappers or given.mapper.isa(
- mapper
- )
- elif given.with_polymorphic_mappers:
- return mapper in given.with_polymorphic_mappers or given.isa(mapper)
- else:
- return given.isa(mapper)
- def _getitem(iterable_query: Query[Any], item: Any) -> Any:
- """calculate __getitem__ in terms of an iterable query object
- that also has a slice() method.
- """
- def _no_negative_indexes():
- raise IndexError(
- "negative indexes are not accepted by SQL "
- "index / slice operators"
- )
- if isinstance(item, slice):
- start, stop, step = util.decode_slice(item)
- if (
- isinstance(stop, int)
- and isinstance(start, int)
- and stop - start <= 0
- ):
- return []
- elif (isinstance(start, int) and start < 0) or (
- isinstance(stop, int) and stop < 0
- ):
- _no_negative_indexes()
- res = iterable_query.slice(start, stop)
- if step is not None:
- return list(res)[None : None : item.step]
- else:
- return list(res)
- else:
- if item == -1:
- _no_negative_indexes()
- else:
- return list(iterable_query[item : item + 1])[0]
- def _is_mapped_annotation(
- raw_annotation: _AnnotationScanType,
- cls: Type[Any],
- originating_cls: Type[Any],
- ) -> bool:
- try:
- annotated = de_stringify_annotation(
- cls, raw_annotation, originating_cls.__module__
- )
- except NameError:
- # in most cases, at least within our own tests, we can raise
- # here, which is more accurate as it prevents us from returning
- # false negatives. However, in the real world, try to avoid getting
- # involved with end-user annotations that have nothing to do with us.
- # see issue #8888 where we bypass using this function in the case
- # that we want to detect an unresolvable Mapped[] type.
- return False
- else:
- return is_origin_of_cls(annotated, _MappedAnnotationBase)
- class _CleanupError(Exception):
- pass
- def _cleanup_mapped_str_annotation(
- annotation: str, originating_module: str
- ) -> str:
- # fix up an annotation that comes in as the form:
- # 'Mapped[List[Address]]' so that it instead looks like:
- # 'Mapped[List["Address"]]' , which will allow us to get
- # "Address" as a string
- # additionally, resolve symbols for these names since this is where
- # we'd have to do it
- inner: Optional[Match[str]]
- mm = re.match(r"^([^ \|]+?)\[(.+)\]$", annotation)
- if not mm:
- return annotation
- # ticket #8759. Resolve the Mapped name to a real symbol.
- # originally this just checked the name.
- try:
- obj = eval_name_only(mm.group(1), originating_module)
- except NameError as ne:
- raise _CleanupError(
- f'For annotation "{annotation}", could not resolve '
- f'container type "{mm.group(1)}". '
- "Please ensure this type is imported at the module level "
- "outside of TYPE_CHECKING blocks"
- ) from ne
- if obj is typing.ClassVar:
- real_symbol = "ClassVar"
- else:
- try:
- if issubclass(obj, _MappedAnnotationBase):
- real_symbol = obj.__name__
- else:
- return annotation
- except TypeError:
- # avoid isinstance(obj, type) check, just catch TypeError
- return annotation
- # note: if one of the codepaths above didn't define real_symbol and
- # then didn't return, real_symbol raises UnboundLocalError
- # which is actually a NameError, and the calling routines don't
- # notice this since they are catching NameError anyway. Just in case
- # this is being modified in the future, something to be aware of.
- stack = []
- inner = mm
- while True:
- stack.append(real_symbol if mm is inner else inner.group(1))
- g2 = inner.group(2)
- inner = re.match(r"^([^ \|]+?)\[(.+)\]$", g2)
- if inner is None:
- stack.append(g2)
- break
- # stacks we want to rewrite, that is, quote the last entry which
- # we think is a relationship class name:
- #
- # ['Mapped', 'List', 'Address']
- # ['Mapped', 'A']
- #
- # stacks we dont want to rewrite, which are generally MappedColumn
- # use cases:
- #
- # ['Mapped', "'Optional[Dict[str, str]]'"]
- # ['Mapped', 'dict[str, str] | None']
- if (
- # avoid already quoted symbols such as
- # ['Mapped', "'Optional[Dict[str, str]]'"]
- not re.match(r"""^["'].*["']$""", stack[-1])
- # avoid further generics like Dict[] such as
- # ['Mapped', 'dict[str, str] | None'],
- # ['Mapped', 'list[int] | list[str]'],
- # ['Mapped', 'Union[list[int], list[str]]'],
- and not re.search(r"[\[\]]", stack[-1])
- ):
- stripchars = "\"' "
- stack[-1] = ", ".join(
- f'"{elem.strip(stripchars)}"' for elem in stack[-1].split(",")
- )
- annotation = "[".join(stack) + ("]" * (len(stack) - 1))
- return annotation
- def _extract_mapped_subtype(
- raw_annotation: Optional[_AnnotationScanType],
- cls: type,
- originating_module: str,
- key: str,
- attr_cls: Type[Any],
- required: bool,
- is_dataclass_field: bool,
- expect_mapped: bool = True,
- raiseerr: bool = True,
- ) -> Optional[Tuple[Union[_AnnotationScanType, str], Optional[type]]]:
- """given an annotation, figure out if it's ``Mapped[something]`` and if
- so, return the ``something`` part.
- Includes error raise scenarios and other options.
- """
- if raw_annotation is None:
- if required:
- raise orm_exc.MappedAnnotationError(
- f"Python typing annotation is required for attribute "
- f'"{cls.__name__}.{key}" when primary argument(s) for '
- f'"{attr_cls.__name__}" construct are None or not present'
- )
- return None
- try:
- # destringify the "outside" of the annotation. note we are not
- # adding include_generic so it will *not* dig into generic contents,
- # which will remain as ForwardRef or plain str under future annotations
- # mode. The full destringify happens later when mapped_column goes
- # to do a full lookup in the registry type_annotations_map.
- annotated = de_stringify_annotation(
- cls,
- raw_annotation,
- originating_module,
- str_cleanup_fn=_cleanup_mapped_str_annotation,
- )
- except _CleanupError as ce:
- raise orm_exc.MappedAnnotationError(
- f"Could not interpret annotation {raw_annotation}. "
- "Check that it uses names that are correctly imported at the "
- "module level. See chained stack trace for more hints."
- ) from ce
- except NameError as ne:
- if raiseerr and "Mapped[" in raw_annotation: # type: ignore
- raise orm_exc.MappedAnnotationError(
- f"Could not interpret annotation {raw_annotation}. "
- "Check that it uses names that are correctly imported at the "
- "module level. See chained stack trace for more hints."
- ) from ne
- annotated = raw_annotation # type: ignore
- if is_dataclass_field:
- return annotated, None
- else:
- if not hasattr(annotated, "__origin__") or not is_origin_of_cls(
- annotated, _MappedAnnotationBase
- ):
- if expect_mapped:
- if not raiseerr:
- return None
- origin = getattr(annotated, "__origin__", None)
- if origin is typing.ClassVar:
- return None
- # check for other kind of ORM descriptor like AssociationProxy,
- # don't raise for that (issue #9957)
- elif isinstance(origin, type) and issubclass(
- origin, ORMDescriptor
- ):
- return None
- raise orm_exc.MappedAnnotationError(
- f'Type annotation for "{cls.__name__}.{key}" '
- "can't be correctly interpreted for "
- "Annotated Declarative Table form. ORM annotations "
- "should normally make use of the ``Mapped[]`` generic "
- "type, or other ORM-compatible generic type, as a "
- "container for the actual type, which indicates the "
- "intent that the attribute is mapped. "
- "Class variables that are not intended to be mapped "
- "by the ORM should use ClassVar[]. "
- "To allow Annotated Declarative to disregard legacy "
- "annotations which don't use Mapped[] to pass, set "
- '"__allow_unmapped__ = True" on the class or a '
- "superclass this class.",
- code="zlpr",
- )
- else:
- return annotated, None
- if len(annotated.__args__) != 1:
- raise orm_exc.MappedAnnotationError(
- "Expected sub-type for Mapped[] annotation"
- )
- return (
- # fix dict/list/set args to be ForwardRef, see #11814
- fixup_container_fwd_refs(annotated.__args__[0]),
- annotated.__origin__,
- )
- def _mapper_property_as_plain_name(prop: Type[Any]) -> str:
- if hasattr(prop, "_mapper_property_name"):
- name = prop._mapper_property_name()
- else:
- name = None
- return util.clsname_as_plain_name(prop, name)
|