loading.py 57 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686
  1. # orm/loading.py
  2. # Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. # mypy: ignore-errors
  8. """private module containing functions used to convert database
  9. rows into object instances and associated state.
  10. the functions here are called primarily by Query, Mapper,
  11. as well as some of the attribute loading strategies.
  12. """
  13. from __future__ import annotations
  14. from typing import Any
  15. from typing import Dict
  16. from typing import Iterable
  17. from typing import List
  18. from typing import Mapping
  19. from typing import Optional
  20. from typing import Sequence
  21. from typing import Tuple
  22. from typing import TYPE_CHECKING
  23. from typing import TypeVar
  24. from typing import Union
  25. from . import attributes
  26. from . import exc as orm_exc
  27. from . import path_registry
  28. from .base import _DEFER_FOR_STATE
  29. from .base import _RAISE_FOR_STATE
  30. from .base import _SET_DEFERRED_EXPIRED
  31. from .base import PassiveFlag
  32. from .context import FromStatement
  33. from .context import ORMCompileState
  34. from .context import QueryContext
  35. from .strategies import SelectInLoader
  36. from .util import _none_set
  37. from .util import state_str
  38. from .. import exc as sa_exc
  39. from .. import util
  40. from ..engine import result_tuple
  41. from ..engine.result import ChunkedIteratorResult
  42. from ..engine.result import FrozenResult
  43. from ..engine.result import SimpleResultMetaData
  44. from ..sql import select
  45. from ..sql import util as sql_util
  46. from ..sql.selectable import ForUpdateArg
  47. from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
  48. from ..sql.selectable import SelectState
  49. from ..util import EMPTY_DICT
  50. if TYPE_CHECKING:
  51. from ._typing import _IdentityKeyType
  52. from .base import LoaderCallableStatus
  53. from .interfaces import ORMOption
  54. from .mapper import Mapper
  55. from .query import Query
  56. from .session import Session
  57. from .state import InstanceState
  58. from ..engine.cursor import CursorResult
  59. from ..engine.interfaces import _ExecuteOptions
  60. from ..engine.result import Result
  61. from ..sql import Select
  62. _T = TypeVar("_T", bound=Any)
  63. _O = TypeVar("_O", bound=object)
  64. _new_runid = util.counter()
  65. _PopulatorDict = Dict[str, List[Tuple[str, Any]]]
  66. def instances(cursor: CursorResult[Any], context: QueryContext) -> Result[Any]:
  67. """Return a :class:`.Result` given an ORM query context.
  68. :param cursor: a :class:`.CursorResult`, generated by a statement
  69. which came from :class:`.ORMCompileState`
  70. :param context: a :class:`.QueryContext` object
  71. :return: a :class:`.Result` object representing ORM results
  72. .. versionchanged:: 1.4 The instances() function now uses
  73. :class:`.Result` objects and has an all new interface.
  74. """
  75. context.runid = _new_runid()
  76. if context.top_level_context:
  77. is_top_level = False
  78. context.post_load_paths = context.top_level_context.post_load_paths
  79. else:
  80. is_top_level = True
  81. context.post_load_paths = {}
  82. compile_state = context.compile_state
  83. filtered = compile_state._has_mapper_entities
  84. single_entity = (
  85. not context.load_options._only_return_tuples
  86. and len(compile_state._entities) == 1
  87. and compile_state._entities[0].supports_single_entity
  88. )
  89. try:
  90. (process, labels, extra) = list(
  91. zip(
  92. *[
  93. query_entity.row_processor(context, cursor)
  94. for query_entity in context.compile_state._entities
  95. ]
  96. )
  97. )
  98. if context.yield_per and (
  99. context.loaders_require_buffering
  100. or context.loaders_require_uniquing
  101. ):
  102. raise sa_exc.InvalidRequestError(
  103. "Can't use yield_per with eager loaders that require uniquing "
  104. "or row buffering, e.g. joinedload() against collections "
  105. "or subqueryload(). Consider the selectinload() strategy "
  106. "for better flexibility in loading objects."
  107. )
  108. except Exception:
  109. with util.safe_reraise():
  110. cursor.close()
  111. def _no_unique(entry):
  112. raise sa_exc.InvalidRequestError(
  113. "Can't use the ORM yield_per feature in conjunction with unique()"
  114. )
  115. def _not_hashable(datatype, *, legacy=False, uncertain=False):
  116. if not legacy:
  117. def go(obj):
  118. if uncertain:
  119. try:
  120. return hash(obj)
  121. except:
  122. pass
  123. raise sa_exc.InvalidRequestError(
  124. "Can't apply uniqueness to row tuple containing value of "
  125. f"""type {datatype!r}; {
  126. 'the values returned appear to be'
  127. if uncertain
  128. else 'this datatype produces'
  129. } non-hashable values"""
  130. )
  131. return go
  132. elif not uncertain:
  133. return id
  134. else:
  135. _use_id = False
  136. def go(obj):
  137. nonlocal _use_id
  138. if not _use_id:
  139. try:
  140. return hash(obj)
  141. except:
  142. pass
  143. # in #10459, we considered using a warning here, however
  144. # as legacy query uses result.unique() in all cases, this
  145. # would lead to too many warning cases.
  146. _use_id = True
  147. return id(obj)
  148. return go
  149. unique_filters = [
  150. (
  151. _no_unique
  152. if context.yield_per
  153. else (
  154. _not_hashable(
  155. ent.column.type, # type: ignore
  156. legacy=context.load_options._legacy_uniquing,
  157. uncertain=ent._null_column_type,
  158. )
  159. if (
  160. not ent.use_id_for_hash
  161. and (ent._non_hashable_value or ent._null_column_type)
  162. )
  163. else id if ent.use_id_for_hash else None
  164. )
  165. )
  166. for ent in context.compile_state._entities
  167. ]
  168. row_metadata = SimpleResultMetaData(
  169. labels, extra, _unique_filters=unique_filters
  170. )
  171. def chunks(size): # type: ignore
  172. while True:
  173. yield_per = size
  174. context.partials = {}
  175. if yield_per:
  176. fetch = cursor.fetchmany(yield_per)
  177. if not fetch:
  178. break
  179. else:
  180. fetch = cursor._raw_all_rows()
  181. if single_entity:
  182. proc = process[0]
  183. rows = [proc(row) for row in fetch]
  184. else:
  185. rows = [
  186. tuple([proc(row) for proc in process]) for row in fetch
  187. ]
  188. # if we are the originating load from a query, meaning we
  189. # aren't being called as a result of a nested "post load",
  190. # iterate through all the collected post loaders and fire them
  191. # off. Previously this used to work recursively, however that
  192. # prevented deeply nested structures from being loadable
  193. if is_top_level:
  194. if yield_per:
  195. # if using yield per, memoize the state of the
  196. # collection so that it can be restored
  197. top_level_post_loads = list(
  198. context.post_load_paths.items()
  199. )
  200. while context.post_load_paths:
  201. post_loads = list(context.post_load_paths.items())
  202. context.post_load_paths.clear()
  203. for path, post_load in post_loads:
  204. post_load.invoke(context, path)
  205. if yield_per:
  206. context.post_load_paths.clear()
  207. context.post_load_paths.update(top_level_post_loads)
  208. yield rows
  209. if not yield_per:
  210. break
  211. if context.execution_options.get("prebuffer_rows", False):
  212. # this is a bit of a hack at the moment.
  213. # I would rather have some option in the result to pre-buffer
  214. # internally.
  215. _prebuffered = list(chunks(None))
  216. def chunks(size):
  217. return iter(_prebuffered)
  218. result = ChunkedIteratorResult(
  219. row_metadata,
  220. chunks,
  221. source_supports_scalars=single_entity,
  222. raw=cursor,
  223. dynamic_yield_per=cursor.context._is_server_side,
  224. )
  225. # filtered and single_entity are used to indicate to legacy Query that the
  226. # query has ORM entities, so legacy deduping and scalars should be called
  227. # on the result.
  228. result._attributes = result._attributes.union(
  229. dict(filtered=filtered, is_single_entity=single_entity)
  230. )
  231. # multi_row_eager_loaders OTOH is specific to joinedload.
  232. if context.compile_state.multi_row_eager_loaders:
  233. def require_unique(obj):
  234. raise sa_exc.InvalidRequestError(
  235. "The unique() method must be invoked on this Result, "
  236. "as it contains results that include joined eager loads "
  237. "against collections"
  238. )
  239. result._unique_filter_state = (None, require_unique)
  240. if context.yield_per:
  241. result.yield_per(context.yield_per)
  242. return result
  243. @util.preload_module("sqlalchemy.orm.context")
  244. def merge_frozen_result(session, statement, frozen_result, load=True):
  245. """Merge a :class:`_engine.FrozenResult` back into a :class:`_orm.Session`,
  246. returning a new :class:`_engine.Result` object with :term:`persistent`
  247. objects.
  248. See the section :ref:`do_orm_execute_re_executing` for an example.
  249. .. seealso::
  250. :ref:`do_orm_execute_re_executing`
  251. :meth:`_engine.Result.freeze`
  252. :class:`_engine.FrozenResult`
  253. """
  254. querycontext = util.preloaded.orm_context
  255. if load:
  256. # flush current contents if we expect to load data
  257. session._autoflush()
  258. ctx = querycontext.ORMSelectCompileState._create_entities_collection(
  259. statement, legacy=False
  260. )
  261. autoflush = session.autoflush
  262. try:
  263. session.autoflush = False
  264. mapped_entities = [
  265. i
  266. for i, e in enumerate(ctx._entities)
  267. if isinstance(e, querycontext._MapperEntity)
  268. ]
  269. keys = [ent._label_name for ent in ctx._entities]
  270. keyed_tuple = result_tuple(
  271. keys, [ent._extra_entities for ent in ctx._entities]
  272. )
  273. result = []
  274. for newrow in frozen_result.rewrite_rows():
  275. for i in mapped_entities:
  276. if newrow[i] is not None:
  277. newrow[i] = session._merge(
  278. attributes.instance_state(newrow[i]),
  279. attributes.instance_dict(newrow[i]),
  280. load=load,
  281. _recursive={},
  282. _resolve_conflict_map={},
  283. )
  284. result.append(keyed_tuple(newrow))
  285. return frozen_result.with_new_rows(result)
  286. finally:
  287. session.autoflush = autoflush
  288. @util.became_legacy_20(
  289. ":func:`_orm.merge_result`",
  290. alternative="The function as well as the method on :class:`_orm.Query` "
  291. "is superseded by the :func:`_orm.merge_frozen_result` function.",
  292. )
  293. @util.preload_module("sqlalchemy.orm.context")
  294. def merge_result(
  295. query: Query[Any],
  296. iterator: Union[FrozenResult, Iterable[Sequence[Any]], Iterable[object]],
  297. load: bool = True,
  298. ) -> Union[FrozenResult, Iterable[Any]]:
  299. """Merge a result into the given :class:`.Query` object's Session.
  300. See :meth:`_orm.Query.merge_result` for top-level documentation on this
  301. function.
  302. """
  303. querycontext = util.preloaded.orm_context
  304. session = query.session
  305. if load:
  306. # flush current contents if we expect to load data
  307. session._autoflush()
  308. # TODO: need test coverage and documentation for the FrozenResult
  309. # use case.
  310. if isinstance(iterator, FrozenResult):
  311. frozen_result = iterator
  312. iterator = iter(frozen_result.data)
  313. else:
  314. frozen_result = None
  315. ctx = querycontext.ORMSelectCompileState._create_entities_collection(
  316. query, legacy=True
  317. )
  318. autoflush = session.autoflush
  319. try:
  320. session.autoflush = False
  321. single_entity = not frozen_result and len(ctx._entities) == 1
  322. if single_entity:
  323. if isinstance(ctx._entities[0], querycontext._MapperEntity):
  324. result = [
  325. session._merge(
  326. attributes.instance_state(instance),
  327. attributes.instance_dict(instance),
  328. load=load,
  329. _recursive={},
  330. _resolve_conflict_map={},
  331. )
  332. for instance in iterator
  333. ]
  334. else:
  335. result = list(iterator)
  336. else:
  337. mapped_entities = [
  338. i
  339. for i, e in enumerate(ctx._entities)
  340. if isinstance(e, querycontext._MapperEntity)
  341. ]
  342. result = []
  343. keys = [ent._label_name for ent in ctx._entities]
  344. keyed_tuple = result_tuple(
  345. keys, [ent._extra_entities for ent in ctx._entities]
  346. )
  347. for row in iterator:
  348. newrow = list(row)
  349. for i in mapped_entities:
  350. if newrow[i] is not None:
  351. newrow[i] = session._merge(
  352. attributes.instance_state(newrow[i]),
  353. attributes.instance_dict(newrow[i]),
  354. load=load,
  355. _recursive={},
  356. _resolve_conflict_map={},
  357. )
  358. result.append(keyed_tuple(newrow))
  359. if frozen_result:
  360. return frozen_result.with_new_rows(result)
  361. else:
  362. return iter(result)
  363. finally:
  364. session.autoflush = autoflush
  365. def get_from_identity(
  366. session: Session,
  367. mapper: Mapper[_O],
  368. key: _IdentityKeyType[_O],
  369. passive: PassiveFlag,
  370. ) -> Union[LoaderCallableStatus, Optional[_O]]:
  371. """Look up the given key in the given session's identity map,
  372. check the object for expired state if found.
  373. """
  374. instance = session.identity_map.get(key)
  375. if instance is not None:
  376. state = attributes.instance_state(instance)
  377. if mapper.inherits and not state.mapper.isa(mapper):
  378. return attributes.PASSIVE_CLASS_MISMATCH
  379. # expired - ensure it still exists
  380. if state.expired:
  381. if not passive & attributes.SQL_OK:
  382. # TODO: no coverage here
  383. return attributes.PASSIVE_NO_RESULT
  384. elif not passive & attributes.RELATED_OBJECT_OK:
  385. # this mode is used within a flush and the instance's
  386. # expired state will be checked soon enough, if necessary.
  387. # also used by immediateloader for a mutually-dependent
  388. # o2m->m2m load, :ticket:`6301`
  389. return instance
  390. try:
  391. state._load_expired(state, passive)
  392. except orm_exc.ObjectDeletedError:
  393. session._remove_newly_deleted([state])
  394. return None
  395. return instance
  396. else:
  397. return None
  398. def load_on_ident(
  399. session: Session,
  400. statement: Union[Select, FromStatement],
  401. key: Optional[_IdentityKeyType],
  402. *,
  403. load_options: Optional[Sequence[ORMOption]] = None,
  404. refresh_state: Optional[InstanceState[Any]] = None,
  405. with_for_update: Optional[ForUpdateArg] = None,
  406. only_load_props: Optional[Iterable[str]] = None,
  407. no_autoflush: bool = False,
  408. bind_arguments: Mapping[str, Any] = util.EMPTY_DICT,
  409. execution_options: _ExecuteOptions = util.EMPTY_DICT,
  410. require_pk_cols: bool = False,
  411. is_user_refresh: bool = False,
  412. ):
  413. """Load the given identity key from the database."""
  414. if key is not None:
  415. ident = key[1]
  416. identity_token = key[2]
  417. else:
  418. ident = identity_token = None
  419. return load_on_pk_identity(
  420. session,
  421. statement,
  422. ident,
  423. load_options=load_options,
  424. refresh_state=refresh_state,
  425. with_for_update=with_for_update,
  426. only_load_props=only_load_props,
  427. identity_token=identity_token,
  428. no_autoflush=no_autoflush,
  429. bind_arguments=bind_arguments,
  430. execution_options=execution_options,
  431. require_pk_cols=require_pk_cols,
  432. is_user_refresh=is_user_refresh,
  433. )
  434. def load_on_pk_identity(
  435. session: Session,
  436. statement: Union[Select, FromStatement],
  437. primary_key_identity: Optional[Tuple[Any, ...]],
  438. *,
  439. load_options: Optional[Sequence[ORMOption]] = None,
  440. refresh_state: Optional[InstanceState[Any]] = None,
  441. with_for_update: Optional[ForUpdateArg] = None,
  442. only_load_props: Optional[Iterable[str]] = None,
  443. identity_token: Optional[Any] = None,
  444. no_autoflush: bool = False,
  445. bind_arguments: Mapping[str, Any] = util.EMPTY_DICT,
  446. execution_options: _ExecuteOptions = util.EMPTY_DICT,
  447. require_pk_cols: bool = False,
  448. is_user_refresh: bool = False,
  449. ):
  450. """Load the given primary key identity from the database."""
  451. query = statement
  452. q = query._clone()
  453. assert not q._is_lambda_element
  454. if load_options is None:
  455. load_options = QueryContext.default_load_options
  456. if (
  457. statement._compile_options
  458. is SelectState.default_select_compile_options
  459. ):
  460. compile_options = ORMCompileState.default_compile_options
  461. else:
  462. compile_options = statement._compile_options
  463. if primary_key_identity is not None:
  464. mapper = query._propagate_attrs["plugin_subject"]
  465. (_get_clause, _get_params) = mapper._get_clause
  466. # None present in ident - turn those comparisons
  467. # into "IS NULL"
  468. if None in primary_key_identity:
  469. nones = {
  470. _get_params[col].key
  471. for col, value in zip(mapper.primary_key, primary_key_identity)
  472. if value is None
  473. }
  474. _get_clause = sql_util.adapt_criterion_to_null(_get_clause, nones)
  475. if len(nones) == len(primary_key_identity):
  476. util.warn(
  477. "fully NULL primary key identity cannot load any "
  478. "object. This condition may raise an error in a future "
  479. "release."
  480. )
  481. q._where_criteria = (
  482. sql_util._deep_annotate(_get_clause, {"_orm_adapt": True}),
  483. )
  484. params = {
  485. _get_params[primary_key].key: id_val
  486. for id_val, primary_key in zip(
  487. primary_key_identity, mapper.primary_key
  488. )
  489. }
  490. else:
  491. params = None
  492. if with_for_update is not None:
  493. version_check = True
  494. q._for_update_arg = with_for_update
  495. elif query._for_update_arg is not None:
  496. version_check = True
  497. q._for_update_arg = query._for_update_arg
  498. else:
  499. version_check = False
  500. if require_pk_cols and only_load_props:
  501. if not refresh_state:
  502. raise sa_exc.ArgumentError(
  503. "refresh_state is required when require_pk_cols is present"
  504. )
  505. refresh_state_prokeys = refresh_state.mapper._primary_key_propkeys
  506. has_changes = {
  507. key
  508. for key in refresh_state_prokeys.difference(only_load_props)
  509. if refresh_state.attrs[key].history.has_changes()
  510. }
  511. if has_changes:
  512. # raise if pending pk changes are present.
  513. # technically, this could be limited to the case where we have
  514. # relationships in the only_load_props collection to be refreshed
  515. # also (and only ones that have a secondary eager loader, at that).
  516. # however, the error is in place across the board so that behavior
  517. # here is easier to predict. The use case it prevents is one
  518. # of mutating PK attrs, leaving them unflushed,
  519. # calling session.refresh(), and expecting those attrs to remain
  520. # still unflushed. It seems likely someone doing all those
  521. # things would be better off having the PK attributes flushed
  522. # to the database before tinkering like that (session.refresh() is
  523. # tinkering).
  524. raise sa_exc.InvalidRequestError(
  525. f"Please flush pending primary key changes on "
  526. "attributes "
  527. f"{has_changes} for mapper {refresh_state.mapper} before "
  528. "proceeding with a refresh"
  529. )
  530. # overall, the ORM has no internal flow right now for "dont load the
  531. # primary row of an object at all, but fire off
  532. # selectinload/subqueryload/immediateload for some relationships".
  533. # It would probably be a pretty big effort to add such a flow. So
  534. # here, the case for #8703 is introduced; user asks to refresh some
  535. # relationship attributes only which are
  536. # selectinload/subqueryload/immediateload/ etc. (not joinedload).
  537. # ORM complains there's no columns in the primary row to load.
  538. # So here, we just add the PK cols if that
  539. # case is detected, so that there is a SELECT emitted for the primary
  540. # row.
  541. #
  542. # Let's just state right up front, for this one little case,
  543. # the ORM here is adding a whole extra SELECT just to satisfy
  544. # limitations in the internal flow. This is really not a thing
  545. # SQLAlchemy finds itself doing like, ever, obviously, we are
  546. # constantly working to *remove* SELECTs we don't need. We
  547. # rationalize this for now based on 1. session.refresh() is not
  548. # commonly used 2. session.refresh() with only relationship attrs is
  549. # even less commonly used 3. the SELECT in question is very low
  550. # latency.
  551. #
  552. # to add the flow to not include the SELECT, the quickest way
  553. # might be to just manufacture a single-row result set to send off to
  554. # instances(), but we'd have to weave that into context.py and all
  555. # that. For 2.0.0, we have enough big changes to navigate for now.
  556. #
  557. mp = refresh_state.mapper._props
  558. for p in only_load_props:
  559. if mp[p]._is_relationship:
  560. only_load_props = refresh_state_prokeys.union(only_load_props)
  561. break
  562. if refresh_state and refresh_state.load_options:
  563. compile_options += {"_current_path": refresh_state.load_path.parent}
  564. q = q.options(*refresh_state.load_options)
  565. new_compile_options, load_options = _set_get_options(
  566. compile_options,
  567. load_options,
  568. version_check=version_check,
  569. only_load_props=only_load_props,
  570. refresh_state=refresh_state,
  571. identity_token=identity_token,
  572. is_user_refresh=is_user_refresh,
  573. )
  574. q._compile_options = new_compile_options
  575. q._order_by = None
  576. if no_autoflush:
  577. load_options += {"_autoflush": False}
  578. execution_options = util.EMPTY_DICT.merge_with(
  579. execution_options, {"_sa_orm_load_options": load_options}
  580. )
  581. result = (
  582. session.execute(
  583. q,
  584. params=params,
  585. execution_options=execution_options,
  586. bind_arguments=bind_arguments,
  587. )
  588. .unique()
  589. .scalars()
  590. )
  591. try:
  592. return result.one()
  593. except orm_exc.NoResultFound:
  594. return None
  595. def _set_get_options(
  596. compile_opt,
  597. load_opt,
  598. populate_existing=None,
  599. version_check=None,
  600. only_load_props=None,
  601. refresh_state=None,
  602. identity_token=None,
  603. is_user_refresh=None,
  604. ):
  605. compile_options = {}
  606. load_options = {}
  607. if version_check:
  608. load_options["_version_check"] = version_check
  609. if populate_existing:
  610. load_options["_populate_existing"] = populate_existing
  611. if refresh_state:
  612. load_options["_refresh_state"] = refresh_state
  613. compile_options["_for_refresh_state"] = True
  614. if only_load_props:
  615. compile_options["_only_load_props"] = frozenset(only_load_props)
  616. if identity_token:
  617. load_options["_identity_token"] = identity_token
  618. if is_user_refresh:
  619. load_options["_is_user_refresh"] = is_user_refresh
  620. if load_options:
  621. load_opt += load_options
  622. if compile_options:
  623. compile_opt += compile_options
  624. return compile_opt, load_opt
  625. def _setup_entity_query(
  626. compile_state,
  627. mapper,
  628. query_entity,
  629. path,
  630. adapter,
  631. column_collection,
  632. with_polymorphic=None,
  633. only_load_props=None,
  634. polymorphic_discriminator=None,
  635. **kw,
  636. ):
  637. if with_polymorphic:
  638. poly_properties = mapper._iterate_polymorphic_properties(
  639. with_polymorphic
  640. )
  641. else:
  642. poly_properties = mapper._polymorphic_properties
  643. quick_populators = {}
  644. path.set(compile_state.attributes, "memoized_setups", quick_populators)
  645. # for the lead entities in the path, e.g. not eager loads, and
  646. # assuming a user-passed aliased class, e.g. not a from_self() or any
  647. # implicit aliasing, don't add columns to the SELECT that aren't
  648. # in the thing that's aliased.
  649. check_for_adapt = adapter and len(path) == 1 and path[-1].is_aliased_class
  650. for value in poly_properties:
  651. if only_load_props and value.key not in only_load_props:
  652. continue
  653. value.setup(
  654. compile_state,
  655. query_entity,
  656. path,
  657. adapter,
  658. only_load_props=only_load_props,
  659. column_collection=column_collection,
  660. memoized_populators=quick_populators,
  661. check_for_adapt=check_for_adapt,
  662. **kw,
  663. )
  664. if (
  665. polymorphic_discriminator is not None
  666. and polymorphic_discriminator is not mapper.polymorphic_on
  667. ):
  668. if adapter:
  669. pd = adapter.columns[polymorphic_discriminator]
  670. else:
  671. pd = polymorphic_discriminator
  672. column_collection.append(pd)
  673. def _warn_for_runid_changed(state):
  674. util.warn(
  675. "Loading context for %s has changed within a load/refresh "
  676. "handler, suggesting a row refresh operation took place. If this "
  677. "event handler is expected to be "
  678. "emitting row refresh operations within an existing load or refresh "
  679. "operation, set restore_load_context=True when establishing the "
  680. "listener to ensure the context remains unchanged when the event "
  681. "handler completes." % (state_str(state),)
  682. )
  683. def _instance_processor(
  684. query_entity,
  685. mapper,
  686. context,
  687. result,
  688. path,
  689. adapter,
  690. only_load_props=None,
  691. refresh_state=None,
  692. polymorphic_discriminator=None,
  693. _polymorphic_from=None,
  694. ):
  695. """Produce a mapper level row processor callable
  696. which processes rows into mapped instances."""
  697. # note that this method, most of which exists in a closure
  698. # called _instance(), resists being broken out, as
  699. # attempts to do so tend to add significant function
  700. # call overhead. _instance() is the most
  701. # performance-critical section in the whole ORM.
  702. identity_class = mapper._identity_class
  703. compile_state = context.compile_state
  704. # look for "row getter" functions that have been assigned along
  705. # with the compile state that were cached from a previous load.
  706. # these are operator.itemgetter() objects that each will extract a
  707. # particular column from each row.
  708. getter_key = ("getters", mapper)
  709. getters = path.get(compile_state.attributes, getter_key, None)
  710. if getters is None:
  711. # no getters, so go through a list of attributes we are loading for,
  712. # and the ones that are column based will have already put information
  713. # for us in another collection "memoized_setups", which represents the
  714. # output of the LoaderStrategy.setup_query() method. We can just as
  715. # easily call LoaderStrategy.create_row_processor for each, but by
  716. # getting it all at once from setup_query we save another method call
  717. # per attribute.
  718. props = mapper._prop_set
  719. if only_load_props is not None:
  720. props = props.intersection(
  721. mapper._props[k] for k in only_load_props
  722. )
  723. quick_populators = path.get(
  724. context.attributes, "memoized_setups", EMPTY_DICT
  725. )
  726. todo = []
  727. cached_populators = {
  728. "new": [],
  729. "quick": [],
  730. "deferred": [],
  731. "expire": [],
  732. "existing": [],
  733. "eager": [],
  734. }
  735. if refresh_state is None:
  736. # we can also get the "primary key" tuple getter function
  737. pk_cols = mapper.primary_key
  738. if adapter:
  739. pk_cols = [adapter.columns[c] for c in pk_cols]
  740. primary_key_getter = result._tuple_getter(pk_cols)
  741. else:
  742. primary_key_getter = None
  743. getters = {
  744. "cached_populators": cached_populators,
  745. "todo": todo,
  746. "primary_key_getter": primary_key_getter,
  747. }
  748. for prop in props:
  749. if prop in quick_populators:
  750. # this is an inlined path just for column-based attributes.
  751. col = quick_populators[prop]
  752. if col is _DEFER_FOR_STATE:
  753. cached_populators["new"].append(
  754. (prop.key, prop._deferred_column_loader)
  755. )
  756. elif col is _SET_DEFERRED_EXPIRED:
  757. # note that in this path, we are no longer
  758. # searching in the result to see if the column might
  759. # be present in some unexpected way.
  760. cached_populators["expire"].append((prop.key, False))
  761. elif col is _RAISE_FOR_STATE:
  762. cached_populators["new"].append(
  763. (prop.key, prop._raise_column_loader)
  764. )
  765. else:
  766. getter = None
  767. if adapter:
  768. # this logic had been removed for all 1.4 releases
  769. # up until 1.4.18; the adapter here is particularly
  770. # the compound eager adapter which isn't accommodated
  771. # in the quick_populators right now. The "fallback"
  772. # logic below instead took over in many more cases
  773. # until issue #6596 was identified.
  774. # note there is still an issue where this codepath
  775. # produces no "getter" for cases where a joined-inh
  776. # mapping includes a labeled column property, meaning
  777. # KeyError is caught internally and we fall back to
  778. # _getter(col), which works anyway. The adapter
  779. # here for joined inh without any aliasing might not
  780. # be useful. Tests which see this include
  781. # test.orm.inheritance.test_basic ->
  782. # EagerTargetingTest.test_adapt_stringency
  783. # OptimizedLoadTest.test_column_expression_joined
  784. # PolymorphicOnNotLocalTest.test_polymorphic_on_column_prop # noqa: E501
  785. #
  786. adapted_col = adapter.columns[col]
  787. if adapted_col is not None:
  788. getter = result._getter(adapted_col, False)
  789. if not getter:
  790. getter = result._getter(col, False)
  791. if getter:
  792. cached_populators["quick"].append((prop.key, getter))
  793. else:
  794. # fall back to the ColumnProperty itself, which
  795. # will iterate through all of its columns
  796. # to see if one fits
  797. prop.create_row_processor(
  798. context,
  799. query_entity,
  800. path,
  801. mapper,
  802. result,
  803. adapter,
  804. cached_populators,
  805. )
  806. else:
  807. # loader strategies like subqueryload, selectinload,
  808. # joinedload, basically relationships, these need to interact
  809. # with the context each time to work correctly.
  810. todo.append(prop)
  811. path.set(compile_state.attributes, getter_key, getters)
  812. cached_populators = getters["cached_populators"]
  813. populators = {key: list(value) for key, value in cached_populators.items()}
  814. for prop in getters["todo"]:
  815. prop.create_row_processor(
  816. context, query_entity, path, mapper, result, adapter, populators
  817. )
  818. propagated_loader_options = context.propagated_loader_options
  819. load_path = (
  820. context.compile_state.current_path + path
  821. if context.compile_state.current_path.path
  822. else path
  823. )
  824. session_identity_map = context.session.identity_map
  825. populate_existing = context.populate_existing or mapper.always_refresh
  826. load_evt = bool(mapper.class_manager.dispatch.load)
  827. refresh_evt = bool(mapper.class_manager.dispatch.refresh)
  828. persistent_evt = bool(context.session.dispatch.loaded_as_persistent)
  829. if persistent_evt:
  830. loaded_as_persistent = context.session.dispatch.loaded_as_persistent
  831. instance_state = attributes.instance_state
  832. instance_dict = attributes.instance_dict
  833. session_id = context.session.hash_key
  834. runid = context.runid
  835. identity_token = context.identity_token
  836. version_check = context.version_check
  837. if version_check:
  838. version_id_col = mapper.version_id_col
  839. if version_id_col is not None:
  840. if adapter:
  841. version_id_col = adapter.columns[version_id_col]
  842. version_id_getter = result._getter(version_id_col)
  843. else:
  844. version_id_getter = None
  845. if not refresh_state and _polymorphic_from is not None:
  846. key = ("loader", path.path)
  847. if key in context.attributes and context.attributes[key].strategy == (
  848. ("selectinload_polymorphic", True),
  849. ):
  850. option_entities = context.attributes[key].local_opts["entities"]
  851. else:
  852. option_entities = None
  853. selectin_load_via = mapper._should_selectin_load(
  854. option_entities,
  855. _polymorphic_from,
  856. )
  857. if selectin_load_via and selectin_load_via is not _polymorphic_from:
  858. # only_load_props goes w/ refresh_state only, and in a refresh
  859. # we are a single row query for the exact entity; polymorphic
  860. # loading does not apply
  861. assert only_load_props is None
  862. if selectin_load_via.is_mapper:
  863. _load_supers = []
  864. _endmost_mapper = selectin_load_via
  865. while (
  866. _endmost_mapper
  867. and _endmost_mapper is not _polymorphic_from
  868. ):
  869. _load_supers.append(_endmost_mapper)
  870. _endmost_mapper = _endmost_mapper.inherits
  871. else:
  872. _load_supers = [selectin_load_via]
  873. for _selectinload_entity in _load_supers:
  874. if PostLoad.path_exists(
  875. context, load_path, _selectinload_entity
  876. ):
  877. continue
  878. callable_ = _load_subclass_via_in(
  879. context,
  880. path,
  881. _selectinload_entity,
  882. _polymorphic_from,
  883. option_entities,
  884. )
  885. PostLoad.callable_for_path(
  886. context,
  887. load_path,
  888. _selectinload_entity.mapper,
  889. _selectinload_entity,
  890. callable_,
  891. _selectinload_entity,
  892. )
  893. post_load = PostLoad.for_context(context, load_path, only_load_props)
  894. if refresh_state:
  895. refresh_identity_key = refresh_state.key
  896. if refresh_identity_key is None:
  897. # super-rare condition; a refresh is being called
  898. # on a non-instance-key instance; this is meant to only
  899. # occur within a flush()
  900. refresh_identity_key = mapper._identity_key_from_state(
  901. refresh_state
  902. )
  903. else:
  904. refresh_identity_key = None
  905. primary_key_getter = getters["primary_key_getter"]
  906. if mapper.allow_partial_pks:
  907. is_not_primary_key = _none_set.issuperset
  908. else:
  909. is_not_primary_key = _none_set.intersection
  910. def _instance(row):
  911. # determine the state that we'll be populating
  912. if refresh_identity_key:
  913. # fixed state that we're refreshing
  914. state = refresh_state
  915. instance = state.obj()
  916. dict_ = instance_dict(instance)
  917. isnew = state.runid != runid
  918. currentload = True
  919. loaded_instance = False
  920. else:
  921. # look at the row, see if that identity is in the
  922. # session, or we have to create a new one
  923. identitykey = (
  924. identity_class,
  925. primary_key_getter(row),
  926. identity_token,
  927. )
  928. instance = session_identity_map.get(identitykey)
  929. if instance is not None:
  930. # existing instance
  931. state = instance_state(instance)
  932. dict_ = instance_dict(instance)
  933. isnew = state.runid != runid
  934. currentload = not isnew
  935. loaded_instance = False
  936. if version_check and version_id_getter and not currentload:
  937. _validate_version_id(
  938. mapper, state, dict_, row, version_id_getter
  939. )
  940. else:
  941. # create a new instance
  942. # check for non-NULL values in the primary key columns,
  943. # else no entity is returned for the row
  944. if is_not_primary_key(identitykey[1]):
  945. return None
  946. isnew = True
  947. currentload = True
  948. loaded_instance = True
  949. instance = mapper.class_manager.new_instance()
  950. dict_ = instance_dict(instance)
  951. state = instance_state(instance)
  952. state.key = identitykey
  953. state.identity_token = identity_token
  954. # attach instance to session.
  955. state.session_id = session_id
  956. session_identity_map._add_unpresent(state, identitykey)
  957. effective_populate_existing = populate_existing
  958. if refresh_state is state:
  959. effective_populate_existing = True
  960. # populate. this looks at whether this state is new
  961. # for this load or was existing, and whether or not this
  962. # row is the first row with this identity.
  963. if currentload or effective_populate_existing:
  964. # full population routines. Objects here are either
  965. # just created, or we are doing a populate_existing
  966. # be conservative about setting load_path when populate_existing
  967. # is in effect; want to maintain options from the original
  968. # load. see test_expire->test_refresh_maintains_deferred_options
  969. if isnew and (
  970. propagated_loader_options or not effective_populate_existing
  971. ):
  972. state.load_options = propagated_loader_options
  973. state.load_path = load_path
  974. _populate_full(
  975. context,
  976. row,
  977. state,
  978. dict_,
  979. isnew,
  980. load_path,
  981. loaded_instance,
  982. effective_populate_existing,
  983. populators,
  984. )
  985. if isnew:
  986. # state.runid should be equal to context.runid / runid
  987. # here, however for event checks we are being more conservative
  988. # and checking against existing run id
  989. # assert state.runid == runid
  990. existing_runid = state.runid
  991. if loaded_instance:
  992. if load_evt:
  993. state.manager.dispatch.load(state, context)
  994. if state.runid != existing_runid:
  995. _warn_for_runid_changed(state)
  996. if persistent_evt:
  997. loaded_as_persistent(context.session, state)
  998. if state.runid != existing_runid:
  999. _warn_for_runid_changed(state)
  1000. elif refresh_evt:
  1001. state.manager.dispatch.refresh(
  1002. state, context, only_load_props
  1003. )
  1004. if state.runid != runid:
  1005. _warn_for_runid_changed(state)
  1006. if effective_populate_existing or state.modified:
  1007. if refresh_state and only_load_props:
  1008. state._commit(dict_, only_load_props)
  1009. else:
  1010. state._commit_all(dict_, session_identity_map)
  1011. if post_load:
  1012. post_load.add_state(state, True)
  1013. else:
  1014. # partial population routines, for objects that were already
  1015. # in the Session, but a row matches them; apply eager loaders
  1016. # on existing objects, etc.
  1017. unloaded = state.unloaded
  1018. isnew = state not in context.partials
  1019. if not isnew or unloaded or populators["eager"]:
  1020. # state is having a partial set of its attributes
  1021. # refreshed. Populate those attributes,
  1022. # and add to the "context.partials" collection.
  1023. to_load = _populate_partial(
  1024. context,
  1025. row,
  1026. state,
  1027. dict_,
  1028. isnew,
  1029. load_path,
  1030. unloaded,
  1031. populators,
  1032. )
  1033. if isnew:
  1034. if refresh_evt:
  1035. existing_runid = state.runid
  1036. state.manager.dispatch.refresh(state, context, to_load)
  1037. if state.runid != existing_runid:
  1038. _warn_for_runid_changed(state)
  1039. state._commit(dict_, to_load)
  1040. if post_load and context.invoke_all_eagers:
  1041. post_load.add_state(state, False)
  1042. return instance
  1043. if mapper.polymorphic_map and not _polymorphic_from and not refresh_state:
  1044. # if we are doing polymorphic, dispatch to a different _instance()
  1045. # method specific to the subclass mapper
  1046. def ensure_no_pk(row):
  1047. identitykey = (
  1048. identity_class,
  1049. primary_key_getter(row),
  1050. identity_token,
  1051. )
  1052. if not is_not_primary_key(identitykey[1]):
  1053. return identitykey
  1054. else:
  1055. return None
  1056. _instance = _decorate_polymorphic_switch(
  1057. _instance,
  1058. context,
  1059. query_entity,
  1060. mapper,
  1061. result,
  1062. path,
  1063. polymorphic_discriminator,
  1064. adapter,
  1065. ensure_no_pk,
  1066. )
  1067. return _instance
  1068. def _load_subclass_via_in(
  1069. context, path, entity, polymorphic_from, option_entities
  1070. ):
  1071. mapper = entity.mapper
  1072. # TODO: polymorphic_from seems to be a Mapper in all cases.
  1073. # this is likely not needed, but as we dont have typing in loading.py
  1074. # yet, err on the safe side
  1075. polymorphic_from_mapper = polymorphic_from.mapper
  1076. not_against_basemost = polymorphic_from_mapper.inherits is not None
  1077. zero_idx = len(mapper.base_mapper.primary_key) == 1
  1078. if entity.is_aliased_class or not_against_basemost:
  1079. q, enable_opt, disable_opt = mapper._subclass_load_via_in(
  1080. entity, polymorphic_from
  1081. )
  1082. else:
  1083. q, enable_opt, disable_opt = mapper._subclass_load_via_in_mapper
  1084. def do_load(context, path, states, load_only, effective_entity):
  1085. if not option_entities:
  1086. # filter out states for those that would have selectinloaded
  1087. # from another loader
  1088. # TODO: we are currently ignoring the case where the
  1089. # "selectin_polymorphic" option is used, as this is much more
  1090. # complex / specific / very uncommon API use
  1091. states = [
  1092. (s, v)
  1093. for s, v in states
  1094. if s.mapper._would_selectin_load_only_from_given_mapper(mapper)
  1095. ]
  1096. if not states:
  1097. return
  1098. orig_query = context.query
  1099. if path.parent:
  1100. enable_opt_lcl = enable_opt._prepend_path(path)
  1101. disable_opt_lcl = disable_opt._prepend_path(path)
  1102. else:
  1103. enable_opt_lcl = enable_opt
  1104. disable_opt_lcl = disable_opt
  1105. options = (
  1106. (enable_opt_lcl,) + orig_query._with_options + (disable_opt_lcl,)
  1107. )
  1108. q2 = q.options(*options)
  1109. q2._compile_options = context.compile_state.default_compile_options
  1110. q2._compile_options += {"_current_path": path.parent}
  1111. if context.populate_existing:
  1112. q2 = q2.execution_options(populate_existing=True)
  1113. while states:
  1114. chunk = states[0 : SelectInLoader._chunksize]
  1115. states = states[SelectInLoader._chunksize :]
  1116. context.session.execute(
  1117. q2,
  1118. dict(
  1119. primary_keys=[
  1120. state.key[1][0] if zero_idx else state.key[1]
  1121. for state, load_attrs in chunk
  1122. ]
  1123. ),
  1124. ).unique().scalars().all()
  1125. return do_load
  1126. def _populate_full(
  1127. context,
  1128. row,
  1129. state,
  1130. dict_,
  1131. isnew,
  1132. load_path,
  1133. loaded_instance,
  1134. populate_existing,
  1135. populators,
  1136. ):
  1137. if isnew:
  1138. # first time we are seeing a row with this identity.
  1139. state.runid = context.runid
  1140. for key, getter in populators["quick"]:
  1141. dict_[key] = getter(row)
  1142. if populate_existing:
  1143. for key, set_callable in populators["expire"]:
  1144. dict_.pop(key, None)
  1145. if set_callable:
  1146. state.expired_attributes.add(key)
  1147. else:
  1148. for key, set_callable in populators["expire"]:
  1149. if set_callable:
  1150. state.expired_attributes.add(key)
  1151. for key, populator in populators["new"]:
  1152. populator(state, dict_, row)
  1153. elif load_path != state.load_path:
  1154. # new load path, e.g. object is present in more than one
  1155. # column position in a series of rows
  1156. state.load_path = load_path
  1157. # if we have data, and the data isn't in the dict, OK, let's put
  1158. # it in.
  1159. for key, getter in populators["quick"]:
  1160. if key not in dict_:
  1161. dict_[key] = getter(row)
  1162. # otherwise treat like an "already seen" row
  1163. for key, populator in populators["existing"]:
  1164. populator(state, dict_, row)
  1165. # TODO: allow "existing" populator to know this is
  1166. # a new path for the state:
  1167. # populator(state, dict_, row, new_path=True)
  1168. else:
  1169. # have already seen rows with this identity in this same path.
  1170. for key, populator in populators["existing"]:
  1171. populator(state, dict_, row)
  1172. # TODO: same path
  1173. # populator(state, dict_, row, new_path=False)
  1174. def _populate_partial(
  1175. context, row, state, dict_, isnew, load_path, unloaded, populators
  1176. ):
  1177. if not isnew:
  1178. if unloaded:
  1179. # extra pass, see #8166
  1180. for key, getter in populators["quick"]:
  1181. if key in unloaded:
  1182. dict_[key] = getter(row)
  1183. to_load = context.partials[state]
  1184. for key, populator in populators["existing"]:
  1185. if key in to_load:
  1186. populator(state, dict_, row)
  1187. else:
  1188. to_load = unloaded
  1189. context.partials[state] = to_load
  1190. for key, getter in populators["quick"]:
  1191. if key in to_load:
  1192. dict_[key] = getter(row)
  1193. for key, set_callable in populators["expire"]:
  1194. if key in to_load:
  1195. dict_.pop(key, None)
  1196. if set_callable:
  1197. state.expired_attributes.add(key)
  1198. for key, populator in populators["new"]:
  1199. if key in to_load:
  1200. populator(state, dict_, row)
  1201. for key, populator in populators["eager"]:
  1202. if key not in unloaded:
  1203. populator(state, dict_, row)
  1204. return to_load
  1205. def _validate_version_id(mapper, state, dict_, row, getter):
  1206. if mapper._get_state_attr_by_column(
  1207. state, dict_, mapper.version_id_col
  1208. ) != getter(row):
  1209. raise orm_exc.StaleDataError(
  1210. "Instance '%s' has version id '%s' which "
  1211. "does not match database-loaded version id '%s'."
  1212. % (
  1213. state_str(state),
  1214. mapper._get_state_attr_by_column(
  1215. state, dict_, mapper.version_id_col
  1216. ),
  1217. getter(row),
  1218. )
  1219. )
  1220. def _decorate_polymorphic_switch(
  1221. instance_fn,
  1222. context,
  1223. query_entity,
  1224. mapper,
  1225. result,
  1226. path,
  1227. polymorphic_discriminator,
  1228. adapter,
  1229. ensure_no_pk,
  1230. ):
  1231. if polymorphic_discriminator is not None:
  1232. polymorphic_on = polymorphic_discriminator
  1233. else:
  1234. polymorphic_on = mapper.polymorphic_on
  1235. if polymorphic_on is None:
  1236. return instance_fn
  1237. if adapter:
  1238. polymorphic_on = adapter.columns[polymorphic_on]
  1239. def configure_subclass_mapper(discriminator):
  1240. try:
  1241. sub_mapper = mapper.polymorphic_map[discriminator]
  1242. except KeyError:
  1243. raise AssertionError(
  1244. "No such polymorphic_identity %r is defined" % discriminator
  1245. )
  1246. else:
  1247. if sub_mapper is mapper:
  1248. return None
  1249. elif not sub_mapper.isa(mapper):
  1250. return False
  1251. return _instance_processor(
  1252. query_entity,
  1253. sub_mapper,
  1254. context,
  1255. result,
  1256. path,
  1257. adapter,
  1258. _polymorphic_from=mapper,
  1259. )
  1260. polymorphic_instances = util.PopulateDict(configure_subclass_mapper)
  1261. getter = result._getter(polymorphic_on)
  1262. def polymorphic_instance(row):
  1263. discriminator = getter(row)
  1264. if discriminator is not None:
  1265. _instance = polymorphic_instances[discriminator]
  1266. if _instance:
  1267. return _instance(row)
  1268. elif _instance is False:
  1269. identitykey = ensure_no_pk(row)
  1270. if identitykey:
  1271. raise sa_exc.InvalidRequestError(
  1272. "Row with identity key %s can't be loaded into an "
  1273. "object; the polymorphic discriminator column '%s' "
  1274. "refers to %s, which is not a sub-mapper of "
  1275. "the requested %s"
  1276. % (
  1277. identitykey,
  1278. polymorphic_on,
  1279. mapper.polymorphic_map[discriminator],
  1280. mapper,
  1281. )
  1282. )
  1283. else:
  1284. return None
  1285. else:
  1286. return instance_fn(row)
  1287. else:
  1288. identitykey = ensure_no_pk(row)
  1289. if identitykey:
  1290. raise sa_exc.InvalidRequestError(
  1291. "Row with identity key %s can't be loaded into an "
  1292. "object; the polymorphic discriminator column '%s' is "
  1293. "NULL" % (identitykey, polymorphic_on)
  1294. )
  1295. else:
  1296. return None
  1297. return polymorphic_instance
  1298. class PostLoad:
  1299. """Track loaders and states for "post load" operations."""
  1300. __slots__ = "loaders", "states", "load_keys"
  1301. def __init__(self):
  1302. self.loaders = {}
  1303. self.states = util.OrderedDict()
  1304. self.load_keys = None
  1305. def add_state(self, state, overwrite):
  1306. # the states for a polymorphic load here are all shared
  1307. # within a single PostLoad object among multiple subtypes.
  1308. # Filtering of callables on a per-subclass basis needs to be done at
  1309. # the invocation level
  1310. self.states[state] = overwrite
  1311. def invoke(self, context, path):
  1312. if not self.states:
  1313. return
  1314. path = path_registry.PathRegistry.coerce(path)
  1315. for (
  1316. effective_context,
  1317. token,
  1318. limit_to_mapper,
  1319. loader,
  1320. arg,
  1321. kw,
  1322. ) in self.loaders.values():
  1323. states = [
  1324. (state, overwrite)
  1325. for state, overwrite in self.states.items()
  1326. if state.manager.mapper.isa(limit_to_mapper)
  1327. ]
  1328. if states:
  1329. loader(
  1330. effective_context, path, states, self.load_keys, *arg, **kw
  1331. )
  1332. self.states.clear()
  1333. @classmethod
  1334. def for_context(cls, context, path, only_load_props):
  1335. pl = context.post_load_paths.get(path.path)
  1336. if pl is not None and only_load_props:
  1337. pl.load_keys = only_load_props
  1338. return pl
  1339. @classmethod
  1340. def path_exists(self, context, path, key):
  1341. return (
  1342. path.path in context.post_load_paths
  1343. and key in context.post_load_paths[path.path].loaders
  1344. )
  1345. @classmethod
  1346. def callable_for_path(
  1347. cls, context, path, limit_to_mapper, token, loader_callable, *arg, **kw
  1348. ):
  1349. if path.path in context.post_load_paths:
  1350. pl = context.post_load_paths[path.path]
  1351. else:
  1352. pl = context.post_load_paths[path.path] = PostLoad()
  1353. pl.loaders[token] = (
  1354. context,
  1355. token,
  1356. limit_to_mapper,
  1357. loader_callable,
  1358. arg,
  1359. kw,
  1360. )
  1361. def load_scalar_attributes(mapper, state, attribute_names, passive):
  1362. """initiate a column-based attribute refresh operation."""
  1363. # assert mapper is _state_mapper(state)
  1364. session = state.session
  1365. if not session:
  1366. raise orm_exc.DetachedInstanceError(
  1367. "Instance %s is not bound to a Session; "
  1368. "attribute refresh operation cannot proceed" % (state_str(state))
  1369. )
  1370. no_autoflush = bool(passive & attributes.NO_AUTOFLUSH)
  1371. # in the case of inheritance, particularly concrete and abstract
  1372. # concrete inheritance, the class manager might have some keys
  1373. # of attributes on the superclass that we didn't actually map.
  1374. # These could be mapped as "concrete, don't load" or could be completely
  1375. # excluded from the mapping and we know nothing about them. Filter them
  1376. # here to prevent them from coming through.
  1377. if attribute_names:
  1378. attribute_names = attribute_names.intersection(mapper.attrs.keys())
  1379. if mapper.inherits and not mapper.concrete:
  1380. # load based on committed attributes in the object, formed into
  1381. # a truncated SELECT that only includes relevant tables. does not
  1382. # currently use state.key
  1383. statement = mapper._optimized_get_statement(state, attribute_names)
  1384. if statement is not None:
  1385. # undefer() isn't needed here because statement has the
  1386. # columns needed already, this implicitly undefers that column
  1387. stmt = FromStatement(mapper, statement)
  1388. return load_on_ident(
  1389. session,
  1390. stmt,
  1391. None,
  1392. only_load_props=attribute_names,
  1393. refresh_state=state,
  1394. no_autoflush=no_autoflush,
  1395. )
  1396. # normal load, use state.key as the identity to SELECT
  1397. has_key = bool(state.key)
  1398. if has_key:
  1399. identity_key = state.key
  1400. else:
  1401. # this codepath is rare - only valid when inside a flush, and the
  1402. # object is becoming persistent but hasn't yet been assigned
  1403. # an identity_key.
  1404. # check here to ensure we have the attrs we need.
  1405. pk_attrs = [
  1406. mapper._columntoproperty[col].key for col in mapper.primary_key
  1407. ]
  1408. if state.expired_attributes.intersection(pk_attrs):
  1409. raise sa_exc.InvalidRequestError(
  1410. "Instance %s cannot be refreshed - it's not "
  1411. " persistent and does not "
  1412. "contain a full primary key." % state_str(state)
  1413. )
  1414. identity_key = mapper._identity_key_from_state(state)
  1415. if (
  1416. _none_set.issubset(identity_key) and not mapper.allow_partial_pks
  1417. ) or _none_set.issuperset(identity_key):
  1418. util.warn_limited(
  1419. "Instance %s to be refreshed doesn't "
  1420. "contain a full primary key - can't be refreshed "
  1421. "(and shouldn't be expired, either).",
  1422. state_str(state),
  1423. )
  1424. return
  1425. result = load_on_ident(
  1426. session,
  1427. select(mapper).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL),
  1428. identity_key,
  1429. refresh_state=state,
  1430. only_load_props=attribute_names,
  1431. no_autoflush=no_autoflush,
  1432. )
  1433. # if instance is pending, a refresh operation
  1434. # may not complete (even if PK attributes are assigned)
  1435. if has_key and result is None:
  1436. raise orm_exc.ObjectDeletedError(state)