attr.py 20 KB


  1. # event/attr.py
  2. # Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. """Attribute implementation for _Dispatch classes.
  8. The various listener targets for a particular event class are represented
  9. as attributes, which refer to collections of listeners to be fired off.
  10. These collections can exist at the class level as well as at the instance
  11. level. An event is fired off using code like this::
  12. some_object.dispatch.first_connect(arg1, arg2)
  13. Above, ``some_object.dispatch`` would be an instance of ``_Dispatch`` and
  14. ``first_connect`` is typically an instance of ``_ListenerCollection``
  15. if event listeners are present, or ``_EmptyListener`` if none are present.
  16. The attribute mechanics here spend effort trying to ensure listener functions
  17. are available with a minimum of function call overhead, that unnecessary
  18. objects aren't created (i.e. many empty per-instance listener collections),
  19. as well as that everything is garbage collectable when owning references are
  20. lost. Other features such as "propagation" of listener functions across
  21. many ``_Dispatch`` instances, "joining" of multiple ``_Dispatch`` instances,
  22. as well as support for subclass propagation (e.g. events assigned to
  23. ``Pool`` vs. ``QueuePool``) are all implemented here.
  24. """
  25. from __future__ import annotations
  26. import collections
  27. from itertools import chain
  28. import threading
  29. from types import TracebackType
  30. import typing
  31. from typing import Any
  32. from typing import cast
  33. from typing import Collection
  34. from typing import Deque
  35. from typing import FrozenSet
  36. from typing import Generic
  37. from typing import Iterator
  38. from typing import MutableMapping
  39. from typing import MutableSequence
  40. from typing import NoReturn
  41. from typing import Optional
  42. from typing import Sequence
  43. from typing import Set
  44. from typing import Tuple
  45. from typing import Type
  46. from typing import TypeVar
  47. from typing import Union
  48. import weakref
  49. from . import legacy
  50. from . import registry
  51. from .registry import _ET
  52. from .registry import _EventKey
  53. from .registry import _ListenerFnType
  54. from .. import exc
  55. from .. import util
  56. from ..util.concurrency import AsyncAdaptedLock
  57. from ..util.typing import Protocol
  58. _T = TypeVar("_T", bound=Any)
  59. if typing.TYPE_CHECKING:
  60. from .base import _Dispatch
  61. from .base import _DispatchCommon
  62. from .base import _HasEventsDispatch
  63. class RefCollection(util.MemoizedSlots, Generic[_ET]):
  64. __slots__ = ("ref",)
  65. ref: weakref.ref[RefCollection[_ET]]
  66. def _memoized_attr_ref(self) -> weakref.ref[RefCollection[_ET]]:
  67. return weakref.ref(self, registry._collection_gced)
  68. class _empty_collection(Collection[_T]):
  69. def append(self, element: _T) -> None:
  70. pass
  71. def appendleft(self, element: _T) -> None:
  72. pass
  73. def extend(self, other: Sequence[_T]) -> None:
  74. pass
  75. def remove(self, element: _T) -> None:
  76. pass
  77. def __contains__(self, element: Any) -> bool:
  78. return False
  79. def __iter__(self) -> Iterator[_T]:
  80. return iter([])
  81. def clear(self) -> None:
  82. pass
  83. def __len__(self) -> int:
  84. return 0
  85. _ListenerFnSequenceType = Union[Deque[_T], _empty_collection[_T]]
  86. class _ClsLevelDispatch(RefCollection[_ET]):
  87. """Class-level events on :class:`._Dispatch` classes."""
  88. __slots__ = (
  89. "clsname",
  90. "name",
  91. "arg_names",
  92. "has_kw",
  93. "legacy_signatures",
  94. "_clslevel",
  95. "__weakref__",
  96. )
  97. clsname: str
  98. name: str
  99. arg_names: Sequence[str]
  100. has_kw: bool
  101. legacy_signatures: MutableSequence[legacy._LegacySignatureType]
  102. _clslevel: MutableMapping[
  103. Type[_ET], _ListenerFnSequenceType[_ListenerFnType]
  104. ]
  105. def __init__(
  106. self,
  107. parent_dispatch_cls: Type[_HasEventsDispatch[_ET]],
  108. fn: _ListenerFnType,
  109. ):
  110. self.name = fn.__name__
  111. self.clsname = parent_dispatch_cls.__name__
  112. argspec = util.inspect_getfullargspec(fn)
  113. self.arg_names = argspec.args[1:]
  114. self.has_kw = bool(argspec.varkw)
  115. self.legacy_signatures = list(
  116. reversed(
  117. sorted(
  118. getattr(fn, "_legacy_signatures", []), key=lambda s: s[0]
  119. )
  120. )
  121. )
  122. fn.__doc__ = legacy._augment_fn_docs(self, parent_dispatch_cls, fn)
  123. self._clslevel = weakref.WeakKeyDictionary()
  124. def _adjust_fn_spec(
  125. self, fn: _ListenerFnType, named: bool
  126. ) -> _ListenerFnType:
  127. if named:
  128. fn = self._wrap_fn_for_kw(fn)
  129. if self.legacy_signatures:
  130. try:
  131. argspec = util.get_callable_argspec(fn, no_self=True)
  132. except TypeError:
  133. pass
  134. else:
  135. fn = legacy._wrap_fn_for_legacy(self, fn, argspec)
  136. return fn
  137. def _wrap_fn_for_kw(self, fn: _ListenerFnType) -> _ListenerFnType:
  138. def wrap_kw(*args: Any, **kw: Any) -> Any:
  139. argdict = dict(zip(self.arg_names, args))
  140. argdict.update(kw)
  141. return fn(**argdict)
  142. return wrap_kw
  143. def _do_insert_or_append(
  144. self, event_key: _EventKey[_ET], is_append: bool
  145. ) -> None:
  146. target = event_key.dispatch_target
  147. assert isinstance(
  148. target, type
  149. ), "Class-level Event targets must be classes."
  150. if not getattr(target, "_sa_propagate_class_events", True):
  151. raise exc.InvalidRequestError(
  152. f"Can't assign an event directly to the {target} class"
  153. )
  154. cls: Type[_ET]
  155. for cls in util.walk_subclasses(target):
  156. if cls is not target and cls not in self._clslevel:
  157. self.update_subclass(cls)
  158. else:
  159. if cls not in self._clslevel:
  160. self.update_subclass(cls)
  161. if is_append:
  162. self._clslevel[cls].append(event_key._listen_fn)
  163. else:
  164. self._clslevel[cls].appendleft(event_key._listen_fn)
  165. registry._stored_in_collection(event_key, self)
  166. def insert(self, event_key: _EventKey[_ET], propagate: bool) -> None:
  167. self._do_insert_or_append(event_key, is_append=False)
  168. def append(self, event_key: _EventKey[_ET], propagate: bool) -> None:
  169. self._do_insert_or_append(event_key, is_append=True)
  170. def update_subclass(self, target: Type[_ET]) -> None:
  171. if target not in self._clslevel:
  172. if getattr(target, "_sa_propagate_class_events", True):
  173. self._clslevel[target] = collections.deque()
  174. else:
  175. self._clslevel[target] = _empty_collection()
  176. clslevel = self._clslevel[target]
  177. cls: Type[_ET]
  178. for cls in target.__mro__[1:]:
  179. if cls in self._clslevel:
  180. clslevel.extend(
  181. [fn for fn in self._clslevel[cls] if fn not in clslevel]
  182. )
  183. def remove(self, event_key: _EventKey[_ET]) -> None:
  184. target = event_key.dispatch_target
  185. cls: Type[_ET]
  186. for cls in util.walk_subclasses(target):
  187. if cls in self._clslevel:
  188. self._clslevel[cls].remove(event_key._listen_fn)
  189. registry._removed_from_collection(event_key, self)
  190. def clear(self) -> None:
  191. """Clear all class level listeners"""
  192. to_clear: Set[_ListenerFnType] = set()
  193. for dispatcher in self._clslevel.values():
  194. to_clear.update(dispatcher)
  195. dispatcher.clear()
  196. registry._clear(self, to_clear)
  197. def for_modify(self, obj: _Dispatch[_ET]) -> _ClsLevelDispatch[_ET]:
  198. """Return an event collection which can be modified.
  199. For _ClsLevelDispatch at the class level of
  200. a dispatcher, this returns self.
  201. """
  202. return self
  203. class _InstanceLevelDispatch(RefCollection[_ET], Collection[_ListenerFnType]):
  204. __slots__ = ()
  205. parent: _ClsLevelDispatch[_ET]
  206. def _adjust_fn_spec(
  207. self, fn: _ListenerFnType, named: bool
  208. ) -> _ListenerFnType:
  209. return self.parent._adjust_fn_spec(fn, named)
  210. def __contains__(self, item: Any) -> bool:
  211. raise NotImplementedError()
  212. def __len__(self) -> int:
  213. raise NotImplementedError()
  214. def __iter__(self) -> Iterator[_ListenerFnType]:
  215. raise NotImplementedError()
  216. def __bool__(self) -> bool:
  217. raise NotImplementedError()
  218. def exec_once(self, *args: Any, **kw: Any) -> None:
  219. raise NotImplementedError()
  220. def exec_once_unless_exception(self, *args: Any, **kw: Any) -> None:
  221. raise NotImplementedError()
  222. def _exec_w_sync_on_first_run(self, *args: Any, **kw: Any) -> None:
  223. raise NotImplementedError()
  224. def __call__(self, *args: Any, **kw: Any) -> None:
  225. raise NotImplementedError()
  226. def insert(self, event_key: _EventKey[_ET], propagate: bool) -> None:
  227. raise NotImplementedError()
  228. def append(self, event_key: _EventKey[_ET], propagate: bool) -> None:
  229. raise NotImplementedError()
  230. def remove(self, event_key: _EventKey[_ET]) -> None:
  231. raise NotImplementedError()
  232. def for_modify(
  233. self, obj: _DispatchCommon[_ET]
  234. ) -> _InstanceLevelDispatch[_ET]:
  235. """Return an event collection which can be modified.
  236. For _ClsLevelDispatch at the class level of
  237. a dispatcher, this returns self.
  238. """
  239. return self
  240. class _EmptyListener(_InstanceLevelDispatch[_ET]):
  241. """Serves as a proxy interface to the events
  242. served by a _ClsLevelDispatch, when there are no
  243. instance-level events present.
  244. Is replaced by _ListenerCollection when instance-level
  245. events are added.
  246. """
  247. __slots__ = "parent", "parent_listeners", "name"
  248. propagate: FrozenSet[_ListenerFnType] = frozenset()
  249. listeners: Tuple[()] = ()
  250. parent: _ClsLevelDispatch[_ET]
  251. parent_listeners: _ListenerFnSequenceType[_ListenerFnType]
  252. name: str
  253. def __init__(self, parent: _ClsLevelDispatch[_ET], target_cls: Type[_ET]):
  254. if target_cls not in parent._clslevel:
  255. parent.update_subclass(target_cls)
  256. self.parent = parent
  257. self.parent_listeners = parent._clslevel[target_cls]
  258. self.name = parent.name
  259. def for_modify(
  260. self, obj: _DispatchCommon[_ET]
  261. ) -> _ListenerCollection[_ET]:
  262. """Return an event collection which can be modified.
  263. For _EmptyListener at the instance level of
  264. a dispatcher, this generates a new
  265. _ListenerCollection, applies it to the instance,
  266. and returns it.
  267. """
  268. obj = cast("_Dispatch[_ET]", obj)
  269. assert obj._instance_cls is not None
  270. result = _ListenerCollection(self.parent, obj._instance_cls)
  271. if getattr(obj, self.name) is self:
  272. setattr(obj, self.name, result)
  273. else:
  274. assert isinstance(getattr(obj, self.name), _JoinedListener)
  275. return result
  276. def _needs_modify(self, *args: Any, **kw: Any) -> NoReturn:
  277. raise NotImplementedError("need to call for_modify()")
  278. def exec_once(self, *args: Any, **kw: Any) -> NoReturn:
  279. self._needs_modify(*args, **kw)
  280. def exec_once_unless_exception(self, *args: Any, **kw: Any) -> NoReturn:
  281. self._needs_modify(*args, **kw)
  282. def insert(self, *args: Any, **kw: Any) -> NoReturn:
  283. self._needs_modify(*args, **kw)
  284. def append(self, *args: Any, **kw: Any) -> NoReturn:
  285. self._needs_modify(*args, **kw)
  286. def remove(self, *args: Any, **kw: Any) -> NoReturn:
  287. self._needs_modify(*args, **kw)
  288. def clear(self, *args: Any, **kw: Any) -> NoReturn:
  289. self._needs_modify(*args, **kw)
  290. def __call__(self, *args: Any, **kw: Any) -> None:
  291. """Execute this event."""
  292. for fn in self.parent_listeners:
  293. fn(*args, **kw)
  294. def __contains__(self, item: Any) -> bool:
  295. return item in self.parent_listeners
  296. def __len__(self) -> int:
  297. return len(self.parent_listeners)
  298. def __iter__(self) -> Iterator[_ListenerFnType]:
  299. return iter(self.parent_listeners)
  300. def __bool__(self) -> bool:
  301. return bool(self.parent_listeners)
  302. class _MutexProtocol(Protocol):
  303. def __enter__(self) -> bool: ...
  304. def __exit__(
  305. self,
  306. exc_type: Optional[Type[BaseException]],
  307. exc_val: Optional[BaseException],
  308. exc_tb: Optional[TracebackType],
  309. ) -> Optional[bool]: ...
  310. class _CompoundListener(_InstanceLevelDispatch[_ET]):
  311. __slots__ = (
  312. "_exec_once_mutex",
  313. "_exec_once",
  314. "_exec_w_sync_once",
  315. "_is_asyncio",
  316. )
  317. _exec_once_mutex: _MutexProtocol
  318. parent_listeners: Collection[_ListenerFnType]
  319. listeners: Collection[_ListenerFnType]
  320. _exec_once: bool
  321. _exec_w_sync_once: bool
  322. def __init__(self, *arg: Any, **kw: Any):
  323. super().__init__(*arg, **kw)
  324. self._is_asyncio = False
  325. def _set_asyncio(self) -> None:
  326. self._is_asyncio = True
  327. def _memoized_attr__exec_once_mutex(self) -> _MutexProtocol:
  328. if self._is_asyncio:
  329. return AsyncAdaptedLock()
  330. else:
  331. return threading.Lock()
  332. def _exec_once_impl(
  333. self, retry_on_exception: bool, *args: Any, **kw: Any
  334. ) -> None:
  335. with self._exec_once_mutex:
  336. if not self._exec_once:
  337. try:
  338. self(*args, **kw)
  339. exception = False
  340. except:
  341. exception = True
  342. raise
  343. finally:
  344. if not exception or not retry_on_exception:
  345. self._exec_once = True
  346. def exec_once(self, *args: Any, **kw: Any) -> None:
  347. """Execute this event, but only if it has not been
  348. executed already for this collection."""
  349. if not self._exec_once:
  350. self._exec_once_impl(False, *args, **kw)
  351. def exec_once_unless_exception(self, *args: Any, **kw: Any) -> None:
  352. """Execute this event, but only if it has not been
  353. executed already for this collection, or was called
  354. by a previous exec_once_unless_exception call and
  355. raised an exception.
  356. If exec_once was already called, then this method will never run
  357. the callable regardless of whether it raised or not.
  358. .. versionadded:: 1.3.8
  359. """
  360. if not self._exec_once:
  361. self._exec_once_impl(True, *args, **kw)
  362. def _exec_w_sync_on_first_run(self, *args: Any, **kw: Any) -> None:
  363. """Execute this event, and use a mutex if it has not been
  364. executed already for this collection, or was called
  365. by a previous _exec_w_sync_on_first_run call and
  366. raised an exception.
  367. If _exec_w_sync_on_first_run was already called and didn't raise an
  368. exception, then a mutex is not used.
  369. .. versionadded:: 1.4.11
  370. """
  371. if not self._exec_w_sync_once:
  372. with self._exec_once_mutex:
  373. try:
  374. self(*args, **kw)
  375. except:
  376. raise
  377. else:
  378. self._exec_w_sync_once = True
  379. else:
  380. self(*args, **kw)
  381. def __call__(self, *args: Any, **kw: Any) -> None:
  382. """Execute this event."""
  383. for fn in self.parent_listeners:
  384. fn(*args, **kw)
  385. for fn in self.listeners:
  386. fn(*args, **kw)
  387. def __contains__(self, item: Any) -> bool:
  388. return item in self.parent_listeners or item in self.listeners
  389. def __len__(self) -> int:
  390. return len(self.parent_listeners) + len(self.listeners)
  391. def __iter__(self) -> Iterator[_ListenerFnType]:
  392. return chain(self.parent_listeners, self.listeners)
  393. def __bool__(self) -> bool:
  394. return bool(self.listeners or self.parent_listeners)
  395. class _ListenerCollection(_CompoundListener[_ET]):
  396. """Instance-level attributes on instances of :class:`._Dispatch`.
  397. Represents a collection of listeners.
  398. As of 0.7.9, _ListenerCollection is only first
  399. created via the _EmptyListener.for_modify() method.
  400. """
  401. __slots__ = (
  402. "parent_listeners",
  403. "parent",
  404. "name",
  405. "listeners",
  406. "propagate",
  407. "__weakref__",
  408. )
  409. parent_listeners: Collection[_ListenerFnType]
  410. parent: _ClsLevelDispatch[_ET]
  411. name: str
  412. listeners: Deque[_ListenerFnType]
  413. propagate: Set[_ListenerFnType]
  414. def __init__(self, parent: _ClsLevelDispatch[_ET], target_cls: Type[_ET]):
  415. super().__init__()
  416. if target_cls not in parent._clslevel:
  417. parent.update_subclass(target_cls)
  418. self._exec_once = False
  419. self._exec_w_sync_once = False
  420. self.parent_listeners = parent._clslevel[target_cls]
  421. self.parent = parent
  422. self.name = parent.name
  423. self.listeners = collections.deque()
  424. self.propagate = set()
  425. def for_modify(
  426. self, obj: _DispatchCommon[_ET]
  427. ) -> _ListenerCollection[_ET]:
  428. """Return an event collection which can be modified.
  429. For _ListenerCollection at the instance level of
  430. a dispatcher, this returns self.
  431. """
  432. return self
  433. def _update(
  434. self, other: _ListenerCollection[_ET], only_propagate: bool = True
  435. ) -> None:
  436. """Populate from the listeners in another :class:`_Dispatch`
  437. object."""
  438. existing_listeners = self.listeners
  439. existing_listener_set = set(existing_listeners)
  440. self.propagate.update(other.propagate)
  441. other_listeners = [
  442. l
  443. for l in other.listeners
  444. if l not in existing_listener_set
  445. and not only_propagate
  446. or l in self.propagate
  447. ]
  448. existing_listeners.extend(other_listeners)
  449. if other._is_asyncio:
  450. self._set_asyncio()
  451. to_associate = other.propagate.union(other_listeners)
  452. registry._stored_in_collection_multi(self, other, to_associate)
  453. def insert(self, event_key: _EventKey[_ET], propagate: bool) -> None:
  454. if event_key.prepend_to_list(self, self.listeners):
  455. if propagate:
  456. self.propagate.add(event_key._listen_fn)
  457. def append(self, event_key: _EventKey[_ET], propagate: bool) -> None:
  458. if event_key.append_to_list(self, self.listeners):
  459. if propagate:
  460. self.propagate.add(event_key._listen_fn)
  461. def remove(self, event_key: _EventKey[_ET]) -> None:
  462. self.listeners.remove(event_key._listen_fn)
  463. self.propagate.discard(event_key._listen_fn)
  464. registry._removed_from_collection(event_key, self)
  465. def clear(self) -> None:
  466. registry._clear(self, self.listeners)
  467. self.propagate.clear()
  468. self.listeners.clear()
  469. class _JoinedListener(_CompoundListener[_ET]):
  470. __slots__ = "parent_dispatch", "name", "local", "parent_listeners"
  471. parent_dispatch: _DispatchCommon[_ET]
  472. name: str
  473. local: _InstanceLevelDispatch[_ET]
  474. parent_listeners: Collection[_ListenerFnType]
  475. def __init__(
  476. self,
  477. parent_dispatch: _DispatchCommon[_ET],
  478. name: str,
  479. local: _EmptyListener[_ET],
  480. ):
  481. self._exec_once = False
  482. self.parent_dispatch = parent_dispatch
  483. self.name = name
  484. self.local = local
  485. self.parent_listeners = self.local
  486. if not typing.TYPE_CHECKING:
  487. # first error, I don't really understand:
  488. # Signature of "listeners" incompatible with
  489. # supertype "_CompoundListener" [override]
  490. # the name / return type are exactly the same
  491. # second error is getattr_isn't typed, the cast() here
  492. # adds too much method overhead
  493. @property
  494. def listeners(self) -> Collection[_ListenerFnType]:
  495. return getattr(self.parent_dispatch, self.name)
  496. def _adjust_fn_spec(
  497. self, fn: _ListenerFnType, named: bool
  498. ) -> _ListenerFnType:
  499. return self.local._adjust_fn_spec(fn, named)
  500. def for_modify(self, obj: _DispatchCommon[_ET]) -> _JoinedListener[_ET]:
  501. self.local = self.parent_listeners = self.local.for_modify(obj)
  502. return self
  503. def insert(self, event_key: _EventKey[_ET], propagate: bool) -> None:
  504. self.local.insert(event_key, propagate)
  505. def append(self, event_key: _EventKey[_ET], propagate: bool) -> None:
  506. self.local.append(event_key, propagate)
  507. def remove(self, event_key: _EventKey[_ET]) -> None:
  508. self.local.remove(event_key)
  509. def clear(self) -> None:
  510. raise NotImplementedError()