annotation.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585
  1. # sql/annotation.py
  2. # Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. """The :class:`.Annotated` class and related routines; creates hash-equivalent
  8. copies of SQL constructs which contain context-specific markers and
  9. associations.
  10. Note that the :class:`.Annotated` concept as implemented in this module is not
  11. related in any way to the pep-593 concept of "Annotated".
  12. """
  13. from __future__ import annotations
  14. import typing
  15. from typing import Any
  16. from typing import Callable
  17. from typing import cast
  18. from typing import Dict
  19. from typing import FrozenSet
  20. from typing import Mapping
  21. from typing import Optional
  22. from typing import overload
  23. from typing import Sequence
  24. from typing import Tuple
  25. from typing import Type
  26. from typing import TYPE_CHECKING
  27. from typing import TypeVar
  28. from . import operators
  29. from .cache_key import HasCacheKey
  30. from .visitors import anon_map
  31. from .visitors import ExternallyTraversible
  32. from .visitors import InternalTraversal
  33. from .. import util
  34. from ..util.typing import Literal
  35. from ..util.typing import Self
  36. if TYPE_CHECKING:
  37. from .base import _EntityNamespace
  38. from .visitors import _TraverseInternalsType
  39. _AnnotationDict = Mapping[str, Any]
  40. EMPTY_ANNOTATIONS: util.immutabledict[str, Any] = util.EMPTY_DICT
  41. class SupportsAnnotations(ExternallyTraversible):
  42. __slots__ = ()
  43. _annotations: util.immutabledict[str, Any] = EMPTY_ANNOTATIONS
  44. proxy_set: util.generic_fn_descriptor[FrozenSet[Any]]
  45. _is_immutable: bool
  46. def _annotate(self, values: _AnnotationDict) -> Self:
  47. raise NotImplementedError()
  48. @overload
  49. def _deannotate(
  50. self,
  51. values: Literal[None] = ...,
  52. clone: bool = ...,
  53. ) -> Self: ...
  54. @overload
  55. def _deannotate(
  56. self,
  57. values: Sequence[str] = ...,
  58. clone: bool = ...,
  59. ) -> SupportsAnnotations: ...
  60. def _deannotate(
  61. self,
  62. values: Optional[Sequence[str]] = None,
  63. clone: bool = False,
  64. ) -> SupportsAnnotations:
  65. raise NotImplementedError()
  66. @util.memoized_property
  67. def _annotations_cache_key(self) -> Tuple[Any, ...]:
  68. anon_map_ = anon_map()
  69. return self._gen_annotations_cache_key(anon_map_)
  70. def _gen_annotations_cache_key(
  71. self, anon_map: anon_map
  72. ) -> Tuple[Any, ...]:
  73. return (
  74. "_annotations",
  75. tuple(
  76. (
  77. key,
  78. (
  79. value._gen_cache_key(anon_map, [])
  80. if isinstance(value, HasCacheKey)
  81. else value
  82. ),
  83. )
  84. for key, value in [
  85. (key, self._annotations[key])
  86. for key in sorted(self._annotations)
  87. ]
  88. ),
  89. )
  90. class SupportsWrappingAnnotations(SupportsAnnotations):
  91. __slots__ = ()
  92. _constructor: Callable[..., SupportsWrappingAnnotations]
  93. if TYPE_CHECKING:
  94. @util.ro_non_memoized_property
  95. def entity_namespace(self) -> _EntityNamespace: ...
  96. def _annotate(self, values: _AnnotationDict) -> Self:
  97. """return a copy of this ClauseElement with annotations
  98. updated by the given dictionary.
  99. """
  100. return Annotated._as_annotated_instance(self, values) # type: ignore
  101. def _with_annotations(self, values: _AnnotationDict) -> Self:
  102. """return a copy of this ClauseElement with annotations
  103. replaced by the given dictionary.
  104. """
  105. return Annotated._as_annotated_instance(self, values) # type: ignore
  106. @overload
  107. def _deannotate(
  108. self,
  109. values: Literal[None] = ...,
  110. clone: bool = ...,
  111. ) -> Self: ...
  112. @overload
  113. def _deannotate(
  114. self,
  115. values: Sequence[str] = ...,
  116. clone: bool = ...,
  117. ) -> SupportsAnnotations: ...
  118. def _deannotate(
  119. self,
  120. values: Optional[Sequence[str]] = None,
  121. clone: bool = False,
  122. ) -> SupportsAnnotations:
  123. """return a copy of this :class:`_expression.ClauseElement`
  124. with annotations
  125. removed.
  126. :param values: optional tuple of individual values
  127. to remove.
  128. """
  129. if clone:
  130. s = self._clone()
  131. return s
  132. else:
  133. return self
  134. class SupportsCloneAnnotations(SupportsWrappingAnnotations):
  135. # SupportsCloneAnnotations extends from SupportsWrappingAnnotations
  136. # to support the structure of having the base ClauseElement
  137. # be a subclass of SupportsWrappingAnnotations. Any ClauseElement
  138. # subclass that wants to extend from SupportsCloneAnnotations
  139. # will inherently also be subclassing SupportsWrappingAnnotations, so
  140. # make that specific here.
  141. if not typing.TYPE_CHECKING:
  142. __slots__ = ()
  143. _clone_annotations_traverse_internals: _TraverseInternalsType = [
  144. ("_annotations", InternalTraversal.dp_annotations_key)
  145. ]
  146. def _annotate(self, values: _AnnotationDict) -> Self:
  147. """return a copy of this ClauseElement with annotations
  148. updated by the given dictionary.
  149. """
  150. new = self._clone()
  151. new._annotations = new._annotations.union(values)
  152. new.__dict__.pop("_annotations_cache_key", None)
  153. new.__dict__.pop("_generate_cache_key", None)
  154. return new
  155. def _with_annotations(self, values: _AnnotationDict) -> Self:
  156. """return a copy of this ClauseElement with annotations
  157. replaced by the given dictionary.
  158. """
  159. new = self._clone()
  160. new._annotations = util.immutabledict(values)
  161. new.__dict__.pop("_annotations_cache_key", None)
  162. new.__dict__.pop("_generate_cache_key", None)
  163. return new
  164. @overload
  165. def _deannotate(
  166. self,
  167. values: Literal[None] = ...,
  168. clone: bool = ...,
  169. ) -> Self: ...
  170. @overload
  171. def _deannotate(
  172. self,
  173. values: Sequence[str] = ...,
  174. clone: bool = ...,
  175. ) -> SupportsAnnotations: ...
  176. def _deannotate(
  177. self,
  178. values: Optional[Sequence[str]] = None,
  179. clone: bool = False,
  180. ) -> SupportsAnnotations:
  181. """return a copy of this :class:`_expression.ClauseElement`
  182. with annotations
  183. removed.
  184. :param values: optional tuple of individual values
  185. to remove.
  186. """
  187. if clone or self._annotations:
  188. # clone is used when we are also copying
  189. # the expression for a deep deannotation
  190. new = self._clone()
  191. new._annotations = util.immutabledict()
  192. new.__dict__.pop("_annotations_cache_key", None)
  193. return new
  194. else:
  195. return self
  196. class Annotated(SupportsAnnotations):
  197. """clones a SupportsAnnotations and applies an 'annotations' dictionary.
  198. Unlike regular clones, this clone also mimics __hash__() and
  199. __eq__() of the original element so that it takes its place
  200. in hashed collections.
  201. A reference to the original element is maintained, for the important
  202. reason of keeping its hash value current. When GC'ed, the
  203. hash value may be reused, causing conflicts.
  204. .. note:: The rationale for Annotated producing a brand new class,
  205. rather than placing the functionality directly within ClauseElement,
  206. is **performance**. The __hash__() method is absent on plain
  207. ClauseElement which leads to significantly reduced function call
  208. overhead, as the use of sets and dictionaries against ClauseElement
  209. objects is prevalent, but most are not "annotated".
  210. """
  211. _is_column_operators = False
  212. @classmethod
  213. def _as_annotated_instance(
  214. cls, element: SupportsWrappingAnnotations, values: _AnnotationDict
  215. ) -> Annotated:
  216. try:
  217. cls = annotated_classes[element.__class__]
  218. except KeyError:
  219. cls = _new_annotation_type(element.__class__, cls)
  220. return cls(element, values)
  221. _annotations: util.immutabledict[str, Any]
  222. __element: SupportsWrappingAnnotations
  223. _hash: int
  224. def __new__(cls: Type[Self], *args: Any) -> Self:
  225. return object.__new__(cls)
  226. def __init__(
  227. self, element: SupportsWrappingAnnotations, values: _AnnotationDict
  228. ):
  229. self.__dict__ = element.__dict__.copy()
  230. self.__dict__.pop("_annotations_cache_key", None)
  231. self.__dict__.pop("_generate_cache_key", None)
  232. self.__element = element
  233. self._annotations = util.immutabledict(values)
  234. self._hash = hash(element)
  235. def _annotate(self, values: _AnnotationDict) -> Self:
  236. _values = self._annotations.union(values)
  237. new = self._with_annotations(_values)
  238. return new
  239. def _with_annotations(self, values: _AnnotationDict) -> Self:
  240. clone = self.__class__.__new__(self.__class__)
  241. clone.__dict__ = self.__dict__.copy()
  242. clone.__dict__.pop("_annotations_cache_key", None)
  243. clone.__dict__.pop("_generate_cache_key", None)
  244. clone._annotations = util.immutabledict(values)
  245. return clone
  246. @overload
  247. def _deannotate(
  248. self,
  249. values: Literal[None] = ...,
  250. clone: bool = ...,
  251. ) -> Self: ...
  252. @overload
  253. def _deannotate(
  254. self,
  255. values: Sequence[str] = ...,
  256. clone: bool = ...,
  257. ) -> Annotated: ...
  258. def _deannotate(
  259. self,
  260. values: Optional[Sequence[str]] = None,
  261. clone: bool = True,
  262. ) -> SupportsAnnotations:
  263. if values is None:
  264. return self.__element
  265. else:
  266. return self._with_annotations(
  267. util.immutabledict(
  268. {
  269. key: value
  270. for key, value in self._annotations.items()
  271. if key not in values
  272. }
  273. )
  274. )
  275. if not typing.TYPE_CHECKING:
  276. # manually proxy some methods that need extra attention
  277. def _compiler_dispatch(self, visitor: Any, **kw: Any) -> Any:
  278. return self.__element.__class__._compiler_dispatch(
  279. self, visitor, **kw
  280. )
  281. @property
  282. def _constructor(self):
  283. return self.__element._constructor
  284. def _clone(self, **kw: Any) -> Self:
  285. clone = self.__element._clone(**kw)
  286. if clone is self.__element:
  287. # detect immutable, don't change anything
  288. return self
  289. else:
  290. # update the clone with any changes that have occurred
  291. # to this object's __dict__.
  292. clone.__dict__.update(self.__dict__)
  293. return self.__class__(clone, self._annotations)
  294. def __reduce__(self) -> Tuple[Type[Annotated], Tuple[Any, ...]]:
  295. return self.__class__, (self.__element, self._annotations)
  296. def __hash__(self) -> int:
  297. return self._hash
  298. def __eq__(self, other: Any) -> bool:
  299. if self._is_column_operators:
  300. return self.__element.__class__.__eq__(self, other)
  301. else:
  302. return hash(other) == hash(self)
  303. @util.ro_non_memoized_property
  304. def entity_namespace(self) -> _EntityNamespace:
  305. if "entity_namespace" in self._annotations:
  306. return cast(
  307. SupportsWrappingAnnotations,
  308. self._annotations["entity_namespace"],
  309. ).entity_namespace
  310. else:
  311. return self.__element.entity_namespace
  312. # hard-generate Annotated subclasses. this technique
  313. # is used instead of on-the-fly types (i.e. type.__new__())
  314. # so that the resulting objects are pickleable; additionally, other
  315. # decisions can be made up front about the type of object being annotated
  316. # just once per class rather than per-instance.
  317. annotated_classes: Dict[Type[SupportsWrappingAnnotations], Type[Annotated]] = (
  318. {}
  319. )
  320. _SA = TypeVar("_SA", bound="SupportsAnnotations")
  321. def _safe_annotate(to_annotate: _SA, annotations: _AnnotationDict) -> _SA:
  322. try:
  323. _annotate = to_annotate._annotate
  324. except AttributeError:
  325. # skip objects that don't actually have an `_annotate`
  326. # attribute, namely QueryableAttribute inside of a join
  327. # condition
  328. return to_annotate
  329. else:
  330. return _annotate(annotations)
  331. def _deep_annotate(
  332. element: _SA,
  333. annotations: _AnnotationDict,
  334. exclude: Optional[Sequence[SupportsAnnotations]] = None,
  335. *,
  336. detect_subquery_cols: bool = False,
  337. ind_cols_on_fromclause: bool = False,
  338. annotate_callable: Optional[
  339. Callable[[SupportsAnnotations, _AnnotationDict], SupportsAnnotations]
  340. ] = None,
  341. ) -> _SA:
  342. """Deep copy the given ClauseElement, annotating each element
  343. with the given annotations dictionary.
  344. Elements within the exclude collection will be cloned but not annotated.
  345. """
  346. # annotated objects hack the __hash__() method so if we want to
  347. # uniquely process them we have to use id()
  348. cloned_ids: Dict[int, SupportsAnnotations] = {}
  349. def clone(elem: SupportsAnnotations, **kw: Any) -> SupportsAnnotations:
  350. # ind_cols_on_fromclause means make sure an AnnotatedFromClause
  351. # has its own .c collection independent of that which its proxying.
  352. # this is used specifically by orm.LoaderCriteriaOption to break
  353. # a reference cycle that it's otherwise prone to building,
  354. # see test_relationship_criteria->
  355. # test_loader_criteria_subquery_w_same_entity. logic here was
  356. # changed for #8796 and made explicit; previously it occurred
  357. # by accident
  358. kw["detect_subquery_cols"] = detect_subquery_cols
  359. id_ = id(elem)
  360. if id_ in cloned_ids:
  361. return cloned_ids[id_]
  362. if (
  363. exclude
  364. and hasattr(elem, "proxy_set")
  365. and elem.proxy_set.intersection(exclude)
  366. ):
  367. newelem = elem._clone(clone=clone, **kw)
  368. elif annotations != elem._annotations:
  369. if detect_subquery_cols and elem._is_immutable:
  370. to_annotate = elem._clone(clone=clone, **kw)
  371. else:
  372. to_annotate = elem
  373. if annotate_callable:
  374. newelem = annotate_callable(to_annotate, annotations)
  375. else:
  376. newelem = _safe_annotate(to_annotate, annotations)
  377. else:
  378. newelem = elem
  379. newelem._copy_internals(
  380. clone=clone, ind_cols_on_fromclause=ind_cols_on_fromclause
  381. )
  382. cloned_ids[id_] = newelem
  383. return newelem
  384. if element is not None:
  385. element = cast(_SA, clone(element))
  386. clone = None # type: ignore # remove gc cycles
  387. return element
  388. @overload
  389. def _deep_deannotate(
  390. element: Literal[None], values: Optional[Sequence[str]] = None
  391. ) -> Literal[None]: ...
  392. @overload
  393. def _deep_deannotate(
  394. element: _SA, values: Optional[Sequence[str]] = None
  395. ) -> _SA: ...
  396. def _deep_deannotate(
  397. element: Optional[_SA], values: Optional[Sequence[str]] = None
  398. ) -> Optional[_SA]:
  399. """Deep copy the given element, removing annotations."""
  400. cloned: Dict[Any, SupportsAnnotations] = {}
  401. def clone(elem: SupportsAnnotations, **kw: Any) -> SupportsAnnotations:
  402. key: Any
  403. if values:
  404. key = id(elem)
  405. else:
  406. key = elem
  407. if key not in cloned:
  408. newelem = elem._deannotate(values=values, clone=True)
  409. newelem._copy_internals(clone=clone)
  410. cloned[key] = newelem
  411. return newelem
  412. else:
  413. return cloned[key]
  414. if element is not None:
  415. element = cast(_SA, clone(element))
  416. clone = None # type: ignore # remove gc cycles
  417. return element
  418. def _shallow_annotate(element: _SA, annotations: _AnnotationDict) -> _SA:
  419. """Annotate the given ClauseElement and copy its internals so that
  420. internal objects refer to the new annotated object.
  421. Basically used to apply a "don't traverse" annotation to a
  422. selectable, without digging throughout the whole
  423. structure wasting time.
  424. """
  425. element = element._annotate(annotations)
  426. element._copy_internals()
  427. return element
  428. def _new_annotation_type(
  429. cls: Type[SupportsWrappingAnnotations], base_cls: Type[Annotated]
  430. ) -> Type[Annotated]:
  431. """Generates a new class that subclasses Annotated and proxies a given
  432. element type.
  433. """
  434. if issubclass(cls, Annotated):
  435. return cls
  436. elif cls in annotated_classes:
  437. return annotated_classes[cls]
  438. for super_ in cls.__mro__:
  439. # check if an Annotated subclass more specific than
  440. # the given base_cls is already registered, such
  441. # as AnnotatedColumnElement.
  442. if super_ in annotated_classes:
  443. base_cls = annotated_classes[super_]
  444. break
  445. annotated_classes[cls] = anno_cls = cast(
  446. Type[Annotated],
  447. type("Annotated%s" % cls.__name__, (base_cls, cls), {}),
  448. )
  449. globals()["Annotated%s" % cls.__name__] = anno_cls
  450. if "_traverse_internals" in cls.__dict__:
  451. anno_cls._traverse_internals = list(cls._traverse_internals) + [
  452. ("_annotations", InternalTraversal.dp_annotations_key)
  453. ]
  454. elif cls.__dict__.get("inherit_cache", False):
  455. anno_cls._traverse_internals = list(cls._traverse_internals) + [
  456. ("_annotations", InternalTraversal.dp_annotations_key)
  457. ]
  458. # some classes include this even if they have traverse_internals
  459. # e.g. BindParameter, add it if present.
  460. if cls.__dict__.get("inherit_cache", False):
  461. anno_cls.inherit_cache = True # type: ignore
  462. elif "inherit_cache" in cls.__dict__:
  463. anno_cls.inherit_cache = cls.__dict__["inherit_cache"] # type: ignore
  464. anno_cls._is_column_operators = issubclass(cls, operators.ColumnOperators)
  465. return anno_cls
  466. def _prepare_annotations(
  467. target_hierarchy: Type[SupportsWrappingAnnotations],
  468. base_cls: Type[Annotated],
  469. ) -> None:
  470. for cls in util.walk_subclasses(target_hierarchy):
  471. _new_annotation_type(cls, base_cls)