| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627 |
- # orm/collections.py
- # Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
- # <see AUTHORS file>
- #
- # This module is part of SQLAlchemy and is released under
- # the MIT License: https://www.opensource.org/licenses/mit-license.php
- # mypy: allow-untyped-defs, allow-untyped-calls
- """Support for collections of mapped entities.
- The collections package supplies the machinery used to inform the ORM of
- collection membership changes. An instrumentation via decoration approach is
- used, allowing arbitrary types (including built-ins) to be used as entity
- collections without requiring inheritance from a base class.
- Instrumentation decoration relays membership change events to the
- :class:`.CollectionAttributeImpl` that is currently managing the collection.
- The decorators observe function call arguments and return values, tracking
- entities entering or leaving the collection. Two decorator approaches are
- provided. One is a bundle of generic decorators that map function arguments
- and return values to events::
- from sqlalchemy.orm.collections import collection
- class MyClass:
- # ...
- @collection.adds(1)
- def store(self, item):
- self.data.append(item)
- @collection.removes_return()
- def pop(self):
- return self.data.pop()
- The second approach is a bundle of targeted decorators that wrap appropriate
- append and remove notifiers around the mutation methods present in the
- standard Python ``list``, ``set`` and ``dict`` interfaces. These could be
- specified in terms of generic decorator recipes, but are instead hand-tooled
- for increased efficiency. The targeted decorators occasionally implement
- adapter-like behavior, such as mapping bulk-set methods (``extend``,
- ``update``, ``__setslice__``, etc.) into the series of atomic mutation events
- that the ORM requires.
- The targeted decorators are used internally for automatic instrumentation of
- entity collection classes. Every collection class goes through a
- transformation process roughly like so:
- 1. If the class is a built-in, substitute a trivial sub-class
- 2. Is this class already instrumented?
- 3. Add in generic decorators
- 4. Sniff out the collection interface through duck-typing
- 5. Add targeted decoration to any undecorated interface method
- This process modifies the class at runtime, decorating methods and adding some
- bookkeeping properties. This isn't possible (or desirable) for built-in
- classes like ``list``, so trivial sub-classes are substituted to hold
- decoration::
- class InstrumentedList(list):
- pass
- Collection classes can be specified in ``relationship(collection_class=)`` as
- types or a function that returns an instance. Collection classes are
- inspected and instrumented during the mapper compilation phase. The
- collection_class callable will be executed once to produce a specimen
- instance, and the type of that specimen will be instrumented. Functions that
- return built-in types like ``lists`` will be adapted to produce instrumented
- instances.
- When extending a known type like ``list``, additional decorations are not
- generally not needed. Odds are, the extension method will delegate to a
- method that's already instrumented. For example::
- class QueueIsh(list):
- def push(self, item):
- self.append(item)
- def shift(self):
- return self.pop(0)
- There's no need to decorate these methods. ``append`` and ``pop`` are already
- instrumented as part of the ``list`` interface. Decorating them would fire
- duplicate events, which should be avoided.
- The targeted decoration tries not to rely on other methods in the underlying
- collection class, but some are unavoidable. Many depend on 'read' methods
- being present to properly instrument a 'write', for example, ``__setitem__``
- needs ``__getitem__``. "Bulk" methods like ``update`` and ``extend`` may also
- reimplemented in terms of atomic appends and removes, so the ``extend``
- decoration will actually perform many ``append`` operations and not call the
- underlying method at all.
- Tight control over bulk operation and the firing of events is also possible by
- implementing the instrumentation internally in your methods. The basic
- instrumentation package works under the general assumption that collection
- mutation will not raise unusual exceptions. If you want to closely
- orchestrate append and remove events with exception management, internal
- instrumentation may be the answer. Within your method,
- ``collection_adapter(self)`` will retrieve an object that you can use for
- explicit control over triggering append and remove events.
- The owning object and :class:`.CollectionAttributeImpl` are also reachable
- through the adapter, allowing for some very sophisticated behavior.
- """
- from __future__ import annotations
- import operator
- import threading
- import typing
- from typing import Any
- from typing import Callable
- from typing import cast
- from typing import Collection
- from typing import Dict
- from typing import Iterable
- from typing import List
- from typing import NoReturn
- from typing import Optional
- from typing import Set
- from typing import Tuple
- from typing import Type
- from typing import TYPE_CHECKING
- from typing import TypeVar
- from typing import Union
- import weakref
- from .base import NO_KEY
- from .. import exc as sa_exc
- from .. import util
- from ..sql.base import NO_ARG
- from ..util.compat import inspect_getfullargspec
- from ..util.typing import Protocol
- if typing.TYPE_CHECKING:
- from .attributes import AttributeEventToken
- from .attributes import CollectionAttributeImpl
- from .mapped_collection import attribute_keyed_dict
- from .mapped_collection import column_keyed_dict
- from .mapped_collection import keyfunc_mapping
- from .mapped_collection import KeyFuncDict # noqa: F401
- from .state import InstanceState
- __all__ = [
- "collection",
- "collection_adapter",
- "keyfunc_mapping",
- "column_keyed_dict",
- "attribute_keyed_dict",
- "KeyFuncDict",
- # old names in < 2.0
- "mapped_collection",
- "column_mapped_collection",
- "attribute_mapped_collection",
- "MappedCollection",
- ]
- __instrumentation_mutex = threading.Lock()
- _CollectionFactoryType = Callable[[], "_AdaptedCollectionProtocol"]
- _T = TypeVar("_T", bound=Any)
- _KT = TypeVar("_KT", bound=Any)
- _VT = TypeVar("_VT", bound=Any)
- _COL = TypeVar("_COL", bound="Collection[Any]")
- _FN = TypeVar("_FN", bound="Callable[..., Any]")
- class _CollectionConverterProtocol(Protocol):
- def __call__(self, collection: _COL) -> _COL: ...
- class _AdaptedCollectionProtocol(Protocol):
- _sa_adapter: CollectionAdapter
- _sa_appender: Callable[..., Any]
- _sa_remover: Callable[..., Any]
- _sa_iterator: Callable[..., Iterable[Any]]
- _sa_converter: _CollectionConverterProtocol
- class collection:
- """Decorators for entity collection classes.
- The decorators fall into two groups: annotations and interception recipes.
- The annotating decorators (appender, remover, iterator, converter,
- internally_instrumented) indicate the method's purpose and take no
- arguments. They are not written with parens::
- @collection.appender
- def append(self, append): ...
- The recipe decorators all require parens, even those that take no
- arguments::
- @collection.adds("entity")
- def insert(self, position, entity): ...
- @collection.removes_return()
- def popitem(self): ...
- """
- # Bundled as a class solely for ease of use: packaging, doc strings,
- # importability.
- @staticmethod
- def appender(fn):
- """Tag the method as the collection appender.
- The appender method is called with one positional argument: the value
- to append. The method will be automatically decorated with 'adds(1)'
- if not already decorated::
- @collection.appender
- def add(self, append): ...
- # or, equivalently
- @collection.appender
- @collection.adds(1)
- def add(self, append): ...
- # for mapping type, an 'append' may kick out a previous value
- # that occupies that slot. consider d['a'] = 'foo'- any previous
- # value in d['a'] is discarded.
- @collection.appender
- @collection.replaces(1)
- def add(self, entity):
- key = some_key_func(entity)
- previous = None
- if key in self:
- previous = self[key]
- self[key] = entity
- return previous
- If the value to append is not allowed in the collection, you may
- raise an exception. Something to remember is that the appender
- will be called for each object mapped by a database query. If the
- database contains rows that violate your collection semantics, you
- will need to get creative to fix the problem, as access via the
- collection will not work.
- If the appender method is internally instrumented, you must also
- receive the keyword argument '_sa_initiator' and ensure its
- promulgation to collection events.
- """
- fn._sa_instrument_role = "appender"
- return fn
- @staticmethod
- def remover(fn):
- """Tag the method as the collection remover.
- The remover method is called with one positional argument: the value
- to remove. The method will be automatically decorated with
- :meth:`removes_return` if not already decorated::
- @collection.remover
- def zap(self, entity): ...
- # or, equivalently
- @collection.remover
- @collection.removes_return()
- def zap(self): ...
- If the value to remove is not present in the collection, you may
- raise an exception or return None to ignore the error.
- If the remove method is internally instrumented, you must also
- receive the keyword argument '_sa_initiator' and ensure its
- promulgation to collection events.
- """
- fn._sa_instrument_role = "remover"
- return fn
- @staticmethod
- def iterator(fn):
- """Tag the method as the collection remover.
- The iterator method is called with no arguments. It is expected to
- return an iterator over all collection members::
- @collection.iterator
- def __iter__(self): ...
- """
- fn._sa_instrument_role = "iterator"
- return fn
- @staticmethod
- def internally_instrumented(fn):
- """Tag the method as instrumented.
- This tag will prevent any decoration from being applied to the
- method. Use this if you are orchestrating your own calls to
- :func:`.collection_adapter` in one of the basic SQLAlchemy
- interface methods, or to prevent an automatic ABC method
- decoration from wrapping your implementation::
- # normally an 'extend' method on a list-like class would be
- # automatically intercepted and re-implemented in terms of
- # SQLAlchemy events and append(). your implementation will
- # never be called, unless:
- @collection.internally_instrumented
- def extend(self, items): ...
- """
- fn._sa_instrumented = True
- return fn
- @staticmethod
- @util.deprecated(
- "1.3",
- "The :meth:`.collection.converter` handler is deprecated and will "
- "be removed in a future release. Please refer to the "
- ":class:`.AttributeEvents.bulk_replace` listener interface in "
- "conjunction with the :func:`.event.listen` function.",
- )
- def converter(fn):
- """Tag the method as the collection converter.
- This optional method will be called when a collection is being
- replaced entirely, as in::
- myobj.acollection = [newvalue1, newvalue2]
- The converter method will receive the object being assigned and should
- return an iterable of values suitable for use by the ``appender``
- method. A converter must not assign values or mutate the collection,
- its sole job is to adapt the value the user provides into an iterable
- of values for the ORM's use.
- The default converter implementation will use duck-typing to do the
- conversion. A dict-like collection will be convert into an iterable
- of dictionary values, and other types will simply be iterated::
- @collection.converter
- def convert(self, other): ...
- If the duck-typing of the object does not match the type of this
- collection, a TypeError is raised.
- Supply an implementation of this method if you want to expand the
- range of possible types that can be assigned in bulk or perform
- validation on the values about to be assigned.
- """
- fn._sa_instrument_role = "converter"
- return fn
- @staticmethod
- def adds(arg: int) -> Callable[[_FN], _FN]:
- """Mark the method as adding an entity to the collection.
- Adds "add to collection" handling to the method. The decorator
- argument indicates which method argument holds the SQLAlchemy-relevant
- value. Arguments can be specified positionally (i.e. integer) or by
- name::
- @collection.adds(1)
- def push(self, item): ...
- @collection.adds("entity")
- def do_stuff(self, thing, entity=None): ...
- """
- def decorator(fn):
- fn._sa_instrument_before = ("fire_append_event", arg)
- return fn
- return decorator
- @staticmethod
- def replaces(arg):
- """Mark the method as replacing an entity in the collection.
- Adds "add to collection" and "remove from collection" handling to
- the method. The decorator argument indicates which method argument
- holds the SQLAlchemy-relevant value to be added, and return value, if
- any will be considered the value to remove.
- Arguments can be specified positionally (i.e. integer) or by name::
- @collection.replaces(2)
- def __setitem__(self, index, item): ...
- """
- def decorator(fn):
- fn._sa_instrument_before = ("fire_append_event", arg)
- fn._sa_instrument_after = "fire_remove_event"
- return fn
- return decorator
- @staticmethod
- def removes(arg):
- """Mark the method as removing an entity in the collection.
- Adds "remove from collection" handling to the method. The decorator
- argument indicates which method argument holds the SQLAlchemy-relevant
- value to be removed. Arguments can be specified positionally (i.e.
- integer) or by name::
- @collection.removes(1)
- def zap(self, item): ...
- For methods where the value to remove is not known at call-time, use
- collection.removes_return.
- """
- def decorator(fn):
- fn._sa_instrument_before = ("fire_remove_event", arg)
- return fn
- return decorator
- @staticmethod
- def removes_return():
- """Mark the method as removing an entity in the collection.
- Adds "remove from collection" handling to the method. The return
- value of the method, if any, is considered the value to remove. The
- method arguments are not inspected::
- @collection.removes_return()
- def pop(self): ...
- For methods where the value to remove is known at call-time, use
- collection.remove.
- """
- def decorator(fn):
- fn._sa_instrument_after = "fire_remove_event"
- return fn
- return decorator
- if TYPE_CHECKING:
- def collection_adapter(collection: Collection[Any]) -> CollectionAdapter:
- """Fetch the :class:`.CollectionAdapter` for a collection."""
- else:
- collection_adapter = operator.attrgetter("_sa_adapter")
- class CollectionAdapter:
- """Bridges between the ORM and arbitrary Python collections.
- Proxies base-level collection operations (append, remove, iterate)
- to the underlying Python collection, and emits add/remove events for
- entities entering or leaving the collection.
- The ORM uses :class:`.CollectionAdapter` exclusively for interaction with
- entity collections.
- """
- __slots__ = (
- "attr",
- "_key",
- "_data",
- "owner_state",
- "_converter",
- "invalidated",
- "empty",
- )
- attr: CollectionAttributeImpl
- _key: str
- # this is actually a weakref; see note in constructor
- _data: Callable[..., _AdaptedCollectionProtocol]
- owner_state: InstanceState[Any]
- _converter: _CollectionConverterProtocol
- invalidated: bool
- empty: bool
- def __init__(
- self,
- attr: CollectionAttributeImpl,
- owner_state: InstanceState[Any],
- data: _AdaptedCollectionProtocol,
- ):
- self.attr = attr
- self._key = attr.key
- # this weakref stays referenced throughout the lifespan of
- # CollectionAdapter. so while the weakref can return None, this
- # is realistically only during garbage collection of this object, so
- # we type this as a callable that returns _AdaptedCollectionProtocol
- # in all cases.
- self._data = weakref.ref(data) # type: ignore
- self.owner_state = owner_state
- data._sa_adapter = self
- self._converter = data._sa_converter
- self.invalidated = False
- self.empty = False
- def _warn_invalidated(self) -> None:
- util.warn("This collection has been invalidated.")
- @property
- def data(self) -> _AdaptedCollectionProtocol:
- "The entity collection being adapted."
- return self._data()
- @property
- def _referenced_by_owner(self) -> bool:
- """return True if the owner state still refers to this collection.
- This will return False within a bulk replace operation,
- where this collection is the one being replaced.
- """
- return self.owner_state.dict[self._key] is self._data()
- def bulk_appender(self):
- return self._data()._sa_appender
- def append_with_event(
- self, item: Any, initiator: Optional[AttributeEventToken] = None
- ) -> None:
- """Add an entity to the collection, firing mutation events."""
- self._data()._sa_appender(item, _sa_initiator=initiator)
- def _set_empty(self, user_data):
- assert (
- not self.empty
- ), "This collection adapter is already in the 'empty' state"
- self.empty = True
- self.owner_state._empty_collections[self._key] = user_data
- def _reset_empty(self) -> None:
- assert (
- self.empty
- ), "This collection adapter is not in the 'empty' state"
- self.empty = False
- self.owner_state.dict[self._key] = (
- self.owner_state._empty_collections.pop(self._key)
- )
- def _refuse_empty(self) -> NoReturn:
- raise sa_exc.InvalidRequestError(
- "This is a special 'empty' collection which cannot accommodate "
- "internal mutation operations"
- )
- def append_without_event(self, item: Any) -> None:
- """Add or restore an entity to the collection, firing no events."""
- if self.empty:
- self._refuse_empty()
- self._data()._sa_appender(item, _sa_initiator=False)
- def append_multiple_without_event(self, items: Iterable[Any]) -> None:
- """Add or restore an entity to the collection, firing no events."""
- if self.empty:
- self._refuse_empty()
- appender = self._data()._sa_appender
- for item in items:
- appender(item, _sa_initiator=False)
- def bulk_remover(self):
- return self._data()._sa_remover
- def remove_with_event(
- self, item: Any, initiator: Optional[AttributeEventToken] = None
- ) -> None:
- """Remove an entity from the collection, firing mutation events."""
- self._data()._sa_remover(item, _sa_initiator=initiator)
- def remove_without_event(self, item: Any) -> None:
- """Remove an entity from the collection, firing no events."""
- if self.empty:
- self._refuse_empty()
- self._data()._sa_remover(item, _sa_initiator=False)
- def clear_with_event(
- self, initiator: Optional[AttributeEventToken] = None
- ) -> None:
- """Empty the collection, firing a mutation event for each entity."""
- if self.empty:
- self._refuse_empty()
- remover = self._data()._sa_remover
- for item in list(self):
- remover(item, _sa_initiator=initiator)
- def clear_without_event(self) -> None:
- """Empty the collection, firing no events."""
- if self.empty:
- self._refuse_empty()
- remover = self._data()._sa_remover
- for item in list(self):
- remover(item, _sa_initiator=False)
- def __iter__(self):
- """Iterate over entities in the collection."""
- return iter(self._data()._sa_iterator())
- def __len__(self):
- """Count entities in the collection."""
- return len(list(self._data()._sa_iterator()))
- def __bool__(self):
- return True
- def _fire_append_wo_mutation_event_bulk(
- self, items, initiator=None, key=NO_KEY
- ):
- if not items:
- return
- if initiator is not False:
- if self.invalidated:
- self._warn_invalidated()
- if self.empty:
- self._reset_empty()
- for item in items:
- self.attr.fire_append_wo_mutation_event(
- self.owner_state,
- self.owner_state.dict,
- item,
- initiator,
- key,
- )
- def fire_append_wo_mutation_event(self, item, initiator=None, key=NO_KEY):
- """Notify that a entity is entering the collection but is already
- present.
- Initiator is a token owned by the InstrumentedAttribute that
- initiated the membership mutation, and should be left as None
- unless you are passing along an initiator value from a chained
- operation.
- .. versionadded:: 1.4.15
- """
- if initiator is not False:
- if self.invalidated:
- self._warn_invalidated()
- if self.empty:
- self._reset_empty()
- return self.attr.fire_append_wo_mutation_event(
- self.owner_state, self.owner_state.dict, item, initiator, key
- )
- else:
- return item
- def fire_append_event(self, item, initiator=None, key=NO_KEY):
- """Notify that a entity has entered the collection.
- Initiator is a token owned by the InstrumentedAttribute that
- initiated the membership mutation, and should be left as None
- unless you are passing along an initiator value from a chained
- operation.
- """
- if initiator is not False:
- if self.invalidated:
- self._warn_invalidated()
- if self.empty:
- self._reset_empty()
- return self.attr.fire_append_event(
- self.owner_state, self.owner_state.dict, item, initiator, key
- )
- else:
- return item
- def _fire_remove_event_bulk(self, items, initiator=None, key=NO_KEY):
- if not items:
- return
- if initiator is not False:
- if self.invalidated:
- self._warn_invalidated()
- if self.empty:
- self._reset_empty()
- for item in items:
- self.attr.fire_remove_event(
- self.owner_state,
- self.owner_state.dict,
- item,
- initiator,
- key,
- )
- def fire_remove_event(self, item, initiator=None, key=NO_KEY):
- """Notify that a entity has been removed from the collection.
- Initiator is the InstrumentedAttribute that initiated the membership
- mutation, and should be left as None unless you are passing along
- an initiator value from a chained operation.
- """
- if initiator is not False:
- if self.invalidated:
- self._warn_invalidated()
- if self.empty:
- self._reset_empty()
- self.attr.fire_remove_event(
- self.owner_state, self.owner_state.dict, item, initiator, key
- )
- def fire_pre_remove_event(self, initiator=None, key=NO_KEY):
- """Notify that an entity is about to be removed from the collection.
- Only called if the entity cannot be removed after calling
- fire_remove_event().
- """
- if self.invalidated:
- self._warn_invalidated()
- self.attr.fire_pre_remove_event(
- self.owner_state,
- self.owner_state.dict,
- initiator=initiator,
- key=key,
- )
- def __getstate__(self):
- return {
- "key": self._key,
- "owner_state": self.owner_state,
- "owner_cls": self.owner_state.class_,
- "data": self.data,
- "invalidated": self.invalidated,
- "empty": self.empty,
- }
- def __setstate__(self, d):
- self._key = d["key"]
- self.owner_state = d["owner_state"]
- # see note in constructor regarding this type: ignore
- self._data = weakref.ref(d["data"]) # type: ignore
- self._converter = d["data"]._sa_converter
- d["data"]._sa_adapter = self
- self.invalidated = d["invalidated"]
- self.attr = getattr(d["owner_cls"], self._key).impl
- self.empty = d.get("empty", False)
- def bulk_replace(values, existing_adapter, new_adapter, initiator=None):
- """Load a new collection, firing events based on prior like membership.
- Appends instances in ``values`` onto the ``new_adapter``. Events will be
- fired for any instance not present in the ``existing_adapter``. Any
- instances in ``existing_adapter`` not present in ``values`` will have
- remove events fired upon them.
- :param values: An iterable of collection member instances
- :param existing_adapter: A :class:`.CollectionAdapter` of
- instances to be replaced
- :param new_adapter: An empty :class:`.CollectionAdapter`
- to load with ``values``
- """
- assert isinstance(values, list)
- idset = util.IdentitySet
- existing_idset = idset(existing_adapter or ())
- constants = existing_idset.intersection(values or ())
- additions = idset(values or ()).difference(constants)
- removals = existing_idset.difference(constants)
- appender = new_adapter.bulk_appender()
- for member in values or ():
- if member in additions:
- appender(member, _sa_initiator=initiator)
- elif member in constants:
- appender(member, _sa_initiator=False)
- if existing_adapter:
- existing_adapter._fire_append_wo_mutation_event_bulk(
- constants, initiator=initiator
- )
- existing_adapter._fire_remove_event_bulk(removals, initiator=initiator)
- def prepare_instrumentation(
- factory: Union[Type[Collection[Any]], _CollectionFactoryType],
- ) -> _CollectionFactoryType:
- """Prepare a callable for future use as a collection class factory.
- Given a collection class factory (either a type or no-arg callable),
- return another factory that will produce compatible instances when
- called.
- This function is responsible for converting collection_class=list
- into the run-time behavior of collection_class=InstrumentedList.
- """
- impl_factory: _CollectionFactoryType
- # Convert a builtin to 'Instrumented*'
- if factory in __canned_instrumentation:
- impl_factory = __canned_instrumentation[factory]
- else:
- impl_factory = cast(_CollectionFactoryType, factory)
- cls: Union[_CollectionFactoryType, Type[Collection[Any]]]
- # Create a specimen
- cls = type(impl_factory())
- # Did factory callable return a builtin?
- if cls in __canned_instrumentation:
- # if so, just convert.
- # in previous major releases, this codepath wasn't working and was
- # not covered by tests. prior to that it supplied a "wrapper"
- # function that would return the class, though the rationale for this
- # case is not known
- impl_factory = __canned_instrumentation[cls]
- cls = type(impl_factory())
- # Instrument the class if needed.
- if __instrumentation_mutex.acquire():
- try:
- if getattr(cls, "_sa_instrumented", None) != id(cls):
- _instrument_class(cls)
- finally:
- __instrumentation_mutex.release()
- return impl_factory
- def _instrument_class(cls):
- """Modify methods in a class and install instrumentation."""
- # In the normal call flow, a request for any of the 3 basic collection
- # types is transformed into one of our trivial subclasses
- # (e.g. InstrumentedList). Catch anything else that sneaks in here...
- if cls.__module__ == "__builtin__":
- raise sa_exc.ArgumentError(
- "Can not instrument a built-in type. Use a "
- "subclass, even a trivial one."
- )
- roles, methods = _locate_roles_and_methods(cls)
- _setup_canned_roles(cls, roles, methods)
- _assert_required_roles(cls, roles, methods)
- _set_collection_attributes(cls, roles, methods)
- def _locate_roles_and_methods(cls):
- """search for _sa_instrument_role-decorated methods in
- method resolution order, assign to roles.
- """
- roles: Dict[str, str] = {}
- methods: Dict[str, Tuple[Optional[str], Optional[int], Optional[str]]] = {}
- for supercls in cls.__mro__:
- for name, method in vars(supercls).items():
- if not callable(method):
- continue
- # note role declarations
- if hasattr(method, "_sa_instrument_role"):
- role = method._sa_instrument_role
- assert role in (
- "appender",
- "remover",
- "iterator",
- "converter",
- )
- roles.setdefault(role, name)
- # transfer instrumentation requests from decorated function
- # to the combined queue
- before: Optional[Tuple[str, int]] = None
- after: Optional[str] = None
- if hasattr(method, "_sa_instrument_before"):
- op, argument = method._sa_instrument_before
- assert op in ("fire_append_event", "fire_remove_event")
- before = op, argument
- if hasattr(method, "_sa_instrument_after"):
- op = method._sa_instrument_after
- assert op in ("fire_append_event", "fire_remove_event")
- after = op
- if before:
- methods[name] = before + (after,)
- elif after:
- methods[name] = None, None, after
- return roles, methods
- def _setup_canned_roles(cls, roles, methods):
- """see if this class has "canned" roles based on a known
- collection type (dict, set, list). Apply those roles
- as needed to the "roles" dictionary, and also
- prepare "decorator" methods
- """
- collection_type = util.duck_type_collection(cls)
- if collection_type in __interfaces:
- assert collection_type is not None
- canned_roles, decorators = __interfaces[collection_type]
- for role, name in canned_roles.items():
- roles.setdefault(role, name)
- # apply ABC auto-decoration to methods that need it
- for method, decorator in decorators.items():
- fn = getattr(cls, method, None)
- if (
- fn
- and method not in methods
- and not hasattr(fn, "_sa_instrumented")
- ):
- setattr(cls, method, decorator(fn))
- def _assert_required_roles(cls, roles, methods):
- """ensure all roles are present, and apply implicit instrumentation if
- needed
- """
- if "appender" not in roles or not hasattr(cls, roles["appender"]):
- raise sa_exc.ArgumentError(
- "Type %s must elect an appender method to be "
- "a collection class" % cls.__name__
- )
- elif roles["appender"] not in methods and not hasattr(
- getattr(cls, roles["appender"]), "_sa_instrumented"
- ):
- methods[roles["appender"]] = ("fire_append_event", 1, None)
- if "remover" not in roles or not hasattr(cls, roles["remover"]):
- raise sa_exc.ArgumentError(
- "Type %s must elect a remover method to be "
- "a collection class" % cls.__name__
- )
- elif roles["remover"] not in methods and not hasattr(
- getattr(cls, roles["remover"]), "_sa_instrumented"
- ):
- methods[roles["remover"]] = ("fire_remove_event", 1, None)
- if "iterator" not in roles or not hasattr(cls, roles["iterator"]):
- raise sa_exc.ArgumentError(
- "Type %s must elect an iterator method to be "
- "a collection class" % cls.__name__
- )
- def _set_collection_attributes(cls, roles, methods):
- """apply ad-hoc instrumentation from decorators, class-level defaults
- and implicit role declarations
- """
- for method_name, (before, argument, after) in methods.items():
- setattr(
- cls,
- method_name,
- _instrument_membership_mutator(
- getattr(cls, method_name), before, argument, after
- ),
- )
- # intern the role map
- for role, method_name in roles.items():
- setattr(cls, "_sa_%s" % role, getattr(cls, method_name))
- cls._sa_adapter = None
- if not hasattr(cls, "_sa_converter"):
- cls._sa_converter = None
- cls._sa_instrumented = id(cls)
- def _instrument_membership_mutator(method, before, argument, after):
- """Route method args and/or return value through the collection
- adapter."""
- # This isn't smart enough to handle @adds(1) for 'def fn(self, (a, b))'
- if before:
- fn_args = list(
- util.flatten_iterator(inspect_getfullargspec(method)[0])
- )
- if isinstance(argument, int):
- pos_arg = argument
- named_arg = len(fn_args) > argument and fn_args[argument] or None
- else:
- if argument in fn_args:
- pos_arg = fn_args.index(argument)
- else:
- pos_arg = None
- named_arg = argument
- del fn_args
- def wrapper(*args, **kw):
- if before:
- if pos_arg is None:
- if named_arg not in kw:
- raise sa_exc.ArgumentError(
- "Missing argument %s" % argument
- )
- value = kw[named_arg]
- else:
- if len(args) > pos_arg:
- value = args[pos_arg]
- elif named_arg in kw:
- value = kw[named_arg]
- else:
- raise sa_exc.ArgumentError(
- "Missing argument %s" % argument
- )
- initiator = kw.pop("_sa_initiator", None)
- if initiator is False:
- executor = None
- else:
- executor = args[0]._sa_adapter
- if before and executor:
- getattr(executor, before)(value, initiator)
- if not after or not executor:
- return method(*args, **kw)
- else:
- res = method(*args, **kw)
- if res is not None:
- getattr(executor, after)(res, initiator)
- return res
- wrapper._sa_instrumented = True # type: ignore[attr-defined]
- if hasattr(method, "_sa_instrument_role"):
- wrapper._sa_instrument_role = method._sa_instrument_role # type: ignore[attr-defined] # noqa: E501
- wrapper.__name__ = method.__name__
- wrapper.__doc__ = method.__doc__
- return wrapper
- def __set_wo_mutation(collection, item, _sa_initiator=None):
- """Run set wo mutation events.
- The collection is not mutated.
- """
- if _sa_initiator is not False:
- executor = collection._sa_adapter
- if executor:
- executor.fire_append_wo_mutation_event(
- item, _sa_initiator, key=None
- )
- def __set(collection, item, _sa_initiator, key):
- """Run set events.
- This event always occurs before the collection is actually mutated.
- """
- if _sa_initiator is not False:
- executor = collection._sa_adapter
- if executor:
- item = executor.fire_append_event(item, _sa_initiator, key=key)
- return item
- def __del(collection, item, _sa_initiator, key):
- """Run del events.
- This event occurs before the collection is actually mutated, *except*
- in the case of a pop operation, in which case it occurs afterwards.
- For pop operations, the __before_pop hook is called before the
- operation occurs.
- """
- if _sa_initiator is not False:
- executor = collection._sa_adapter
- if executor:
- executor.fire_remove_event(item, _sa_initiator, key=key)
- def __before_pop(collection, _sa_initiator=None):
- """An event which occurs on a before a pop() operation occurs."""
- executor = collection._sa_adapter
- if executor:
- executor.fire_pre_remove_event(_sa_initiator)
- def _list_decorators() -> Dict[str, Callable[[_FN], _FN]]:
- """Tailored instrumentation wrappers for any list-like class."""
- def _tidy(fn):
- fn._sa_instrumented = True
- fn.__doc__ = getattr(list, fn.__name__).__doc__
- def append(fn):
- def append(self, item, _sa_initiator=None):
- item = __set(self, item, _sa_initiator, NO_KEY)
- fn(self, item)
- _tidy(append)
- return append
- def remove(fn):
- def remove(self, value, _sa_initiator=None):
- __del(self, value, _sa_initiator, NO_KEY)
- # testlib.pragma exempt:__eq__
- fn(self, value)
- _tidy(remove)
- return remove
- def insert(fn):
- def insert(self, index, value):
- value = __set(self, value, None, index)
- fn(self, index, value)
- _tidy(insert)
- return insert
- def __setitem__(fn):
- def __setitem__(self, index, value):
- if not isinstance(index, slice):
- existing = self[index]
- if existing is not None:
- __del(self, existing, None, index)
- value = __set(self, value, None, index)
- fn(self, index, value)
- else:
- # slice assignment requires __delitem__, insert, __len__
- step = index.step or 1
- start = index.start or 0
- if start < 0:
- start += len(self)
- if index.stop is not None:
- stop = index.stop
- else:
- stop = len(self)
- if stop < 0:
- stop += len(self)
- if step == 1:
- if value is self:
- return
- for i in range(start, stop, step):
- if len(self) > start:
- del self[start]
- for i, item in enumerate(value):
- self.insert(i + start, item)
- else:
- rng = list(range(start, stop, step))
- if len(value) != len(rng):
- raise ValueError(
- "attempt to assign sequence of size %s to "
- "extended slice of size %s"
- % (len(value), len(rng))
- )
- for i, item in zip(rng, value):
- self.__setitem__(i, item)
- _tidy(__setitem__)
- return __setitem__
- def __delitem__(fn):
- def __delitem__(self, index):
- if not isinstance(index, slice):
- item = self[index]
- __del(self, item, None, index)
- fn(self, index)
- else:
- # slice deletion requires __getslice__ and a slice-groking
- # __getitem__ for stepped deletion
- # note: not breaking this into atomic dels
- for item in self[index]:
- __del(self, item, None, index)
- fn(self, index)
- _tidy(__delitem__)
- return __delitem__
- def extend(fn):
- def extend(self, iterable):
- for value in list(iterable):
- self.append(value)
- _tidy(extend)
- return extend
- def __iadd__(fn):
- def __iadd__(self, iterable):
- # list.__iadd__ takes any iterable and seems to let TypeError
- # raise as-is instead of returning NotImplemented
- for value in list(iterable):
- self.append(value)
- return self
- _tidy(__iadd__)
- return __iadd__
- def pop(fn):
- def pop(self, index=-1):
- __before_pop(self)
- item = fn(self, index)
- __del(self, item, None, index)
- return item
- _tidy(pop)
- return pop
- def clear(fn):
- def clear(self, index=-1):
- for item in self:
- __del(self, item, None, index)
- fn(self)
- _tidy(clear)
- return clear
- # __imul__ : not wrapping this. all members of the collection are already
- # present, so no need to fire appends... wrapping it with an explicit
- # decorator is still possible, so events on *= can be had if they're
- # desired. hard to imagine a use case for __imul__, though.
- l = locals().copy()
- l.pop("_tidy")
- return l
- def _dict_decorators() -> Dict[str, Callable[[_FN], _FN]]:
- """Tailored instrumentation wrappers for any dict-like mapping class."""
- def _tidy(fn):
- fn._sa_instrumented = True
- fn.__doc__ = getattr(dict, fn.__name__).__doc__
- def __setitem__(fn):
- def __setitem__(self, key, value, _sa_initiator=None):
- if key in self:
- __del(self, self[key], _sa_initiator, key)
- value = __set(self, value, _sa_initiator, key)
- fn(self, key, value)
- _tidy(__setitem__)
- return __setitem__
- def __delitem__(fn):
- def __delitem__(self, key, _sa_initiator=None):
- if key in self:
- __del(self, self[key], _sa_initiator, key)
- fn(self, key)
- _tidy(__delitem__)
- return __delitem__
- def clear(fn):
- def clear(self):
- for key in self:
- __del(self, self[key], None, key)
- fn(self)
- _tidy(clear)
- return clear
- def pop(fn):
- def pop(self, key, default=NO_ARG):
- __before_pop(self)
- _to_del = key in self
- if default is NO_ARG:
- item = fn(self, key)
- else:
- item = fn(self, key, default)
- if _to_del:
- __del(self, item, None, key)
- return item
- _tidy(pop)
- return pop
- def popitem(fn):
- def popitem(self):
- __before_pop(self)
- item = fn(self)
- __del(self, item[1], None, 1)
- return item
- _tidy(popitem)
- return popitem
- def setdefault(fn):
- def setdefault(self, key, default=None):
- if key not in self:
- self.__setitem__(key, default)
- return default
- else:
- value = self.__getitem__(key)
- if value is default:
- __set_wo_mutation(self, value, None)
- return value
- _tidy(setdefault)
- return setdefault
- def update(fn):
- def update(self, __other=NO_ARG, **kw):
- if __other is not NO_ARG:
- if hasattr(__other, "keys"):
- for key in list(__other):
- if key not in self or self[key] is not __other[key]:
- self[key] = __other[key]
- else:
- __set_wo_mutation(self, __other[key], None)
- else:
- for key, value in __other:
- if key not in self or self[key] is not value:
- self[key] = value
- else:
- __set_wo_mutation(self, value, None)
- for key in kw:
- if key not in self or self[key] is not kw[key]:
- self[key] = kw[key]
- else:
- __set_wo_mutation(self, kw[key], None)
- _tidy(update)
- return update
- l = locals().copy()
- l.pop("_tidy")
- return l
- _set_binop_bases = (set, frozenset)
- def _set_binops_check_strict(self: Any, obj: Any) -> bool:
- """Allow only set, frozenset and self.__class__-derived
- objects in binops."""
- return isinstance(obj, _set_binop_bases + (self.__class__,))
- def _set_binops_check_loose(self: Any, obj: Any) -> bool:
- """Allow anything set-like to participate in set binops."""
- return (
- isinstance(obj, _set_binop_bases + (self.__class__,))
- or util.duck_type_collection(obj) == set
- )
- def _set_decorators() -> Dict[str, Callable[[_FN], _FN]]:
- """Tailored instrumentation wrappers for any set-like class."""
- def _tidy(fn):
- fn._sa_instrumented = True
- fn.__doc__ = getattr(set, fn.__name__).__doc__
- def add(fn):
- def add(self, value, _sa_initiator=None):
- if value not in self:
- value = __set(self, value, _sa_initiator, NO_KEY)
- else:
- __set_wo_mutation(self, value, _sa_initiator)
- # testlib.pragma exempt:__hash__
- fn(self, value)
- _tidy(add)
- return add
- def discard(fn):
- def discard(self, value, _sa_initiator=None):
- # testlib.pragma exempt:__hash__
- if value in self:
- __del(self, value, _sa_initiator, NO_KEY)
- # testlib.pragma exempt:__hash__
- fn(self, value)
- _tidy(discard)
- return discard
- def remove(fn):
- def remove(self, value, _sa_initiator=None):
- # testlib.pragma exempt:__hash__
- if value in self:
- __del(self, value, _sa_initiator, NO_KEY)
- # testlib.pragma exempt:__hash__
- fn(self, value)
- _tidy(remove)
- return remove
- def pop(fn):
- def pop(self):
- __before_pop(self)
- item = fn(self)
- # for set in particular, we have no way to access the item
- # that will be popped before pop is called.
- __del(self, item, None, NO_KEY)
- return item
- _tidy(pop)
- return pop
- def clear(fn):
- def clear(self):
- for item in list(self):
- self.remove(item)
- _tidy(clear)
- return clear
- def update(fn):
- def update(self, value):
- for item in value:
- self.add(item)
- _tidy(update)
- return update
- def __ior__(fn):
- def __ior__(self, value):
- if not _set_binops_check_strict(self, value):
- return NotImplemented
- for item in value:
- self.add(item)
- return self
- _tidy(__ior__)
- return __ior__
- def difference_update(fn):
- def difference_update(self, value):
- for item in value:
- self.discard(item)
- _tidy(difference_update)
- return difference_update
- def __isub__(fn):
- def __isub__(self, value):
- if not _set_binops_check_strict(self, value):
- return NotImplemented
- for item in value:
- self.discard(item)
- return self
- _tidy(__isub__)
- return __isub__
- def intersection_update(fn):
- def intersection_update(self, other):
- want, have = self.intersection(other), set(self)
- remove, add = have - want, want - have
- for item in remove:
- self.remove(item)
- for item in add:
- self.add(item)
- _tidy(intersection_update)
- return intersection_update
- def __iand__(fn):
- def __iand__(self, other):
- if not _set_binops_check_strict(self, other):
- return NotImplemented
- want, have = self.intersection(other), set(self)
- remove, add = have - want, want - have
- for item in remove:
- self.remove(item)
- for item in add:
- self.add(item)
- return self
- _tidy(__iand__)
- return __iand__
- def symmetric_difference_update(fn):
- def symmetric_difference_update(self, other):
- want, have = self.symmetric_difference(other), set(self)
- remove, add = have - want, want - have
- for item in remove:
- self.remove(item)
- for item in add:
- self.add(item)
- _tidy(symmetric_difference_update)
- return symmetric_difference_update
- def __ixor__(fn):
- def __ixor__(self, other):
- if not _set_binops_check_strict(self, other):
- return NotImplemented
- want, have = self.symmetric_difference(other), set(self)
- remove, add = have - want, want - have
- for item in remove:
- self.remove(item)
- for item in add:
- self.add(item)
- return self
- _tidy(__ixor__)
- return __ixor__
- l = locals().copy()
- l.pop("_tidy")
- return l
- class InstrumentedList(List[_T]):
- """An instrumented version of the built-in list."""
- class InstrumentedSet(Set[_T]):
- """An instrumented version of the built-in set."""
- class InstrumentedDict(Dict[_KT, _VT]):
- """An instrumented version of the built-in dict."""
- __canned_instrumentation: util.immutabledict[Any, _CollectionFactoryType] = (
- util.immutabledict(
- {
- list: InstrumentedList,
- set: InstrumentedSet,
- dict: InstrumentedDict,
- }
- )
- )
- __interfaces: util.immutabledict[
- Any,
- Tuple[
- Dict[str, str],
- Dict[str, Callable[..., Any]],
- ],
- ] = util.immutabledict(
- {
- list: (
- {
- "appender": "append",
- "remover": "remove",
- "iterator": "__iter__",
- },
- _list_decorators(),
- ),
- set: (
- {"appender": "add", "remover": "remove", "iterator": "__iter__"},
- _set_decorators(),
- ),
- # decorators are required for dicts and object collections.
- dict: ({"iterator": "values"}, _dict_decorators()),
- }
- )
- def __go(lcls):
- global keyfunc_mapping, mapped_collection
- global column_keyed_dict, column_mapped_collection
- global MappedCollection, KeyFuncDict
- global attribute_keyed_dict, attribute_mapped_collection
- from .mapped_collection import keyfunc_mapping
- from .mapped_collection import column_keyed_dict
- from .mapped_collection import attribute_keyed_dict
- from .mapped_collection import KeyFuncDict
- from .mapped_collection import mapped_collection
- from .mapped_collection import column_mapped_collection
- from .mapped_collection import attribute_mapped_collection
- from .mapped_collection import MappedCollection
- # ensure instrumentation is associated with
- # these built-in classes; if a user-defined class
- # subclasses these and uses @internally_instrumented,
- # the superclass is otherwise not instrumented.
- # see [ticket:2406].
- _instrument_class(InstrumentedList)
- _instrument_class(InstrumentedSet)
- _instrument_class(KeyFuncDict)
- __go(locals())
|