| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686 |
- # orm/loading.py
- # Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
- # <see AUTHORS file>
- #
- # This module is part of SQLAlchemy and is released under
- # the MIT License: https://www.opensource.org/licenses/mit-license.php
- # mypy: ignore-errors
- """private module containing functions used to convert database
- rows into object instances and associated state.
- the functions here are called primarily by Query, Mapper,
- as well as some of the attribute loading strategies.
- """
- from __future__ import annotations
- from typing import Any
- from typing import Dict
- from typing import Iterable
- from typing import List
- from typing import Mapping
- from typing import Optional
- from typing import Sequence
- from typing import Tuple
- from typing import TYPE_CHECKING
- from typing import TypeVar
- from typing import Union
- from . import attributes
- from . import exc as orm_exc
- from . import path_registry
- from .base import _DEFER_FOR_STATE
- from .base import _RAISE_FOR_STATE
- from .base import _SET_DEFERRED_EXPIRED
- from .base import PassiveFlag
- from .context import FromStatement
- from .context import ORMCompileState
- from .context import QueryContext
- from .strategies import SelectInLoader
- from .util import _none_set
- from .util import state_str
- from .. import exc as sa_exc
- from .. import util
- from ..engine import result_tuple
- from ..engine.result import ChunkedIteratorResult
- from ..engine.result import FrozenResult
- from ..engine.result import SimpleResultMetaData
- from ..sql import select
- from ..sql import util as sql_util
- from ..sql.selectable import ForUpdateArg
- from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
- from ..sql.selectable import SelectState
- from ..util import EMPTY_DICT
- if TYPE_CHECKING:
- from ._typing import _IdentityKeyType
- from .base import LoaderCallableStatus
- from .interfaces import ORMOption
- from .mapper import Mapper
- from .query import Query
- from .session import Session
- from .state import InstanceState
- from ..engine.cursor import CursorResult
- from ..engine.interfaces import _ExecuteOptions
- from ..engine.result import Result
- from ..sql import Select
- _T = TypeVar("_T", bound=Any)
- _O = TypeVar("_O", bound=object)
- _new_runid = util.counter()
- _PopulatorDict = Dict[str, List[Tuple[str, Any]]]
- def instances(cursor: CursorResult[Any], context: QueryContext) -> Result[Any]:
- """Return a :class:`.Result` given an ORM query context.
- :param cursor: a :class:`.CursorResult`, generated by a statement
- which came from :class:`.ORMCompileState`
- :param context: a :class:`.QueryContext` object
- :return: a :class:`.Result` object representing ORM results
- .. versionchanged:: 1.4 The instances() function now uses
- :class:`.Result` objects and has an all new interface.
- """
- context.runid = _new_runid()
- if context.top_level_context:
- is_top_level = False
- context.post_load_paths = context.top_level_context.post_load_paths
- else:
- is_top_level = True
- context.post_load_paths = {}
- compile_state = context.compile_state
- filtered = compile_state._has_mapper_entities
- single_entity = (
- not context.load_options._only_return_tuples
- and len(compile_state._entities) == 1
- and compile_state._entities[0].supports_single_entity
- )
- try:
- (process, labels, extra) = list(
- zip(
- *[
- query_entity.row_processor(context, cursor)
- for query_entity in context.compile_state._entities
- ]
- )
- )
- if context.yield_per and (
- context.loaders_require_buffering
- or context.loaders_require_uniquing
- ):
- raise sa_exc.InvalidRequestError(
- "Can't use yield_per with eager loaders that require uniquing "
- "or row buffering, e.g. joinedload() against collections "
- "or subqueryload(). Consider the selectinload() strategy "
- "for better flexibility in loading objects."
- )
- except Exception:
- with util.safe_reraise():
- cursor.close()
- def _no_unique(entry):
- raise sa_exc.InvalidRequestError(
- "Can't use the ORM yield_per feature in conjunction with unique()"
- )
- def _not_hashable(datatype, *, legacy=False, uncertain=False):
- if not legacy:
- def go(obj):
- if uncertain:
- try:
- return hash(obj)
- except:
- pass
- raise sa_exc.InvalidRequestError(
- "Can't apply uniqueness to row tuple containing value of "
- f"""type {datatype!r}; {
- 'the values returned appear to be'
- if uncertain
- else 'this datatype produces'
- } non-hashable values"""
- )
- return go
- elif not uncertain:
- return id
- else:
- _use_id = False
- def go(obj):
- nonlocal _use_id
- if not _use_id:
- try:
- return hash(obj)
- except:
- pass
- # in #10459, we considered using a warning here, however
- # as legacy query uses result.unique() in all cases, this
- # would lead to too many warning cases.
- _use_id = True
- return id(obj)
- return go
- unique_filters = [
- (
- _no_unique
- if context.yield_per
- else (
- _not_hashable(
- ent.column.type, # type: ignore
- legacy=context.load_options._legacy_uniquing,
- uncertain=ent._null_column_type,
- )
- if (
- not ent.use_id_for_hash
- and (ent._non_hashable_value or ent._null_column_type)
- )
- else id if ent.use_id_for_hash else None
- )
- )
- for ent in context.compile_state._entities
- ]
- row_metadata = SimpleResultMetaData(
- labels, extra, _unique_filters=unique_filters
- )
- def chunks(size): # type: ignore
- while True:
- yield_per = size
- context.partials = {}
- if yield_per:
- fetch = cursor.fetchmany(yield_per)
- if not fetch:
- break
- else:
- fetch = cursor._raw_all_rows()
- if single_entity:
- proc = process[0]
- rows = [proc(row) for row in fetch]
- else:
- rows = [
- tuple([proc(row) for proc in process]) for row in fetch
- ]
- # if we are the originating load from a query, meaning we
- # aren't being called as a result of a nested "post load",
- # iterate through all the collected post loaders and fire them
- # off. Previously this used to work recursively, however that
- # prevented deeply nested structures from being loadable
- if is_top_level:
- if yield_per:
- # if using yield per, memoize the state of the
- # collection so that it can be restored
- top_level_post_loads = list(
- context.post_load_paths.items()
- )
- while context.post_load_paths:
- post_loads = list(context.post_load_paths.items())
- context.post_load_paths.clear()
- for path, post_load in post_loads:
- post_load.invoke(context, path)
- if yield_per:
- context.post_load_paths.clear()
- context.post_load_paths.update(top_level_post_loads)
- yield rows
- if not yield_per:
- break
- if context.execution_options.get("prebuffer_rows", False):
- # this is a bit of a hack at the moment.
- # I would rather have some option in the result to pre-buffer
- # internally.
- _prebuffered = list(chunks(None))
- def chunks(size):
- return iter(_prebuffered)
- result = ChunkedIteratorResult(
- row_metadata,
- chunks,
- source_supports_scalars=single_entity,
- raw=cursor,
- dynamic_yield_per=cursor.context._is_server_side,
- )
- # filtered and single_entity are used to indicate to legacy Query that the
- # query has ORM entities, so legacy deduping and scalars should be called
- # on the result.
- result._attributes = result._attributes.union(
- dict(filtered=filtered, is_single_entity=single_entity)
- )
- # multi_row_eager_loaders OTOH is specific to joinedload.
- if context.compile_state.multi_row_eager_loaders:
- def require_unique(obj):
- raise sa_exc.InvalidRequestError(
- "The unique() method must be invoked on this Result, "
- "as it contains results that include joined eager loads "
- "against collections"
- )
- result._unique_filter_state = (None, require_unique)
- if context.yield_per:
- result.yield_per(context.yield_per)
- return result
- @util.preload_module("sqlalchemy.orm.context")
- def merge_frozen_result(session, statement, frozen_result, load=True):
- """Merge a :class:`_engine.FrozenResult` back into a :class:`_orm.Session`,
- returning a new :class:`_engine.Result` object with :term:`persistent`
- objects.
- See the section :ref:`do_orm_execute_re_executing` for an example.
- .. seealso::
- :ref:`do_orm_execute_re_executing`
- :meth:`_engine.Result.freeze`
- :class:`_engine.FrozenResult`
- """
- querycontext = util.preloaded.orm_context
- if load:
- # flush current contents if we expect to load data
- session._autoflush()
- ctx = querycontext.ORMSelectCompileState._create_entities_collection(
- statement, legacy=False
- )
- autoflush = session.autoflush
- try:
- session.autoflush = False
- mapped_entities = [
- i
- for i, e in enumerate(ctx._entities)
- if isinstance(e, querycontext._MapperEntity)
- ]
- keys = [ent._label_name for ent in ctx._entities]
- keyed_tuple = result_tuple(
- keys, [ent._extra_entities for ent in ctx._entities]
- )
- result = []
- for newrow in frozen_result.rewrite_rows():
- for i in mapped_entities:
- if newrow[i] is not None:
- newrow[i] = session._merge(
- attributes.instance_state(newrow[i]),
- attributes.instance_dict(newrow[i]),
- load=load,
- _recursive={},
- _resolve_conflict_map={},
- )
- result.append(keyed_tuple(newrow))
- return frozen_result.with_new_rows(result)
- finally:
- session.autoflush = autoflush
- @util.became_legacy_20(
- ":func:`_orm.merge_result`",
- alternative="The function as well as the method on :class:`_orm.Query` "
- "is superseded by the :func:`_orm.merge_frozen_result` function.",
- )
- @util.preload_module("sqlalchemy.orm.context")
- def merge_result(
- query: Query[Any],
- iterator: Union[FrozenResult, Iterable[Sequence[Any]], Iterable[object]],
- load: bool = True,
- ) -> Union[FrozenResult, Iterable[Any]]:
- """Merge a result into the given :class:`.Query` object's Session.
- See :meth:`_orm.Query.merge_result` for top-level documentation on this
- function.
- """
- querycontext = util.preloaded.orm_context
- session = query.session
- if load:
- # flush current contents if we expect to load data
- session._autoflush()
- # TODO: need test coverage and documentation for the FrozenResult
- # use case.
- if isinstance(iterator, FrozenResult):
- frozen_result = iterator
- iterator = iter(frozen_result.data)
- else:
- frozen_result = None
- ctx = querycontext.ORMSelectCompileState._create_entities_collection(
- query, legacy=True
- )
- autoflush = session.autoflush
- try:
- session.autoflush = False
- single_entity = not frozen_result and len(ctx._entities) == 1
- if single_entity:
- if isinstance(ctx._entities[0], querycontext._MapperEntity):
- result = [
- session._merge(
- attributes.instance_state(instance),
- attributes.instance_dict(instance),
- load=load,
- _recursive={},
- _resolve_conflict_map={},
- )
- for instance in iterator
- ]
- else:
- result = list(iterator)
- else:
- mapped_entities = [
- i
- for i, e in enumerate(ctx._entities)
- if isinstance(e, querycontext._MapperEntity)
- ]
- result = []
- keys = [ent._label_name for ent in ctx._entities]
- keyed_tuple = result_tuple(
- keys, [ent._extra_entities for ent in ctx._entities]
- )
- for row in iterator:
- newrow = list(row)
- for i in mapped_entities:
- if newrow[i] is not None:
- newrow[i] = session._merge(
- attributes.instance_state(newrow[i]),
- attributes.instance_dict(newrow[i]),
- load=load,
- _recursive={},
- _resolve_conflict_map={},
- )
- result.append(keyed_tuple(newrow))
- if frozen_result:
- return frozen_result.with_new_rows(result)
- else:
- return iter(result)
- finally:
- session.autoflush = autoflush
- def get_from_identity(
- session: Session,
- mapper: Mapper[_O],
- key: _IdentityKeyType[_O],
- passive: PassiveFlag,
- ) -> Union[LoaderCallableStatus, Optional[_O]]:
- """Look up the given key in the given session's identity map,
- check the object for expired state if found.
- """
- instance = session.identity_map.get(key)
- if instance is not None:
- state = attributes.instance_state(instance)
- if mapper.inherits and not state.mapper.isa(mapper):
- return attributes.PASSIVE_CLASS_MISMATCH
- # expired - ensure it still exists
- if state.expired:
- if not passive & attributes.SQL_OK:
- # TODO: no coverage here
- return attributes.PASSIVE_NO_RESULT
- elif not passive & attributes.RELATED_OBJECT_OK:
- # this mode is used within a flush and the instance's
- # expired state will be checked soon enough, if necessary.
- # also used by immediateloader for a mutually-dependent
- # o2m->m2m load, :ticket:`6301`
- return instance
- try:
- state._load_expired(state, passive)
- except orm_exc.ObjectDeletedError:
- session._remove_newly_deleted([state])
- return None
- return instance
- else:
- return None
- def load_on_ident(
- session: Session,
- statement: Union[Select, FromStatement],
- key: Optional[_IdentityKeyType],
- *,
- load_options: Optional[Sequence[ORMOption]] = None,
- refresh_state: Optional[InstanceState[Any]] = None,
- with_for_update: Optional[ForUpdateArg] = None,
- only_load_props: Optional[Iterable[str]] = None,
- no_autoflush: bool = False,
- bind_arguments: Mapping[str, Any] = util.EMPTY_DICT,
- execution_options: _ExecuteOptions = util.EMPTY_DICT,
- require_pk_cols: bool = False,
- is_user_refresh: bool = False,
- ):
- """Load the given identity key from the database."""
- if key is not None:
- ident = key[1]
- identity_token = key[2]
- else:
- ident = identity_token = None
- return load_on_pk_identity(
- session,
- statement,
- ident,
- load_options=load_options,
- refresh_state=refresh_state,
- with_for_update=with_for_update,
- only_load_props=only_load_props,
- identity_token=identity_token,
- no_autoflush=no_autoflush,
- bind_arguments=bind_arguments,
- execution_options=execution_options,
- require_pk_cols=require_pk_cols,
- is_user_refresh=is_user_refresh,
- )
- def load_on_pk_identity(
- session: Session,
- statement: Union[Select, FromStatement],
- primary_key_identity: Optional[Tuple[Any, ...]],
- *,
- load_options: Optional[Sequence[ORMOption]] = None,
- refresh_state: Optional[InstanceState[Any]] = None,
- with_for_update: Optional[ForUpdateArg] = None,
- only_load_props: Optional[Iterable[str]] = None,
- identity_token: Optional[Any] = None,
- no_autoflush: bool = False,
- bind_arguments: Mapping[str, Any] = util.EMPTY_DICT,
- execution_options: _ExecuteOptions = util.EMPTY_DICT,
- require_pk_cols: bool = False,
- is_user_refresh: bool = False,
- ):
- """Load the given primary key identity from the database."""
- query = statement
- q = query._clone()
- assert not q._is_lambda_element
- if load_options is None:
- load_options = QueryContext.default_load_options
- if (
- statement._compile_options
- is SelectState.default_select_compile_options
- ):
- compile_options = ORMCompileState.default_compile_options
- else:
- compile_options = statement._compile_options
- if primary_key_identity is not None:
- mapper = query._propagate_attrs["plugin_subject"]
- (_get_clause, _get_params) = mapper._get_clause
- # None present in ident - turn those comparisons
- # into "IS NULL"
- if None in primary_key_identity:
- nones = {
- _get_params[col].key
- for col, value in zip(mapper.primary_key, primary_key_identity)
- if value is None
- }
- _get_clause = sql_util.adapt_criterion_to_null(_get_clause, nones)
- if len(nones) == len(primary_key_identity):
- util.warn(
- "fully NULL primary key identity cannot load any "
- "object. This condition may raise an error in a future "
- "release."
- )
- q._where_criteria = (
- sql_util._deep_annotate(_get_clause, {"_orm_adapt": True}),
- )
- params = {
- _get_params[primary_key].key: id_val
- for id_val, primary_key in zip(
- primary_key_identity, mapper.primary_key
- )
- }
- else:
- params = None
- if with_for_update is not None:
- version_check = True
- q._for_update_arg = with_for_update
- elif query._for_update_arg is not None:
- version_check = True
- q._for_update_arg = query._for_update_arg
- else:
- version_check = False
- if require_pk_cols and only_load_props:
- if not refresh_state:
- raise sa_exc.ArgumentError(
- "refresh_state is required when require_pk_cols is present"
- )
- refresh_state_prokeys = refresh_state.mapper._primary_key_propkeys
- has_changes = {
- key
- for key in refresh_state_prokeys.difference(only_load_props)
- if refresh_state.attrs[key].history.has_changes()
- }
- if has_changes:
- # raise if pending pk changes are present.
- # technically, this could be limited to the case where we have
- # relationships in the only_load_props collection to be refreshed
- # also (and only ones that have a secondary eager loader, at that).
- # however, the error is in place across the board so that behavior
- # here is easier to predict. The use case it prevents is one
- # of mutating PK attrs, leaving them unflushed,
- # calling session.refresh(), and expecting those attrs to remain
- # still unflushed. It seems likely someone doing all those
- # things would be better off having the PK attributes flushed
- # to the database before tinkering like that (session.refresh() is
- # tinkering).
- raise sa_exc.InvalidRequestError(
- f"Please flush pending primary key changes on "
- "attributes "
- f"{has_changes} for mapper {refresh_state.mapper} before "
- "proceeding with a refresh"
- )
- # overall, the ORM has no internal flow right now for "dont load the
- # primary row of an object at all, but fire off
- # selectinload/subqueryload/immediateload for some relationships".
- # It would probably be a pretty big effort to add such a flow. So
- # here, the case for #8703 is introduced; user asks to refresh some
- # relationship attributes only which are
- # selectinload/subqueryload/immediateload/ etc. (not joinedload).
- # ORM complains there's no columns in the primary row to load.
- # So here, we just add the PK cols if that
- # case is detected, so that there is a SELECT emitted for the primary
- # row.
- #
- # Let's just state right up front, for this one little case,
- # the ORM here is adding a whole extra SELECT just to satisfy
- # limitations in the internal flow. This is really not a thing
- # SQLAlchemy finds itself doing like, ever, obviously, we are
- # constantly working to *remove* SELECTs we don't need. We
- # rationalize this for now based on 1. session.refresh() is not
- # commonly used 2. session.refresh() with only relationship attrs is
- # even less commonly used 3. the SELECT in question is very low
- # latency.
- #
- # to add the flow to not include the SELECT, the quickest way
- # might be to just manufacture a single-row result set to send off to
- # instances(), but we'd have to weave that into context.py and all
- # that. For 2.0.0, we have enough big changes to navigate for now.
- #
- mp = refresh_state.mapper._props
- for p in only_load_props:
- if mp[p]._is_relationship:
- only_load_props = refresh_state_prokeys.union(only_load_props)
- break
- if refresh_state and refresh_state.load_options:
- compile_options += {"_current_path": refresh_state.load_path.parent}
- q = q.options(*refresh_state.load_options)
- new_compile_options, load_options = _set_get_options(
- compile_options,
- load_options,
- version_check=version_check,
- only_load_props=only_load_props,
- refresh_state=refresh_state,
- identity_token=identity_token,
- is_user_refresh=is_user_refresh,
- )
- q._compile_options = new_compile_options
- q._order_by = None
- if no_autoflush:
- load_options += {"_autoflush": False}
- execution_options = util.EMPTY_DICT.merge_with(
- execution_options, {"_sa_orm_load_options": load_options}
- )
- result = (
- session.execute(
- q,
- params=params,
- execution_options=execution_options,
- bind_arguments=bind_arguments,
- )
- .unique()
- .scalars()
- )
- try:
- return result.one()
- except orm_exc.NoResultFound:
- return None
- def _set_get_options(
- compile_opt,
- load_opt,
- populate_existing=None,
- version_check=None,
- only_load_props=None,
- refresh_state=None,
- identity_token=None,
- is_user_refresh=None,
- ):
- compile_options = {}
- load_options = {}
- if version_check:
- load_options["_version_check"] = version_check
- if populate_existing:
- load_options["_populate_existing"] = populate_existing
- if refresh_state:
- load_options["_refresh_state"] = refresh_state
- compile_options["_for_refresh_state"] = True
- if only_load_props:
- compile_options["_only_load_props"] = frozenset(only_load_props)
- if identity_token:
- load_options["_identity_token"] = identity_token
- if is_user_refresh:
- load_options["_is_user_refresh"] = is_user_refresh
- if load_options:
- load_opt += load_options
- if compile_options:
- compile_opt += compile_options
- return compile_opt, load_opt
- def _setup_entity_query(
- compile_state,
- mapper,
- query_entity,
- path,
- adapter,
- column_collection,
- with_polymorphic=None,
- only_load_props=None,
- polymorphic_discriminator=None,
- **kw,
- ):
- if with_polymorphic:
- poly_properties = mapper._iterate_polymorphic_properties(
- with_polymorphic
- )
- else:
- poly_properties = mapper._polymorphic_properties
- quick_populators = {}
- path.set(compile_state.attributes, "memoized_setups", quick_populators)
- # for the lead entities in the path, e.g. not eager loads, and
- # assuming a user-passed aliased class, e.g. not a from_self() or any
- # implicit aliasing, don't add columns to the SELECT that aren't
- # in the thing that's aliased.
- check_for_adapt = adapter and len(path) == 1 and path[-1].is_aliased_class
- for value in poly_properties:
- if only_load_props and value.key not in only_load_props:
- continue
- value.setup(
- compile_state,
- query_entity,
- path,
- adapter,
- only_load_props=only_load_props,
- column_collection=column_collection,
- memoized_populators=quick_populators,
- check_for_adapt=check_for_adapt,
- **kw,
- )
- if (
- polymorphic_discriminator is not None
- and polymorphic_discriminator is not mapper.polymorphic_on
- ):
- if adapter:
- pd = adapter.columns[polymorphic_discriminator]
- else:
- pd = polymorphic_discriminator
- column_collection.append(pd)
- def _warn_for_runid_changed(state):
- util.warn(
- "Loading context for %s has changed within a load/refresh "
- "handler, suggesting a row refresh operation took place. If this "
- "event handler is expected to be "
- "emitting row refresh operations within an existing load or refresh "
- "operation, set restore_load_context=True when establishing the "
- "listener to ensure the context remains unchanged when the event "
- "handler completes." % (state_str(state),)
- )
- def _instance_processor(
- query_entity,
- mapper,
- context,
- result,
- path,
- adapter,
- only_load_props=None,
- refresh_state=None,
- polymorphic_discriminator=None,
- _polymorphic_from=None,
- ):
- """Produce a mapper level row processor callable
- which processes rows into mapped instances."""
- # note that this method, most of which exists in a closure
- # called _instance(), resists being broken out, as
- # attempts to do so tend to add significant function
- # call overhead. _instance() is the most
- # performance-critical section in the whole ORM.
- identity_class = mapper._identity_class
- compile_state = context.compile_state
- # look for "row getter" functions that have been assigned along
- # with the compile state that were cached from a previous load.
- # these are operator.itemgetter() objects that each will extract a
- # particular column from each row.
- getter_key = ("getters", mapper)
- getters = path.get(compile_state.attributes, getter_key, None)
- if getters is None:
- # no getters, so go through a list of attributes we are loading for,
- # and the ones that are column based will have already put information
- # for us in another collection "memoized_setups", which represents the
- # output of the LoaderStrategy.setup_query() method. We can just as
- # easily call LoaderStrategy.create_row_processor for each, but by
- # getting it all at once from setup_query we save another method call
- # per attribute.
- props = mapper._prop_set
- if only_load_props is not None:
- props = props.intersection(
- mapper._props[k] for k in only_load_props
- )
- quick_populators = path.get(
- context.attributes, "memoized_setups", EMPTY_DICT
- )
- todo = []
- cached_populators = {
- "new": [],
- "quick": [],
- "deferred": [],
- "expire": [],
- "existing": [],
- "eager": [],
- }
- if refresh_state is None:
- # we can also get the "primary key" tuple getter function
- pk_cols = mapper.primary_key
- if adapter:
- pk_cols = [adapter.columns[c] for c in pk_cols]
- primary_key_getter = result._tuple_getter(pk_cols)
- else:
- primary_key_getter = None
- getters = {
- "cached_populators": cached_populators,
- "todo": todo,
- "primary_key_getter": primary_key_getter,
- }
- for prop in props:
- if prop in quick_populators:
- # this is an inlined path just for column-based attributes.
- col = quick_populators[prop]
- if col is _DEFER_FOR_STATE:
- cached_populators["new"].append(
- (prop.key, prop._deferred_column_loader)
- )
- elif col is _SET_DEFERRED_EXPIRED:
- # note that in this path, we are no longer
- # searching in the result to see if the column might
- # be present in some unexpected way.
- cached_populators["expire"].append((prop.key, False))
- elif col is _RAISE_FOR_STATE:
- cached_populators["new"].append(
- (prop.key, prop._raise_column_loader)
- )
- else:
- getter = None
- if adapter:
- # this logic had been removed for all 1.4 releases
- # up until 1.4.18; the adapter here is particularly
- # the compound eager adapter which isn't accommodated
- # in the quick_populators right now. The "fallback"
- # logic below instead took over in many more cases
- # until issue #6596 was identified.
- # note there is still an issue where this codepath
- # produces no "getter" for cases where a joined-inh
- # mapping includes a labeled column property, meaning
- # KeyError is caught internally and we fall back to
- # _getter(col), which works anyway. The adapter
- # here for joined inh without any aliasing might not
- # be useful. Tests which see this include
- # test.orm.inheritance.test_basic ->
- # EagerTargetingTest.test_adapt_stringency
- # OptimizedLoadTest.test_column_expression_joined
- # PolymorphicOnNotLocalTest.test_polymorphic_on_column_prop # noqa: E501
- #
- adapted_col = adapter.columns[col]
- if adapted_col is not None:
- getter = result._getter(adapted_col, False)
- if not getter:
- getter = result._getter(col, False)
- if getter:
- cached_populators["quick"].append((prop.key, getter))
- else:
- # fall back to the ColumnProperty itself, which
- # will iterate through all of its columns
- # to see if one fits
- prop.create_row_processor(
- context,
- query_entity,
- path,
- mapper,
- result,
- adapter,
- cached_populators,
- )
- else:
- # loader strategies like subqueryload, selectinload,
- # joinedload, basically relationships, these need to interact
- # with the context each time to work correctly.
- todo.append(prop)
- path.set(compile_state.attributes, getter_key, getters)
- cached_populators = getters["cached_populators"]
- populators = {key: list(value) for key, value in cached_populators.items()}
- for prop in getters["todo"]:
- prop.create_row_processor(
- context, query_entity, path, mapper, result, adapter, populators
- )
- propagated_loader_options = context.propagated_loader_options
- load_path = (
- context.compile_state.current_path + path
- if context.compile_state.current_path.path
- else path
- )
- session_identity_map = context.session.identity_map
- populate_existing = context.populate_existing or mapper.always_refresh
- load_evt = bool(mapper.class_manager.dispatch.load)
- refresh_evt = bool(mapper.class_manager.dispatch.refresh)
- persistent_evt = bool(context.session.dispatch.loaded_as_persistent)
- if persistent_evt:
- loaded_as_persistent = context.session.dispatch.loaded_as_persistent
- instance_state = attributes.instance_state
- instance_dict = attributes.instance_dict
- session_id = context.session.hash_key
- runid = context.runid
- identity_token = context.identity_token
- version_check = context.version_check
- if version_check:
- version_id_col = mapper.version_id_col
- if version_id_col is not None:
- if adapter:
- version_id_col = adapter.columns[version_id_col]
- version_id_getter = result._getter(version_id_col)
- else:
- version_id_getter = None
- if not refresh_state and _polymorphic_from is not None:
- key = ("loader", path.path)
- if key in context.attributes and context.attributes[key].strategy == (
- ("selectinload_polymorphic", True),
- ):
- option_entities = context.attributes[key].local_opts["entities"]
- else:
- option_entities = None
- selectin_load_via = mapper._should_selectin_load(
- option_entities,
- _polymorphic_from,
- )
- if selectin_load_via and selectin_load_via is not _polymorphic_from:
- # only_load_props goes w/ refresh_state only, and in a refresh
- # we are a single row query for the exact entity; polymorphic
- # loading does not apply
- assert only_load_props is None
- if selectin_load_via.is_mapper:
- _load_supers = []
- _endmost_mapper = selectin_load_via
- while (
- _endmost_mapper
- and _endmost_mapper is not _polymorphic_from
- ):
- _load_supers.append(_endmost_mapper)
- _endmost_mapper = _endmost_mapper.inherits
- else:
- _load_supers = [selectin_load_via]
- for _selectinload_entity in _load_supers:
- if PostLoad.path_exists(
- context, load_path, _selectinload_entity
- ):
- continue
- callable_ = _load_subclass_via_in(
- context,
- path,
- _selectinload_entity,
- _polymorphic_from,
- option_entities,
- )
- PostLoad.callable_for_path(
- context,
- load_path,
- _selectinload_entity.mapper,
- _selectinload_entity,
- callable_,
- _selectinload_entity,
- )
- post_load = PostLoad.for_context(context, load_path, only_load_props)
- if refresh_state:
- refresh_identity_key = refresh_state.key
- if refresh_identity_key is None:
- # super-rare condition; a refresh is being called
- # on a non-instance-key instance; this is meant to only
- # occur within a flush()
- refresh_identity_key = mapper._identity_key_from_state(
- refresh_state
- )
- else:
- refresh_identity_key = None
- primary_key_getter = getters["primary_key_getter"]
- if mapper.allow_partial_pks:
- is_not_primary_key = _none_set.issuperset
- else:
- is_not_primary_key = _none_set.intersection
- def _instance(row):
- # determine the state that we'll be populating
- if refresh_identity_key:
- # fixed state that we're refreshing
- state = refresh_state
- instance = state.obj()
- dict_ = instance_dict(instance)
- isnew = state.runid != runid
- currentload = True
- loaded_instance = False
- else:
- # look at the row, see if that identity is in the
- # session, or we have to create a new one
- identitykey = (
- identity_class,
- primary_key_getter(row),
- identity_token,
- )
- instance = session_identity_map.get(identitykey)
- if instance is not None:
- # existing instance
- state = instance_state(instance)
- dict_ = instance_dict(instance)
- isnew = state.runid != runid
- currentload = not isnew
- loaded_instance = False
- if version_check and version_id_getter and not currentload:
- _validate_version_id(
- mapper, state, dict_, row, version_id_getter
- )
- else:
- # create a new instance
- # check for non-NULL values in the primary key columns,
- # else no entity is returned for the row
- if is_not_primary_key(identitykey[1]):
- return None
- isnew = True
- currentload = True
- loaded_instance = True
- instance = mapper.class_manager.new_instance()
- dict_ = instance_dict(instance)
- state = instance_state(instance)
- state.key = identitykey
- state.identity_token = identity_token
- # attach instance to session.
- state.session_id = session_id
- session_identity_map._add_unpresent(state, identitykey)
- effective_populate_existing = populate_existing
- if refresh_state is state:
- effective_populate_existing = True
- # populate. this looks at whether this state is new
- # for this load or was existing, and whether or not this
- # row is the first row with this identity.
- if currentload or effective_populate_existing:
- # full population routines. Objects here are either
- # just created, or we are doing a populate_existing
- # be conservative about setting load_path when populate_existing
- # is in effect; want to maintain options from the original
- # load. see test_expire->test_refresh_maintains_deferred_options
- if isnew and (
- propagated_loader_options or not effective_populate_existing
- ):
- state.load_options = propagated_loader_options
- state.load_path = load_path
- _populate_full(
- context,
- row,
- state,
- dict_,
- isnew,
- load_path,
- loaded_instance,
- effective_populate_existing,
- populators,
- )
- if isnew:
- # state.runid should be equal to context.runid / runid
- # here, however for event checks we are being more conservative
- # and checking against existing run id
- # assert state.runid == runid
- existing_runid = state.runid
- if loaded_instance:
- if load_evt:
- state.manager.dispatch.load(state, context)
- if state.runid != existing_runid:
- _warn_for_runid_changed(state)
- if persistent_evt:
- loaded_as_persistent(context.session, state)
- if state.runid != existing_runid:
- _warn_for_runid_changed(state)
- elif refresh_evt:
- state.manager.dispatch.refresh(
- state, context, only_load_props
- )
- if state.runid != runid:
- _warn_for_runid_changed(state)
- if effective_populate_existing or state.modified:
- if refresh_state and only_load_props:
- state._commit(dict_, only_load_props)
- else:
- state._commit_all(dict_, session_identity_map)
- if post_load:
- post_load.add_state(state, True)
- else:
- # partial population routines, for objects that were already
- # in the Session, but a row matches them; apply eager loaders
- # on existing objects, etc.
- unloaded = state.unloaded
- isnew = state not in context.partials
- if not isnew or unloaded or populators["eager"]:
- # state is having a partial set of its attributes
- # refreshed. Populate those attributes,
- # and add to the "context.partials" collection.
- to_load = _populate_partial(
- context,
- row,
- state,
- dict_,
- isnew,
- load_path,
- unloaded,
- populators,
- )
- if isnew:
- if refresh_evt:
- existing_runid = state.runid
- state.manager.dispatch.refresh(state, context, to_load)
- if state.runid != existing_runid:
- _warn_for_runid_changed(state)
- state._commit(dict_, to_load)
- if post_load and context.invoke_all_eagers:
- post_load.add_state(state, False)
- return instance
- if mapper.polymorphic_map and not _polymorphic_from and not refresh_state:
- # if we are doing polymorphic, dispatch to a different _instance()
- # method specific to the subclass mapper
- def ensure_no_pk(row):
- identitykey = (
- identity_class,
- primary_key_getter(row),
- identity_token,
- )
- if not is_not_primary_key(identitykey[1]):
- return identitykey
- else:
- return None
- _instance = _decorate_polymorphic_switch(
- _instance,
- context,
- query_entity,
- mapper,
- result,
- path,
- polymorphic_discriminator,
- adapter,
- ensure_no_pk,
- )
- return _instance
- def _load_subclass_via_in(
- context, path, entity, polymorphic_from, option_entities
- ):
- mapper = entity.mapper
- # TODO: polymorphic_from seems to be a Mapper in all cases.
- # this is likely not needed, but as we dont have typing in loading.py
- # yet, err on the safe side
- polymorphic_from_mapper = polymorphic_from.mapper
- not_against_basemost = polymorphic_from_mapper.inherits is not None
- zero_idx = len(mapper.base_mapper.primary_key) == 1
- if entity.is_aliased_class or not_against_basemost:
- q, enable_opt, disable_opt = mapper._subclass_load_via_in(
- entity, polymorphic_from
- )
- else:
- q, enable_opt, disable_opt = mapper._subclass_load_via_in_mapper
- def do_load(context, path, states, load_only, effective_entity):
- if not option_entities:
- # filter out states for those that would have selectinloaded
- # from another loader
- # TODO: we are currently ignoring the case where the
- # "selectin_polymorphic" option is used, as this is much more
- # complex / specific / very uncommon API use
- states = [
- (s, v)
- for s, v in states
- if s.mapper._would_selectin_load_only_from_given_mapper(mapper)
- ]
- if not states:
- return
- orig_query = context.query
- if path.parent:
- enable_opt_lcl = enable_opt._prepend_path(path)
- disable_opt_lcl = disable_opt._prepend_path(path)
- else:
- enable_opt_lcl = enable_opt
- disable_opt_lcl = disable_opt
- options = (
- (enable_opt_lcl,) + orig_query._with_options + (disable_opt_lcl,)
- )
- q2 = q.options(*options)
- q2._compile_options = context.compile_state.default_compile_options
- q2._compile_options += {"_current_path": path.parent}
- if context.populate_existing:
- q2 = q2.execution_options(populate_existing=True)
- while states:
- chunk = states[0 : SelectInLoader._chunksize]
- states = states[SelectInLoader._chunksize :]
- context.session.execute(
- q2,
- dict(
- primary_keys=[
- state.key[1][0] if zero_idx else state.key[1]
- for state, load_attrs in chunk
- ]
- ),
- ).unique().scalars().all()
- return do_load
- def _populate_full(
- context,
- row,
- state,
- dict_,
- isnew,
- load_path,
- loaded_instance,
- populate_existing,
- populators,
- ):
- if isnew:
- # first time we are seeing a row with this identity.
- state.runid = context.runid
- for key, getter in populators["quick"]:
- dict_[key] = getter(row)
- if populate_existing:
- for key, set_callable in populators["expire"]:
- dict_.pop(key, None)
- if set_callable:
- state.expired_attributes.add(key)
- else:
- for key, set_callable in populators["expire"]:
- if set_callable:
- state.expired_attributes.add(key)
- for key, populator in populators["new"]:
- populator(state, dict_, row)
- elif load_path != state.load_path:
- # new load path, e.g. object is present in more than one
- # column position in a series of rows
- state.load_path = load_path
- # if we have data, and the data isn't in the dict, OK, let's put
- # it in.
- for key, getter in populators["quick"]:
- if key not in dict_:
- dict_[key] = getter(row)
- # otherwise treat like an "already seen" row
- for key, populator in populators["existing"]:
- populator(state, dict_, row)
- # TODO: allow "existing" populator to know this is
- # a new path for the state:
- # populator(state, dict_, row, new_path=True)
- else:
- # have already seen rows with this identity in this same path.
- for key, populator in populators["existing"]:
- populator(state, dict_, row)
- # TODO: same path
- # populator(state, dict_, row, new_path=False)
- def _populate_partial(
- context, row, state, dict_, isnew, load_path, unloaded, populators
- ):
- if not isnew:
- if unloaded:
- # extra pass, see #8166
- for key, getter in populators["quick"]:
- if key in unloaded:
- dict_[key] = getter(row)
- to_load = context.partials[state]
- for key, populator in populators["existing"]:
- if key in to_load:
- populator(state, dict_, row)
- else:
- to_load = unloaded
- context.partials[state] = to_load
- for key, getter in populators["quick"]:
- if key in to_load:
- dict_[key] = getter(row)
- for key, set_callable in populators["expire"]:
- if key in to_load:
- dict_.pop(key, None)
- if set_callable:
- state.expired_attributes.add(key)
- for key, populator in populators["new"]:
- if key in to_load:
- populator(state, dict_, row)
- for key, populator in populators["eager"]:
- if key not in unloaded:
- populator(state, dict_, row)
- return to_load
- def _validate_version_id(mapper, state, dict_, row, getter):
- if mapper._get_state_attr_by_column(
- state, dict_, mapper.version_id_col
- ) != getter(row):
- raise orm_exc.StaleDataError(
- "Instance '%s' has version id '%s' which "
- "does not match database-loaded version id '%s'."
- % (
- state_str(state),
- mapper._get_state_attr_by_column(
- state, dict_, mapper.version_id_col
- ),
- getter(row),
- )
- )
- def _decorate_polymorphic_switch(
- instance_fn,
- context,
- query_entity,
- mapper,
- result,
- path,
- polymorphic_discriminator,
- adapter,
- ensure_no_pk,
- ):
- if polymorphic_discriminator is not None:
- polymorphic_on = polymorphic_discriminator
- else:
- polymorphic_on = mapper.polymorphic_on
- if polymorphic_on is None:
- return instance_fn
- if adapter:
- polymorphic_on = adapter.columns[polymorphic_on]
- def configure_subclass_mapper(discriminator):
- try:
- sub_mapper = mapper.polymorphic_map[discriminator]
- except KeyError:
- raise AssertionError(
- "No such polymorphic_identity %r is defined" % discriminator
- )
- else:
- if sub_mapper is mapper:
- return None
- elif not sub_mapper.isa(mapper):
- return False
- return _instance_processor(
- query_entity,
- sub_mapper,
- context,
- result,
- path,
- adapter,
- _polymorphic_from=mapper,
- )
- polymorphic_instances = util.PopulateDict(configure_subclass_mapper)
- getter = result._getter(polymorphic_on)
- def polymorphic_instance(row):
- discriminator = getter(row)
- if discriminator is not None:
- _instance = polymorphic_instances[discriminator]
- if _instance:
- return _instance(row)
- elif _instance is False:
- identitykey = ensure_no_pk(row)
- if identitykey:
- raise sa_exc.InvalidRequestError(
- "Row with identity key %s can't be loaded into an "
- "object; the polymorphic discriminator column '%s' "
- "refers to %s, which is not a sub-mapper of "
- "the requested %s"
- % (
- identitykey,
- polymorphic_on,
- mapper.polymorphic_map[discriminator],
- mapper,
- )
- )
- else:
- return None
- else:
- return instance_fn(row)
- else:
- identitykey = ensure_no_pk(row)
- if identitykey:
- raise sa_exc.InvalidRequestError(
- "Row with identity key %s can't be loaded into an "
- "object; the polymorphic discriminator column '%s' is "
- "NULL" % (identitykey, polymorphic_on)
- )
- else:
- return None
- return polymorphic_instance
- class PostLoad:
- """Track loaders and states for "post load" operations."""
- __slots__ = "loaders", "states", "load_keys"
- def __init__(self):
- self.loaders = {}
- self.states = util.OrderedDict()
- self.load_keys = None
- def add_state(self, state, overwrite):
- # the states for a polymorphic load here are all shared
- # within a single PostLoad object among multiple subtypes.
- # Filtering of callables on a per-subclass basis needs to be done at
- # the invocation level
- self.states[state] = overwrite
- def invoke(self, context, path):
- if not self.states:
- return
- path = path_registry.PathRegistry.coerce(path)
- for (
- effective_context,
- token,
- limit_to_mapper,
- loader,
- arg,
- kw,
- ) in self.loaders.values():
- states = [
- (state, overwrite)
- for state, overwrite in self.states.items()
- if state.manager.mapper.isa(limit_to_mapper)
- ]
- if states:
- loader(
- effective_context, path, states, self.load_keys, *arg, **kw
- )
- self.states.clear()
- @classmethod
- def for_context(cls, context, path, only_load_props):
- pl = context.post_load_paths.get(path.path)
- if pl is not None and only_load_props:
- pl.load_keys = only_load_props
- return pl
- @classmethod
- def path_exists(self, context, path, key):
- return (
- path.path in context.post_load_paths
- and key in context.post_load_paths[path.path].loaders
- )
- @classmethod
- def callable_for_path(
- cls, context, path, limit_to_mapper, token, loader_callable, *arg, **kw
- ):
- if path.path in context.post_load_paths:
- pl = context.post_load_paths[path.path]
- else:
- pl = context.post_load_paths[path.path] = PostLoad()
- pl.loaders[token] = (
- context,
- token,
- limit_to_mapper,
- loader_callable,
- arg,
- kw,
- )
- def load_scalar_attributes(mapper, state, attribute_names, passive):
- """initiate a column-based attribute refresh operation."""
- # assert mapper is _state_mapper(state)
- session = state.session
- if not session:
- raise orm_exc.DetachedInstanceError(
- "Instance %s is not bound to a Session; "
- "attribute refresh operation cannot proceed" % (state_str(state))
- )
- no_autoflush = bool(passive & attributes.NO_AUTOFLUSH)
- # in the case of inheritance, particularly concrete and abstract
- # concrete inheritance, the class manager might have some keys
- # of attributes on the superclass that we didn't actually map.
- # These could be mapped as "concrete, don't load" or could be completely
- # excluded from the mapping and we know nothing about them. Filter them
- # here to prevent them from coming through.
- if attribute_names:
- attribute_names = attribute_names.intersection(mapper.attrs.keys())
- if mapper.inherits and not mapper.concrete:
- # load based on committed attributes in the object, formed into
- # a truncated SELECT that only includes relevant tables. does not
- # currently use state.key
- statement = mapper._optimized_get_statement(state, attribute_names)
- if statement is not None:
- # undefer() isn't needed here because statement has the
- # columns needed already, this implicitly undefers that column
- stmt = FromStatement(mapper, statement)
- return load_on_ident(
- session,
- stmt,
- None,
- only_load_props=attribute_names,
- refresh_state=state,
- no_autoflush=no_autoflush,
- )
- # normal load, use state.key as the identity to SELECT
- has_key = bool(state.key)
- if has_key:
- identity_key = state.key
- else:
- # this codepath is rare - only valid when inside a flush, and the
- # object is becoming persistent but hasn't yet been assigned
- # an identity_key.
- # check here to ensure we have the attrs we need.
- pk_attrs = [
- mapper._columntoproperty[col].key for col in mapper.primary_key
- ]
- if state.expired_attributes.intersection(pk_attrs):
- raise sa_exc.InvalidRequestError(
- "Instance %s cannot be refreshed - it's not "
- " persistent and does not "
- "contain a full primary key." % state_str(state)
- )
- identity_key = mapper._identity_key_from_state(state)
- if (
- _none_set.issubset(identity_key) and not mapper.allow_partial_pks
- ) or _none_set.issuperset(identity_key):
- util.warn_limited(
- "Instance %s to be refreshed doesn't "
- "contain a full primary key - can't be refreshed "
- "(and shouldn't be expired, either).",
- state_str(state),
- )
- return
- result = load_on_ident(
- session,
- select(mapper).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL),
- identity_key,
- refresh_state=state,
- only_load_props=attribute_names,
- no_autoflush=no_autoflush,
- )
- # if instance is pending, a refresh operation
- # may not complete (even if PK attributes are assigned)
- if has_key and result is None:
- raise orm_exc.ObjectDeletedError(state)
|