bulk_persistence.py 71 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135
  1. # orm/bulk_persistence.py
  2. # Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. # mypy: ignore-errors
  8. """additional ORM persistence classes related to "bulk" operations,
  9. specifically outside of the flush() process.
  10. """
  11. from __future__ import annotations
  12. from typing import Any
  13. from typing import cast
  14. from typing import Dict
  15. from typing import Iterable
  16. from typing import Optional
  17. from typing import overload
  18. from typing import TYPE_CHECKING
  19. from typing import TypeVar
  20. from typing import Union
  21. from . import attributes
  22. from . import context
  23. from . import evaluator
  24. from . import exc as orm_exc
  25. from . import loading
  26. from . import persistence
  27. from .base import NO_VALUE
  28. from .context import AbstractORMCompileState
  29. from .context import FromStatement
  30. from .context import ORMFromStatementCompileState
  31. from .context import QueryContext
  32. from .. import exc as sa_exc
  33. from .. import util
  34. from ..engine import Dialect
  35. from ..engine import result as _result
  36. from ..sql import coercions
  37. from ..sql import dml
  38. from ..sql import expression
  39. from ..sql import roles
  40. from ..sql import select
  41. from ..sql import sqltypes
  42. from ..sql.base import _entity_namespace_key
  43. from ..sql.base import CompileState
  44. from ..sql.base import Options
  45. from ..sql.dml import DeleteDMLState
  46. from ..sql.dml import InsertDMLState
  47. from ..sql.dml import UpdateDMLState
  48. from ..util import EMPTY_DICT
  49. from ..util.typing import Literal
  50. if TYPE_CHECKING:
  51. from ._typing import DMLStrategyArgument
  52. from ._typing import OrmExecuteOptionsParameter
  53. from ._typing import SynchronizeSessionArgument
  54. from .mapper import Mapper
  55. from .session import _BindArguments
  56. from .session import ORMExecuteState
  57. from .session import Session
  58. from .session import SessionTransaction
  59. from .state import InstanceState
  60. from ..engine import Connection
  61. from ..engine import cursor
  62. from ..engine.interfaces import _CoreAnyExecuteParams
  63. _O = TypeVar("_O", bound=object)
  64. @overload
  65. def _bulk_insert(
  66. mapper: Mapper[_O],
  67. mappings: Union[Iterable[InstanceState[_O]], Iterable[Dict[str, Any]]],
  68. session_transaction: SessionTransaction,
  69. *,
  70. isstates: bool,
  71. return_defaults: bool,
  72. render_nulls: bool,
  73. use_orm_insert_stmt: Literal[None] = ...,
  74. execution_options: Optional[OrmExecuteOptionsParameter] = ...,
  75. ) -> None: ...
  76. @overload
  77. def _bulk_insert(
  78. mapper: Mapper[_O],
  79. mappings: Union[Iterable[InstanceState[_O]], Iterable[Dict[str, Any]]],
  80. session_transaction: SessionTransaction,
  81. *,
  82. isstates: bool,
  83. return_defaults: bool,
  84. render_nulls: bool,
  85. use_orm_insert_stmt: Optional[dml.Insert] = ...,
  86. execution_options: Optional[OrmExecuteOptionsParameter] = ...,
  87. ) -> cursor.CursorResult[Any]: ...
  88. def _bulk_insert(
  89. mapper: Mapper[_O],
  90. mappings: Union[Iterable[InstanceState[_O]], Iterable[Dict[str, Any]]],
  91. session_transaction: SessionTransaction,
  92. *,
  93. isstates: bool,
  94. return_defaults: bool,
  95. render_nulls: bool,
  96. use_orm_insert_stmt: Optional[dml.Insert] = None,
  97. execution_options: Optional[OrmExecuteOptionsParameter] = None,
  98. ) -> Optional[cursor.CursorResult[Any]]:
  99. base_mapper = mapper.base_mapper
  100. if session_transaction.session.connection_callable:
  101. raise NotImplementedError(
  102. "connection_callable / per-instance sharding "
  103. "not supported in bulk_insert()"
  104. )
  105. if isstates:
  106. if TYPE_CHECKING:
  107. mappings = cast(Iterable[InstanceState[_O]], mappings)
  108. if return_defaults:
  109. # list of states allows us to attach .key for return_defaults case
  110. states = [(state, state.dict) for state in mappings]
  111. mappings = [dict_ for (state, dict_) in states]
  112. else:
  113. mappings = [state.dict for state in mappings]
  114. else:
  115. if TYPE_CHECKING:
  116. mappings = cast(Iterable[Dict[str, Any]], mappings)
  117. if return_defaults:
  118. # use dictionaries given, so that newly populated defaults
  119. # can be delivered back to the caller (see #11661). This is **not**
  120. # compatible with other use cases such as a session-executed
  121. # insert() construct, as this will confuse the case of
  122. # insert-per-subclass for joined inheritance cases (see
  123. # test_bulk_statements.py::BulkDMLReturningJoinedInhTest).
  124. #
  125. # So in this conditional, we have **only** called
  126. # session.bulk_insert_mappings() which does not have this
  127. # requirement
  128. mappings = list(mappings)
  129. else:
  130. # for all other cases we need to establish a local dictionary
  131. # so that the incoming dictionaries aren't mutated
  132. mappings = [dict(m) for m in mappings]
  133. _expand_composites(mapper, mappings)
  134. connection = session_transaction.connection(base_mapper)
  135. return_result: Optional[cursor.CursorResult[Any]] = None
  136. mappers_to_run = [
  137. (table, mp)
  138. for table, mp in base_mapper._sorted_tables.items()
  139. if table in mapper._pks_by_table
  140. ]
  141. if return_defaults:
  142. # not used by new-style bulk inserts, only used for legacy
  143. bookkeeping = True
  144. elif len(mappers_to_run) > 1:
  145. # if we have more than one table, mapper to run where we will be
  146. # either horizontally splicing, or copying values between tables,
  147. # we need the "bookkeeping" / deterministic returning order
  148. bookkeeping = True
  149. else:
  150. bookkeeping = False
  151. for table, super_mapper in mappers_to_run:
  152. # find bindparams in the statement. For bulk, we don't really know if
  153. # a key in the params applies to a different table since we are
  154. # potentially inserting for multiple tables here; looking at the
  155. # bindparam() is a lot more direct. in most cases this will
  156. # use _generate_cache_key() which is memoized, although in practice
  157. # the ultimate statement that's executed is probably not the same
  158. # object so that memoization might not matter much.
  159. extra_bp_names = (
  160. [
  161. b.key
  162. for b in use_orm_insert_stmt._get_embedded_bindparams()
  163. if b.key in mappings[0]
  164. ]
  165. if use_orm_insert_stmt is not None
  166. else ()
  167. )
  168. records = (
  169. (
  170. None,
  171. state_dict,
  172. params,
  173. mapper,
  174. connection,
  175. value_params,
  176. has_all_pks,
  177. has_all_defaults,
  178. )
  179. for (
  180. state,
  181. state_dict,
  182. params,
  183. mp,
  184. conn,
  185. value_params,
  186. has_all_pks,
  187. has_all_defaults,
  188. ) in persistence._collect_insert_commands(
  189. table,
  190. ((None, mapping, mapper, connection) for mapping in mappings),
  191. bulk=True,
  192. return_defaults=bookkeeping,
  193. render_nulls=render_nulls,
  194. include_bulk_keys=extra_bp_names,
  195. )
  196. )
  197. result = persistence._emit_insert_statements(
  198. base_mapper,
  199. None,
  200. super_mapper,
  201. table,
  202. records,
  203. bookkeeping=bookkeeping,
  204. use_orm_insert_stmt=use_orm_insert_stmt,
  205. execution_options=execution_options,
  206. )
  207. if use_orm_insert_stmt is not None:
  208. if not use_orm_insert_stmt._returning or return_result is None:
  209. return_result = result
  210. elif result.returns_rows:
  211. assert bookkeeping
  212. return_result = return_result.splice_horizontally(result)
  213. if return_defaults and isstates:
  214. identity_cls = mapper._identity_class
  215. identity_props = [p.key for p in mapper._identity_key_props]
  216. for state, dict_ in states:
  217. state.key = (
  218. identity_cls,
  219. tuple([dict_[key] for key in identity_props]),
  220. None,
  221. )
  222. if use_orm_insert_stmt is not None:
  223. assert return_result is not None
  224. return return_result
  225. @overload
  226. def _bulk_update(
  227. mapper: Mapper[Any],
  228. mappings: Union[Iterable[InstanceState[_O]], Iterable[Dict[str, Any]]],
  229. session_transaction: SessionTransaction,
  230. *,
  231. isstates: bool,
  232. update_changed_only: bool,
  233. use_orm_update_stmt: Literal[None] = ...,
  234. enable_check_rowcount: bool = True,
  235. ) -> None: ...
  236. @overload
  237. def _bulk_update(
  238. mapper: Mapper[Any],
  239. mappings: Union[Iterable[InstanceState[_O]], Iterable[Dict[str, Any]]],
  240. session_transaction: SessionTransaction,
  241. *,
  242. isstates: bool,
  243. update_changed_only: bool,
  244. use_orm_update_stmt: Optional[dml.Update] = ...,
  245. enable_check_rowcount: bool = True,
  246. ) -> _result.Result[Any]: ...
  247. def _bulk_update(
  248. mapper: Mapper[Any],
  249. mappings: Union[Iterable[InstanceState[_O]], Iterable[Dict[str, Any]]],
  250. session_transaction: SessionTransaction,
  251. *,
  252. isstates: bool,
  253. update_changed_only: bool,
  254. use_orm_update_stmt: Optional[dml.Update] = None,
  255. enable_check_rowcount: bool = True,
  256. ) -> Optional[_result.Result[Any]]:
  257. base_mapper = mapper.base_mapper
  258. search_keys = mapper._primary_key_propkeys
  259. if mapper._version_id_prop:
  260. search_keys = {mapper._version_id_prop.key}.union(search_keys)
  261. def _changed_dict(mapper, state):
  262. return {
  263. k: v
  264. for k, v in state.dict.items()
  265. if k in state.committed_state or k in search_keys
  266. }
  267. if isstates:
  268. if update_changed_only:
  269. mappings = [_changed_dict(mapper, state) for state in mappings]
  270. else:
  271. mappings = [state.dict for state in mappings]
  272. else:
  273. mappings = [dict(m) for m in mappings]
  274. _expand_composites(mapper, mappings)
  275. if session_transaction.session.connection_callable:
  276. raise NotImplementedError(
  277. "connection_callable / per-instance sharding "
  278. "not supported in bulk_update()"
  279. )
  280. connection = session_transaction.connection(base_mapper)
  281. # find bindparams in the statement. see _bulk_insert for similar
  282. # notes for the insert case
  283. extra_bp_names = (
  284. [
  285. b.key
  286. for b in use_orm_update_stmt._get_embedded_bindparams()
  287. if b.key in mappings[0]
  288. ]
  289. if use_orm_update_stmt is not None
  290. else ()
  291. )
  292. for table, super_mapper in base_mapper._sorted_tables.items():
  293. if not mapper.isa(super_mapper) or table not in mapper._pks_by_table:
  294. continue
  295. records = persistence._collect_update_commands(
  296. None,
  297. table,
  298. (
  299. (
  300. None,
  301. mapping,
  302. mapper,
  303. connection,
  304. (
  305. mapping[mapper._version_id_prop.key]
  306. if mapper._version_id_prop
  307. else None
  308. ),
  309. )
  310. for mapping in mappings
  311. ),
  312. bulk=True,
  313. use_orm_update_stmt=use_orm_update_stmt,
  314. include_bulk_keys=extra_bp_names,
  315. )
  316. persistence._emit_update_statements(
  317. base_mapper,
  318. None,
  319. super_mapper,
  320. table,
  321. records,
  322. bookkeeping=False,
  323. use_orm_update_stmt=use_orm_update_stmt,
  324. enable_check_rowcount=enable_check_rowcount,
  325. )
  326. if use_orm_update_stmt is not None:
  327. return _result.null_result()
  328. def _expand_composites(mapper, mappings):
  329. composite_attrs = mapper.composites
  330. if not composite_attrs:
  331. return
  332. composite_keys = set(composite_attrs.keys())
  333. populators = {
  334. key: composite_attrs[key]._populate_composite_bulk_save_mappings_fn()
  335. for key in composite_keys
  336. }
  337. for mapping in mappings:
  338. for key in composite_keys.intersection(mapping):
  339. populators[key](mapping)
  340. class ORMDMLState(AbstractORMCompileState):
  341. is_dml_returning = True
  342. from_statement_ctx: Optional[ORMFromStatementCompileState] = None
  343. @classmethod
  344. def _get_orm_crud_kv_pairs(
  345. cls, mapper, statement, kv_iterator, needs_to_be_cacheable
  346. ):
  347. core_get_crud_kv_pairs = UpdateDMLState._get_crud_kv_pairs
  348. for k, v in kv_iterator:
  349. k = coercions.expect(roles.DMLColumnRole, k)
  350. if isinstance(k, str):
  351. desc = _entity_namespace_key(mapper, k, default=NO_VALUE)
  352. if desc is NO_VALUE:
  353. yield (
  354. coercions.expect(roles.DMLColumnRole, k),
  355. (
  356. coercions.expect(
  357. roles.ExpressionElementRole,
  358. v,
  359. type_=sqltypes.NullType(),
  360. is_crud=True,
  361. )
  362. if needs_to_be_cacheable
  363. else v
  364. ),
  365. )
  366. else:
  367. yield from core_get_crud_kv_pairs(
  368. statement,
  369. desc._bulk_update_tuples(v),
  370. needs_to_be_cacheable,
  371. )
  372. elif "entity_namespace" in k._annotations:
  373. k_anno = k._annotations
  374. attr = _entity_namespace_key(
  375. k_anno["entity_namespace"], k_anno["proxy_key"]
  376. )
  377. yield from core_get_crud_kv_pairs(
  378. statement,
  379. attr._bulk_update_tuples(v),
  380. needs_to_be_cacheable,
  381. )
  382. else:
  383. yield (
  384. k,
  385. (
  386. v
  387. if not needs_to_be_cacheable
  388. else coercions.expect(
  389. roles.ExpressionElementRole,
  390. v,
  391. type_=sqltypes.NullType(),
  392. is_crud=True,
  393. )
  394. ),
  395. )
  396. @classmethod
  397. def _get_dml_plugin_subject(cls, statement):
  398. plugin_subject = statement.table._propagate_attrs.get("plugin_subject")
  399. if (
  400. not plugin_subject
  401. or not plugin_subject.mapper
  402. or plugin_subject
  403. is not statement._propagate_attrs["plugin_subject"]
  404. ):
  405. return None
  406. return plugin_subject
  407. @classmethod
  408. def _get_multi_crud_kv_pairs(cls, statement, kv_iterator):
  409. plugin_subject = cls._get_dml_plugin_subject(statement)
  410. if not plugin_subject:
  411. return UpdateDMLState._get_multi_crud_kv_pairs(
  412. statement, kv_iterator
  413. )
  414. return [
  415. dict(
  416. cls._get_orm_crud_kv_pairs(
  417. plugin_subject.mapper, statement, value_dict.items(), False
  418. )
  419. )
  420. for value_dict in kv_iterator
  421. ]
  422. @classmethod
  423. def _get_crud_kv_pairs(cls, statement, kv_iterator, needs_to_be_cacheable):
  424. assert (
  425. needs_to_be_cacheable
  426. ), "no test coverage for needs_to_be_cacheable=False"
  427. plugin_subject = cls._get_dml_plugin_subject(statement)
  428. if not plugin_subject:
  429. return UpdateDMLState._get_crud_kv_pairs(
  430. statement, kv_iterator, needs_to_be_cacheable
  431. )
  432. return list(
  433. cls._get_orm_crud_kv_pairs(
  434. plugin_subject.mapper,
  435. statement,
  436. kv_iterator,
  437. needs_to_be_cacheable,
  438. )
  439. )
  440. @classmethod
  441. def get_entity_description(cls, statement):
  442. ext_info = statement.table._annotations["parententity"]
  443. mapper = ext_info.mapper
  444. if ext_info.is_aliased_class:
  445. _label_name = ext_info.name
  446. else:
  447. _label_name = mapper.class_.__name__
  448. return {
  449. "name": _label_name,
  450. "type": mapper.class_,
  451. "expr": ext_info.entity,
  452. "entity": ext_info.entity,
  453. "table": mapper.local_table,
  454. }
  455. @classmethod
  456. def get_returning_column_descriptions(cls, statement):
  457. def _ent_for_col(c):
  458. return c._annotations.get("parententity", None)
  459. def _attr_for_col(c, ent):
  460. if ent is None:
  461. return c
  462. proxy_key = c._annotations.get("proxy_key", None)
  463. if not proxy_key:
  464. return c
  465. else:
  466. return getattr(ent.entity, proxy_key, c)
  467. return [
  468. {
  469. "name": c.key,
  470. "type": c.type,
  471. "expr": _attr_for_col(c, ent),
  472. "aliased": ent.is_aliased_class,
  473. "entity": ent.entity,
  474. }
  475. for c, ent in [
  476. (c, _ent_for_col(c)) for c in statement._all_selected_columns
  477. ]
  478. ]
  479. def _setup_orm_returning(
  480. self,
  481. compiler,
  482. orm_level_statement,
  483. dml_level_statement,
  484. dml_mapper,
  485. *,
  486. use_supplemental_cols=True,
  487. ):
  488. """establish ORM column handlers for an INSERT, UPDATE, or DELETE
  489. which uses explicit returning().
  490. called within compilation level create_for_statement.
  491. The _return_orm_returning() method then receives the Result
  492. after the statement was executed, and applies ORM loading to the
  493. state that we first established here.
  494. """
  495. if orm_level_statement._returning:
  496. fs = FromStatement(
  497. orm_level_statement._returning,
  498. dml_level_statement,
  499. _adapt_on_names=False,
  500. )
  501. fs = fs.execution_options(**orm_level_statement._execution_options)
  502. fs = fs.options(*orm_level_statement._with_options)
  503. self.select_statement = fs
  504. self.from_statement_ctx = fsc = (
  505. ORMFromStatementCompileState.create_for_statement(fs, compiler)
  506. )
  507. fsc.setup_dml_returning_compile_state(dml_mapper)
  508. dml_level_statement = dml_level_statement._generate()
  509. dml_level_statement._returning = ()
  510. cols_to_return = [c for c in fsc.primary_columns if c is not None]
  511. # since we are splicing result sets together, make sure there
  512. # are columns of some kind returned in each result set
  513. if not cols_to_return:
  514. cols_to_return.extend(dml_mapper.primary_key)
  515. if use_supplemental_cols:
  516. dml_level_statement = dml_level_statement.return_defaults(
  517. # this is a little weird looking, but by passing
  518. # primary key as the main list of cols, this tells
  519. # return_defaults to omit server-default cols (and
  520. # actually all cols, due to some weird thing we should
  521. # clean up in crud.py).
  522. # Since we have cols_to_return, just return what we asked
  523. # for (plus primary key, which ORM persistence needs since
  524. # we likely set bookkeeping=True here, which is another
  525. # whole thing...). We dont want to clutter the
  526. # statement up with lots of other cols the user didn't
  527. # ask for. see #9685
  528. *dml_mapper.primary_key,
  529. supplemental_cols=cols_to_return,
  530. )
  531. else:
  532. dml_level_statement = dml_level_statement.returning(
  533. *cols_to_return
  534. )
  535. return dml_level_statement
  536. @classmethod
  537. def _return_orm_returning(
  538. cls,
  539. session,
  540. statement,
  541. params,
  542. execution_options,
  543. bind_arguments,
  544. result,
  545. ):
  546. execution_context = result.context
  547. compile_state = execution_context.compiled.compile_state
  548. if (
  549. compile_state.from_statement_ctx
  550. and not compile_state.from_statement_ctx.compile_options._is_star
  551. ):
  552. load_options = execution_options.get(
  553. "_sa_orm_load_options", QueryContext.default_load_options
  554. )
  555. querycontext = QueryContext(
  556. compile_state.from_statement_ctx,
  557. compile_state.select_statement,
  558. statement,
  559. params,
  560. session,
  561. load_options,
  562. execution_options,
  563. bind_arguments,
  564. )
  565. return loading.instances(result, querycontext)
  566. else:
  567. return result
  568. class BulkUDCompileState(ORMDMLState):
  569. class default_update_options(Options):
  570. _dml_strategy: DMLStrategyArgument = "auto"
  571. _synchronize_session: SynchronizeSessionArgument = "auto"
  572. _can_use_returning: bool = False
  573. _is_delete_using: bool = False
  574. _is_update_from: bool = False
  575. _autoflush: bool = True
  576. _subject_mapper: Optional[Mapper[Any]] = None
  577. _resolved_values = EMPTY_DICT
  578. _eval_condition = None
  579. _matched_rows = None
  580. _identity_token = None
  581. _populate_existing: bool = False
  582. @classmethod
  583. def can_use_returning(
  584. cls,
  585. dialect: Dialect,
  586. mapper: Mapper[Any],
  587. *,
  588. is_multitable: bool = False,
  589. is_update_from: bool = False,
  590. is_delete_using: bool = False,
  591. is_executemany: bool = False,
  592. ) -> bool:
  593. raise NotImplementedError()
  594. @classmethod
  595. def orm_pre_session_exec(
  596. cls,
  597. session,
  598. statement,
  599. params,
  600. execution_options,
  601. bind_arguments,
  602. is_pre_event,
  603. ):
  604. (
  605. update_options,
  606. execution_options,
  607. ) = BulkUDCompileState.default_update_options.from_execution_options(
  608. "_sa_orm_update_options",
  609. {
  610. "synchronize_session",
  611. "autoflush",
  612. "populate_existing",
  613. "identity_token",
  614. "is_delete_using",
  615. "is_update_from",
  616. "dml_strategy",
  617. },
  618. execution_options,
  619. statement._execution_options,
  620. )
  621. bind_arguments["clause"] = statement
  622. try:
  623. plugin_subject = statement._propagate_attrs["plugin_subject"]
  624. except KeyError:
  625. assert False, "statement had 'orm' plugin but no plugin_subject"
  626. else:
  627. if plugin_subject:
  628. bind_arguments["mapper"] = plugin_subject.mapper
  629. update_options += {"_subject_mapper": plugin_subject.mapper}
  630. if "parententity" not in statement.table._annotations:
  631. update_options += {"_dml_strategy": "core_only"}
  632. elif not isinstance(params, list):
  633. if update_options._dml_strategy == "auto":
  634. update_options += {"_dml_strategy": "orm"}
  635. elif update_options._dml_strategy == "bulk":
  636. raise sa_exc.InvalidRequestError(
  637. 'Can\'t use "bulk" ORM insert strategy without '
  638. "passing separate parameters"
  639. )
  640. else:
  641. if update_options._dml_strategy == "auto":
  642. update_options += {"_dml_strategy": "bulk"}
  643. sync = update_options._synchronize_session
  644. if sync is not None:
  645. if sync not in ("auto", "evaluate", "fetch", False):
  646. raise sa_exc.ArgumentError(
  647. "Valid strategies for session synchronization "
  648. "are 'auto', 'evaluate', 'fetch', False"
  649. )
  650. if update_options._dml_strategy == "bulk" and sync == "fetch":
  651. raise sa_exc.InvalidRequestError(
  652. "The 'fetch' synchronization strategy is not available "
  653. "for 'bulk' ORM updates (i.e. multiple parameter sets)"
  654. )
  655. if not is_pre_event:
  656. if update_options._autoflush:
  657. session._autoflush()
  658. if update_options._dml_strategy == "orm":
  659. if update_options._synchronize_session == "auto":
  660. update_options = cls._do_pre_synchronize_auto(
  661. session,
  662. statement,
  663. params,
  664. execution_options,
  665. bind_arguments,
  666. update_options,
  667. )
  668. elif update_options._synchronize_session == "evaluate":
  669. update_options = cls._do_pre_synchronize_evaluate(
  670. session,
  671. statement,
  672. params,
  673. execution_options,
  674. bind_arguments,
  675. update_options,
  676. )
  677. elif update_options._synchronize_session == "fetch":
  678. update_options = cls._do_pre_synchronize_fetch(
  679. session,
  680. statement,
  681. params,
  682. execution_options,
  683. bind_arguments,
  684. update_options,
  685. )
  686. elif update_options._dml_strategy == "bulk":
  687. if update_options._synchronize_session == "auto":
  688. update_options += {"_synchronize_session": "evaluate"}
  689. # indicators from the "pre exec" step that are then
  690. # added to the DML statement, which will also be part of the cache
  691. # key. The compile level create_for_statement() method will then
  692. # consume these at compiler time.
  693. statement = statement._annotate(
  694. {
  695. "synchronize_session": update_options._synchronize_session,
  696. "is_delete_using": update_options._is_delete_using,
  697. "is_update_from": update_options._is_update_from,
  698. "dml_strategy": update_options._dml_strategy,
  699. "can_use_returning": update_options._can_use_returning,
  700. }
  701. )
  702. return (
  703. statement,
  704. util.immutabledict(execution_options).union(
  705. {"_sa_orm_update_options": update_options}
  706. ),
  707. )
  708. @classmethod
  709. def orm_setup_cursor_result(
  710. cls,
  711. session,
  712. statement,
  713. params,
  714. execution_options,
  715. bind_arguments,
  716. result,
  717. ):
  718. # this stage of the execution is called after the
  719. # do_orm_execute event hook. meaning for an extension like
  720. # horizontal sharding, this step happens *within* the horizontal
  721. # sharding event handler which calls session.execute() re-entrantly
  722. # and will occur for each backend individually.
  723. # the sharding extension then returns its own merged result from the
  724. # individual ones we return here.
  725. update_options = execution_options["_sa_orm_update_options"]
  726. if update_options._dml_strategy == "orm":
  727. if update_options._synchronize_session == "evaluate":
  728. cls._do_post_synchronize_evaluate(
  729. session, statement, result, update_options
  730. )
  731. elif update_options._synchronize_session == "fetch":
  732. cls._do_post_synchronize_fetch(
  733. session, statement, result, update_options
  734. )
  735. elif update_options._dml_strategy == "bulk":
  736. if update_options._synchronize_session == "evaluate":
  737. cls._do_post_synchronize_bulk_evaluate(
  738. session, params, result, update_options
  739. )
  740. return result
  741. return cls._return_orm_returning(
  742. session,
  743. statement,
  744. params,
  745. execution_options,
  746. bind_arguments,
  747. result,
  748. )
  749. @classmethod
  750. def _adjust_for_extra_criteria(cls, global_attributes, ext_info):
  751. """Apply extra criteria filtering.
  752. For all distinct single-table-inheritance mappers represented in the
  753. table being updated or deleted, produce additional WHERE criteria such
  754. that only the appropriate subtypes are selected from the total results.
  755. Additionally, add WHERE criteria originating from LoaderCriteriaOptions
  756. collected from the statement.
  757. """
  758. return_crit = ()
  759. adapter = ext_info._adapter if ext_info.is_aliased_class else None
  760. if (
  761. "additional_entity_criteria",
  762. ext_info.mapper,
  763. ) in global_attributes:
  764. return_crit += tuple(
  765. ae._resolve_where_criteria(ext_info)
  766. for ae in global_attributes[
  767. ("additional_entity_criteria", ext_info.mapper)
  768. ]
  769. if ae.include_aliases or ae.entity is ext_info
  770. )
  771. if ext_info.mapper._single_table_criterion is not None:
  772. return_crit += (ext_info.mapper._single_table_criterion,)
  773. if adapter:
  774. return_crit = tuple(adapter.traverse(crit) for crit in return_crit)
  775. return return_crit
  776. @classmethod
  777. def _interpret_returning_rows(cls, result, mapper, rows):
  778. """return rows that indicate PK cols in mapper.primary_key position
  779. for RETURNING rows.
  780. Prior to 2.0.36, this method seemed to be written for some kind of
  781. inheritance scenario but the scenario was unused for actual joined
  782. inheritance, and the function instead seemed to perform some kind of
  783. partial translation that would remove non-PK cols if the PK cols
  784. happened to be first in the row, but not otherwise. The joined
  785. inheritance walk feature here seems to have never been used as it was
  786. always skipped by the "local_table" check.
  787. As of 2.0.36 the function strips away non-PK cols and provides the
  788. PK cols for the table in mapper PK order.
  789. """
  790. try:
  791. if mapper.local_table is not mapper.base_mapper.local_table:
  792. # TODO: dive more into how a local table PK is used for fetch
  793. # sync, not clear if this is correct as it depends on the
  794. # downstream routine to fetch rows using
  795. # local_table.primary_key order
  796. pk_keys = result._tuple_getter(mapper.local_table.primary_key)
  797. else:
  798. pk_keys = result._tuple_getter(mapper.primary_key)
  799. except KeyError:
  800. # can't use these rows, they don't have PK cols in them
  801. # this is an unusual case where the user would have used
  802. # .return_defaults()
  803. return []
  804. return [pk_keys(row) for row in rows]
  805. @classmethod
  806. def _get_matched_objects_on_criteria(cls, update_options, states):
  807. mapper = update_options._subject_mapper
  808. eval_condition = update_options._eval_condition
  809. raw_data = [
  810. (state.obj(), state, state.dict)
  811. for state in states
  812. if state.mapper.isa(mapper) and not state.expired
  813. ]
  814. identity_token = update_options._identity_token
  815. if identity_token is not None:
  816. raw_data = [
  817. (obj, state, dict_)
  818. for obj, state, dict_ in raw_data
  819. if state.identity_token == identity_token
  820. ]
  821. result = []
  822. for obj, state, dict_ in raw_data:
  823. evaled_condition = eval_condition(obj)
  824. # caution: don't use "in ()" or == here, _EXPIRE_OBJECT
  825. # evaluates as True for all comparisons
  826. if (
  827. evaled_condition is True
  828. or evaled_condition is evaluator._EXPIRED_OBJECT
  829. ):
  830. result.append(
  831. (
  832. obj,
  833. state,
  834. dict_,
  835. evaled_condition is evaluator._EXPIRED_OBJECT,
  836. )
  837. )
  838. return result
  839. @classmethod
  840. def _eval_condition_from_statement(cls, update_options, statement):
  841. mapper = update_options._subject_mapper
  842. target_cls = mapper.class_
  843. evaluator_compiler = evaluator._EvaluatorCompiler(target_cls)
  844. crit = ()
  845. if statement._where_criteria:
  846. crit += statement._where_criteria
  847. global_attributes = {}
  848. for opt in statement._with_options:
  849. if opt._is_criteria_option:
  850. opt.get_global_criteria(global_attributes)
  851. if global_attributes:
  852. crit += cls._adjust_for_extra_criteria(global_attributes, mapper)
  853. if crit:
  854. eval_condition = evaluator_compiler.process(*crit)
  855. else:
  856. # workaround for mypy https://github.com/python/mypy/issues/14027
  857. def _eval_condition(obj):
  858. return True
  859. eval_condition = _eval_condition
  860. return eval_condition
  861. @classmethod
  862. def _do_pre_synchronize_auto(
  863. cls,
  864. session,
  865. statement,
  866. params,
  867. execution_options,
  868. bind_arguments,
  869. update_options,
  870. ):
  871. """setup auto sync strategy
  872. "auto" checks if we can use "evaluate" first, then falls back
  873. to "fetch"
  874. evaluate is vastly more efficient for the common case
  875. where session is empty, only has a few objects, and the UPDATE
  876. statement can potentially match thousands/millions of rows.
  877. OTOH more complex criteria that fails to work with "evaluate"
  878. we would hope usually correlates with fewer net rows.
  879. """
  880. try:
  881. eval_condition = cls._eval_condition_from_statement(
  882. update_options, statement
  883. )
  884. except evaluator.UnevaluatableError:
  885. pass
  886. else:
  887. return update_options + {
  888. "_eval_condition": eval_condition,
  889. "_synchronize_session": "evaluate",
  890. }
  891. update_options += {"_synchronize_session": "fetch"}
  892. return cls._do_pre_synchronize_fetch(
  893. session,
  894. statement,
  895. params,
  896. execution_options,
  897. bind_arguments,
  898. update_options,
  899. )
  900. @classmethod
  901. def _do_pre_synchronize_evaluate(
  902. cls,
  903. session,
  904. statement,
  905. params,
  906. execution_options,
  907. bind_arguments,
  908. update_options,
  909. ):
  910. try:
  911. eval_condition = cls._eval_condition_from_statement(
  912. update_options, statement
  913. )
  914. except evaluator.UnevaluatableError as err:
  915. raise sa_exc.InvalidRequestError(
  916. 'Could not evaluate current criteria in Python: "%s". '
  917. "Specify 'fetch' or False for the "
  918. "synchronize_session execution option." % err
  919. ) from err
  920. return update_options + {
  921. "_eval_condition": eval_condition,
  922. }
  923. @classmethod
  924. def _get_resolved_values(cls, mapper, statement):
  925. if statement._multi_values:
  926. return []
  927. elif statement._ordered_values:
  928. return list(statement._ordered_values)
  929. elif statement._values:
  930. return list(statement._values.items())
  931. else:
  932. return []
  933. @classmethod
  934. def _resolved_keys_as_propnames(cls, mapper, resolved_values):
  935. values = []
  936. for k, v in resolved_values:
  937. if mapper and isinstance(k, expression.ColumnElement):
  938. try:
  939. attr = mapper._columntoproperty[k]
  940. except orm_exc.UnmappedColumnError:
  941. pass
  942. else:
  943. values.append((attr.key, v))
  944. else:
  945. raise sa_exc.InvalidRequestError(
  946. "Attribute name not found, can't be "
  947. "synchronized back to objects: %r" % k
  948. )
  949. return values
  950. @classmethod
  951. def _do_pre_synchronize_fetch(
  952. cls,
  953. session,
  954. statement,
  955. params,
  956. execution_options,
  957. bind_arguments,
  958. update_options,
  959. ):
  960. mapper = update_options._subject_mapper
  961. select_stmt = (
  962. select(*(mapper.primary_key + (mapper.select_identity_token,)))
  963. .select_from(mapper)
  964. .options(*statement._with_options)
  965. )
  966. select_stmt._where_criteria = statement._where_criteria
  967. # conditionally run the SELECT statement for pre-fetch, testing the
  968. # "bind" for if we can use RETURNING or not using the do_orm_execute
  969. # event. If RETURNING is available, the do_orm_execute event
  970. # will cancel the SELECT from being actually run.
  971. #
  972. # The way this is organized seems strange, why don't we just
  973. # call can_use_returning() before invoking the statement and get
  974. # answer?, why does this go through the whole execute phase using an
  975. # event? Answer: because we are integrating with extensions such
  976. # as the horizontal sharding extention that "multiplexes" an individual
  977. # statement run through multiple engines, and it uses
  978. # do_orm_execute() to do that.
  979. can_use_returning = None
  980. def skip_for_returning(orm_context: ORMExecuteState) -> Any:
  981. bind = orm_context.session.get_bind(**orm_context.bind_arguments)
  982. nonlocal can_use_returning
  983. per_bind_result = cls.can_use_returning(
  984. bind.dialect,
  985. mapper,
  986. is_update_from=update_options._is_update_from,
  987. is_delete_using=update_options._is_delete_using,
  988. is_executemany=orm_context.is_executemany,
  989. )
  990. if can_use_returning is not None:
  991. if can_use_returning != per_bind_result:
  992. raise sa_exc.InvalidRequestError(
  993. "For synchronize_session='fetch', can't mix multiple "
  994. "backends where some support RETURNING and others "
  995. "don't"
  996. )
  997. elif orm_context.is_executemany and not per_bind_result:
  998. raise sa_exc.InvalidRequestError(
  999. "For synchronize_session='fetch', can't use multiple "
  1000. "parameter sets in ORM mode, which this backend does not "
  1001. "support with RETURNING"
  1002. )
  1003. else:
  1004. can_use_returning = per_bind_result
  1005. if per_bind_result:
  1006. return _result.null_result()
  1007. else:
  1008. return None
  1009. result = session.execute(
  1010. select_stmt,
  1011. params,
  1012. execution_options=execution_options,
  1013. bind_arguments=bind_arguments,
  1014. _add_event=skip_for_returning,
  1015. )
  1016. matched_rows = result.fetchall()
  1017. return update_options + {
  1018. "_matched_rows": matched_rows,
  1019. "_can_use_returning": can_use_returning,
  1020. }
  1021. @CompileState.plugin_for("orm", "insert")
  1022. class BulkORMInsert(ORMDMLState, InsertDMLState):
  1023. class default_insert_options(Options):
  1024. _dml_strategy: DMLStrategyArgument = "auto"
  1025. _render_nulls: bool = False
  1026. _return_defaults: bool = False
  1027. _subject_mapper: Optional[Mapper[Any]] = None
  1028. _autoflush: bool = True
  1029. _populate_existing: bool = False
  1030. select_statement: Optional[FromStatement] = None
  1031. @classmethod
  1032. def orm_pre_session_exec(
  1033. cls,
  1034. session,
  1035. statement,
  1036. params,
  1037. execution_options,
  1038. bind_arguments,
  1039. is_pre_event,
  1040. ):
  1041. (
  1042. insert_options,
  1043. execution_options,
  1044. ) = BulkORMInsert.default_insert_options.from_execution_options(
  1045. "_sa_orm_insert_options",
  1046. {"dml_strategy", "autoflush", "populate_existing", "render_nulls"},
  1047. execution_options,
  1048. statement._execution_options,
  1049. )
  1050. bind_arguments["clause"] = statement
  1051. try:
  1052. plugin_subject = statement._propagate_attrs["plugin_subject"]
  1053. except KeyError:
  1054. assert False, "statement had 'orm' plugin but no plugin_subject"
  1055. else:
  1056. if plugin_subject:
  1057. bind_arguments["mapper"] = plugin_subject.mapper
  1058. insert_options += {"_subject_mapper": plugin_subject.mapper}
  1059. if not params:
  1060. if insert_options._dml_strategy == "auto":
  1061. insert_options += {"_dml_strategy": "orm"}
  1062. elif insert_options._dml_strategy == "bulk":
  1063. raise sa_exc.InvalidRequestError(
  1064. 'Can\'t use "bulk" ORM insert strategy without '
  1065. "passing separate parameters"
  1066. )
  1067. else:
  1068. if insert_options._dml_strategy == "auto":
  1069. insert_options += {"_dml_strategy": "bulk"}
  1070. if insert_options._dml_strategy != "raw":
  1071. # for ORM object loading, like ORMContext, we have to disable
  1072. # result set adapt_to_context, because we will be generating a
  1073. # new statement with specific columns that's cached inside of
  1074. # an ORMFromStatementCompileState, which we will re-use for
  1075. # each result.
  1076. if not execution_options:
  1077. execution_options = context._orm_load_exec_options
  1078. else:
  1079. execution_options = execution_options.union(
  1080. context._orm_load_exec_options
  1081. )
  1082. if not is_pre_event and insert_options._autoflush:
  1083. session._autoflush()
  1084. statement = statement._annotate(
  1085. {"dml_strategy": insert_options._dml_strategy}
  1086. )
  1087. return (
  1088. statement,
  1089. util.immutabledict(execution_options).union(
  1090. {"_sa_orm_insert_options": insert_options}
  1091. ),
  1092. )
  1093. @classmethod
  1094. def orm_execute_statement(
  1095. cls,
  1096. session: Session,
  1097. statement: dml.Insert,
  1098. params: _CoreAnyExecuteParams,
  1099. execution_options: OrmExecuteOptionsParameter,
  1100. bind_arguments: _BindArguments,
  1101. conn: Connection,
  1102. ) -> _result.Result:
  1103. insert_options = execution_options.get(
  1104. "_sa_orm_insert_options", cls.default_insert_options
  1105. )
  1106. if insert_options._dml_strategy not in (
  1107. "raw",
  1108. "bulk",
  1109. "orm",
  1110. "auto",
  1111. ):
  1112. raise sa_exc.ArgumentError(
  1113. "Valid strategies for ORM insert strategy "
  1114. "are 'raw', 'orm', 'bulk', 'auto"
  1115. )
  1116. result: _result.Result[Any]
  1117. if insert_options._dml_strategy == "raw":
  1118. result = conn.execute(
  1119. statement, params or {}, execution_options=execution_options
  1120. )
  1121. return result
  1122. if insert_options._dml_strategy == "bulk":
  1123. mapper = insert_options._subject_mapper
  1124. if (
  1125. statement._post_values_clause is not None
  1126. and mapper._multiple_persistence_tables
  1127. ):
  1128. raise sa_exc.InvalidRequestError(
  1129. "bulk INSERT with a 'post values' clause "
  1130. "(typically upsert) not supported for multi-table "
  1131. f"mapper {mapper}"
  1132. )
  1133. assert mapper is not None
  1134. assert session._transaction is not None
  1135. result = _bulk_insert(
  1136. mapper,
  1137. cast(
  1138. "Iterable[Dict[str, Any]]",
  1139. [params] if isinstance(params, dict) else params,
  1140. ),
  1141. session._transaction,
  1142. isstates=False,
  1143. return_defaults=insert_options._return_defaults,
  1144. render_nulls=insert_options._render_nulls,
  1145. use_orm_insert_stmt=statement,
  1146. execution_options=execution_options,
  1147. )
  1148. elif insert_options._dml_strategy == "orm":
  1149. result = conn.execute(
  1150. statement, params or {}, execution_options=execution_options
  1151. )
  1152. else:
  1153. raise AssertionError()
  1154. if not bool(statement._returning):
  1155. return result
  1156. if insert_options._populate_existing:
  1157. load_options = execution_options.get(
  1158. "_sa_orm_load_options", QueryContext.default_load_options
  1159. )
  1160. load_options += {"_populate_existing": True}
  1161. execution_options = execution_options.union(
  1162. {"_sa_orm_load_options": load_options}
  1163. )
  1164. return cls._return_orm_returning(
  1165. session,
  1166. statement,
  1167. params,
  1168. execution_options,
  1169. bind_arguments,
  1170. result,
  1171. )
  1172. @classmethod
  1173. def create_for_statement(cls, statement, compiler, **kw) -> BulkORMInsert:
  1174. self = cast(
  1175. BulkORMInsert,
  1176. super().create_for_statement(statement, compiler, **kw),
  1177. )
  1178. if compiler is not None:
  1179. toplevel = not compiler.stack
  1180. else:
  1181. toplevel = True
  1182. if not toplevel:
  1183. return self
  1184. mapper = statement._propagate_attrs["plugin_subject"]
  1185. dml_strategy = statement._annotations.get("dml_strategy", "raw")
  1186. if dml_strategy == "bulk":
  1187. self._setup_for_bulk_insert(compiler)
  1188. elif dml_strategy == "orm":
  1189. self._setup_for_orm_insert(compiler, mapper)
  1190. return self
  1191. @classmethod
  1192. def _resolved_keys_as_col_keys(cls, mapper, resolved_value_dict):
  1193. return {
  1194. col.key if col is not None else k: v
  1195. for col, k, v in (
  1196. (mapper.c.get(k), k, v) for k, v in resolved_value_dict.items()
  1197. )
  1198. }
  1199. def _setup_for_orm_insert(self, compiler, mapper):
  1200. statement = orm_level_statement = cast(dml.Insert, self.statement)
  1201. statement = self._setup_orm_returning(
  1202. compiler,
  1203. orm_level_statement,
  1204. statement,
  1205. dml_mapper=mapper,
  1206. use_supplemental_cols=False,
  1207. )
  1208. self.statement = statement
  1209. def _setup_for_bulk_insert(self, compiler):
  1210. """establish an INSERT statement within the context of
  1211. bulk insert.
  1212. This method will be within the "conn.execute()" call that is invoked
  1213. by persistence._emit_insert_statement().
  1214. """
  1215. statement = orm_level_statement = cast(dml.Insert, self.statement)
  1216. an = statement._annotations
  1217. emit_insert_table, emit_insert_mapper = (
  1218. an["_emit_insert_table"],
  1219. an["_emit_insert_mapper"],
  1220. )
  1221. statement = statement._clone()
  1222. statement.table = emit_insert_table
  1223. if self._dict_parameters:
  1224. self._dict_parameters = {
  1225. col: val
  1226. for col, val in self._dict_parameters.items()
  1227. if col.table is emit_insert_table
  1228. }
  1229. statement = self._setup_orm_returning(
  1230. compiler,
  1231. orm_level_statement,
  1232. statement,
  1233. dml_mapper=emit_insert_mapper,
  1234. use_supplemental_cols=True,
  1235. )
  1236. if (
  1237. self.from_statement_ctx is not None
  1238. and self.from_statement_ctx.compile_options._is_star
  1239. ):
  1240. raise sa_exc.CompileError(
  1241. "Can't use RETURNING * with bulk ORM INSERT. "
  1242. "Please use a different INSERT form, such as INSERT..VALUES "
  1243. "or INSERT with a Core Connection"
  1244. )
  1245. self.statement = statement
  1246. @CompileState.plugin_for("orm", "update")
  1247. class BulkORMUpdate(BulkUDCompileState, UpdateDMLState):
  1248. @classmethod
  1249. def create_for_statement(cls, statement, compiler, **kw):
  1250. self = cls.__new__(cls)
  1251. dml_strategy = statement._annotations.get(
  1252. "dml_strategy", "unspecified"
  1253. )
  1254. toplevel = not compiler.stack
  1255. if toplevel and dml_strategy == "bulk":
  1256. self._setup_for_bulk_update(statement, compiler)
  1257. elif (
  1258. dml_strategy == "core_only"
  1259. or dml_strategy == "unspecified"
  1260. and "parententity" not in statement.table._annotations
  1261. ):
  1262. UpdateDMLState.__init__(self, statement, compiler, **kw)
  1263. elif not toplevel or dml_strategy in ("orm", "unspecified"):
  1264. self._setup_for_orm_update(statement, compiler)
  1265. return self
  1266. def _setup_for_orm_update(self, statement, compiler, **kw):
  1267. orm_level_statement = statement
  1268. toplevel = not compiler.stack
  1269. ext_info = statement.table._annotations["parententity"]
  1270. self.mapper = mapper = ext_info.mapper
  1271. self._resolved_values = self._get_resolved_values(mapper, statement)
  1272. self._init_global_attributes(
  1273. statement,
  1274. compiler,
  1275. toplevel=toplevel,
  1276. process_criteria_for_toplevel=toplevel,
  1277. )
  1278. if statement._values:
  1279. self._resolved_values = dict(self._resolved_values)
  1280. new_stmt = statement._clone()
  1281. if new_stmt.table._annotations["parententity"] is mapper:
  1282. new_stmt.table = mapper.local_table
  1283. # note if the statement has _multi_values, these
  1284. # are passed through to the new statement, which will then raise
  1285. # InvalidRequestError because UPDATE doesn't support multi_values
  1286. # right now.
  1287. if statement._ordered_values:
  1288. new_stmt._ordered_values = self._resolved_values
  1289. elif statement._values:
  1290. new_stmt._values = self._resolved_values
  1291. new_crit = self._adjust_for_extra_criteria(
  1292. self.global_attributes, mapper
  1293. )
  1294. if new_crit:
  1295. new_stmt = new_stmt.where(*new_crit)
  1296. # if we are against a lambda statement we might not be the
  1297. # topmost object that received per-execute annotations
  1298. # do this first as we need to determine if there is
  1299. # UPDATE..FROM
  1300. UpdateDMLState.__init__(self, new_stmt, compiler, **kw)
  1301. use_supplemental_cols = False
  1302. if not toplevel:
  1303. synchronize_session = None
  1304. else:
  1305. synchronize_session = compiler._annotations.get(
  1306. "synchronize_session", None
  1307. )
  1308. can_use_returning = compiler._annotations.get(
  1309. "can_use_returning", None
  1310. )
  1311. if can_use_returning is not False:
  1312. # even though pre_exec has determined basic
  1313. # can_use_returning for the dialect, if we are to use
  1314. # RETURNING we need to run can_use_returning() at this level
  1315. # unconditionally because is_delete_using was not known
  1316. # at the pre_exec level
  1317. can_use_returning = (
  1318. synchronize_session == "fetch"
  1319. and self.can_use_returning(
  1320. compiler.dialect, mapper, is_multitable=self.is_multitable
  1321. )
  1322. )
  1323. if synchronize_session == "fetch" and can_use_returning:
  1324. use_supplemental_cols = True
  1325. # NOTE: we might want to RETURNING the actual columns to be
  1326. # synchronized also. however this is complicated and difficult
  1327. # to align against the behavior of "evaluate". Additionally,
  1328. # in a large number (if not the majority) of cases, we have the
  1329. # "evaluate" answer, usually a fixed value, in memory already and
  1330. # there's no need to re-fetch the same value
  1331. # over and over again. so perhaps if it could be RETURNING just
  1332. # the elements that were based on a SQL expression and not
  1333. # a constant. For now it doesn't quite seem worth it
  1334. new_stmt = new_stmt.return_defaults(*new_stmt.table.primary_key)
  1335. if toplevel:
  1336. new_stmt = self._setup_orm_returning(
  1337. compiler,
  1338. orm_level_statement,
  1339. new_stmt,
  1340. dml_mapper=mapper,
  1341. use_supplemental_cols=use_supplemental_cols,
  1342. )
  1343. self.statement = new_stmt
  1344. def _setup_for_bulk_update(self, statement, compiler, **kw):
  1345. """establish an UPDATE statement within the context of
  1346. bulk insert.
  1347. This method will be within the "conn.execute()" call that is invoked
  1348. by persistence._emit_update_statement().
  1349. """
  1350. statement = cast(dml.Update, statement)
  1351. an = statement._annotations
  1352. emit_update_table, _ = (
  1353. an["_emit_update_table"],
  1354. an["_emit_update_mapper"],
  1355. )
  1356. statement = statement._clone()
  1357. statement.table = emit_update_table
  1358. UpdateDMLState.__init__(self, statement, compiler, **kw)
  1359. if self._ordered_values:
  1360. raise sa_exc.InvalidRequestError(
  1361. "bulk ORM UPDATE does not support ordered_values() for "
  1362. "custom UPDATE statements with bulk parameter sets. Use a "
  1363. "non-bulk UPDATE statement or use values()."
  1364. )
  1365. if self._dict_parameters:
  1366. self._dict_parameters = {
  1367. col: val
  1368. for col, val in self._dict_parameters.items()
  1369. if col.table is emit_update_table
  1370. }
  1371. self.statement = statement
  1372. @classmethod
  1373. def orm_execute_statement(
  1374. cls,
  1375. session: Session,
  1376. statement: dml.Update,
  1377. params: _CoreAnyExecuteParams,
  1378. execution_options: OrmExecuteOptionsParameter,
  1379. bind_arguments: _BindArguments,
  1380. conn: Connection,
  1381. ) -> _result.Result:
  1382. update_options = execution_options.get(
  1383. "_sa_orm_update_options", cls.default_update_options
  1384. )
  1385. if update_options._populate_existing:
  1386. load_options = execution_options.get(
  1387. "_sa_orm_load_options", QueryContext.default_load_options
  1388. )
  1389. load_options += {"_populate_existing": True}
  1390. execution_options = execution_options.union(
  1391. {"_sa_orm_load_options": load_options}
  1392. )
  1393. if update_options._dml_strategy not in (
  1394. "orm",
  1395. "auto",
  1396. "bulk",
  1397. "core_only",
  1398. ):
  1399. raise sa_exc.ArgumentError(
  1400. "Valid strategies for ORM UPDATE strategy "
  1401. "are 'orm', 'auto', 'bulk', 'core_only'"
  1402. )
  1403. result: _result.Result[Any]
  1404. if update_options._dml_strategy == "bulk":
  1405. enable_check_rowcount = not statement._where_criteria
  1406. assert update_options._synchronize_session != "fetch"
  1407. if (
  1408. statement._where_criteria
  1409. and update_options._synchronize_session == "evaluate"
  1410. ):
  1411. raise sa_exc.InvalidRequestError(
  1412. "bulk synchronize of persistent objects not supported "
  1413. "when using bulk update with additional WHERE "
  1414. "criteria right now. add synchronize_session=None "
  1415. "execution option to bypass synchronize of persistent "
  1416. "objects."
  1417. )
  1418. mapper = update_options._subject_mapper
  1419. assert mapper is not None
  1420. assert session._transaction is not None
  1421. result = _bulk_update(
  1422. mapper,
  1423. cast(
  1424. "Iterable[Dict[str, Any]]",
  1425. [params] if isinstance(params, dict) else params,
  1426. ),
  1427. session._transaction,
  1428. isstates=False,
  1429. update_changed_only=False,
  1430. use_orm_update_stmt=statement,
  1431. enable_check_rowcount=enable_check_rowcount,
  1432. )
  1433. return cls.orm_setup_cursor_result(
  1434. session,
  1435. statement,
  1436. params,
  1437. execution_options,
  1438. bind_arguments,
  1439. result,
  1440. )
  1441. else:
  1442. return super().orm_execute_statement(
  1443. session,
  1444. statement,
  1445. params,
  1446. execution_options,
  1447. bind_arguments,
  1448. conn,
  1449. )
  1450. @classmethod
  1451. def can_use_returning(
  1452. cls,
  1453. dialect: Dialect,
  1454. mapper: Mapper[Any],
  1455. *,
  1456. is_multitable: bool = False,
  1457. is_update_from: bool = False,
  1458. is_delete_using: bool = False,
  1459. is_executemany: bool = False,
  1460. ) -> bool:
  1461. # normal answer for "should we use RETURNING" at all.
  1462. normal_answer = (
  1463. dialect.update_returning and mapper.local_table.implicit_returning
  1464. )
  1465. if not normal_answer:
  1466. return False
  1467. if is_executemany:
  1468. return dialect.update_executemany_returning
  1469. # these workarounds are currently hypothetical for UPDATE,
  1470. # unlike DELETE where they impact MariaDB
  1471. if is_update_from:
  1472. return dialect.update_returning_multifrom
  1473. elif is_multitable and not dialect.update_returning_multifrom:
  1474. raise sa_exc.CompileError(
  1475. f'Dialect "{dialect.name}" does not support RETURNING '
  1476. "with UPDATE..FROM; for synchronize_session='fetch', "
  1477. "please add the additional execution option "
  1478. "'is_update_from=True' to the statement to indicate that "
  1479. "a separate SELECT should be used for this backend."
  1480. )
  1481. return True
  1482. @classmethod
  1483. def _do_post_synchronize_bulk_evaluate(
  1484. cls, session, params, result, update_options
  1485. ):
  1486. if not params:
  1487. return
  1488. mapper = update_options._subject_mapper
  1489. pk_keys = [prop.key for prop in mapper._identity_key_props]
  1490. identity_map = session.identity_map
  1491. for param in params:
  1492. identity_key = mapper.identity_key_from_primary_key(
  1493. (param[key] for key in pk_keys),
  1494. update_options._identity_token,
  1495. )
  1496. state = identity_map.fast_get_state(identity_key)
  1497. if not state:
  1498. continue
  1499. evaluated_keys = set(param).difference(pk_keys)
  1500. dict_ = state.dict
  1501. # only evaluate unmodified attributes
  1502. to_evaluate = state.unmodified.intersection(evaluated_keys)
  1503. for key in to_evaluate:
  1504. if key in dict_:
  1505. dict_[key] = param[key]
  1506. state.manager.dispatch.refresh(state, None, to_evaluate)
  1507. state._commit(dict_, list(to_evaluate))
  1508. # attributes that were formerly modified instead get expired.
  1509. # this only gets hit if the session had pending changes
  1510. # and autoflush were set to False.
  1511. to_expire = evaluated_keys.intersection(dict_).difference(
  1512. to_evaluate
  1513. )
  1514. if to_expire:
  1515. state._expire_attributes(dict_, to_expire)
  1516. @classmethod
  1517. def _do_post_synchronize_evaluate(
  1518. cls, session, statement, result, update_options
  1519. ):
  1520. matched_objects = cls._get_matched_objects_on_criteria(
  1521. update_options,
  1522. session.identity_map.all_states(),
  1523. )
  1524. cls._apply_update_set_values_to_objects(
  1525. session,
  1526. update_options,
  1527. statement,
  1528. result.context.compiled_parameters[0],
  1529. [(obj, state, dict_) for obj, state, dict_, _ in matched_objects],
  1530. result.prefetch_cols(),
  1531. result.postfetch_cols(),
  1532. )
  1533. @classmethod
  1534. def _do_post_synchronize_fetch(
  1535. cls, session, statement, result, update_options
  1536. ):
  1537. target_mapper = update_options._subject_mapper
  1538. returned_defaults_rows = result.returned_defaults_rows
  1539. if returned_defaults_rows:
  1540. pk_rows = cls._interpret_returning_rows(
  1541. result, target_mapper, returned_defaults_rows
  1542. )
  1543. matched_rows = [
  1544. tuple(row) + (update_options._identity_token,)
  1545. for row in pk_rows
  1546. ]
  1547. else:
  1548. matched_rows = update_options._matched_rows
  1549. objs = [
  1550. session.identity_map[identity_key]
  1551. for identity_key in [
  1552. target_mapper.identity_key_from_primary_key(
  1553. list(primary_key),
  1554. identity_token=identity_token,
  1555. )
  1556. for primary_key, identity_token in [
  1557. (row[0:-1], row[-1]) for row in matched_rows
  1558. ]
  1559. if update_options._identity_token is None
  1560. or identity_token == update_options._identity_token
  1561. ]
  1562. if identity_key in session.identity_map
  1563. ]
  1564. if not objs:
  1565. return
  1566. cls._apply_update_set_values_to_objects(
  1567. session,
  1568. update_options,
  1569. statement,
  1570. result.context.compiled_parameters[0],
  1571. [
  1572. (
  1573. obj,
  1574. attributes.instance_state(obj),
  1575. attributes.instance_dict(obj),
  1576. )
  1577. for obj in objs
  1578. ],
  1579. result.prefetch_cols(),
  1580. result.postfetch_cols(),
  1581. )
  1582. @classmethod
  1583. def _apply_update_set_values_to_objects(
  1584. cls,
  1585. session,
  1586. update_options,
  1587. statement,
  1588. effective_params,
  1589. matched_objects,
  1590. prefetch_cols,
  1591. postfetch_cols,
  1592. ):
  1593. """apply values to objects derived from an update statement, e.g.
  1594. UPDATE..SET <values>
  1595. """
  1596. mapper = update_options._subject_mapper
  1597. target_cls = mapper.class_
  1598. evaluator_compiler = evaluator._EvaluatorCompiler(target_cls)
  1599. resolved_values = cls._get_resolved_values(mapper, statement)
  1600. resolved_keys_as_propnames = cls._resolved_keys_as_propnames(
  1601. mapper, resolved_values
  1602. )
  1603. value_evaluators = {}
  1604. for key, value in resolved_keys_as_propnames:
  1605. try:
  1606. _evaluator = evaluator_compiler.process(
  1607. coercions.expect(roles.ExpressionElementRole, value)
  1608. )
  1609. except evaluator.UnevaluatableError:
  1610. pass
  1611. else:
  1612. value_evaluators[key] = _evaluator
  1613. evaluated_keys = list(value_evaluators.keys())
  1614. attrib = {k for k, v in resolved_keys_as_propnames}
  1615. states = set()
  1616. to_prefetch = {
  1617. c
  1618. for c in prefetch_cols
  1619. if c.key in effective_params
  1620. and c in mapper._columntoproperty
  1621. and c.key not in evaluated_keys
  1622. }
  1623. to_expire = {
  1624. mapper._columntoproperty[c].key
  1625. for c in postfetch_cols
  1626. if c in mapper._columntoproperty
  1627. }.difference(evaluated_keys)
  1628. prefetch_transfer = [
  1629. (mapper._columntoproperty[c].key, c.key) for c in to_prefetch
  1630. ]
  1631. for obj, state, dict_ in matched_objects:
  1632. dict_.update(
  1633. {
  1634. col_to_prop: effective_params[c_key]
  1635. for col_to_prop, c_key in prefetch_transfer
  1636. }
  1637. )
  1638. state._expire_attributes(state.dict, to_expire)
  1639. to_evaluate = state.unmodified.intersection(evaluated_keys)
  1640. for key in to_evaluate:
  1641. if key in dict_:
  1642. # only run eval for attributes that are present.
  1643. dict_[key] = value_evaluators[key](obj)
  1644. state.manager.dispatch.refresh(state, None, to_evaluate)
  1645. state._commit(dict_, list(to_evaluate))
  1646. # attributes that were formerly modified instead get expired.
  1647. # this only gets hit if the session had pending changes
  1648. # and autoflush were set to False.
  1649. to_expire = attrib.intersection(dict_).difference(to_evaluate)
  1650. if to_expire:
  1651. state._expire_attributes(dict_, to_expire)
  1652. states.add(state)
  1653. session._register_altered(states)
  1654. @CompileState.plugin_for("orm", "delete")
  1655. class BulkORMDelete(BulkUDCompileState, DeleteDMLState):
  1656. @classmethod
  1657. def create_for_statement(cls, statement, compiler, **kw):
  1658. self = cls.__new__(cls)
  1659. dml_strategy = statement._annotations.get(
  1660. "dml_strategy", "unspecified"
  1661. )
  1662. if (
  1663. dml_strategy == "core_only"
  1664. or dml_strategy == "unspecified"
  1665. and "parententity" not in statement.table._annotations
  1666. ):
  1667. DeleteDMLState.__init__(self, statement, compiler, **kw)
  1668. return self
  1669. toplevel = not compiler.stack
  1670. orm_level_statement = statement
  1671. ext_info = statement.table._annotations["parententity"]
  1672. self.mapper = mapper = ext_info.mapper
  1673. self._init_global_attributes(
  1674. statement,
  1675. compiler,
  1676. toplevel=toplevel,
  1677. process_criteria_for_toplevel=toplevel,
  1678. )
  1679. new_stmt = statement._clone()
  1680. if new_stmt.table._annotations["parententity"] is mapper:
  1681. new_stmt.table = mapper.local_table
  1682. new_crit = cls._adjust_for_extra_criteria(
  1683. self.global_attributes, mapper
  1684. )
  1685. if new_crit:
  1686. new_stmt = new_stmt.where(*new_crit)
  1687. # do this first as we need to determine if there is
  1688. # DELETE..FROM
  1689. DeleteDMLState.__init__(self, new_stmt, compiler, **kw)
  1690. use_supplemental_cols = False
  1691. if not toplevel:
  1692. synchronize_session = None
  1693. else:
  1694. synchronize_session = compiler._annotations.get(
  1695. "synchronize_session", None
  1696. )
  1697. can_use_returning = compiler._annotations.get(
  1698. "can_use_returning", None
  1699. )
  1700. if can_use_returning is not False:
  1701. # even though pre_exec has determined basic
  1702. # can_use_returning for the dialect, if we are to use
  1703. # RETURNING we need to run can_use_returning() at this level
  1704. # unconditionally because is_delete_using was not known
  1705. # at the pre_exec level
  1706. can_use_returning = (
  1707. synchronize_session == "fetch"
  1708. and self.can_use_returning(
  1709. compiler.dialect,
  1710. mapper,
  1711. is_multitable=self.is_multitable,
  1712. is_delete_using=compiler._annotations.get(
  1713. "is_delete_using", False
  1714. ),
  1715. )
  1716. )
  1717. if can_use_returning:
  1718. use_supplemental_cols = True
  1719. new_stmt = new_stmt.return_defaults(*new_stmt.table.primary_key)
  1720. if toplevel:
  1721. new_stmt = self._setup_orm_returning(
  1722. compiler,
  1723. orm_level_statement,
  1724. new_stmt,
  1725. dml_mapper=mapper,
  1726. use_supplemental_cols=use_supplemental_cols,
  1727. )
  1728. self.statement = new_stmt
  1729. return self
  1730. @classmethod
  1731. def orm_execute_statement(
  1732. cls,
  1733. session: Session,
  1734. statement: dml.Delete,
  1735. params: _CoreAnyExecuteParams,
  1736. execution_options: OrmExecuteOptionsParameter,
  1737. bind_arguments: _BindArguments,
  1738. conn: Connection,
  1739. ) -> _result.Result:
  1740. update_options = execution_options.get(
  1741. "_sa_orm_update_options", cls.default_update_options
  1742. )
  1743. if update_options._dml_strategy == "bulk":
  1744. raise sa_exc.InvalidRequestError(
  1745. "Bulk ORM DELETE not supported right now. "
  1746. "Statement may be invoked at the "
  1747. "Core level using "
  1748. "session.connection().execute(stmt, parameters)"
  1749. )
  1750. if update_options._dml_strategy not in ("orm", "auto", "core_only"):
  1751. raise sa_exc.ArgumentError(
  1752. "Valid strategies for ORM DELETE strategy are 'orm', 'auto', "
  1753. "'core_only'"
  1754. )
  1755. return super().orm_execute_statement(
  1756. session, statement, params, execution_options, bind_arguments, conn
  1757. )
  1758. @classmethod
  1759. def can_use_returning(
  1760. cls,
  1761. dialect: Dialect,
  1762. mapper: Mapper[Any],
  1763. *,
  1764. is_multitable: bool = False,
  1765. is_update_from: bool = False,
  1766. is_delete_using: bool = False,
  1767. is_executemany: bool = False,
  1768. ) -> bool:
  1769. # normal answer for "should we use RETURNING" at all.
  1770. normal_answer = (
  1771. dialect.delete_returning and mapper.local_table.implicit_returning
  1772. )
  1773. if not normal_answer:
  1774. return False
  1775. # now get into special workarounds because MariaDB supports
  1776. # DELETE...RETURNING but not DELETE...USING...RETURNING.
  1777. if is_delete_using:
  1778. # is_delete_using hint was passed. use
  1779. # additional dialect feature (True for PG, False for MariaDB)
  1780. return dialect.delete_returning_multifrom
  1781. elif is_multitable and not dialect.delete_returning_multifrom:
  1782. # is_delete_using hint was not passed, but we determined
  1783. # at compile time that this is in fact a DELETE..USING.
  1784. # it's too late to continue since we did not pre-SELECT.
  1785. # raise that we need that hint up front.
  1786. raise sa_exc.CompileError(
  1787. f'Dialect "{dialect.name}" does not support RETURNING '
  1788. "with DELETE..USING; for synchronize_session='fetch', "
  1789. "please add the additional execution option "
  1790. "'is_delete_using=True' to the statement to indicate that "
  1791. "a separate SELECT should be used for this backend."
  1792. )
  1793. return True
  1794. @classmethod
  1795. def _do_post_synchronize_evaluate(
  1796. cls, session, statement, result, update_options
  1797. ):
  1798. matched_objects = cls._get_matched_objects_on_criteria(
  1799. update_options,
  1800. session.identity_map.all_states(),
  1801. )
  1802. to_delete = []
  1803. for _, state, dict_, is_partially_expired in matched_objects:
  1804. if is_partially_expired:
  1805. state._expire(dict_, session.identity_map._modified)
  1806. else:
  1807. to_delete.append(state)
  1808. if to_delete:
  1809. session._remove_newly_deleted(to_delete)
  1810. @classmethod
  1811. def _do_post_synchronize_fetch(
  1812. cls, session, statement, result, update_options
  1813. ):
  1814. target_mapper = update_options._subject_mapper
  1815. returned_defaults_rows = result.returned_defaults_rows
  1816. if returned_defaults_rows:
  1817. pk_rows = cls._interpret_returning_rows(
  1818. result, target_mapper, returned_defaults_rows
  1819. )
  1820. matched_rows = [
  1821. tuple(row) + (update_options._identity_token,)
  1822. for row in pk_rows
  1823. ]
  1824. else:
  1825. matched_rows = update_options._matched_rows
  1826. for row in matched_rows:
  1827. primary_key = row[0:-1]
  1828. identity_token = row[-1]
  1829. # TODO: inline this and call remove_newly_deleted
  1830. # once
  1831. identity_key = target_mapper.identity_key_from_primary_key(
  1832. list(primary_key),
  1833. identity_token=identity_token,
  1834. )
  1835. if identity_key in session.identity_map:
  1836. session._remove_newly_deleted(
  1837. [
  1838. attributes.instance_state(
  1839. session.identity_map[identity_key]
  1840. )
  1841. ]
  1842. )