| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585 |
- # sql/annotation.py
- # Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
- # <see AUTHORS file>
- #
- # This module is part of SQLAlchemy and is released under
- # the MIT License: https://www.opensource.org/licenses/mit-license.php
- """The :class:`.Annotated` class and related routines; creates hash-equivalent
- copies of SQL constructs which contain context-specific markers and
- associations.
- Note that the :class:`.Annotated` concept as implemented in this module is not
- related in any way to the pep-593 concept of "Annotated".
- """
- from __future__ import annotations
- import typing
- from typing import Any
- from typing import Callable
- from typing import cast
- from typing import Dict
- from typing import FrozenSet
- from typing import Mapping
- from typing import Optional
- from typing import overload
- from typing import Sequence
- from typing import Tuple
- from typing import Type
- from typing import TYPE_CHECKING
- from typing import TypeVar
- from . import operators
- from .cache_key import HasCacheKey
- from .visitors import anon_map
- from .visitors import ExternallyTraversible
- from .visitors import InternalTraversal
- from .. import util
- from ..util.typing import Literal
- from ..util.typing import Self
- if TYPE_CHECKING:
- from .base import _EntityNamespace
- from .visitors import _TraverseInternalsType
- _AnnotationDict = Mapping[str, Any]
- EMPTY_ANNOTATIONS: util.immutabledict[str, Any] = util.EMPTY_DICT
- class SupportsAnnotations(ExternallyTraversible):
- __slots__ = ()
- _annotations: util.immutabledict[str, Any] = EMPTY_ANNOTATIONS
- proxy_set: util.generic_fn_descriptor[FrozenSet[Any]]
- _is_immutable: bool
- def _annotate(self, values: _AnnotationDict) -> Self:
- raise NotImplementedError()
- @overload
- def _deannotate(
- self,
- values: Literal[None] = ...,
- clone: bool = ...,
- ) -> Self: ...
- @overload
- def _deannotate(
- self,
- values: Sequence[str] = ...,
- clone: bool = ...,
- ) -> SupportsAnnotations: ...
- def _deannotate(
- self,
- values: Optional[Sequence[str]] = None,
- clone: bool = False,
- ) -> SupportsAnnotations:
- raise NotImplementedError()
- @util.memoized_property
- def _annotations_cache_key(self) -> Tuple[Any, ...]:
- anon_map_ = anon_map()
- return self._gen_annotations_cache_key(anon_map_)
- def _gen_annotations_cache_key(
- self, anon_map: anon_map
- ) -> Tuple[Any, ...]:
- return (
- "_annotations",
- tuple(
- (
- key,
- (
- value._gen_cache_key(anon_map, [])
- if isinstance(value, HasCacheKey)
- else value
- ),
- )
- for key, value in [
- (key, self._annotations[key])
- for key in sorted(self._annotations)
- ]
- ),
- )
- class SupportsWrappingAnnotations(SupportsAnnotations):
- __slots__ = ()
- _constructor: Callable[..., SupportsWrappingAnnotations]
- if TYPE_CHECKING:
- @util.ro_non_memoized_property
- def entity_namespace(self) -> _EntityNamespace: ...
- def _annotate(self, values: _AnnotationDict) -> Self:
- """return a copy of this ClauseElement with annotations
- updated by the given dictionary.
- """
- return Annotated._as_annotated_instance(self, values) # type: ignore
- def _with_annotations(self, values: _AnnotationDict) -> Self:
- """return a copy of this ClauseElement with annotations
- replaced by the given dictionary.
- """
- return Annotated._as_annotated_instance(self, values) # type: ignore
- @overload
- def _deannotate(
- self,
- values: Literal[None] = ...,
- clone: bool = ...,
- ) -> Self: ...
- @overload
- def _deannotate(
- self,
- values: Sequence[str] = ...,
- clone: bool = ...,
- ) -> SupportsAnnotations: ...
- def _deannotate(
- self,
- values: Optional[Sequence[str]] = None,
- clone: bool = False,
- ) -> SupportsAnnotations:
- """return a copy of this :class:`_expression.ClauseElement`
- with annotations
- removed.
- :param values: optional tuple of individual values
- to remove.
- """
- if clone:
- s = self._clone()
- return s
- else:
- return self
- class SupportsCloneAnnotations(SupportsWrappingAnnotations):
- # SupportsCloneAnnotations extends from SupportsWrappingAnnotations
- # to support the structure of having the base ClauseElement
- # be a subclass of SupportsWrappingAnnotations. Any ClauseElement
- # subclass that wants to extend from SupportsCloneAnnotations
- # will inherently also be subclassing SupportsWrappingAnnotations, so
- # make that specific here.
- if not typing.TYPE_CHECKING:
- __slots__ = ()
- _clone_annotations_traverse_internals: _TraverseInternalsType = [
- ("_annotations", InternalTraversal.dp_annotations_key)
- ]
- def _annotate(self, values: _AnnotationDict) -> Self:
- """return a copy of this ClauseElement with annotations
- updated by the given dictionary.
- """
- new = self._clone()
- new._annotations = new._annotations.union(values)
- new.__dict__.pop("_annotations_cache_key", None)
- new.__dict__.pop("_generate_cache_key", None)
- return new
- def _with_annotations(self, values: _AnnotationDict) -> Self:
- """return a copy of this ClauseElement with annotations
- replaced by the given dictionary.
- """
- new = self._clone()
- new._annotations = util.immutabledict(values)
- new.__dict__.pop("_annotations_cache_key", None)
- new.__dict__.pop("_generate_cache_key", None)
- return new
- @overload
- def _deannotate(
- self,
- values: Literal[None] = ...,
- clone: bool = ...,
- ) -> Self: ...
- @overload
- def _deannotate(
- self,
- values: Sequence[str] = ...,
- clone: bool = ...,
- ) -> SupportsAnnotations: ...
- def _deannotate(
- self,
- values: Optional[Sequence[str]] = None,
- clone: bool = False,
- ) -> SupportsAnnotations:
- """return a copy of this :class:`_expression.ClauseElement`
- with annotations
- removed.
- :param values: optional tuple of individual values
- to remove.
- """
- if clone or self._annotations:
- # clone is used when we are also copying
- # the expression for a deep deannotation
- new = self._clone()
- new._annotations = util.immutabledict()
- new.__dict__.pop("_annotations_cache_key", None)
- return new
- else:
- return self
- class Annotated(SupportsAnnotations):
- """clones a SupportsAnnotations and applies an 'annotations' dictionary.
- Unlike regular clones, this clone also mimics __hash__() and
- __eq__() of the original element so that it takes its place
- in hashed collections.
- A reference to the original element is maintained, for the important
- reason of keeping its hash value current. When GC'ed, the
- hash value may be reused, causing conflicts.
- .. note:: The rationale for Annotated producing a brand new class,
- rather than placing the functionality directly within ClauseElement,
- is **performance**. The __hash__() method is absent on plain
- ClauseElement which leads to significantly reduced function call
- overhead, as the use of sets and dictionaries against ClauseElement
- objects is prevalent, but most are not "annotated".
- """
- _is_column_operators = False
- @classmethod
- def _as_annotated_instance(
- cls, element: SupportsWrappingAnnotations, values: _AnnotationDict
- ) -> Annotated:
- try:
- cls = annotated_classes[element.__class__]
- except KeyError:
- cls = _new_annotation_type(element.__class__, cls)
- return cls(element, values)
- _annotations: util.immutabledict[str, Any]
- __element: SupportsWrappingAnnotations
- _hash: int
- def __new__(cls: Type[Self], *args: Any) -> Self:
- return object.__new__(cls)
- def __init__(
- self, element: SupportsWrappingAnnotations, values: _AnnotationDict
- ):
- self.__dict__ = element.__dict__.copy()
- self.__dict__.pop("_annotations_cache_key", None)
- self.__dict__.pop("_generate_cache_key", None)
- self.__element = element
- self._annotations = util.immutabledict(values)
- self._hash = hash(element)
- def _annotate(self, values: _AnnotationDict) -> Self:
- _values = self._annotations.union(values)
- new = self._with_annotations(_values)
- return new
- def _with_annotations(self, values: _AnnotationDict) -> Self:
- clone = self.__class__.__new__(self.__class__)
- clone.__dict__ = self.__dict__.copy()
- clone.__dict__.pop("_annotations_cache_key", None)
- clone.__dict__.pop("_generate_cache_key", None)
- clone._annotations = util.immutabledict(values)
- return clone
- @overload
- def _deannotate(
- self,
- values: Literal[None] = ...,
- clone: bool = ...,
- ) -> Self: ...
- @overload
- def _deannotate(
- self,
- values: Sequence[str] = ...,
- clone: bool = ...,
- ) -> Annotated: ...
- def _deannotate(
- self,
- values: Optional[Sequence[str]] = None,
- clone: bool = True,
- ) -> SupportsAnnotations:
- if values is None:
- return self.__element
- else:
- return self._with_annotations(
- util.immutabledict(
- {
- key: value
- for key, value in self._annotations.items()
- if key not in values
- }
- )
- )
- if not typing.TYPE_CHECKING:
- # manually proxy some methods that need extra attention
- def _compiler_dispatch(self, visitor: Any, **kw: Any) -> Any:
- return self.__element.__class__._compiler_dispatch(
- self, visitor, **kw
- )
- @property
- def _constructor(self):
- return self.__element._constructor
- def _clone(self, **kw: Any) -> Self:
- clone = self.__element._clone(**kw)
- if clone is self.__element:
- # detect immutable, don't change anything
- return self
- else:
- # update the clone with any changes that have occurred
- # to this object's __dict__.
- clone.__dict__.update(self.__dict__)
- return self.__class__(clone, self._annotations)
- def __reduce__(self) -> Tuple[Type[Annotated], Tuple[Any, ...]]:
- return self.__class__, (self.__element, self._annotations)
- def __hash__(self) -> int:
- return self._hash
- def __eq__(self, other: Any) -> bool:
- if self._is_column_operators:
- return self.__element.__class__.__eq__(self, other)
- else:
- return hash(other) == hash(self)
- @util.ro_non_memoized_property
- def entity_namespace(self) -> _EntityNamespace:
- if "entity_namespace" in self._annotations:
- return cast(
- SupportsWrappingAnnotations,
- self._annotations["entity_namespace"],
- ).entity_namespace
- else:
- return self.__element.entity_namespace
- # hard-generate Annotated subclasses. this technique
- # is used instead of on-the-fly types (i.e. type.__new__())
- # so that the resulting objects are pickleable; additionally, other
- # decisions can be made up front about the type of object being annotated
- # just once per class rather than per-instance.
- annotated_classes: Dict[Type[SupportsWrappingAnnotations], Type[Annotated]] = (
- {}
- )
- _SA = TypeVar("_SA", bound="SupportsAnnotations")
- def _safe_annotate(to_annotate: _SA, annotations: _AnnotationDict) -> _SA:
- try:
- _annotate = to_annotate._annotate
- except AttributeError:
- # skip objects that don't actually have an `_annotate`
- # attribute, namely QueryableAttribute inside of a join
- # condition
- return to_annotate
- else:
- return _annotate(annotations)
- def _deep_annotate(
- element: _SA,
- annotations: _AnnotationDict,
- exclude: Optional[Sequence[SupportsAnnotations]] = None,
- *,
- detect_subquery_cols: bool = False,
- ind_cols_on_fromclause: bool = False,
- annotate_callable: Optional[
- Callable[[SupportsAnnotations, _AnnotationDict], SupportsAnnotations]
- ] = None,
- ) -> _SA:
- """Deep copy the given ClauseElement, annotating each element
- with the given annotations dictionary.
- Elements within the exclude collection will be cloned but not annotated.
- """
- # annotated objects hack the __hash__() method so if we want to
- # uniquely process them we have to use id()
- cloned_ids: Dict[int, SupportsAnnotations] = {}
- def clone(elem: SupportsAnnotations, **kw: Any) -> SupportsAnnotations:
- # ind_cols_on_fromclause means make sure an AnnotatedFromClause
- # has its own .c collection independent of that which its proxying.
- # this is used specifically by orm.LoaderCriteriaOption to break
- # a reference cycle that it's otherwise prone to building,
- # see test_relationship_criteria->
- # test_loader_criteria_subquery_w_same_entity. logic here was
- # changed for #8796 and made explicit; previously it occurred
- # by accident
- kw["detect_subquery_cols"] = detect_subquery_cols
- id_ = id(elem)
- if id_ in cloned_ids:
- return cloned_ids[id_]
- if (
- exclude
- and hasattr(elem, "proxy_set")
- and elem.proxy_set.intersection(exclude)
- ):
- newelem = elem._clone(clone=clone, **kw)
- elif annotations != elem._annotations:
- if detect_subquery_cols and elem._is_immutable:
- to_annotate = elem._clone(clone=clone, **kw)
- else:
- to_annotate = elem
- if annotate_callable:
- newelem = annotate_callable(to_annotate, annotations)
- else:
- newelem = _safe_annotate(to_annotate, annotations)
- else:
- newelem = elem
- newelem._copy_internals(
- clone=clone, ind_cols_on_fromclause=ind_cols_on_fromclause
- )
- cloned_ids[id_] = newelem
- return newelem
- if element is not None:
- element = cast(_SA, clone(element))
- clone = None # type: ignore # remove gc cycles
- return element
- @overload
- def _deep_deannotate(
- element: Literal[None], values: Optional[Sequence[str]] = None
- ) -> Literal[None]: ...
- @overload
- def _deep_deannotate(
- element: _SA, values: Optional[Sequence[str]] = None
- ) -> _SA: ...
- def _deep_deannotate(
- element: Optional[_SA], values: Optional[Sequence[str]] = None
- ) -> Optional[_SA]:
- """Deep copy the given element, removing annotations."""
- cloned: Dict[Any, SupportsAnnotations] = {}
- def clone(elem: SupportsAnnotations, **kw: Any) -> SupportsAnnotations:
- key: Any
- if values:
- key = id(elem)
- else:
- key = elem
- if key not in cloned:
- newelem = elem._deannotate(values=values, clone=True)
- newelem._copy_internals(clone=clone)
- cloned[key] = newelem
- return newelem
- else:
- return cloned[key]
- if element is not None:
- element = cast(_SA, clone(element))
- clone = None # type: ignore # remove gc cycles
- return element
- def _shallow_annotate(element: _SA, annotations: _AnnotationDict) -> _SA:
- """Annotate the given ClauseElement and copy its internals so that
- internal objects refer to the new annotated object.
- Basically used to apply a "don't traverse" annotation to a
- selectable, without digging throughout the whole
- structure wasting time.
- """
- element = element._annotate(annotations)
- element._copy_internals()
- return element
- def _new_annotation_type(
- cls: Type[SupportsWrappingAnnotations], base_cls: Type[Annotated]
- ) -> Type[Annotated]:
- """Generates a new class that subclasses Annotated and proxies a given
- element type.
- """
- if issubclass(cls, Annotated):
- return cls
- elif cls in annotated_classes:
- return annotated_classes[cls]
- for super_ in cls.__mro__:
- # check if an Annotated subclass more specific than
- # the given base_cls is already registered, such
- # as AnnotatedColumnElement.
- if super_ in annotated_classes:
- base_cls = annotated_classes[super_]
- break
- annotated_classes[cls] = anno_cls = cast(
- Type[Annotated],
- type("Annotated%s" % cls.__name__, (base_cls, cls), {}),
- )
- globals()["Annotated%s" % cls.__name__] = anno_cls
- if "_traverse_internals" in cls.__dict__:
- anno_cls._traverse_internals = list(cls._traverse_internals) + [
- ("_annotations", InternalTraversal.dp_annotations_key)
- ]
- elif cls.__dict__.get("inherit_cache", False):
- anno_cls._traverse_internals = list(cls._traverse_internals) + [
- ("_annotations", InternalTraversal.dp_annotations_key)
- ]
- # some classes include this even if they have traverse_internals
- # e.g. BindParameter, add it if present.
- if cls.__dict__.get("inherit_cache", False):
- anno_cls.inherit_cache = True # type: ignore
- elif "inherit_cache" in cls.__dict__:
- anno_cls.inherit_cache = cls.__dict__["inherit_cache"] # type: ignore
- anno_cls._is_column_operators = issubclass(cls, operators.ColumnOperators)
- return anno_cls
- def _prepare_annotations(
- target_hierarchy: Type[SupportsWrappingAnnotations],
- base_cls: Type[Annotated],
- ) -> None:
- for cls in util.walk_subclasses(target_hierarchy):
- _new_annotation_type(cls, base_cls)
|