| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508 |
- # orm/relationships.py
- # Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
- # <see AUTHORS file>
- #
- # This module is part of SQLAlchemy and is released under
- # the MIT License: https://www.opensource.org/licenses/mit-license.php
- """Heuristics related to join conditions as used in
- :func:`_orm.relationship`.
- Provides the :class:`.JoinCondition` object, which encapsulates
- SQL annotation and aliasing behavior focused on the `primaryjoin`
- and `secondaryjoin` aspects of :func:`_orm.relationship`.
- """
- from __future__ import annotations
- import collections
- from collections import abc
- import dataclasses
- import inspect as _py_inspect
- import itertools
- import re
- import typing
- from typing import Any
- from typing import Callable
- from typing import cast
- from typing import Collection
- from typing import Dict
- from typing import FrozenSet
- from typing import Generic
- from typing import Iterable
- from typing import Iterator
- from typing import List
- from typing import NamedTuple
- from typing import NoReturn
- from typing import Optional
- from typing import Sequence
- from typing import Set
- from typing import Tuple
- from typing import Type
- from typing import TypeVar
- from typing import Union
- import weakref
- from . import attributes
- from . import strategy_options
- from ._typing import insp_is_aliased_class
- from ._typing import is_has_collection_adapter
- from .base import _DeclarativeMapped
- from .base import _is_mapped_class
- from .base import class_mapper
- from .base import DynamicMapped
- from .base import LoaderCallableStatus
- from .base import PassiveFlag
- from .base import state_str
- from .base import WriteOnlyMapped
- from .interfaces import _AttributeOptions
- from .interfaces import _IntrospectsAnnotations
- from .interfaces import MANYTOMANY
- from .interfaces import MANYTOONE
- from .interfaces import ONETOMANY
- from .interfaces import PropComparator
- from .interfaces import RelationshipDirection
- from .interfaces import StrategizedProperty
- from .util import _orm_annotate
- from .util import _orm_deannotate
- from .util import CascadeOptions
- from .. import exc as sa_exc
- from .. import Exists
- from .. import log
- from .. import schema
- from .. import sql
- from .. import util
- from ..inspection import inspect
- from ..sql import coercions
- from ..sql import expression
- from ..sql import operators
- from ..sql import roles
- from ..sql import visitors
- from ..sql._typing import _ColumnExpressionArgument
- from ..sql._typing import _HasClauseElement
- from ..sql.annotation import _safe_annotate
- from ..sql.elements import ColumnClause
- from ..sql.elements import ColumnElement
- from ..sql.util import _deep_annotate
- from ..sql.util import _deep_deannotate
- from ..sql.util import _shallow_annotate
- from ..sql.util import adapt_criterion_to_null
- from ..sql.util import ClauseAdapter
- from ..sql.util import join_condition
- from ..sql.util import selectables_overlap
- from ..sql.util import visit_binary_product
- from ..util.typing import de_optionalize_union_types
- from ..util.typing import Literal
- from ..util.typing import resolve_name_to_real_class_name
- if typing.TYPE_CHECKING:
- from ._typing import _EntityType
- from ._typing import _ExternalEntityType
- from ._typing import _IdentityKeyType
- from ._typing import _InstanceDict
- from ._typing import _InternalEntityType
- from ._typing import _O
- from ._typing import _RegistryType
- from .base import Mapped
- from .clsregistry import _class_resolver
- from .clsregistry import _ModNS
- from .decl_base import _ClassScanMapperConfig
- from .dependency import DependencyProcessor
- from .mapper import Mapper
- from .query import Query
- from .session import Session
- from .state import InstanceState
- from .strategies import LazyLoader
- from .util import AliasedClass
- from .util import AliasedInsp
- from ..sql._typing import _CoreAdapterProto
- from ..sql._typing import _EquivalentColumnMap
- from ..sql._typing import _InfoType
- from ..sql.annotation import _AnnotationDict
- from ..sql.annotation import SupportsAnnotations
- from ..sql.elements import BinaryExpression
- from ..sql.elements import BindParameter
- from ..sql.elements import ClauseElement
- from ..sql.schema import Table
- from ..sql.selectable import FromClause
- from ..util.typing import _AnnotationScanType
- from ..util.typing import RODescriptorReference
- _T = TypeVar("_T", bound=Any)
- _T1 = TypeVar("_T1", bound=Any)
- _T2 = TypeVar("_T2", bound=Any)
- _PT = TypeVar("_PT", bound=Any)
- _PT2 = TypeVar("_PT2", bound=Any)
- _RelationshipArgumentType = Union[
- str,
- Type[_T],
- Callable[[], Type[_T]],
- "Mapper[_T]",
- "AliasedClass[_T]",
- Callable[[], "Mapper[_T]"],
- Callable[[], "AliasedClass[_T]"],
- ]
- _LazyLoadArgumentType = Literal[
- "select",
- "joined",
- "selectin",
- "subquery",
- "raise",
- "raise_on_sql",
- "noload",
- "immediate",
- "write_only",
- "dynamic",
- True,
- False,
- None,
- ]
- _RelationshipJoinConditionArgument = Union[
- str, _ColumnExpressionArgument[bool]
- ]
- _RelationshipSecondaryArgument = Union[
- "FromClause", str, Callable[[], "FromClause"]
- ]
- _ORMOrderByArgument = Union[
- Literal[False],
- str,
- _ColumnExpressionArgument[Any],
- Callable[[], _ColumnExpressionArgument[Any]],
- Callable[[], Iterable[_ColumnExpressionArgument[Any]]],
- Iterable[Union[str, _ColumnExpressionArgument[Any]]],
- ]
- ORMBackrefArgument = Union[str, Tuple[str, Dict[str, Any]]]
- _ORMColCollectionElement = Union[
- ColumnClause[Any],
- _HasClauseElement[Any],
- roles.DMLColumnRole,
- "Mapped[Any]",
- ]
- _ORMColCollectionArgument = Union[
- str,
- Sequence[_ORMColCollectionElement],
- Callable[[], Sequence[_ORMColCollectionElement]],
- Callable[[], _ORMColCollectionElement],
- _ORMColCollectionElement,
- ]
- _CEA = TypeVar("_CEA", bound=_ColumnExpressionArgument[Any])
- _CE = TypeVar("_CE", bound="ColumnElement[Any]")
- _ColumnPairIterable = Iterable[Tuple[ColumnElement[Any], ColumnElement[Any]]]
- _ColumnPairs = Sequence[Tuple[ColumnElement[Any], ColumnElement[Any]]]
- _MutableColumnPairs = List[Tuple[ColumnElement[Any], ColumnElement[Any]]]
- def remote(expr: _CEA) -> _CEA:
- """Annotate a portion of a primaryjoin expression
- with a 'remote' annotation.
- See the section :ref:`relationship_custom_foreign` for a
- description of use.
- .. seealso::
- :ref:`relationship_custom_foreign`
- :func:`.foreign`
- """
- return _annotate_columns( # type: ignore
- coercions.expect(roles.ColumnArgumentRole, expr), {"remote": True}
- )
- def foreign(expr: _CEA) -> _CEA:
- """Annotate a portion of a primaryjoin expression
- with a 'foreign' annotation.
- See the section :ref:`relationship_custom_foreign` for a
- description of use.
- .. seealso::
- :ref:`relationship_custom_foreign`
- :func:`.remote`
- """
- return _annotate_columns( # type: ignore
- coercions.expect(roles.ColumnArgumentRole, expr), {"foreign": True}
- )
- @dataclasses.dataclass
- class _RelationshipArg(Generic[_T1, _T2]):
- """stores a user-defined parameter value that must be resolved and
- parsed later at mapper configuration time.
- """
- __slots__ = "name", "argument", "resolved"
- name: str
- argument: _T1
- resolved: Optional[_T2]
- def _is_populated(self) -> bool:
- return self.argument is not None
- def _resolve_against_registry(
- self, clsregistry_resolver: Callable[[str, bool], _class_resolver]
- ) -> None:
- attr_value = self.argument
- if isinstance(attr_value, str):
- self.resolved = clsregistry_resolver(
- attr_value, self.name == "secondary"
- )()
- elif callable(attr_value) and not _is_mapped_class(attr_value):
- self.resolved = attr_value()
- else:
- self.resolved = attr_value
- _RelationshipOrderByArg = Union[Literal[False], Tuple[ColumnElement[Any], ...]]
- class _RelationshipArgs(NamedTuple):
- """stores user-passed parameters that are resolved at mapper configuration
- time.
- """
- secondary: _RelationshipArg[
- Optional[_RelationshipSecondaryArgument],
- Optional[FromClause],
- ]
- primaryjoin: _RelationshipArg[
- Optional[_RelationshipJoinConditionArgument],
- Optional[ColumnElement[Any]],
- ]
- secondaryjoin: _RelationshipArg[
- Optional[_RelationshipJoinConditionArgument],
- Optional[ColumnElement[Any]],
- ]
- order_by: _RelationshipArg[_ORMOrderByArgument, _RelationshipOrderByArg]
- foreign_keys: _RelationshipArg[
- Optional[_ORMColCollectionArgument], Set[ColumnElement[Any]]
- ]
- remote_side: _RelationshipArg[
- Optional[_ORMColCollectionArgument], Set[ColumnElement[Any]]
- ]
- @log.class_logger
- class RelationshipProperty(
- _IntrospectsAnnotations, StrategizedProperty[_T], log.Identified
- ):
- """Describes an object property that holds a single item or list
- of items that correspond to a related database table.
- Public constructor is the :func:`_orm.relationship` function.
- .. seealso::
- :ref:`relationship_config_toplevel`
- """
- strategy_wildcard_key = strategy_options._RELATIONSHIP_TOKEN
- inherit_cache = True
- """:meta private:"""
- _links_to_entity = True
- _is_relationship = True
- _overlaps: Sequence[str]
- _lazy_strategy: LazyLoader
- _persistence_only = dict(
- passive_deletes=False,
- passive_updates=True,
- enable_typechecks=True,
- active_history=False,
- cascade_backrefs=False,
- )
- _dependency_processor: Optional[DependencyProcessor] = None
- primaryjoin: ColumnElement[bool]
- secondaryjoin: Optional[ColumnElement[bool]]
- secondary: Optional[FromClause]
- _join_condition: JoinCondition
- order_by: _RelationshipOrderByArg
- _user_defined_foreign_keys: Set[ColumnElement[Any]]
- _calculated_foreign_keys: Set[ColumnElement[Any]]
- remote_side: Set[ColumnElement[Any]]
- local_columns: Set[ColumnElement[Any]]
- synchronize_pairs: _ColumnPairs
- secondary_synchronize_pairs: Optional[_ColumnPairs]
- local_remote_pairs: Optional[_ColumnPairs]
- direction: RelationshipDirection
- _init_args: _RelationshipArgs
- def __init__(
- self,
- argument: Optional[_RelationshipArgumentType[_T]] = None,
- secondary: Optional[_RelationshipSecondaryArgument] = None,
- *,
- uselist: Optional[bool] = None,
- collection_class: Optional[
- Union[Type[Collection[Any]], Callable[[], Collection[Any]]]
- ] = None,
- primaryjoin: Optional[_RelationshipJoinConditionArgument] = None,
- secondaryjoin: Optional[_RelationshipJoinConditionArgument] = None,
- back_populates: Optional[str] = None,
- order_by: _ORMOrderByArgument = False,
- backref: Optional[ORMBackrefArgument] = None,
- overlaps: Optional[str] = None,
- post_update: bool = False,
- cascade: str = "save-update, merge",
- viewonly: bool = False,
- attribute_options: Optional[_AttributeOptions] = None,
- lazy: _LazyLoadArgumentType = "select",
- passive_deletes: Union[Literal["all"], bool] = False,
- passive_updates: bool = True,
- active_history: bool = False,
- enable_typechecks: bool = True,
- foreign_keys: Optional[_ORMColCollectionArgument] = None,
- remote_side: Optional[_ORMColCollectionArgument] = None,
- join_depth: Optional[int] = None,
- comparator_factory: Optional[
- Type[RelationshipProperty.Comparator[Any]]
- ] = None,
- single_parent: bool = False,
- innerjoin: bool = False,
- distinct_target_key: Optional[bool] = None,
- load_on_pending: bool = False,
- query_class: Optional[Type[Query[Any]]] = None,
- info: Optional[_InfoType] = None,
- omit_join: Literal[None, False] = None,
- sync_backref: Optional[bool] = None,
- doc: Optional[str] = None,
- bake_queries: Literal[True] = True,
- cascade_backrefs: Literal[False] = False,
- _local_remote_pairs: Optional[_ColumnPairs] = None,
- _legacy_inactive_history_style: bool = False,
- ):
- super().__init__(attribute_options=attribute_options)
- self.uselist = uselist
- self.argument = argument
- self._init_args = _RelationshipArgs(
- _RelationshipArg("secondary", secondary, None),
- _RelationshipArg("primaryjoin", primaryjoin, None),
- _RelationshipArg("secondaryjoin", secondaryjoin, None),
- _RelationshipArg("order_by", order_by, None),
- _RelationshipArg("foreign_keys", foreign_keys, None),
- _RelationshipArg("remote_side", remote_side, None),
- )
- self.post_update = post_update
- self.viewonly = viewonly
- if viewonly:
- self._warn_for_persistence_only_flags(
- passive_deletes=passive_deletes,
- passive_updates=passive_updates,
- enable_typechecks=enable_typechecks,
- active_history=active_history,
- cascade_backrefs=cascade_backrefs,
- )
- if viewonly and sync_backref:
- raise sa_exc.ArgumentError(
- "sync_backref and viewonly cannot both be True"
- )
- self.sync_backref = sync_backref
- self.lazy = lazy
- self.single_parent = single_parent
- self.collection_class = collection_class
- self.passive_deletes = passive_deletes
- if cascade_backrefs:
- raise sa_exc.ArgumentError(
- "The 'cascade_backrefs' parameter passed to "
- "relationship() may only be set to False."
- )
- self.passive_updates = passive_updates
- self.enable_typechecks = enable_typechecks
- self.query_class = query_class
- self.innerjoin = innerjoin
- self.distinct_target_key = distinct_target_key
- self.doc = doc
- self.active_history = active_history
- self._legacy_inactive_history_style = _legacy_inactive_history_style
- self.join_depth = join_depth
- if omit_join:
- util.warn(
- "setting omit_join to True is not supported; selectin "
- "loading of this relationship may not work correctly if this "
- "flag is set explicitly. omit_join optimization is "
- "automatically detected for conditions under which it is "
- "supported."
- )
- self.omit_join = omit_join
- self.local_remote_pairs = _local_remote_pairs
- self.load_on_pending = load_on_pending
- self.comparator_factory = (
- comparator_factory or RelationshipProperty.Comparator
- )
- util.set_creation_order(self)
- if info is not None:
- self.info.update(info)
- self.strategy_key = (("lazy", self.lazy),)
- self._reverse_property: Set[RelationshipProperty[Any]] = set()
- if overlaps:
- self._overlaps = set(re.split(r"\s*,\s*", overlaps)) # type: ignore # noqa: E501
- else:
- self._overlaps = ()
- self.cascade = cascade
- self.back_populates = back_populates
- if self.back_populates:
- if backref:
- raise sa_exc.ArgumentError(
- "backref and back_populates keyword arguments "
- "are mutually exclusive"
- )
- self.backref = None
- else:
- self.backref = backref
- def _warn_for_persistence_only_flags(self, **kw: Any) -> None:
- for k, v in kw.items():
- if v != self._persistence_only[k]:
- # we are warning here rather than warn deprecated as this is a
- # configuration mistake, and Python shows regular warnings more
- # aggressively than deprecation warnings by default. Unlike the
- # case of setting viewonly with cascade, the settings being
- # warned about here are not actively doing the wrong thing
- # against viewonly=True, so it is not as urgent to have these
- # raise an error.
- util.warn(
- "Setting %s on relationship() while also "
- "setting viewonly=True does not make sense, as a "
- "viewonly=True relationship does not perform persistence "
- "operations. This configuration may raise an error "
- "in a future release." % (k,)
- )
- def instrument_class(self, mapper: Mapper[Any]) -> None:
- attributes.register_descriptor(
- mapper.class_,
- self.key,
- comparator=self.comparator_factory(self, mapper),
- parententity=mapper,
- doc=self.doc,
- )
- class Comparator(util.MemoizedSlots, PropComparator[_PT]):
- """Produce boolean, comparison, and other operators for
- :class:`.RelationshipProperty` attributes.
- See the documentation for :class:`.PropComparator` for a brief
- overview of ORM level operator definition.
- .. seealso::
- :class:`.PropComparator`
- :class:`.ColumnProperty.Comparator`
- :class:`.ColumnOperators`
- :ref:`types_operators`
- :attr:`.TypeEngine.comparator_factory`
- """
- __slots__ = (
- "entity",
- "mapper",
- "property",
- "_of_type",
- "_extra_criteria",
- )
- prop: RODescriptorReference[RelationshipProperty[_PT]]
- _of_type: Optional[_EntityType[_PT]]
- def __init__(
- self,
- prop: RelationshipProperty[_PT],
- parentmapper: _InternalEntityType[Any],
- adapt_to_entity: Optional[AliasedInsp[Any]] = None,
- of_type: Optional[_EntityType[_PT]] = None,
- extra_criteria: Tuple[ColumnElement[bool], ...] = (),
- ):
- """Construction of :class:`.RelationshipProperty.Comparator`
- is internal to the ORM's attribute mechanics.
- """
- self.prop = prop
- self._parententity = parentmapper
- self._adapt_to_entity = adapt_to_entity
- if of_type:
- self._of_type = of_type
- else:
- self._of_type = None
- self._extra_criteria = extra_criteria
- def adapt_to_entity(
- self, adapt_to_entity: AliasedInsp[Any]
- ) -> RelationshipProperty.Comparator[Any]:
- return self.__class__(
- self.prop,
- self._parententity,
- adapt_to_entity=adapt_to_entity,
- of_type=self._of_type,
- )
- entity: _InternalEntityType[_PT]
- """The target entity referred to by this
- :class:`.RelationshipProperty.Comparator`.
- This is either a :class:`_orm.Mapper` or :class:`.AliasedInsp`
- object.
- This is the "target" or "remote" side of the
- :func:`_orm.relationship`.
- """
- mapper: Mapper[_PT]
- """The target :class:`_orm.Mapper` referred to by this
- :class:`.RelationshipProperty.Comparator`.
- This is the "target" or "remote" side of the
- :func:`_orm.relationship`.
- """
- def _memoized_attr_entity(self) -> _InternalEntityType[_PT]:
- if self._of_type:
- return inspect(self._of_type) # type: ignore
- else:
- return self.prop.entity
- def _memoized_attr_mapper(self) -> Mapper[_PT]:
- return self.entity.mapper
- def _source_selectable(self) -> FromClause:
- if self._adapt_to_entity:
- return self._adapt_to_entity.selectable
- else:
- return self.property.parent._with_polymorphic_selectable
- def __clause_element__(self) -> ColumnElement[bool]:
- adapt_from = self._source_selectable()
- if self._of_type:
- of_type_entity = inspect(self._of_type)
- else:
- of_type_entity = None
- (
- pj,
- sj,
- source,
- dest,
- secondary,
- target_adapter,
- ) = self.prop._create_joins(
- source_selectable=adapt_from,
- source_polymorphic=True,
- of_type_entity=of_type_entity,
- alias_secondary=True,
- extra_criteria=self._extra_criteria,
- )
- if sj is not None:
- return pj & sj
- else:
- return pj
- def of_type(self, class_: _EntityType[Any]) -> PropComparator[_PT]:
- r"""Redefine this object in terms of a polymorphic subclass.
- See :meth:`.PropComparator.of_type` for an example.
- """
- return RelationshipProperty.Comparator(
- self.prop,
- self._parententity,
- adapt_to_entity=self._adapt_to_entity,
- of_type=class_,
- extra_criteria=self._extra_criteria,
- )
- def and_(
- self, *criteria: _ColumnExpressionArgument[bool]
- ) -> PropComparator[Any]:
- """Add AND criteria.
- See :meth:`.PropComparator.and_` for an example.
- .. versionadded:: 1.4
- """
- exprs = tuple(
- coercions.expect(roles.WhereHavingRole, clause)
- for clause in util.coerce_generator_arg(criteria)
- )
- return RelationshipProperty.Comparator(
- self.prop,
- self._parententity,
- adapt_to_entity=self._adapt_to_entity,
- of_type=self._of_type,
- extra_criteria=self._extra_criteria + exprs,
- )
- def in_(self, other: Any) -> NoReturn:
- """Produce an IN clause - this is not implemented
- for :func:`_orm.relationship`-based attributes at this time.
- """
- raise NotImplementedError(
- "in_() not yet supported for "
- "relationships. For a simple "
- "many-to-one, use in_() against "
- "the set of foreign key values."
- )
- # https://github.com/python/mypy/issues/4266
- __hash__ = None # type: ignore
- def __eq__(self, other: Any) -> ColumnElement[bool]: # type: ignore[override] # noqa: E501
- """Implement the ``==`` operator.
- In a many-to-one context, such as:
- .. sourcecode:: text
- MyClass.some_prop == <some object>
- this will typically produce a
- clause such as:
- .. sourcecode:: text
- mytable.related_id == <some id>
- Where ``<some id>`` is the primary key of the given
- object.
- The ``==`` operator provides partial functionality for non-
- many-to-one comparisons:
- * Comparisons against collections are not supported.
- Use :meth:`~.Relationship.Comparator.contains`.
- * Compared to a scalar one-to-many, will produce a
- clause that compares the target columns in the parent to
- the given target.
- * Compared to a scalar many-to-many, an alias
- of the association table will be rendered as
- well, forming a natural join that is part of the
- main body of the query. This will not work for
- queries that go beyond simple AND conjunctions of
- comparisons, such as those which use OR. Use
- explicit joins, outerjoins, or
- :meth:`~.Relationship.Comparator.has` for
- more comprehensive non-many-to-one scalar
- membership tests.
- * Comparisons against ``None`` given in a one-to-many
- or many-to-many context produce a NOT EXISTS clause.
- """
- if other is None or isinstance(other, expression.Null):
- if self.property.direction in [ONETOMANY, MANYTOMANY]:
- return ~self._criterion_exists()
- else:
- return _orm_annotate(
- self.property._optimized_compare(
- None, adapt_source=self.adapter
- )
- )
- elif self.property.uselist:
- raise sa_exc.InvalidRequestError(
- "Can't compare a collection to an object or collection; "
- "use contains() to test for membership."
- )
- else:
- return _orm_annotate(
- self.property._optimized_compare(
- other, adapt_source=self.adapter
- )
- )
- def _criterion_exists(
- self,
- criterion: Optional[_ColumnExpressionArgument[bool]] = None,
- **kwargs: Any,
- ) -> Exists:
- where_criteria = (
- coercions.expect(roles.WhereHavingRole, criterion)
- if criterion is not None
- else None
- )
- if getattr(self, "_of_type", None):
- info: Optional[_InternalEntityType[Any]] = inspect(
- self._of_type
- )
- assert info is not None
- target_mapper, to_selectable, is_aliased_class = (
- info.mapper,
- info.selectable,
- info.is_aliased_class,
- )
- if self.property._is_self_referential and not is_aliased_class:
- to_selectable = to_selectable._anonymous_fromclause()
- single_crit = target_mapper._single_table_criterion
- if single_crit is not None:
- if where_criteria is not None:
- where_criteria = single_crit & where_criteria
- else:
- where_criteria = single_crit
- else:
- is_aliased_class = False
- to_selectable = None
- if self.adapter:
- source_selectable = self._source_selectable()
- else:
- source_selectable = None
- (
- pj,
- sj,
- source,
- dest,
- secondary,
- target_adapter,
- ) = self.property._create_joins(
- dest_selectable=to_selectable,
- source_selectable=source_selectable,
- )
- for k in kwargs:
- crit = getattr(self.property.mapper.class_, k) == kwargs[k]
- if where_criteria is None:
- where_criteria = crit
- else:
- where_criteria = where_criteria & crit
- # annotate the *local* side of the join condition, in the case
- # of pj + sj this is the full primaryjoin, in the case of just
- # pj its the local side of the primaryjoin.
- if sj is not None:
- j = _orm_annotate(pj) & sj
- else:
- j = _orm_annotate(pj, exclude=self.property.remote_side)
- if (
- where_criteria is not None
- and target_adapter
- and not is_aliased_class
- ):
- # limit this adapter to annotated only?
- where_criteria = target_adapter.traverse(where_criteria)
- # only have the "joined left side" of what we
- # return be subject to Query adaption. The right
- # side of it is used for an exists() subquery and
- # should not correlate or otherwise reach out
- # to anything in the enclosing query.
- if where_criteria is not None:
- where_criteria = where_criteria._annotate(
- {"no_replacement_traverse": True}
- )
- crit = j & sql.True_._ifnone(where_criteria)
- if secondary is not None:
- ex = (
- sql.exists(1)
- .where(crit)
- .select_from(dest, secondary)
- .correlate_except(dest, secondary)
- )
- else:
- ex = (
- sql.exists(1)
- .where(crit)
- .select_from(dest)
- .correlate_except(dest)
- )
- return ex
- def any(
- self,
- criterion: Optional[_ColumnExpressionArgument[bool]] = None,
- **kwargs: Any,
- ) -> ColumnElement[bool]:
- """Produce an expression that tests a collection against
- particular criterion, using EXISTS.
- An expression like::
- session.query(MyClass).filter(
- MyClass.somereference.any(SomeRelated.x == 2)
- )
- Will produce a query like:
- .. sourcecode:: sql
- SELECT * FROM my_table WHERE
- EXISTS (SELECT 1 FROM related WHERE related.my_id=my_table.id
- AND related.x=2)
- Because :meth:`~.Relationship.Comparator.any` uses
- a correlated subquery, its performance is not nearly as
- good when compared against large target tables as that of
- using a join.
- :meth:`~.Relationship.Comparator.any` is particularly
- useful for testing for empty collections::
- session.query(MyClass).filter(~MyClass.somereference.any())
- will produce:
- .. sourcecode:: sql
- SELECT * FROM my_table WHERE
- NOT (EXISTS (SELECT 1 FROM related WHERE
- related.my_id=my_table.id))
- :meth:`~.Relationship.Comparator.any` is only
- valid for collections, i.e. a :func:`_orm.relationship`
- that has ``uselist=True``. For scalar references,
- use :meth:`~.Relationship.Comparator.has`.
- """
- if not self.property.uselist:
- raise sa_exc.InvalidRequestError(
- "'any()' not implemented for scalar "
- "attributes. Use has()."
- )
- return self._criterion_exists(criterion, **kwargs)
- def has(
- self,
- criterion: Optional[_ColumnExpressionArgument[bool]] = None,
- **kwargs: Any,
- ) -> ColumnElement[bool]:
- """Produce an expression that tests a scalar reference against
- particular criterion, using EXISTS.
- An expression like::
- session.query(MyClass).filter(
- MyClass.somereference.has(SomeRelated.x == 2)
- )
- Will produce a query like:
- .. sourcecode:: sql
- SELECT * FROM my_table WHERE
- EXISTS (SELECT 1 FROM related WHERE
- related.id==my_table.related_id AND related.x=2)
- Because :meth:`~.Relationship.Comparator.has` uses
- a correlated subquery, its performance is not nearly as
- good when compared against large target tables as that of
- using a join.
- :meth:`~.Relationship.Comparator.has` is only
- valid for scalar references, i.e. a :func:`_orm.relationship`
- that has ``uselist=False``. For collection references,
- use :meth:`~.Relationship.Comparator.any`.
- """
- if self.property.uselist:
- raise sa_exc.InvalidRequestError(
- "'has()' not implemented for collections. Use any()."
- )
- return self._criterion_exists(criterion, **kwargs)
- def contains(
- self, other: _ColumnExpressionArgument[Any], **kwargs: Any
- ) -> ColumnElement[bool]:
- """Return a simple expression that tests a collection for
- containment of a particular item.
- :meth:`~.Relationship.Comparator.contains` is
- only valid for a collection, i.e. a
- :func:`_orm.relationship` that implements
- one-to-many or many-to-many with ``uselist=True``.
- When used in a simple one-to-many context, an
- expression like::
- MyClass.contains(other)
- Produces a clause like:
- .. sourcecode:: sql
- mytable.id == <some id>
- Where ``<some id>`` is the value of the foreign key
- attribute on ``other`` which refers to the primary
- key of its parent object. From this it follows that
- :meth:`~.Relationship.Comparator.contains` is
- very useful when used with simple one-to-many
- operations.
- For many-to-many operations, the behavior of
- :meth:`~.Relationship.Comparator.contains`
- has more caveats. The association table will be
- rendered in the statement, producing an "implicit"
- join, that is, includes multiple tables in the FROM
- clause which are equated in the WHERE clause::
- query(MyClass).filter(MyClass.contains(other))
- Produces a query like:
- .. sourcecode:: sql
- SELECT * FROM my_table, my_association_table AS
- my_association_table_1 WHERE
- my_table.id = my_association_table_1.parent_id
- AND my_association_table_1.child_id = <some id>
- Where ``<some id>`` would be the primary key of
- ``other``. From the above, it is clear that
- :meth:`~.Relationship.Comparator.contains`
- will **not** work with many-to-many collections when
- used in queries that move beyond simple AND
- conjunctions, such as multiple
- :meth:`~.Relationship.Comparator.contains`
- expressions joined by OR. In such cases subqueries or
- explicit "outer joins" will need to be used instead.
- See :meth:`~.Relationship.Comparator.any` for
- a less-performant alternative using EXISTS, or refer
- to :meth:`_query.Query.outerjoin`
- as well as :ref:`orm_queryguide_joins`
- for more details on constructing outer joins.
- kwargs may be ignored by this operator but are required for API
- conformance.
- """
- if not self.prop.uselist:
- raise sa_exc.InvalidRequestError(
- "'contains' not implemented for scalar "
- "attributes. Use =="
- )
- clause = self.prop._optimized_compare(
- other, adapt_source=self.adapter
- )
- if self.prop.secondaryjoin is not None:
- clause.negation_clause = self.__negated_contains_or_equals(
- other
- )
- return clause
- def __negated_contains_or_equals(
- self, other: Any
- ) -> ColumnElement[bool]:
- if self.prop.direction == MANYTOONE:
- state = attributes.instance_state(other)
- def state_bindparam(
- local_col: ColumnElement[Any],
- state: InstanceState[Any],
- remote_col: ColumnElement[Any],
- ) -> BindParameter[Any]:
- dict_ = state.dict
- return sql.bindparam(
- local_col.key,
- type_=local_col.type,
- unique=True,
- callable_=self.prop._get_attr_w_warn_on_none(
- self.prop.mapper, state, dict_, remote_col
- ),
- )
- def adapt(col: _CE) -> _CE:
- if self.adapter:
- return self.adapter(col)
- else:
- return col
- if self.property._use_get:
- return sql.and_(
- *[
- sql.or_(
- adapt(x)
- != state_bindparam(adapt(x), state, y),
- adapt(x) == None,
- )
- for (x, y) in self.property.local_remote_pairs
- ]
- )
- criterion = sql.and_(
- *[
- x == y
- for (x, y) in zip(
- self.property.mapper.primary_key,
- self.property.mapper.primary_key_from_instance(other),
- )
- ]
- )
- return ~self._criterion_exists(criterion)
- def __ne__(self, other: Any) -> ColumnElement[bool]: # type: ignore[override] # noqa: E501
- """Implement the ``!=`` operator.
- In a many-to-one context, such as:
- .. sourcecode:: text
- MyClass.some_prop != <some object>
- This will typically produce a clause such as:
- .. sourcecode:: sql
- mytable.related_id != <some id>
- Where ``<some id>`` is the primary key of the
- given object.
- The ``!=`` operator provides partial functionality for non-
- many-to-one comparisons:
- * Comparisons against collections are not supported.
- Use
- :meth:`~.Relationship.Comparator.contains`
- in conjunction with :func:`_expression.not_`.
- * Compared to a scalar one-to-many, will produce a
- clause that compares the target columns in the parent to
- the given target.
- * Compared to a scalar many-to-many, an alias
- of the association table will be rendered as
- well, forming a natural join that is part of the
- main body of the query. This will not work for
- queries that go beyond simple AND conjunctions of
- comparisons, such as those which use OR. Use
- explicit joins, outerjoins, or
- :meth:`~.Relationship.Comparator.has` in
- conjunction with :func:`_expression.not_` for
- more comprehensive non-many-to-one scalar
- membership tests.
- * Comparisons against ``None`` given in a one-to-many
- or many-to-many context produce an EXISTS clause.
- """
- if other is None or isinstance(other, expression.Null):
- if self.property.direction == MANYTOONE:
- return _orm_annotate(
- ~self.property._optimized_compare(
- None, adapt_source=self.adapter
- )
- )
- else:
- return self._criterion_exists()
- elif self.property.uselist:
- raise sa_exc.InvalidRequestError(
- "Can't compare a collection"
- " to an object or collection; use "
- "contains() to test for membership."
- )
- else:
- return _orm_annotate(self.__negated_contains_or_equals(other))
- def _memoized_attr_property(self) -> RelationshipProperty[_PT]:
- self.prop.parent._check_configure()
- return self.prop
- def _with_parent(
- self,
- instance: object,
- alias_secondary: bool = True,
- from_entity: Optional[_EntityType[Any]] = None,
- ) -> ColumnElement[bool]:
- assert instance is not None
- adapt_source: Optional[_CoreAdapterProto] = None
- if from_entity is not None:
- insp: Optional[_InternalEntityType[Any]] = inspect(from_entity)
- assert insp is not None
- if insp_is_aliased_class(insp):
- adapt_source = insp._adapter.adapt_clause
- return self._optimized_compare(
- instance,
- value_is_parent=True,
- adapt_source=adapt_source,
- alias_secondary=alias_secondary,
- )
- def _optimized_compare(
- self,
- state: Any,
- value_is_parent: bool = False,
- adapt_source: Optional[_CoreAdapterProto] = None,
- alias_secondary: bool = True,
- ) -> ColumnElement[bool]:
- if state is not None:
- try:
- state = inspect(state)
- except sa_exc.NoInspectionAvailable:
- state = None
- if state is None or not getattr(state, "is_instance", False):
- raise sa_exc.ArgumentError(
- "Mapped instance expected for relationship "
- "comparison to object. Classes, queries and other "
- "SQL elements are not accepted in this context; for "
- "comparison with a subquery, "
- "use %s.has(**criteria)." % self
- )
- reverse_direction = not value_is_parent
- if state is None:
- return self._lazy_none_clause(
- reverse_direction, adapt_source=adapt_source
- )
- if not reverse_direction:
- criterion, bind_to_col = (
- self._lazy_strategy._lazywhere,
- self._lazy_strategy._bind_to_col,
- )
- else:
- criterion, bind_to_col = (
- self._lazy_strategy._rev_lazywhere,
- self._lazy_strategy._rev_bind_to_col,
- )
- if reverse_direction:
- mapper = self.mapper
- else:
- mapper = self.parent
- dict_ = attributes.instance_dict(state.obj())
- def visit_bindparam(bindparam: BindParameter[Any]) -> None:
- if bindparam._identifying_key in bind_to_col:
- bindparam.callable = self._get_attr_w_warn_on_none(
- mapper,
- state,
- dict_,
- bind_to_col[bindparam._identifying_key],
- )
- if self.secondary is not None and alias_secondary:
- criterion = ClauseAdapter(
- self.secondary._anonymous_fromclause()
- ).traverse(criterion)
- criterion = visitors.cloned_traverse(
- criterion, {}, {"bindparam": visit_bindparam}
- )
- if adapt_source:
- criterion = adapt_source(criterion)
- return criterion
- def _get_attr_w_warn_on_none(
- self,
- mapper: Mapper[Any],
- state: InstanceState[Any],
- dict_: _InstanceDict,
- column: ColumnElement[Any],
- ) -> Callable[[], Any]:
- """Create the callable that is used in a many-to-one expression.
- E.g.::
- u1 = s.query(User).get(5)
- expr = Address.user == u1
- Above, the SQL should be "address.user_id = 5". The callable
- returned by this method produces the value "5" based on the identity
- of ``u1``.
- """
- # in this callable, we're trying to thread the needle through
- # a wide variety of scenarios, including:
- #
- # * the object hasn't been flushed yet and there's no value for
- # the attribute as of yet
- #
- # * the object hasn't been flushed yet but it has a user-defined
- # value
- #
- # * the object has a value but it's expired and not locally present
- #
- # * the object has a value but it's expired and not locally present,
- # and the object is also detached
- #
- # * The object hadn't been flushed yet, there was no value, but
- # later, the object has been expired and detached, and *now*
- # they're trying to evaluate it
- #
- # * the object had a value, but it was changed to a new value, and
- # then expired
- #
- # * the object had a value, but it was changed to a new value, and
- # then expired, then the object was detached
- #
- # * the object has a user-set value, but it's None and we don't do
- # the comparison correctly for that so warn
- #
- prop = mapper.get_property_by_column(column)
- # by invoking this method, InstanceState will track the last known
- # value for this key each time the attribute is to be expired.
- # this feature was added explicitly for use in this method.
- state._track_last_known_value(prop.key)
- lkv_fixed = state._last_known_values
- def _go() -> Any:
- assert lkv_fixed is not None
- last_known = to_return = lkv_fixed[prop.key]
- existing_is_available = (
- last_known is not LoaderCallableStatus.NO_VALUE
- )
- # we support that the value may have changed. so here we
- # try to get the most recent value including re-fetching.
- # only if we can't get a value now due to detachment do we return
- # the last known value
- current_value = mapper._get_state_attr_by_column(
- state,
- dict_,
- column,
- passive=(
- PassiveFlag.PASSIVE_OFF
- if state.persistent
- else PassiveFlag.PASSIVE_NO_FETCH ^ PassiveFlag.INIT_OK
- ),
- )
- if current_value is LoaderCallableStatus.NEVER_SET:
- if not existing_is_available:
- raise sa_exc.InvalidRequestError(
- "Can't resolve value for column %s on object "
- "%s; no value has been set for this column"
- % (column, state_str(state))
- )
- elif current_value is LoaderCallableStatus.PASSIVE_NO_RESULT:
- if not existing_is_available:
- raise sa_exc.InvalidRequestError(
- "Can't resolve value for column %s on object "
- "%s; the object is detached and the value was "
- "expired" % (column, state_str(state))
- )
- else:
- to_return = current_value
- if to_return is None:
- util.warn(
- "Got None for value of column %s; this is unsupported "
- "for a relationship comparison and will not "
- "currently produce an IS comparison "
- "(but may in a future release)" % column
- )
- return to_return
- return _go
- def _lazy_none_clause(
- self,
- reverse_direction: bool = False,
- adapt_source: Optional[_CoreAdapterProto] = None,
- ) -> ColumnElement[bool]:
- if not reverse_direction:
- criterion, bind_to_col = (
- self._lazy_strategy._lazywhere,
- self._lazy_strategy._bind_to_col,
- )
- else:
- criterion, bind_to_col = (
- self._lazy_strategy._rev_lazywhere,
- self._lazy_strategy._rev_bind_to_col,
- )
- criterion = adapt_criterion_to_null(criterion, bind_to_col)
- if adapt_source:
- criterion = adapt_source(criterion)
- return criterion
- def __str__(self) -> str:
- return str(self.parent.class_.__name__) + "." + self.key
- def merge(
- self,
- session: Session,
- source_state: InstanceState[Any],
- source_dict: _InstanceDict,
- dest_state: InstanceState[Any],
- dest_dict: _InstanceDict,
- load: bool,
- _recursive: Dict[Any, object],
- _resolve_conflict_map: Dict[_IdentityKeyType[Any], object],
- ) -> None:
- if load:
- for r in self._reverse_property:
- if (source_state, r) in _recursive:
- return
- if "merge" not in self._cascade:
- return
- if self.key not in source_dict:
- return
- if self.uselist:
- impl = source_state.get_impl(self.key)
- assert is_has_collection_adapter(impl)
- instances_iterable = impl.get_collection(source_state, source_dict)
- # if this is a CollectionAttributeImpl, then empty should
- # be False, otherwise "self.key in source_dict" should not be
- # True
- assert not instances_iterable.empty if impl.collection else True
- if load:
- # for a full merge, pre-load the destination collection,
- # so that individual _merge of each item pulls from identity
- # map for those already present.
- # also assumes CollectionAttributeImpl behavior of loading
- # "old" list in any case
- dest_state.get_impl(self.key).get(
- dest_state, dest_dict, passive=PassiveFlag.PASSIVE_MERGE
- )
- dest_list = []
- for current in instances_iterable:
- current_state = attributes.instance_state(current)
- current_dict = attributes.instance_dict(current)
- _recursive[(current_state, self)] = True
- obj = session._merge(
- current_state,
- current_dict,
- load=load,
- _recursive=_recursive,
- _resolve_conflict_map=_resolve_conflict_map,
- )
- if obj is not None:
- dest_list.append(obj)
- if not load:
- coll = attributes.init_state_collection(
- dest_state, dest_dict, self.key
- )
- for c in dest_list:
- coll.append_without_event(c)
- else:
- dest_impl = dest_state.get_impl(self.key)
- assert is_has_collection_adapter(dest_impl)
- dest_impl.set(
- dest_state,
- dest_dict,
- dest_list,
- _adapt=False,
- passive=PassiveFlag.PASSIVE_MERGE,
- )
- else:
- current = source_dict[self.key]
- if current is not None:
- current_state = attributes.instance_state(current)
- current_dict = attributes.instance_dict(current)
- _recursive[(current_state, self)] = True
- obj = session._merge(
- current_state,
- current_dict,
- load=load,
- _recursive=_recursive,
- _resolve_conflict_map=_resolve_conflict_map,
- )
- else:
- obj = None
- if not load:
- dest_dict[self.key] = obj
- else:
- dest_state.get_impl(self.key).set(
- dest_state, dest_dict, obj, None
- )
- def _value_as_iterable(
- self,
- state: InstanceState[_O],
- dict_: _InstanceDict,
- key: str,
- passive: PassiveFlag = PassiveFlag.PASSIVE_OFF,
- ) -> Sequence[Tuple[InstanceState[_O], _O]]:
- """Return a list of tuples (state, obj) for the given
- key.
- returns an empty list if the value is None/empty/PASSIVE_NO_RESULT
- """
- impl = state.manager[key].impl
- x = impl.get(state, dict_, passive=passive)
- if x is LoaderCallableStatus.PASSIVE_NO_RESULT or x is None:
- return []
- elif is_has_collection_adapter(impl):
- return [
- (attributes.instance_state(o), o)
- for o in impl.get_collection(state, dict_, x, passive=passive)
- ]
- else:
- return [(attributes.instance_state(x), x)]
- def cascade_iterator(
- self,
- type_: str,
- state: InstanceState[Any],
- dict_: _InstanceDict,
- visited_states: Set[InstanceState[Any]],
- halt_on: Optional[Callable[[InstanceState[Any]], bool]] = None,
- ) -> Iterator[Tuple[Any, Mapper[Any], InstanceState[Any], _InstanceDict]]:
- # assert type_ in self._cascade
- # only actively lazy load on the 'delete' cascade
- if type_ != "delete" or self.passive_deletes:
- passive = PassiveFlag.PASSIVE_NO_INITIALIZE
- else:
- passive = PassiveFlag.PASSIVE_OFF | PassiveFlag.NO_RAISE
- if type_ == "save-update":
- tuples = state.manager[self.key].impl.get_all_pending(state, dict_)
- else:
- tuples = self._value_as_iterable(
- state, dict_, self.key, passive=passive
- )
- skip_pending = (
- type_ == "refresh-expire" and "delete-orphan" not in self._cascade
- )
- for instance_state, c in tuples:
- if instance_state in visited_states:
- continue
- if c is None:
- # would like to emit a warning here, but
- # would not be consistent with collection.append(None)
- # current behavior of silently skipping.
- # see [ticket:2229]
- continue
- assert instance_state is not None
- instance_dict = attributes.instance_dict(c)
- if halt_on and halt_on(instance_state):
- continue
- if skip_pending and not instance_state.key:
- continue
- instance_mapper = instance_state.manager.mapper
- if not instance_mapper.isa(self.mapper.class_manager.mapper):
- raise AssertionError(
- "Attribute '%s' on class '%s' "
- "doesn't handle objects "
- "of type '%s'"
- % (self.key, self.parent.class_, c.__class__)
- )
- visited_states.add(instance_state)
- yield c, instance_mapper, instance_state, instance_dict
- @property
- def _effective_sync_backref(self) -> bool:
- if self.viewonly:
- return False
- else:
- return self.sync_backref is not False
- @staticmethod
- def _check_sync_backref(
- rel_a: RelationshipProperty[Any], rel_b: RelationshipProperty[Any]
- ) -> None:
- if rel_a.viewonly and rel_b.sync_backref:
- raise sa_exc.InvalidRequestError(
- "Relationship %s cannot specify sync_backref=True since %s "
- "includes viewonly=True." % (rel_b, rel_a)
- )
- if (
- rel_a.viewonly
- and not rel_b.viewonly
- and rel_b.sync_backref is not False
- ):
- rel_b.sync_backref = False
- def _add_reverse_property(self, key: str) -> None:
- other = self.mapper.get_property(key, _configure_mappers=False)
- if not isinstance(other, RelationshipProperty):
- raise sa_exc.InvalidRequestError(
- "back_populates on relationship '%s' refers to attribute '%s' "
- "that is not a relationship. The back_populates parameter "
- "should refer to the name of a relationship on the target "
- "class." % (self, other)
- )
- # viewonly and sync_backref cases
- # 1. self.viewonly==True and other.sync_backref==True -> error
- # 2. self.viewonly==True and other.viewonly==False and
- # other.sync_backref==None -> warn sync_backref=False, set to False
- self._check_sync_backref(self, other)
- # 3. other.viewonly==True and self.sync_backref==True -> error
- # 4. other.viewonly==True and self.viewonly==False and
- # self.sync_backref==None -> warn sync_backref=False, set to False
- self._check_sync_backref(other, self)
- self._reverse_property.add(other)
- other._reverse_property.add(self)
- other._setup_entity()
- if not other.mapper.common_parent(self.parent):
- raise sa_exc.ArgumentError(
- "reverse_property %r on "
- "relationship %s references relationship %s, which "
- "does not reference mapper %s"
- % (key, self, other, self.parent)
- )
- if (
- other._configure_started
- and self.direction in (ONETOMANY, MANYTOONE)
- and self.direction == other.direction
- ):
- raise sa_exc.ArgumentError(
- "%s and back-reference %s are "
- "both of the same direction %r. Did you mean to "
- "set remote_side on the many-to-one side ?"
- % (other, self, self.direction)
- )
- @util.memoized_property
- def entity(self) -> _InternalEntityType[_T]:
- """Return the target mapped entity, which is an inspect() of the
- class or aliased class that is referenced by this
- :class:`.RelationshipProperty`.
- """
- self.parent._check_configure()
- return self.entity
- @util.memoized_property
- def mapper(self) -> Mapper[_T]:
- """Return the targeted :class:`_orm.Mapper` for this
- :class:`.RelationshipProperty`.
- """
- return self.entity.mapper
- def do_init(self) -> None:
- self._check_conflicts()
- self._process_dependent_arguments()
- self._setup_entity()
- self._setup_registry_dependencies()
- self._setup_join_conditions()
- self._check_cascade_settings(self._cascade)
- self._post_init()
- self._generate_backref()
- self._join_condition._warn_for_conflicting_sync_targets()
- super().do_init()
- self._lazy_strategy = cast(
- "LazyLoader", self._get_strategy((("lazy", "select"),))
- )
- def _setup_registry_dependencies(self) -> None:
- self.parent.mapper.registry._set_depends_on(
- self.entity.mapper.registry
- )
- def _process_dependent_arguments(self) -> None:
- """Convert incoming configuration arguments to their
- proper form.
- Callables are resolved, ORM annotations removed.
- """
- # accept callables for other attributes which may require
- # deferred initialization. This technique is used
- # by declarative "string configs" and some recipes.
- init_args = self._init_args
- for attr in (
- "order_by",
- "primaryjoin",
- "secondaryjoin",
- "secondary",
- "foreign_keys",
- "remote_side",
- ):
- rel_arg = getattr(init_args, attr)
- rel_arg._resolve_against_registry(self._clsregistry_resolvers[1])
- # remove "annotations" which are present if mapped class
- # descriptors are used to create the join expression.
- for attr in "primaryjoin", "secondaryjoin":
- rel_arg = getattr(init_args, attr)
- val = rel_arg.resolved
- if val is not None:
- rel_arg.resolved = _orm_deannotate(
- coercions.expect(
- roles.ColumnArgumentRole, val, argname=attr
- )
- )
- secondary = init_args.secondary.resolved
- if secondary is not None and _is_mapped_class(secondary):
- raise sa_exc.ArgumentError(
- "secondary argument %s passed to to relationship() %s must "
- "be a Table object or other FROM clause; can't send a mapped "
- "class directly as rows in 'secondary' are persisted "
- "independently of a class that is mapped "
- "to that same table." % (secondary, self)
- )
- # ensure expressions in self.order_by, foreign_keys,
- # remote_side are all columns, not strings.
- if (
- init_args.order_by.resolved is not False
- and init_args.order_by.resolved is not None
- ):
- self.order_by = tuple(
- coercions.expect(
- roles.ColumnArgumentRole, x, argname="order_by"
- )
- for x in util.to_list(init_args.order_by.resolved)
- )
- else:
- self.order_by = False
- self._user_defined_foreign_keys = util.column_set(
- coercions.expect(
- roles.ColumnArgumentRole, x, argname="foreign_keys"
- )
- for x in util.to_column_set(init_args.foreign_keys.resolved)
- )
- self.remote_side = util.column_set(
- coercions.expect(
- roles.ColumnArgumentRole, x, argname="remote_side"
- )
- for x in util.to_column_set(init_args.remote_side.resolved)
- )
- def declarative_scan(
- self,
- decl_scan: _ClassScanMapperConfig,
- registry: _RegistryType,
- cls: Type[Any],
- originating_module: Optional[str],
- key: str,
- mapped_container: Optional[Type[Mapped[Any]]],
- annotation: Optional[_AnnotationScanType],
- extracted_mapped_annotation: Optional[_AnnotationScanType],
- is_dataclass_field: bool,
- ) -> None:
- if extracted_mapped_annotation is None:
- if self.argument is None:
- self._raise_for_required(key, cls)
- else:
- return
- argument = extracted_mapped_annotation
- assert originating_module is not None
- if mapped_container is not None:
- is_write_only = issubclass(mapped_container, WriteOnlyMapped)
- is_dynamic = issubclass(mapped_container, DynamicMapped)
- if is_write_only:
- self.lazy = "write_only"
- self.strategy_key = (("lazy", self.lazy),)
- elif is_dynamic:
- self.lazy = "dynamic"
- self.strategy_key = (("lazy", self.lazy),)
- else:
- is_write_only = is_dynamic = False
- argument = de_optionalize_union_types(argument)
- if hasattr(argument, "__origin__"):
- arg_origin = argument.__origin__
- if isinstance(arg_origin, type) and issubclass(
- arg_origin, abc.Collection
- ):
- if self.collection_class is None:
- if _py_inspect.isabstract(arg_origin):
- raise sa_exc.ArgumentError(
- f"Collection annotation type {arg_origin} cannot "
- "be instantiated; please provide an explicit "
- "'collection_class' parameter "
- "(e.g. list, set, etc.) to the "
- "relationship() function to accompany this "
- "annotation"
- )
- self.collection_class = arg_origin
- elif not is_write_only and not is_dynamic:
- self.uselist = False
- if argument.__args__: # type: ignore
- if isinstance(arg_origin, type) and issubclass(
- arg_origin, typing.Mapping
- ):
- type_arg = argument.__args__[-1] # type: ignore
- else:
- type_arg = argument.__args__[0] # type: ignore
- if hasattr(type_arg, "__forward_arg__"):
- str_argument = type_arg.__forward_arg__
- argument = resolve_name_to_real_class_name(
- str_argument, originating_module
- )
- else:
- argument = type_arg
- else:
- raise sa_exc.ArgumentError(
- f"Generic alias {argument} requires an argument"
- )
- elif hasattr(argument, "__forward_arg__"):
- argument = argument.__forward_arg__
- argument = resolve_name_to_real_class_name(
- argument, originating_module
- )
- if (
- self.collection_class is None
- and not is_write_only
- and not is_dynamic
- ):
- self.uselist = False
- # ticket #8759
- # if a lead argument was given to relationship(), like
- # `relationship("B")`, use that, don't replace it with class we
- # found in the annotation. The declarative_scan() method call here is
- # still useful, as we continue to derive collection type and do
- # checking of the annotation in any case.
- if self.argument is None:
- self.argument = cast("_RelationshipArgumentType[_T]", argument)
- @util.preload_module("sqlalchemy.orm.mapper")
- def _setup_entity(self, __argument: Any = None) -> None:
- if "entity" in self.__dict__:
- return
- mapperlib = util.preloaded.orm_mapper
- if __argument:
- argument = __argument
- else:
- argument = self.argument
- resolved_argument: _ExternalEntityType[Any]
- if isinstance(argument, str):
- # we might want to cleanup clsregistry API to make this
- # more straightforward
- resolved_argument = cast(
- "_ExternalEntityType[Any]",
- self._clsregistry_resolve_name(argument)(),
- )
- elif callable(argument) and not isinstance(
- argument, (type, mapperlib.Mapper)
- ):
- resolved_argument = argument()
- else:
- resolved_argument = argument
- entity: _InternalEntityType[Any]
- if isinstance(resolved_argument, type):
- entity = class_mapper(resolved_argument, configure=False)
- else:
- try:
- entity = inspect(resolved_argument)
- except sa_exc.NoInspectionAvailable:
- entity = None # type: ignore
- if not hasattr(entity, "mapper"):
- raise sa_exc.ArgumentError(
- "relationship '%s' expects "
- "a class or a mapper argument (received: %s)"
- % (self.key, type(resolved_argument))
- )
- self.entity = entity
- self.target = self.entity.persist_selectable
- def _setup_join_conditions(self) -> None:
- self._join_condition = jc = JoinCondition(
- parent_persist_selectable=self.parent.persist_selectable,
- child_persist_selectable=self.entity.persist_selectable,
- parent_local_selectable=self.parent.local_table,
- child_local_selectable=self.entity.local_table,
- primaryjoin=self._init_args.primaryjoin.resolved,
- secondary=self._init_args.secondary.resolved,
- secondaryjoin=self._init_args.secondaryjoin.resolved,
- parent_equivalents=self.parent._equivalent_columns,
- child_equivalents=self.mapper._equivalent_columns,
- consider_as_foreign_keys=self._user_defined_foreign_keys,
- local_remote_pairs=self.local_remote_pairs,
- remote_side=self.remote_side,
- self_referential=self._is_self_referential,
- prop=self,
- support_sync=not self.viewonly,
- can_be_synced_fn=self._columns_are_mapped,
- )
- self.primaryjoin = jc.primaryjoin
- self.secondaryjoin = jc.secondaryjoin
- self.secondary = jc.secondary
- self.direction = jc.direction
- self.local_remote_pairs = jc.local_remote_pairs
- self.remote_side = jc.remote_columns
- self.local_columns = jc.local_columns
- self.synchronize_pairs = jc.synchronize_pairs
- self._calculated_foreign_keys = jc.foreign_key_columns
- self.secondary_synchronize_pairs = jc.secondary_synchronize_pairs
- @property
- def _clsregistry_resolve_arg(
- self,
- ) -> Callable[[str, bool], _class_resolver]:
- return self._clsregistry_resolvers[1]
- @property
- def _clsregistry_resolve_name(
- self,
- ) -> Callable[[str], Callable[[], Union[Type[Any], Table, _ModNS]]]:
- return self._clsregistry_resolvers[0]
- @util.memoized_property
- @util.preload_module("sqlalchemy.orm.clsregistry")
- def _clsregistry_resolvers(
- self,
- ) -> Tuple[
- Callable[[str], Callable[[], Union[Type[Any], Table, _ModNS]]],
- Callable[[str, bool], _class_resolver],
- ]:
- _resolver = util.preloaded.orm_clsregistry._resolver
- return _resolver(self.parent.class_, self)
- def _check_conflicts(self) -> None:
- """Test that this relationship is legal, warn about
- inheritance conflicts."""
- if self.parent.non_primary and not class_mapper(
- self.parent.class_, configure=False
- ).has_property(self.key):
- raise sa_exc.ArgumentError(
- "Attempting to assign a new "
- "relationship '%s' to a non-primary mapper on "
- "class '%s'. New relationships can only be added "
- "to the primary mapper, i.e. the very first mapper "
- "created for class '%s' "
- % (
- self.key,
- self.parent.class_.__name__,
- self.parent.class_.__name__,
- )
- )
- @property
- def cascade(self) -> CascadeOptions:
- """Return the current cascade setting for this
- :class:`.RelationshipProperty`.
- """
- return self._cascade
- @cascade.setter
- def cascade(self, cascade: Union[str, CascadeOptions]) -> None:
- self._set_cascade(cascade)
- def _set_cascade(self, cascade_arg: Union[str, CascadeOptions]) -> None:
- cascade = CascadeOptions(cascade_arg)
- if self.viewonly:
- cascade = CascadeOptions(
- cascade.intersection(CascadeOptions._viewonly_cascades)
- )
- if "mapper" in self.__dict__:
- self._check_cascade_settings(cascade)
- self._cascade = cascade
- if self._dependency_processor:
- self._dependency_processor.cascade = cascade
- def _check_cascade_settings(self, cascade: CascadeOptions) -> None:
- if (
- cascade.delete_orphan
- and not self.single_parent
- and (self.direction is MANYTOMANY or self.direction is MANYTOONE)
- ):
- raise sa_exc.ArgumentError(
- "For %(direction)s relationship %(rel)s, delete-orphan "
- "cascade is normally "
- 'configured only on the "one" side of a one-to-many '
- "relationship, "
- 'and not on the "many" side of a many-to-one or many-to-many '
- "relationship. "
- "To force this relationship to allow a particular "
- '"%(relatedcls)s" object to be referenced by only '
- 'a single "%(clsname)s" object at a time via the '
- "%(rel)s relationship, which "
- "would allow "
- "delete-orphan cascade to take place in this direction, set "
- "the single_parent=True flag."
- % {
- "rel": self,
- "direction": (
- "many-to-one"
- if self.direction is MANYTOONE
- else "many-to-many"
- ),
- "clsname": self.parent.class_.__name__,
- "relatedcls": self.mapper.class_.__name__,
- },
- code="bbf0",
- )
- if self.passive_deletes == "all" and (
- "delete" in cascade or "delete-orphan" in cascade
- ):
- raise sa_exc.ArgumentError(
- "On %s, can't set passive_deletes='all' in conjunction "
- "with 'delete' or 'delete-orphan' cascade" % self
- )
- if cascade.delete_orphan:
- self.mapper.primary_mapper()._delete_orphans.append(
- (self.key, self.parent.class_)
- )
- def _persists_for(self, mapper: Mapper[Any]) -> bool:
- """Return True if this property will persist values on behalf
- of the given mapper.
- """
- return (
- self.key in mapper.relationships
- and mapper.relationships[self.key] is self
- )
- def _columns_are_mapped(self, *cols: ColumnElement[Any]) -> bool:
- """Return True if all columns in the given collection are
- mapped by the tables referenced by this :class:`.RelationshipProperty`.
- """
- secondary = self._init_args.secondary.resolved
- for c in cols:
- if secondary is not None and secondary.c.contains_column(c):
- continue
- if not self.parent.persist_selectable.c.contains_column(
- c
- ) and not self.target.c.contains_column(c):
- return False
- return True
- def _generate_backref(self) -> None:
- """Interpret the 'backref' instruction to create a
- :func:`_orm.relationship` complementary to this one."""
- if self.parent.non_primary:
- return
- if self.backref is not None and not self.back_populates:
- kwargs: Dict[str, Any]
- if isinstance(self.backref, str):
- backref_key, kwargs = self.backref, {}
- else:
- backref_key, kwargs = self.backref
- mapper = self.mapper.primary_mapper()
- if not mapper.concrete:
- check = set(mapper.iterate_to_root()).union(
- mapper.self_and_descendants
- )
- for m in check:
- if m.has_property(backref_key) and not m.concrete:
- raise sa_exc.ArgumentError(
- "Error creating backref "
- "'%s' on relationship '%s': property of that "
- "name exists on mapper '%s'"
- % (backref_key, self, m)
- )
- # determine primaryjoin/secondaryjoin for the
- # backref. Use the one we had, so that
- # a custom join doesn't have to be specified in
- # both directions.
- if self.secondary is not None:
- # for many to many, just switch primaryjoin/
- # secondaryjoin. use the annotated
- # pj/sj on the _join_condition.
- pj = kwargs.pop(
- "primaryjoin",
- self._join_condition.secondaryjoin_minus_local,
- )
- sj = kwargs.pop(
- "secondaryjoin",
- self._join_condition.primaryjoin_minus_local,
- )
- else:
- pj = kwargs.pop(
- "primaryjoin",
- self._join_condition.primaryjoin_reverse_remote,
- )
- sj = kwargs.pop("secondaryjoin", None)
- if sj:
- raise sa_exc.InvalidRequestError(
- "Can't assign 'secondaryjoin' on a backref "
- "against a non-secondary relationship."
- )
- foreign_keys = kwargs.pop(
- "foreign_keys", self._user_defined_foreign_keys
- )
- parent = self.parent.primary_mapper()
- kwargs.setdefault("viewonly", self.viewonly)
- kwargs.setdefault("post_update", self.post_update)
- kwargs.setdefault("passive_updates", self.passive_updates)
- kwargs.setdefault("sync_backref", self.sync_backref)
- self.back_populates = backref_key
- relationship = RelationshipProperty(
- parent,
- self.secondary,
- primaryjoin=pj,
- secondaryjoin=sj,
- foreign_keys=foreign_keys,
- back_populates=self.key,
- **kwargs,
- )
- mapper._configure_property(
- backref_key, relationship, warn_for_existing=True
- )
- if self.back_populates:
- self._add_reverse_property(self.back_populates)
- @util.preload_module("sqlalchemy.orm.dependency")
- def _post_init(self) -> None:
- dependency = util.preloaded.orm_dependency
- if self.uselist is None:
- self.uselist = self.direction is not MANYTOONE
- if not self.viewonly:
- self._dependency_processor = ( # type: ignore
- dependency.DependencyProcessor.from_relationship
- )(self)
- @util.memoized_property
- def _use_get(self) -> bool:
- """memoize the 'use_get' attribute of this RelationshipLoader's
- lazyloader."""
- strategy = self._lazy_strategy
- return strategy.use_get
- @util.memoized_property
- def _is_self_referential(self) -> bool:
- return self.mapper.common_parent(self.parent)
- def _create_joins(
- self,
- source_polymorphic: bool = False,
- source_selectable: Optional[FromClause] = None,
- dest_selectable: Optional[FromClause] = None,
- of_type_entity: Optional[_InternalEntityType[Any]] = None,
- alias_secondary: bool = False,
- extra_criteria: Tuple[ColumnElement[bool], ...] = (),
- ) -> Tuple[
- ColumnElement[bool],
- Optional[ColumnElement[bool]],
- FromClause,
- FromClause,
- Optional[FromClause],
- Optional[ClauseAdapter],
- ]:
- aliased = False
- if alias_secondary and self.secondary is not None:
- aliased = True
- if source_selectable is None:
- if source_polymorphic and self.parent.with_polymorphic:
- source_selectable = self.parent._with_polymorphic_selectable
- if of_type_entity:
- dest_mapper = of_type_entity.mapper
- if dest_selectable is None:
- dest_selectable = of_type_entity.selectable
- aliased = True
- else:
- dest_mapper = self.mapper
- if dest_selectable is None:
- dest_selectable = self.entity.selectable
- if self.mapper.with_polymorphic:
- aliased = True
- if self._is_self_referential and source_selectable is None:
- dest_selectable = dest_selectable._anonymous_fromclause()
- aliased = True
- elif (
- dest_selectable is not self.mapper._with_polymorphic_selectable
- or self.mapper.with_polymorphic
- ):
- aliased = True
- single_crit = dest_mapper._single_table_criterion
- aliased = aliased or (
- source_selectable is not None
- and (
- source_selectable
- is not self.parent._with_polymorphic_selectable
- or source_selectable._is_subquery
- )
- )
- (
- primaryjoin,
- secondaryjoin,
- secondary,
- target_adapter,
- dest_selectable,
- ) = self._join_condition.join_targets(
- source_selectable,
- dest_selectable,
- aliased,
- single_crit,
- extra_criteria,
- )
- if source_selectable is None:
- source_selectable = self.parent.local_table
- if dest_selectable is None:
- dest_selectable = self.entity.local_table
- return (
- primaryjoin,
- secondaryjoin,
- source_selectable,
- dest_selectable,
- secondary,
- target_adapter,
- )
- def _annotate_columns(element: _CE, annotations: _AnnotationDict) -> _CE:
- def clone(elem: _CE) -> _CE:
- if isinstance(elem, expression.ColumnClause):
- elem = elem._annotate(annotations.copy()) # type: ignore
- elem._copy_internals(clone=clone)
- return elem
- if element is not None:
- element = clone(element)
- clone = None # type: ignore # remove gc cycles
- return element
- class JoinCondition:
- primaryjoin_initial: Optional[ColumnElement[bool]]
- primaryjoin: ColumnElement[bool]
- secondaryjoin: Optional[ColumnElement[bool]]
- secondary: Optional[FromClause]
- prop: RelationshipProperty[Any]
- synchronize_pairs: _ColumnPairs
- secondary_synchronize_pairs: _ColumnPairs
- direction: RelationshipDirection
- parent_persist_selectable: FromClause
- child_persist_selectable: FromClause
- parent_local_selectable: FromClause
- child_local_selectable: FromClause
- _local_remote_pairs: Optional[_ColumnPairs]
- def __init__(
- self,
- parent_persist_selectable: FromClause,
- child_persist_selectable: FromClause,
- parent_local_selectable: FromClause,
- child_local_selectable: FromClause,
- *,
- primaryjoin: Optional[ColumnElement[bool]] = None,
- secondary: Optional[FromClause] = None,
- secondaryjoin: Optional[ColumnElement[bool]] = None,
- parent_equivalents: Optional[_EquivalentColumnMap] = None,
- child_equivalents: Optional[_EquivalentColumnMap] = None,
- consider_as_foreign_keys: Any = None,
- local_remote_pairs: Optional[_ColumnPairs] = None,
- remote_side: Any = None,
- self_referential: Any = False,
- prop: RelationshipProperty[Any],
- support_sync: bool = True,
- can_be_synced_fn: Callable[..., bool] = lambda *c: True,
- ):
- self.parent_persist_selectable = parent_persist_selectable
- self.parent_local_selectable = parent_local_selectable
- self.child_persist_selectable = child_persist_selectable
- self.child_local_selectable = child_local_selectable
- self.parent_equivalents = parent_equivalents
- self.child_equivalents = child_equivalents
- self.primaryjoin_initial = primaryjoin
- self.secondaryjoin = secondaryjoin
- self.secondary = secondary
- self.consider_as_foreign_keys = consider_as_foreign_keys
- self._local_remote_pairs = local_remote_pairs
- self._remote_side = remote_side
- self.prop = prop
- self.self_referential = self_referential
- self.support_sync = support_sync
- self.can_be_synced_fn = can_be_synced_fn
- self._determine_joins()
- assert self.primaryjoin is not None
- self._sanitize_joins()
- self._annotate_fks()
- self._annotate_remote()
- self._annotate_local()
- self._annotate_parentmapper()
- self._setup_pairs()
- self._check_foreign_cols(self.primaryjoin, True)
- if self.secondaryjoin is not None:
- self._check_foreign_cols(self.secondaryjoin, False)
- self._determine_direction()
- self._check_remote_side()
- self._log_joins()
- def _log_joins(self) -> None:
- log = self.prop.logger
- log.info("%s setup primary join %s", self.prop, self.primaryjoin)
- log.info("%s setup secondary join %s", self.prop, self.secondaryjoin)
- log.info(
- "%s synchronize pairs [%s]",
- self.prop,
- ",".join(
- "(%s => %s)" % (l, r) for (l, r) in self.synchronize_pairs
- ),
- )
- log.info(
- "%s secondary synchronize pairs [%s]",
- self.prop,
- ",".join(
- "(%s => %s)" % (l, r)
- for (l, r) in self.secondary_synchronize_pairs or []
- ),
- )
- log.info(
- "%s local/remote pairs [%s]",
- self.prop,
- ",".join(
- "(%s / %s)" % (l, r) for (l, r) in self.local_remote_pairs
- ),
- )
- log.info(
- "%s remote columns [%s]",
- self.prop,
- ",".join("%s" % col for col in self.remote_columns),
- )
- log.info(
- "%s local columns [%s]",
- self.prop,
- ",".join("%s" % col for col in self.local_columns),
- )
- log.info("%s relationship direction %s", self.prop, self.direction)
- def _sanitize_joins(self) -> None:
- """remove the parententity annotation from our join conditions which
- can leak in here based on some declarative patterns and maybe others.
- "parentmapper" is relied upon both by the ORM evaluator as well as
- the use case in _join_fixture_inh_selfref_w_entity
- that relies upon it being present, see :ticket:`3364`.
- """
- self.primaryjoin = _deep_deannotate(
- self.primaryjoin, values=("parententity", "proxy_key")
- )
- if self.secondaryjoin is not None:
- self.secondaryjoin = _deep_deannotate(
- self.secondaryjoin, values=("parententity", "proxy_key")
- )
- def _determine_joins(self) -> None:
- """Determine the 'primaryjoin' and 'secondaryjoin' attributes,
- if not passed to the constructor already.
- This is based on analysis of the foreign key relationships
- between the parent and target mapped selectables.
- """
- if self.secondaryjoin is not None and self.secondary is None:
- raise sa_exc.ArgumentError(
- "Property %s specified with secondary "
- "join condition but "
- "no secondary argument" % self.prop
- )
- # find a join between the given mapper's mapped table and
- # the given table. will try the mapper's local table first
- # for more specificity, then if not found will try the more
- # general mapped table, which in the case of inheritance is
- # a join.
- try:
- consider_as_foreign_keys = self.consider_as_foreign_keys or None
- if self.secondary is not None:
- if self.secondaryjoin is None:
- self.secondaryjoin = join_condition(
- self.child_persist_selectable,
- self.secondary,
- a_subset=self.child_local_selectable,
- consider_as_foreign_keys=consider_as_foreign_keys,
- )
- if self.primaryjoin_initial is None:
- self.primaryjoin = join_condition(
- self.parent_persist_selectable,
- self.secondary,
- a_subset=self.parent_local_selectable,
- consider_as_foreign_keys=consider_as_foreign_keys,
- )
- else:
- self.primaryjoin = self.primaryjoin_initial
- else:
- if self.primaryjoin_initial is None:
- self.primaryjoin = join_condition(
- self.parent_persist_selectable,
- self.child_persist_selectable,
- a_subset=self.parent_local_selectable,
- consider_as_foreign_keys=consider_as_foreign_keys,
- )
- else:
- self.primaryjoin = self.primaryjoin_initial
- except sa_exc.NoForeignKeysError as nfe:
- if self.secondary is not None:
- raise sa_exc.NoForeignKeysError(
- "Could not determine join "
- "condition between parent/child tables on "
- "relationship %s - there are no foreign keys "
- "linking these tables via secondary table '%s'. "
- "Ensure that referencing columns are associated "
- "with a ForeignKey or ForeignKeyConstraint, or "
- "specify 'primaryjoin' and 'secondaryjoin' "
- "expressions." % (self.prop, self.secondary)
- ) from nfe
- else:
- raise sa_exc.NoForeignKeysError(
- "Could not determine join "
- "condition between parent/child tables on "
- "relationship %s - there are no foreign keys "
- "linking these tables. "
- "Ensure that referencing columns are associated "
- "with a ForeignKey or ForeignKeyConstraint, or "
- "specify a 'primaryjoin' expression." % self.prop
- ) from nfe
- except sa_exc.AmbiguousForeignKeysError as afe:
- if self.secondary is not None:
- raise sa_exc.AmbiguousForeignKeysError(
- "Could not determine join "
- "condition between parent/child tables on "
- "relationship %s - there are multiple foreign key "
- "paths linking the tables via secondary table '%s'. "
- "Specify the 'foreign_keys' "
- "argument, providing a list of those columns which "
- "should be counted as containing a foreign key "
- "reference from the secondary table to each of the "
- "parent and child tables." % (self.prop, self.secondary)
- ) from afe
- else:
- raise sa_exc.AmbiguousForeignKeysError(
- "Could not determine join "
- "condition between parent/child tables on "
- "relationship %s - there are multiple foreign key "
- "paths linking the tables. Specify the "
- "'foreign_keys' argument, providing a list of those "
- "columns which should be counted as containing a "
- "foreign key reference to the parent table." % self.prop
- ) from afe
- @property
- def primaryjoin_minus_local(self) -> ColumnElement[bool]:
- return _deep_deannotate(self.primaryjoin, values=("local", "remote"))
- @property
- def secondaryjoin_minus_local(self) -> ColumnElement[bool]:
- assert self.secondaryjoin is not None
- return _deep_deannotate(self.secondaryjoin, values=("local", "remote"))
- @util.memoized_property
- def primaryjoin_reverse_remote(self) -> ColumnElement[bool]:
- """Return the primaryjoin condition suitable for the
- "reverse" direction.
- If the primaryjoin was delivered here with pre-existing
- "remote" annotations, the local/remote annotations
- are reversed. Otherwise, the local/remote annotations
- are removed.
- """
- if self._has_remote_annotations:
- def replace(element: _CE, **kw: Any) -> Optional[_CE]:
- if "remote" in element._annotations:
- v = dict(element._annotations)
- del v["remote"]
- v["local"] = True
- return element._with_annotations(v)
- elif "local" in element._annotations:
- v = dict(element._annotations)
- del v["local"]
- v["remote"] = True
- return element._with_annotations(v)
- return None
- return visitors.replacement_traverse(self.primaryjoin, {}, replace)
- else:
- if self._has_foreign_annotations:
- # TODO: coverage
- return _deep_deannotate(
- self.primaryjoin, values=("local", "remote")
- )
- else:
- return _deep_deannotate(self.primaryjoin)
- def _has_annotation(self, clause: ClauseElement, annotation: str) -> bool:
- for col in visitors.iterate(clause, {}):
- if annotation in col._annotations:
- return True
- else:
- return False
- @util.memoized_property
- def _has_foreign_annotations(self) -> bool:
- return self._has_annotation(self.primaryjoin, "foreign")
- @util.memoized_property
- def _has_remote_annotations(self) -> bool:
- return self._has_annotation(self.primaryjoin, "remote")
- def _annotate_fks(self) -> None:
- """Annotate the primaryjoin and secondaryjoin
- structures with 'foreign' annotations marking columns
- considered as foreign.
- """
- if self._has_foreign_annotations:
- return
- if self.consider_as_foreign_keys:
- self._annotate_from_fk_list()
- else:
- self._annotate_present_fks()
- def _annotate_from_fk_list(self) -> None:
- def check_fk(element: _CE, **kw: Any) -> Optional[_CE]:
- if element in self.consider_as_foreign_keys:
- return element._annotate({"foreign": True})
- return None
- self.primaryjoin = visitors.replacement_traverse(
- self.primaryjoin, {}, check_fk
- )
- if self.secondaryjoin is not None:
- self.secondaryjoin = visitors.replacement_traverse(
- self.secondaryjoin, {}, check_fk
- )
- def _annotate_present_fks(self) -> None:
- if self.secondary is not None:
- secondarycols = util.column_set(self.secondary.c)
- else:
- secondarycols = set()
- def is_foreign(
- a: ColumnElement[Any], b: ColumnElement[Any]
- ) -> Optional[ColumnElement[Any]]:
- if isinstance(a, schema.Column) and isinstance(b, schema.Column):
- if a.references(b):
- return a
- elif b.references(a):
- return b
- if secondarycols:
- if a in secondarycols and b not in secondarycols:
- return a
- elif b in secondarycols and a not in secondarycols:
- return b
- return None
- def visit_binary(binary: BinaryExpression[Any]) -> None:
- if not isinstance(
- binary.left, sql.ColumnElement
- ) or not isinstance(binary.right, sql.ColumnElement):
- return
- if (
- "foreign" not in binary.left._annotations
- and "foreign" not in binary.right._annotations
- ):
- col = is_foreign(binary.left, binary.right)
- if col is not None:
- if col.compare(binary.left):
- binary.left = binary.left._annotate({"foreign": True})
- elif col.compare(binary.right):
- binary.right = binary.right._annotate(
- {"foreign": True}
- )
- self.primaryjoin = visitors.cloned_traverse(
- self.primaryjoin, {}, {"binary": visit_binary}
- )
- if self.secondaryjoin is not None:
- self.secondaryjoin = visitors.cloned_traverse(
- self.secondaryjoin, {}, {"binary": visit_binary}
- )
- def _refers_to_parent_table(self) -> bool:
- """Return True if the join condition contains column
- comparisons where both columns are in both tables.
- """
- pt = self.parent_persist_selectable
- mt = self.child_persist_selectable
- result = False
- def visit_binary(binary: BinaryExpression[Any]) -> None:
- nonlocal result
- c, f = binary.left, binary.right
- if (
- isinstance(c, expression.ColumnClause)
- and isinstance(f, expression.ColumnClause)
- and pt.is_derived_from(c.table)
- and pt.is_derived_from(f.table)
- and mt.is_derived_from(c.table)
- and mt.is_derived_from(f.table)
- ):
- result = True
- visitors.traverse(self.primaryjoin, {}, {"binary": visit_binary})
- return result
- def _tables_overlap(self) -> bool:
- """Return True if parent/child tables have some overlap."""
- return selectables_overlap(
- self.parent_persist_selectable, self.child_persist_selectable
- )
- def _annotate_remote(self) -> None:
- """Annotate the primaryjoin and secondaryjoin
- structures with 'remote' annotations marking columns
- considered as part of the 'remote' side.
- """
- if self._has_remote_annotations:
- return
- if self.secondary is not None:
- self._annotate_remote_secondary()
- elif self._local_remote_pairs or self._remote_side:
- self._annotate_remote_from_args()
- elif self._refers_to_parent_table():
- self._annotate_selfref(
- lambda col: "foreign" in col._annotations, False
- )
- elif self._tables_overlap():
- self._annotate_remote_with_overlap()
- else:
- self._annotate_remote_distinct_selectables()
- def _annotate_remote_secondary(self) -> None:
- """annotate 'remote' in primaryjoin, secondaryjoin
- when 'secondary' is present.
- """
- assert self.secondary is not None
- fixed_secondary = self.secondary
- def repl(element: _CE, **kw: Any) -> Optional[_CE]:
- if fixed_secondary.c.contains_column(element):
- return element._annotate({"remote": True})
- return None
- self.primaryjoin = visitors.replacement_traverse(
- self.primaryjoin, {}, repl
- )
- assert self.secondaryjoin is not None
- self.secondaryjoin = visitors.replacement_traverse(
- self.secondaryjoin, {}, repl
- )
- def _annotate_selfref(
- self, fn: Callable[[ColumnElement[Any]], bool], remote_side_given: bool
- ) -> None:
- """annotate 'remote' in primaryjoin, secondaryjoin
- when the relationship is detected as self-referential.
- """
- def visit_binary(binary: BinaryExpression[Any]) -> None:
- equated = binary.left.compare(binary.right)
- if isinstance(binary.left, expression.ColumnClause) and isinstance(
- binary.right, expression.ColumnClause
- ):
- # assume one to many - FKs are "remote"
- if fn(binary.left):
- binary.left = binary.left._annotate({"remote": True})
- if fn(binary.right) and not equated:
- binary.right = binary.right._annotate({"remote": True})
- elif not remote_side_given:
- self._warn_non_column_elements()
- self.primaryjoin = visitors.cloned_traverse(
- self.primaryjoin, {}, {"binary": visit_binary}
- )
- def _annotate_remote_from_args(self) -> None:
- """annotate 'remote' in primaryjoin, secondaryjoin
- when the 'remote_side' or '_local_remote_pairs'
- arguments are used.
- """
- if self._local_remote_pairs:
- if self._remote_side:
- raise sa_exc.ArgumentError(
- "remote_side argument is redundant "
- "against more detailed _local_remote_side "
- "argument."
- )
- remote_side = [r for (l, r) in self._local_remote_pairs]
- else:
- remote_side = self._remote_side
- if self._refers_to_parent_table():
- self._annotate_selfref(lambda col: col in remote_side, True)
- else:
- def repl(element: _CE, **kw: Any) -> Optional[_CE]:
- # use set() to avoid generating ``__eq__()`` expressions
- # against each element
- if element in set(remote_side):
- return element._annotate({"remote": True})
- return None
- self.primaryjoin = visitors.replacement_traverse(
- self.primaryjoin, {}, repl
- )
- def _annotate_remote_with_overlap(self) -> None:
- """annotate 'remote' in primaryjoin, secondaryjoin
- when the parent/child tables have some set of
- tables in common, though is not a fully self-referential
- relationship.
- """
- def visit_binary(binary: BinaryExpression[Any]) -> None:
- binary.left, binary.right = proc_left_right(
- binary.left, binary.right
- )
- binary.right, binary.left = proc_left_right(
- binary.right, binary.left
- )
- check_entities = (
- self.prop is not None and self.prop.mapper is not self.prop.parent
- )
- def proc_left_right(
- left: ColumnElement[Any], right: ColumnElement[Any]
- ) -> Tuple[ColumnElement[Any], ColumnElement[Any]]:
- if isinstance(left, expression.ColumnClause) and isinstance(
- right, expression.ColumnClause
- ):
- if self.child_persist_selectable.c.contains_column(
- right
- ) and self.parent_persist_selectable.c.contains_column(left):
- right = right._annotate({"remote": True})
- elif (
- check_entities
- and right._annotations.get("parentmapper") is self.prop.mapper
- ):
- right = right._annotate({"remote": True})
- elif (
- check_entities
- and left._annotations.get("parentmapper") is self.prop.mapper
- ):
- left = left._annotate({"remote": True})
- else:
- self._warn_non_column_elements()
- return left, right
- self.primaryjoin = visitors.cloned_traverse(
- self.primaryjoin, {}, {"binary": visit_binary}
- )
- def _annotate_remote_distinct_selectables(self) -> None:
- """annotate 'remote' in primaryjoin, secondaryjoin
- when the parent/child tables are entirely
- separate.
- """
- def repl(element: _CE, **kw: Any) -> Optional[_CE]:
- if self.child_persist_selectable.c.contains_column(element) and (
- not self.parent_local_selectable.c.contains_column(element)
- or self.child_local_selectable.c.contains_column(element)
- ):
- return element._annotate({"remote": True})
- return None
- self.primaryjoin = visitors.replacement_traverse(
- self.primaryjoin, {}, repl
- )
- def _warn_non_column_elements(self) -> None:
- util.warn(
- "Non-simple column elements in primary "
- "join condition for property %s - consider using "
- "remote() annotations to mark the remote side." % self.prop
- )
- def _annotate_local(self) -> None:
- """Annotate the primaryjoin and secondaryjoin
- structures with 'local' annotations.
- This annotates all column elements found
- simultaneously in the parent table
- and the join condition that don't have a
- 'remote' annotation set up from
- _annotate_remote() or user-defined.
- """
- if self._has_annotation(self.primaryjoin, "local"):
- return
- if self._local_remote_pairs:
- local_side = util.column_set(
- [l for (l, r) in self._local_remote_pairs]
- )
- else:
- local_side = util.column_set(self.parent_persist_selectable.c)
- def locals_(element: _CE, **kw: Any) -> Optional[_CE]:
- if "remote" not in element._annotations and element in local_side:
- return element._annotate({"local": True})
- return None
- self.primaryjoin = visitors.replacement_traverse(
- self.primaryjoin, {}, locals_
- )
- def _annotate_parentmapper(self) -> None:
- def parentmappers_(element: _CE, **kw: Any) -> Optional[_CE]:
- if "remote" in element._annotations:
- return element._annotate({"parentmapper": self.prop.mapper})
- elif "local" in element._annotations:
- return element._annotate({"parentmapper": self.prop.parent})
- return None
- self.primaryjoin = visitors.replacement_traverse(
- self.primaryjoin, {}, parentmappers_
- )
- def _check_remote_side(self) -> None:
- if not self.local_remote_pairs:
- raise sa_exc.ArgumentError(
- "Relationship %s could "
- "not determine any unambiguous local/remote column "
- "pairs based on join condition and remote_side "
- "arguments. "
- "Consider using the remote() annotation to "
- "accurately mark those elements of the join "
- "condition that are on the remote side of "
- "the relationship." % (self.prop,)
- )
- else:
- not_target = util.column_set(
- self.parent_persist_selectable.c
- ).difference(self.child_persist_selectable.c)
- for _, rmt in self.local_remote_pairs:
- if rmt in not_target:
- util.warn(
- "Expression %s is marked as 'remote', but these "
- "column(s) are local to the local side. The "
- "remote() annotation is needed only for a "
- "self-referential relationship where both sides "
- "of the relationship refer to the same tables."
- % (rmt,)
- )
- def _check_foreign_cols(
- self, join_condition: ColumnElement[bool], primary: bool
- ) -> None:
- """Check the foreign key columns collected and emit error
- messages."""
- foreign_cols = self._gather_columns_with_annotation(
- join_condition, "foreign"
- )
- has_foreign = bool(foreign_cols)
- if primary:
- can_sync = bool(self.synchronize_pairs)
- else:
- can_sync = bool(self.secondary_synchronize_pairs)
- if (
- self.support_sync
- and can_sync
- or (not self.support_sync and has_foreign)
- ):
- return
- # from here below is just determining the best error message
- # to report. Check for a join condition using any operator
- # (not just ==), perhaps they need to turn on "viewonly=True".
- if self.support_sync and has_foreign and not can_sync:
- err = (
- "Could not locate any simple equality expressions "
- "involving locally mapped foreign key columns for "
- "%s join condition "
- "'%s' on relationship %s."
- % (
- primary and "primary" or "secondary",
- join_condition,
- self.prop,
- )
- )
- err += (
- " Ensure that referencing columns are associated "
- "with a ForeignKey or ForeignKeyConstraint, or are "
- "annotated in the join condition with the foreign() "
- "annotation. To allow comparison operators other than "
- "'==', the relationship can be marked as viewonly=True."
- )
- raise sa_exc.ArgumentError(err)
- else:
- err = (
- "Could not locate any relevant foreign key columns "
- "for %s join condition '%s' on relationship %s."
- % (
- primary and "primary" or "secondary",
- join_condition,
- self.prop,
- )
- )
- err += (
- " Ensure that referencing columns are associated "
- "with a ForeignKey or ForeignKeyConstraint, or are "
- "annotated in the join condition with the foreign() "
- "annotation."
- )
- raise sa_exc.ArgumentError(err)
- def _determine_direction(self) -> None:
- """Determine if this relationship is one to many, many to one,
- many to many.
- """
- if self.secondaryjoin is not None:
- self.direction = MANYTOMANY
- else:
- parentcols = util.column_set(self.parent_persist_selectable.c)
- targetcols = util.column_set(self.child_persist_selectable.c)
- # fk collection which suggests ONETOMANY.
- onetomany_fk = targetcols.intersection(self.foreign_key_columns)
- # fk collection which suggests MANYTOONE.
- manytoone_fk = parentcols.intersection(self.foreign_key_columns)
- if onetomany_fk and manytoone_fk:
- # fks on both sides. test for overlap of local/remote
- # with foreign key.
- # we will gather columns directly from their annotations
- # without deannotating, so that we can distinguish on a column
- # that refers to itself.
- # 1. columns that are both remote and FK suggest
- # onetomany.
- onetomany_local = self._gather_columns_with_annotation(
- self.primaryjoin, "remote", "foreign"
- )
- # 2. columns that are FK but are not remote (e.g. local)
- # suggest manytoone.
- manytoone_local = {
- c
- for c in self._gather_columns_with_annotation(
- self.primaryjoin, "foreign"
- )
- if "remote" not in c._annotations
- }
- # 3. if both collections are present, remove columns that
- # refer to themselves. This is for the case of
- # and_(Me.id == Me.remote_id, Me.version == Me.version)
- if onetomany_local and manytoone_local:
- self_equated = self.remote_columns.intersection(
- self.local_columns
- )
- onetomany_local = onetomany_local.difference(self_equated)
- manytoone_local = manytoone_local.difference(self_equated)
- # at this point, if only one or the other collection is
- # present, we know the direction, otherwise it's still
- # ambiguous.
- if onetomany_local and not manytoone_local:
- self.direction = ONETOMANY
- elif manytoone_local and not onetomany_local:
- self.direction = MANYTOONE
- else:
- raise sa_exc.ArgumentError(
- "Can't determine relationship"
- " direction for relationship '%s' - foreign "
- "key columns within the join condition are present "
- "in both the parent and the child's mapped tables. "
- "Ensure that only those columns referring "
- "to a parent column are marked as foreign, "
- "either via the foreign() annotation or "
- "via the foreign_keys argument." % self.prop
- )
- elif onetomany_fk:
- self.direction = ONETOMANY
- elif manytoone_fk:
- self.direction = MANYTOONE
- else:
- raise sa_exc.ArgumentError(
- "Can't determine relationship "
- "direction for relationship '%s' - foreign "
- "key columns are present in neither the parent "
- "nor the child's mapped tables" % self.prop
- )
- def _deannotate_pairs(
- self, collection: _ColumnPairIterable
- ) -> _MutableColumnPairs:
- """provide deannotation for the various lists of
- pairs, so that using them in hashes doesn't incur
- high-overhead __eq__() comparisons against
- original columns mapped.
- """
- return [(x._deannotate(), y._deannotate()) for x, y in collection]
- def _setup_pairs(self) -> None:
- sync_pairs: _MutableColumnPairs = []
- lrp: util.OrderedSet[Tuple[ColumnElement[Any], ColumnElement[Any]]] = (
- util.OrderedSet([])
- )
- secondary_sync_pairs: _MutableColumnPairs = []
- def go(
- joincond: ColumnElement[bool],
- collection: _MutableColumnPairs,
- ) -> None:
- def visit_binary(
- binary: BinaryExpression[Any],
- left: ColumnElement[Any],
- right: ColumnElement[Any],
- ) -> None:
- if (
- "remote" in right._annotations
- and "remote" not in left._annotations
- and self.can_be_synced_fn(left)
- ):
- lrp.add((left, right))
- elif (
- "remote" in left._annotations
- and "remote" not in right._annotations
- and self.can_be_synced_fn(right)
- ):
- lrp.add((right, left))
- if binary.operator is operators.eq and self.can_be_synced_fn(
- left, right
- ):
- if "foreign" in right._annotations:
- collection.append((left, right))
- elif "foreign" in left._annotations:
- collection.append((right, left))
- visit_binary_product(visit_binary, joincond)
- for joincond, collection in [
- (self.primaryjoin, sync_pairs),
- (self.secondaryjoin, secondary_sync_pairs),
- ]:
- if joincond is None:
- continue
- go(joincond, collection)
- self.local_remote_pairs = self._deannotate_pairs(lrp)
- self.synchronize_pairs = self._deannotate_pairs(sync_pairs)
- self.secondary_synchronize_pairs = self._deannotate_pairs(
- secondary_sync_pairs
- )
- _track_overlapping_sync_targets: weakref.WeakKeyDictionary[
- ColumnElement[Any],
- weakref.WeakKeyDictionary[
- RelationshipProperty[Any], ColumnElement[Any]
- ],
- ] = weakref.WeakKeyDictionary()
- def _warn_for_conflicting_sync_targets(self) -> None:
- if not self.support_sync:
- return
- # we would like to detect if we are synchronizing any column
- # pairs in conflict with another relationship that wishes to sync
- # an entirely different column to the same target. This is a
- # very rare edge case so we will try to minimize the memory/overhead
- # impact of this check
- for from_, to_ in [
- (from_, to_) for (from_, to_) in self.synchronize_pairs
- ] + [
- (from_, to_) for (from_, to_) in self.secondary_synchronize_pairs
- ]:
- # save ourselves a ton of memory and overhead by only
- # considering columns that are subject to a overlapping
- # FK constraints at the core level. This condition can arise
- # if multiple relationships overlap foreign() directly, but
- # we're going to assume it's typically a ForeignKeyConstraint-
- # level configuration that benefits from this warning.
- if to_ not in self._track_overlapping_sync_targets:
- self._track_overlapping_sync_targets[to_] = (
- weakref.WeakKeyDictionary({self.prop: from_})
- )
- else:
- other_props = []
- prop_to_from = self._track_overlapping_sync_targets[to_]
- for pr, fr_ in prop_to_from.items():
- if (
- not pr.mapper._dispose_called
- and pr not in self.prop._reverse_property
- and pr.key not in self.prop._overlaps
- and self.prop.key not in pr._overlaps
- # note: the "__*" symbol is used internally by
- # SQLAlchemy as a general means of suppressing the
- # overlaps warning for some extension cases, however
- # this is not currently
- # a publicly supported symbol and may change at
- # any time.
- and "__*" not in self.prop._overlaps
- and "__*" not in pr._overlaps
- and not self.prop.parent.is_sibling(pr.parent)
- and not self.prop.mapper.is_sibling(pr.mapper)
- and not self.prop.parent.is_sibling(pr.mapper)
- and not self.prop.mapper.is_sibling(pr.parent)
- and (
- self.prop.key != pr.key
- or not self.prop.parent.common_parent(pr.parent)
- )
- ):
- other_props.append((pr, fr_))
- if other_props:
- util.warn(
- "relationship '%s' will copy column %s to column %s, "
- "which conflicts with relationship(s): %s. "
- "If this is not the intention, consider if these "
- "relationships should be linked with "
- "back_populates, or if viewonly=True should be "
- "applied to one or more if they are read-only. "
- "For the less common case that foreign key "
- "constraints are partially overlapping, the "
- "orm.foreign() "
- "annotation can be used to isolate the columns that "
- "should be written towards. To silence this "
- "warning, add the parameter 'overlaps=\"%s\"' to the "
- "'%s' relationship."
- % (
- self.prop,
- from_,
- to_,
- ", ".join(
- sorted(
- "'%s' (copies %s to %s)" % (pr, fr_, to_)
- for (pr, fr_) in other_props
- )
- ),
- ",".join(sorted(pr.key for pr, fr in other_props)),
- self.prop,
- ),
- code="qzyx",
- )
- self._track_overlapping_sync_targets[to_][self.prop] = from_
- @util.memoized_property
- def remote_columns(self) -> Set[ColumnElement[Any]]:
- return self._gather_join_annotations("remote")
- @util.memoized_property
- def local_columns(self) -> Set[ColumnElement[Any]]:
- return self._gather_join_annotations("local")
- @util.memoized_property
- def foreign_key_columns(self) -> Set[ColumnElement[Any]]:
- return self._gather_join_annotations("foreign")
- def _gather_join_annotations(
- self, annotation: str
- ) -> Set[ColumnElement[Any]]:
- s = set(
- self._gather_columns_with_annotation(self.primaryjoin, annotation)
- )
- if self.secondaryjoin is not None:
- s.update(
- self._gather_columns_with_annotation(
- self.secondaryjoin, annotation
- )
- )
- return {x._deannotate() for x in s}
- def _gather_columns_with_annotation(
- self, clause: ColumnElement[Any], *annotation: Iterable[str]
- ) -> Set[ColumnElement[Any]]:
- annotation_set = set(annotation)
- return {
- cast(ColumnElement[Any], col)
- for col in visitors.iterate(clause, {})
- if annotation_set.issubset(col._annotations)
- }
- @util.memoized_property
- def _secondary_lineage_set(self) -> FrozenSet[ColumnElement[Any]]:
- if self.secondary is not None:
- return frozenset(
- itertools.chain(*[c.proxy_set for c in self.secondary.c])
- )
- else:
- return util.EMPTY_SET
- def join_targets(
- self,
- source_selectable: Optional[FromClause],
- dest_selectable: FromClause,
- aliased: bool,
- single_crit: Optional[ColumnElement[bool]] = None,
- extra_criteria: Tuple[ColumnElement[bool], ...] = (),
- ) -> Tuple[
- ColumnElement[bool],
- Optional[ColumnElement[bool]],
- Optional[FromClause],
- Optional[ClauseAdapter],
- FromClause,
- ]:
- """Given a source and destination selectable, create a
- join between them.
- This takes into account aliasing the join clause
- to reference the appropriate corresponding columns
- in the target objects, as well as the extra child
- criterion, equivalent column sets, etc.
- """
- # place a barrier on the destination such that
- # replacement traversals won't ever dig into it.
- # its internal structure remains fixed
- # regardless of context.
- dest_selectable = _shallow_annotate(
- dest_selectable, {"no_replacement_traverse": True}
- )
- primaryjoin, secondaryjoin, secondary = (
- self.primaryjoin,
- self.secondaryjoin,
- self.secondary,
- )
- # adjust the join condition for single table inheritance,
- # in the case that the join is to a subclass
- # this is analogous to the
- # "_adjust_for_single_table_inheritance()" method in Query.
- if single_crit is not None:
- if secondaryjoin is not None:
- secondaryjoin = secondaryjoin & single_crit
- else:
- primaryjoin = primaryjoin & single_crit
- if extra_criteria:
- def mark_exclude_cols(
- elem: SupportsAnnotations, annotations: _AnnotationDict
- ) -> SupportsAnnotations:
- """note unrelated columns in the "extra criteria" as either
- should be adapted or not adapted, even though they are not
- part of our "local" or "remote" side.
- see #9779 for this case, as well as #11010 for a follow up
- """
- parentmapper_for_element = elem._annotations.get(
- "parentmapper", None
- )
- if (
- parentmapper_for_element is not self.prop.parent
- and parentmapper_for_element is not self.prop.mapper
- and elem not in self._secondary_lineage_set
- ):
- return _safe_annotate(elem, annotations)
- else:
- return elem
- extra_criteria = tuple(
- _deep_annotate(
- elem,
- {"should_not_adapt": True},
- annotate_callable=mark_exclude_cols,
- )
- for elem in extra_criteria
- )
- if secondaryjoin is not None:
- secondaryjoin = secondaryjoin & sql.and_(*extra_criteria)
- else:
- primaryjoin = primaryjoin & sql.and_(*extra_criteria)
- if aliased:
- if secondary is not None:
- secondary = secondary._anonymous_fromclause(flat=True)
- primary_aliasizer = ClauseAdapter(
- secondary,
- exclude_fn=_local_col_exclude,
- )
- secondary_aliasizer = ClauseAdapter(
- dest_selectable, equivalents=self.child_equivalents
- ).chain(primary_aliasizer)
- if source_selectable is not None:
- primary_aliasizer = ClauseAdapter(
- secondary,
- exclude_fn=_local_col_exclude,
- ).chain(
- ClauseAdapter(
- source_selectable,
- equivalents=self.parent_equivalents,
- )
- )
- secondaryjoin = secondary_aliasizer.traverse(secondaryjoin)
- else:
- primary_aliasizer = ClauseAdapter(
- dest_selectable,
- exclude_fn=_local_col_exclude,
- equivalents=self.child_equivalents,
- )
- if source_selectable is not None:
- primary_aliasizer.chain(
- ClauseAdapter(
- source_selectable,
- exclude_fn=_remote_col_exclude,
- equivalents=self.parent_equivalents,
- )
- )
- secondary_aliasizer = None
- primaryjoin = primary_aliasizer.traverse(primaryjoin)
- target_adapter = secondary_aliasizer or primary_aliasizer
- target_adapter.exclude_fn = None
- else:
- target_adapter = None
- return (
- primaryjoin,
- secondaryjoin,
- secondary,
- target_adapter,
- dest_selectable,
- )
- def create_lazy_clause(self, reverse_direction: bool = False) -> Tuple[
- ColumnElement[bool],
- Dict[str, ColumnElement[Any]],
- Dict[ColumnElement[Any], ColumnElement[Any]],
- ]:
- binds: Dict[ColumnElement[Any], BindParameter[Any]] = {}
- equated_columns: Dict[ColumnElement[Any], ColumnElement[Any]] = {}
- has_secondary = self.secondaryjoin is not None
- if has_secondary:
- lookup = collections.defaultdict(list)
- for l, r in self.local_remote_pairs:
- lookup[l].append((l, r))
- equated_columns[r] = l
- elif not reverse_direction:
- for l, r in self.local_remote_pairs:
- equated_columns[r] = l
- else:
- for l, r in self.local_remote_pairs:
- equated_columns[l] = r
- def col_to_bind(
- element: ColumnElement[Any], **kw: Any
- ) -> Optional[BindParameter[Any]]:
- if (
- (not reverse_direction and "local" in element._annotations)
- or reverse_direction
- and (
- (has_secondary and element in lookup)
- or (not has_secondary and "remote" in element._annotations)
- )
- ):
- if element not in binds:
- binds[element] = sql.bindparam(
- None, None, type_=element.type, unique=True
- )
- return binds[element]
- return None
- lazywhere = self.primaryjoin
- if self.secondaryjoin is None or not reverse_direction:
- lazywhere = visitors.replacement_traverse(
- lazywhere, {}, col_to_bind
- )
- if self.secondaryjoin is not None:
- secondaryjoin = self.secondaryjoin
- if reverse_direction:
- secondaryjoin = visitors.replacement_traverse(
- secondaryjoin, {}, col_to_bind
- )
- lazywhere = sql.and_(lazywhere, secondaryjoin)
- bind_to_col = {binds[col].key: col for col in binds}
- return lazywhere, bind_to_col, equated_columns
- class _ColInAnnotations:
- """Serializable object that tests for names in c._annotations.
- TODO: does this need to be serializable anymore? can we find what the
- use case was for that?
- """
- __slots__ = ("names",)
- def __init__(self, *names: str):
- self.names = frozenset(names)
- def __call__(self, c: ClauseElement) -> bool:
- return bool(self.names.intersection(c._annotations))
- _local_col_exclude = _ColInAnnotations("local", "should_not_adapt")
- _remote_col_exclude = _ColInAnnotations("remote", "should_not_adapt")
- class Relationship(
- RelationshipProperty[_T],
- _DeclarativeMapped[_T],
- ):
- """Describes an object property that holds a single item or list
- of items that correspond to a related database table.
- Public constructor is the :func:`_orm.relationship` function.
- .. seealso::
- :ref:`relationship_config_toplevel`
- .. versionchanged:: 2.0 Added :class:`_orm.Relationship` as a Declarative
- compatible subclass for :class:`_orm.RelationshipProperty`.
- """
- inherit_cache = True
- """:meta private:"""
- class _RelationshipDeclared( # type: ignore[misc]
- Relationship[_T],
- WriteOnlyMapped[_T], # not compatible with Mapped[_T]
- DynamicMapped[_T], # not compatible with Mapped[_T]
- ):
- """Relationship subclass used implicitly for declarative mapping."""
- inherit_cache = True
- """:meta private:"""
- @classmethod
- def _mapper_property_name(cls) -> str:
- return "Relationship"
|