base.py 120 KB


  1. # engine/base.py
  2. # Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. """Defines :class:`_engine.Connection` and :class:`_engine.Engine`."""
  8. from __future__ import annotations
  9. import contextlib
  10. import sys
  11. import typing
  12. from typing import Any
  13. from typing import Callable
  14. from typing import cast
  15. from typing import Iterable
  16. from typing import Iterator
  17. from typing import List
  18. from typing import Mapping
  19. from typing import NoReturn
  20. from typing import Optional
  21. from typing import overload
  22. from typing import Tuple
  23. from typing import Type
  24. from typing import TypeVar
  25. from typing import Union
  26. from .interfaces import BindTyping
  27. from .interfaces import ConnectionEventsTarget
  28. from .interfaces import DBAPICursor
  29. from .interfaces import ExceptionContext
  30. from .interfaces import ExecuteStyle
  31. from .interfaces import ExecutionContext
  32. from .interfaces import IsolationLevel
  33. from .util import _distill_params_20
  34. from .util import _distill_raw_params
  35. from .util import TransactionalContext
  36. from .. import exc
  37. from .. import inspection
  38. from .. import log
  39. from .. import util
  40. from ..sql import compiler
  41. from ..sql import util as sql_util
  42. if typing.TYPE_CHECKING:
  43. from . import CursorResult
  44. from . import ScalarResult
  45. from .interfaces import _AnyExecuteParams
  46. from .interfaces import _AnyMultiExecuteParams
  47. from .interfaces import _CoreAnyExecuteParams
  48. from .interfaces import _CoreMultiExecuteParams
  49. from .interfaces import _CoreSingleExecuteParams
  50. from .interfaces import _DBAPIAnyExecuteParams
  51. from .interfaces import _DBAPISingleExecuteParams
  52. from .interfaces import _ExecuteOptions
  53. from .interfaces import CompiledCacheType
  54. from .interfaces import CoreExecuteOptionsParameter
  55. from .interfaces import Dialect
  56. from .interfaces import SchemaTranslateMapType
  57. from .reflection import Inspector # noqa
  58. from .url import URL
  59. from ..event import dispatcher
  60. from ..log import _EchoFlagType
  61. from ..pool import _ConnectionFairy
  62. from ..pool import Pool
  63. from ..pool import PoolProxiedConnection
  64. from ..sql import Executable
  65. from ..sql._typing import _InfoType
  66. from ..sql.compiler import Compiled
  67. from ..sql.ddl import ExecutableDDLElement
  68. from ..sql.ddl import InvokeDDLBase
  69. from ..sql.functions import FunctionElement
  70. from ..sql.schema import DefaultGenerator
  71. from ..sql.schema import HasSchemaAttr
  72. from ..sql.schema import SchemaVisitable
  73. from ..sql.selectable import TypedReturnsRows
  74. _T = TypeVar("_T", bound=Any)
  75. _EMPTY_EXECUTION_OPTS: _ExecuteOptions = util.EMPTY_DICT
  76. NO_OPTIONS: Mapping[str, Any] = util.EMPTY_DICT
  77. class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
  78. """Provides high-level functionality for a wrapped DB-API connection.
  79. The :class:`_engine.Connection` object is procured by calling the
  80. :meth:`_engine.Engine.connect` method of the :class:`_engine.Engine`
  81. object, and provides services for execution of SQL statements as well
  82. as transaction control.
  83. The Connection object is **not** thread-safe. While a Connection can be
  84. shared among threads using properly synchronized access, it is still
  85. possible that the underlying DBAPI connection may not support shared
  86. access between threads. Check the DBAPI documentation for details.
  87. The Connection object represents a single DBAPI connection checked out
  88. from the connection pool. In this state, the connection pool has no
  89. affect upon the connection, including its expiration or timeout state.
  90. For the connection pool to properly manage connections, connections
  91. should be returned to the connection pool (i.e. ``connection.close()``)
  92. whenever the connection is not in use.
  93. .. index::
  94. single: thread safety; Connection
  95. """
  96. dialect: Dialect
  97. dispatch: dispatcher[ConnectionEventsTarget]
  98. _sqla_logger_namespace = "sqlalchemy.engine.Connection"
  99. # used by sqlalchemy.engine.util.TransactionalContext
  100. _trans_context_manager: Optional[TransactionalContext] = None
  101. # legacy as of 2.0, should be eventually deprecated and
  102. # removed. was used in the "pre_ping" recipe that's been in the docs
  103. # a long time
  104. should_close_with_result = False
  105. _dbapi_connection: Optional[PoolProxiedConnection]
  106. _execution_options: _ExecuteOptions
  107. _transaction: Optional[RootTransaction]
  108. _nested_transaction: Optional[NestedTransaction]
  109. def __init__(
  110. self,
  111. engine: Engine,
  112. connection: Optional[PoolProxiedConnection] = None,
  113. _has_events: Optional[bool] = None,
  114. _allow_revalidate: bool = True,
  115. _allow_autobegin: bool = True,
  116. ):
  117. """Construct a new Connection."""
  118. self.engine = engine
  119. self.dialect = dialect = engine.dialect
  120. if connection is None:
  121. try:
  122. self._dbapi_connection = engine.raw_connection()
  123. except dialect.loaded_dbapi.Error as err:
  124. Connection._handle_dbapi_exception_noconnection(
  125. err, dialect, engine
  126. )
  127. raise
  128. else:
  129. self._dbapi_connection = connection
  130. self._transaction = self._nested_transaction = None
  131. self.__savepoint_seq = 0
  132. self.__in_begin = False
  133. self.__can_reconnect = _allow_revalidate
  134. self._allow_autobegin = _allow_autobegin
  135. self._echo = self.engine._should_log_info()
  136. if _has_events is None:
  137. # if _has_events is sent explicitly as False,
  138. # then don't join the dispatch of the engine; we don't
  139. # want to handle any of the engine's events in that case.
  140. self.dispatch = self.dispatch._join(engine.dispatch)
  141. self._has_events = _has_events or (
  142. _has_events is None and engine._has_events
  143. )
  144. self._execution_options = engine._execution_options
  145. if self._has_events or self.engine._has_events:
  146. self.dispatch.engine_connect(self)
  147. # this can be assigned differently via
  148. # characteristics.LoggingTokenCharacteristic
  149. _message_formatter: Any = None
  150. def _log_info(self, message: str, *arg: Any, **kw: Any) -> None:
  151. fmt = self._message_formatter
  152. if fmt:
  153. message = fmt(message)
  154. if log.STACKLEVEL:
  155. kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET
  156. self.engine.logger.info(message, *arg, **kw)
  157. def _log_debug(self, message: str, *arg: Any, **kw: Any) -> None:
  158. fmt = self._message_formatter
  159. if fmt:
  160. message = fmt(message)
  161. if log.STACKLEVEL:
  162. kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET
  163. self.engine.logger.debug(message, *arg, **kw)
  164. @property
  165. def _schema_translate_map(self) -> Optional[SchemaTranslateMapType]:
  166. schema_translate_map: Optional[SchemaTranslateMapType] = (
  167. self._execution_options.get("schema_translate_map", None)
  168. )
  169. return schema_translate_map
  170. def schema_for_object(self, obj: HasSchemaAttr) -> Optional[str]:
  171. """Return the schema name for the given schema item taking into
  172. account current schema translate map.
  173. """
  174. name = obj.schema
  175. schema_translate_map: Optional[SchemaTranslateMapType] = (
  176. self._execution_options.get("schema_translate_map", None)
  177. )
  178. if (
  179. schema_translate_map
  180. and name in schema_translate_map
  181. and obj._use_schema_map
  182. ):
  183. return schema_translate_map[name]
  184. else:
  185. return name
  186. def __enter__(self) -> Connection:
  187. return self
  188. def __exit__(self, type_: Any, value: Any, traceback: Any) -> None:
  189. self.close()
  190. @overload
  191. def execution_options(
  192. self,
  193. *,
  194. compiled_cache: Optional[CompiledCacheType] = ...,
  195. logging_token: str = ...,
  196. isolation_level: IsolationLevel = ...,
  197. no_parameters: bool = False,
  198. stream_results: bool = False,
  199. max_row_buffer: int = ...,
  200. yield_per: int = ...,
  201. insertmanyvalues_page_size: int = ...,
  202. schema_translate_map: Optional[SchemaTranslateMapType] = ...,
  203. preserve_rowcount: bool = False,
  204. **opt: Any,
  205. ) -> Connection: ...
  206. @overload
  207. def execution_options(self, **opt: Any) -> Connection: ...
  208. def execution_options(self, **opt: Any) -> Connection:
  209. r"""Set non-SQL options for the connection which take effect
  210. during execution.
  211. This method modifies this :class:`_engine.Connection` **in-place**;
  212. the return value is the same :class:`_engine.Connection` object
  213. upon which the method is called. Note that this is in contrast
  214. to the behavior of the ``execution_options`` methods on other
  215. objects such as :meth:`_engine.Engine.execution_options` and
  216. :meth:`_sql.Executable.execution_options`. The rationale is that many
  217. such execution options necessarily modify the state of the base
  218. DBAPI connection in any case so there is no feasible means of
  219. keeping the effect of such an option localized to a "sub" connection.
  220. .. versionchanged:: 2.0 The :meth:`_engine.Connection.execution_options`
  221. method, in contrast to other objects with this method, modifies
  222. the connection in-place without creating copy of it.
  223. As discussed elsewhere, the :meth:`_engine.Connection.execution_options`
  224. method accepts any arbitrary parameters including user defined names.
  225. All parameters given are consumable in a number of ways including
  226. by using the :meth:`_engine.Connection.get_execution_options` method.
  227. See the examples at :meth:`_sql.Executable.execution_options`
  228. and :meth:`_engine.Engine.execution_options`.
  229. The keywords that are currently recognized by SQLAlchemy itself
  230. include all those listed under :meth:`.Executable.execution_options`,
  231. as well as others that are specific to :class:`_engine.Connection`.
  232. :param compiled_cache: Available on: :class:`_engine.Connection`,
  233. :class:`_engine.Engine`.
  234. A dictionary where :class:`.Compiled` objects
  235. will be cached when the :class:`_engine.Connection`
  236. compiles a clause
  237. expression into a :class:`.Compiled` object. This dictionary will
  238. supersede the statement cache that may be configured on the
  239. :class:`_engine.Engine` itself. If set to None, caching
  240. is disabled, even if the engine has a configured cache size.
  241. Note that the ORM makes use of its own "compiled" caches for
  242. some operations, including flush operations. The caching
  243. used by the ORM internally supersedes a cache dictionary
  244. specified here.
  245. :param logging_token: Available on: :class:`_engine.Connection`,
  246. :class:`_engine.Engine`, :class:`_sql.Executable`.
  247. Adds the specified string token surrounded by brackets in log
  248. messages logged by the connection, i.e. the logging that's enabled
  249. either via the :paramref:`_sa.create_engine.echo` flag or via the
  250. ``logging.getLogger("sqlalchemy.engine")`` logger. This allows a
  251. per-connection or per-sub-engine token to be available which is
  252. useful for debugging concurrent connection scenarios.
  253. .. versionadded:: 1.4.0b2
  254. .. seealso::
  255. :ref:`dbengine_logging_tokens` - usage example
  256. :paramref:`_sa.create_engine.logging_name` - adds a name to the
  257. name used by the Python logger object itself.
  258. :param isolation_level: Available on: :class:`_engine.Connection`,
  259. :class:`_engine.Engine`.
  260. Set the transaction isolation level for the lifespan of this
  261. :class:`_engine.Connection` object.
  262. Valid values include those string
  263. values accepted by the :paramref:`_sa.create_engine.isolation_level`
  264. parameter passed to :func:`_sa.create_engine`. These levels are
  265. semi-database specific; see individual dialect documentation for
  266. valid levels.
  267. The isolation level option applies the isolation level by emitting
  268. statements on the DBAPI connection, and **necessarily affects the
  269. original Connection object overall**. The isolation level will remain
  270. at the given setting until explicitly changed, or when the DBAPI
  271. connection itself is :term:`released` to the connection pool, i.e. the
  272. :meth:`_engine.Connection.close` method is called, at which time an
  273. event handler will emit additional statements on the DBAPI connection
  274. in order to revert the isolation level change.
  275. .. note:: The ``isolation_level`` execution option may only be
  276. established before the :meth:`_engine.Connection.begin` method is
  277. called, as well as before any SQL statements are emitted which
  278. would otherwise trigger "autobegin", or directly after a call to
  279. :meth:`_engine.Connection.commit` or
  280. :meth:`_engine.Connection.rollback`. A database cannot change the
  281. isolation level on a transaction in progress.
  282. .. note:: The ``isolation_level`` execution option is implicitly
  283. reset if the :class:`_engine.Connection` is invalidated, e.g. via
  284. the :meth:`_engine.Connection.invalidate` method, or if a
  285. disconnection error occurs. The new connection produced after the
  286. invalidation will **not** have the selected isolation level
  287. re-applied to it automatically.
  288. .. seealso::
  289. :ref:`dbapi_autocommit`
  290. :meth:`_engine.Connection.get_isolation_level`
  291. - view current actual level
  292. :param no_parameters: Available on: :class:`_engine.Connection`,
  293. :class:`_sql.Executable`.
  294. When ``True``, if the final parameter
  295. list or dictionary is totally empty, will invoke the
  296. statement on the cursor as ``cursor.execute(statement)``,
  297. not passing the parameter collection at all.
  298. Some DBAPIs such as psycopg2 and mysql-python consider
  299. percent signs as significant only when parameters are
  300. present; this option allows code to generate SQL
  301. containing percent signs (and possibly other characters)
  302. that is neutral regarding whether it's executed by the DBAPI
  303. or piped into a script that's later invoked by
  304. command line tools.
  305. :param stream_results: Available on: :class:`_engine.Connection`,
  306. :class:`_sql.Executable`.
  307. Indicate to the dialect that results should be "streamed" and not
  308. pre-buffered, if possible. For backends such as PostgreSQL, MySQL
  309. and MariaDB, this indicates the use of a "server side cursor" as
  310. opposed to a client side cursor. Other backends such as that of
  311. Oracle Database may already use server side cursors by default.
  312. The usage of
  313. :paramref:`_engine.Connection.execution_options.stream_results` is
  314. usually combined with setting a fixed number of rows to to be fetched
  315. in batches, to allow for efficient iteration of database rows while
  316. at the same time not loading all result rows into memory at once;
  317. this can be configured on a :class:`_engine.Result` object using the
  318. :meth:`_engine.Result.yield_per` method, after execution has
  319. returned a new :class:`_engine.Result`. If
  320. :meth:`_engine.Result.yield_per` is not used,
  321. the :paramref:`_engine.Connection.execution_options.stream_results`
  322. mode of operation will instead use a dynamically sized buffer
  323. which buffers sets of rows at a time, growing on each batch
  324. based on a fixed growth size up until a limit which may
  325. be configured using the
  326. :paramref:`_engine.Connection.execution_options.max_row_buffer`
  327. parameter.
  328. When using the ORM to fetch ORM mapped objects from a result,
  329. :meth:`_engine.Result.yield_per` should always be used with
  330. :paramref:`_engine.Connection.execution_options.stream_results`,
  331. so that the ORM does not fetch all rows into new ORM objects at once.
  332. For typical use, the
  333. :paramref:`_engine.Connection.execution_options.yield_per` execution
  334. option should be preferred, which sets up both
  335. :paramref:`_engine.Connection.execution_options.stream_results` and
  336. :meth:`_engine.Result.yield_per` at once. This option is supported
  337. both at a core level by :class:`_engine.Connection` as well as by the
  338. ORM :class:`_engine.Session`; the latter is described at
  339. :ref:`orm_queryguide_yield_per`.
  340. .. seealso::
  341. :ref:`engine_stream_results` - background on
  342. :paramref:`_engine.Connection.execution_options.stream_results`
  343. :paramref:`_engine.Connection.execution_options.max_row_buffer`
  344. :paramref:`_engine.Connection.execution_options.yield_per`
  345. :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
  346. describing the ORM version of ``yield_per``
  347. :param max_row_buffer: Available on: :class:`_engine.Connection`,
  348. :class:`_sql.Executable`. Sets a maximum
  349. buffer size to use when the
  350. :paramref:`_engine.Connection.execution_options.stream_results`
  351. execution option is used on a backend that supports server side
  352. cursors. The default value if not specified is 1000.
  353. .. seealso::
  354. :paramref:`_engine.Connection.execution_options.stream_results`
  355. :ref:`engine_stream_results`
  356. :param yield_per: Available on: :class:`_engine.Connection`,
  357. :class:`_sql.Executable`. Integer value applied which will
  358. set the :paramref:`_engine.Connection.execution_options.stream_results`
  359. execution option and invoke :meth:`_engine.Result.yield_per`
  360. automatically at once. Allows equivalent functionality as
  361. is present when using this parameter with the ORM.
  362. .. versionadded:: 1.4.40
  363. .. seealso::
  364. :ref:`engine_stream_results` - background and examples
  365. on using server side cursors with Core.
  366. :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
  367. describing the ORM version of ``yield_per``
  368. :param insertmanyvalues_page_size: Available on: :class:`_engine.Connection`,
  369. :class:`_engine.Engine`. Number of rows to format into an
  370. INSERT statement when the statement uses "insertmanyvalues" mode,
  371. which is a paged form of bulk insert that is used for many backends
  372. when using :term:`executemany` execution typically in conjunction
  373. with RETURNING. Defaults to 1000. May also be modified on a
  374. per-engine basis using the
  375. :paramref:`_sa.create_engine.insertmanyvalues_page_size` parameter.
  376. .. versionadded:: 2.0
  377. .. seealso::
  378. :ref:`engine_insertmanyvalues`
  379. :param schema_translate_map: Available on: :class:`_engine.Connection`,
  380. :class:`_engine.Engine`, :class:`_sql.Executable`.
  381. A dictionary mapping schema names to schema names, that will be
  382. applied to the :paramref:`_schema.Table.schema` element of each
  383. :class:`_schema.Table`
  384. encountered when SQL or DDL expression elements
  385. are compiled into strings; the resulting schema name will be
  386. converted based on presence in the map of the original name.
  387. .. seealso::
  388. :ref:`schema_translating`
  389. :param preserve_rowcount: Boolean; when True, the ``cursor.rowcount``
  390. attribute will be unconditionally memoized within the result and
  391. made available via the :attr:`.CursorResult.rowcount` attribute.
  392. Normally, this attribute is only preserved for UPDATE and DELETE
  393. statements. Using this option, the DBAPIs rowcount value can
  394. be accessed for other kinds of statements such as INSERT and SELECT,
  395. to the degree that the DBAPI supports these statements. See
  396. :attr:`.CursorResult.rowcount` for notes regarding the behavior
  397. of this attribute.
  398. .. versionadded:: 2.0.28
  399. .. seealso::
  400. :meth:`_engine.Engine.execution_options`
  401. :meth:`.Executable.execution_options`
  402. :meth:`_engine.Connection.get_execution_options`
  403. :ref:`orm_queryguide_execution_options` - documentation on all
  404. ORM-specific execution options
  405. """ # noqa
  406. if self._has_events or self.engine._has_events:
  407. self.dispatch.set_connection_execution_options(self, opt)
  408. self._execution_options = self._execution_options.union(opt)
  409. self.dialect.set_connection_execution_options(self, opt)
  410. return self
  411. def get_execution_options(self) -> _ExecuteOptions:
  412. """Get the non-SQL options which will take effect during execution.
  413. .. versionadded:: 1.3
  414. .. seealso::
  415. :meth:`_engine.Connection.execution_options`
  416. """
  417. return self._execution_options
  418. @property
  419. def _still_open_and_dbapi_connection_is_valid(self) -> bool:
  420. pool_proxied_connection = self._dbapi_connection
  421. return (
  422. pool_proxied_connection is not None
  423. and pool_proxied_connection.is_valid
  424. )
  425. @property
  426. def closed(self) -> bool:
  427. """Return True if this connection is closed."""
  428. return self._dbapi_connection is None and not self.__can_reconnect
  429. @property
  430. def invalidated(self) -> bool:
  431. """Return True if this connection was invalidated.
  432. This does not indicate whether or not the connection was
  433. invalidated at the pool level, however
  434. """
  435. # prior to 1.4, "invalid" was stored as a state independent of
  436. # "closed", meaning an invalidated connection could be "closed",
  437. # the _dbapi_connection would be None and closed=True, yet the
  438. # "invalid" flag would stay True. This meant that there were
  439. # three separate states (open/valid, closed/valid, closed/invalid)
  440. # when there is really no reason for that; a connection that's
  441. # "closed" does not need to be "invalid". So the state is now
  442. # represented by the two facts alone.
  443. pool_proxied_connection = self._dbapi_connection
  444. return pool_proxied_connection is None and self.__can_reconnect
  445. @property
  446. def connection(self) -> PoolProxiedConnection:
  447. """The underlying DB-API connection managed by this Connection.
  448. This is a SQLAlchemy connection-pool proxied connection
  449. which then has the attribute
  450. :attr:`_pool._ConnectionFairy.dbapi_connection` that refers to the
  451. actual driver connection.
  452. .. seealso::
  453. :ref:`dbapi_connections`
  454. """
  455. if self._dbapi_connection is None:
  456. try:
  457. return self._revalidate_connection()
  458. except (exc.PendingRollbackError, exc.ResourceClosedError):
  459. raise
  460. except BaseException as e:
  461. self._handle_dbapi_exception(e, None, None, None, None)
  462. else:
  463. return self._dbapi_connection
  464. def get_isolation_level(self) -> IsolationLevel:
  465. """Return the current **actual** isolation level that's present on
  466. the database within the scope of this connection.
  467. This attribute will perform a live SQL operation against the database
  468. in order to procure the current isolation level, so the value returned
  469. is the actual level on the underlying DBAPI connection regardless of
  470. how this state was set. This will be one of the four actual isolation
  471. modes ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``,
  472. ``SERIALIZABLE``. It will **not** include the ``AUTOCOMMIT`` isolation
  473. level setting. Third party dialects may also feature additional
  474. isolation level settings.
  475. .. note:: This method **will not report** on the ``AUTOCOMMIT``
  476. isolation level, which is a separate :term:`dbapi` setting that's
  477. independent of **actual** isolation level. When ``AUTOCOMMIT`` is
  478. in use, the database connection still has a "traditional" isolation
  479. mode in effect, that is typically one of the four values
  480. ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``,
  481. ``SERIALIZABLE``.
  482. Compare to the :attr:`_engine.Connection.default_isolation_level`
  483. accessor which returns the isolation level that is present on the
  484. database at initial connection time.
  485. .. seealso::
  486. :attr:`_engine.Connection.default_isolation_level`
  487. - view default level
  488. :paramref:`_sa.create_engine.isolation_level`
  489. - set per :class:`_engine.Engine` isolation level
  490. :paramref:`.Connection.execution_options.isolation_level`
  491. - set per :class:`_engine.Connection` isolation level
  492. """
  493. dbapi_connection = self.connection.dbapi_connection
  494. assert dbapi_connection is not None
  495. try:
  496. return self.dialect.get_isolation_level(dbapi_connection)
  497. except BaseException as e:
  498. self._handle_dbapi_exception(e, None, None, None, None)
  499. @property
  500. def default_isolation_level(self) -> Optional[IsolationLevel]:
  501. """The initial-connection time isolation level associated with the
  502. :class:`_engine.Dialect` in use.
  503. This value is independent of the
  504. :paramref:`.Connection.execution_options.isolation_level` and
  505. :paramref:`.Engine.execution_options.isolation_level` execution
  506. options, and is determined by the :class:`_engine.Dialect` when the
  507. first connection is created, by performing a SQL query against the
  508. database for the current isolation level before any additional commands
  509. have been emitted.
  510. Calling this accessor does not invoke any new SQL queries.
  511. .. seealso::
  512. :meth:`_engine.Connection.get_isolation_level`
  513. - view current actual isolation level
  514. :paramref:`_sa.create_engine.isolation_level`
  515. - set per :class:`_engine.Engine` isolation level
  516. :paramref:`.Connection.execution_options.isolation_level`
  517. - set per :class:`_engine.Connection` isolation level
  518. """
  519. return self.dialect.default_isolation_level
  520. def _invalid_transaction(self) -> NoReturn:
  521. raise exc.PendingRollbackError(
  522. "Can't reconnect until invalid %stransaction is rolled "
  523. "back. Please rollback() fully before proceeding"
  524. % ("savepoint " if self._nested_transaction is not None else ""),
  525. code="8s2b",
  526. )
  527. def _revalidate_connection(self) -> PoolProxiedConnection:
  528. if self.__can_reconnect and self.invalidated:
  529. if self._transaction is not None:
  530. self._invalid_transaction()
  531. self._dbapi_connection = self.engine.raw_connection()
  532. return self._dbapi_connection
  533. raise exc.ResourceClosedError("This Connection is closed")
  534. @property
  535. def info(self) -> _InfoType:
  536. """Info dictionary associated with the underlying DBAPI connection
  537. referred to by this :class:`_engine.Connection`, allowing user-defined
  538. data to be associated with the connection.
  539. The data here will follow along with the DBAPI connection including
  540. after it is returned to the connection pool and used again
  541. in subsequent instances of :class:`_engine.Connection`.
  542. """
  543. return self.connection.info
  544. def invalidate(self, exception: Optional[BaseException] = None) -> None:
  545. """Invalidate the underlying DBAPI connection associated with
  546. this :class:`_engine.Connection`.
  547. An attempt will be made to close the underlying DBAPI connection
  548. immediately; however if this operation fails, the error is logged
  549. but not raised. The connection is then discarded whether or not
  550. close() succeeded.
  551. Upon the next use (where "use" typically means using the
  552. :meth:`_engine.Connection.execute` method or similar),
  553. this :class:`_engine.Connection` will attempt to
  554. procure a new DBAPI connection using the services of the
  555. :class:`_pool.Pool` as a source of connectivity (e.g.
  556. a "reconnection").
  557. If a transaction was in progress (e.g. the
  558. :meth:`_engine.Connection.begin` method has been called) when
  559. :meth:`_engine.Connection.invalidate` method is called, at the DBAPI
  560. level all state associated with this transaction is lost, as
  561. the DBAPI connection is closed. The :class:`_engine.Connection`
  562. will not allow a reconnection to proceed until the
  563. :class:`.Transaction` object is ended, by calling the
  564. :meth:`.Transaction.rollback` method; until that point, any attempt at
  565. continuing to use the :class:`_engine.Connection` will raise an
  566. :class:`~sqlalchemy.exc.InvalidRequestError`.
  567. This is to prevent applications from accidentally
  568. continuing an ongoing transactional operations despite the
  569. fact that the transaction has been lost due to an
  570. invalidation.
  571. The :meth:`_engine.Connection.invalidate` method,
  572. just like auto-invalidation,
  573. will at the connection pool level invoke the
  574. :meth:`_events.PoolEvents.invalidate` event.
  575. :param exception: an optional ``Exception`` instance that's the
  576. reason for the invalidation. is passed along to event handlers
  577. and logging functions.
  578. .. seealso::
  579. :ref:`pool_connection_invalidation`
  580. """
  581. if self.invalidated:
  582. return
  583. if self.closed:
  584. raise exc.ResourceClosedError("This Connection is closed")
  585. if self._still_open_and_dbapi_connection_is_valid:
  586. pool_proxied_connection = self._dbapi_connection
  587. assert pool_proxied_connection is not None
  588. pool_proxied_connection.invalidate(exception)
  589. self._dbapi_connection = None
  590. def detach(self) -> None:
  591. """Detach the underlying DB-API connection from its connection pool.
  592. E.g.::
  593. with engine.connect() as conn:
  594. conn.detach()
  595. conn.execute(text("SET search_path TO schema1, schema2"))
  596. # work with connection
  597. # connection is fully closed (since we used "with:", can
  598. # also call .close())
  599. This :class:`_engine.Connection` instance will remain usable.
  600. When closed
  601. (or exited from a context manager context as above),
  602. the DB-API connection will be literally closed and not
  603. returned to its originating pool.
  604. This method can be used to insulate the rest of an application
  605. from a modified state on a connection (such as a transaction
  606. isolation level or similar).
  607. """
  608. if self.closed:
  609. raise exc.ResourceClosedError("This Connection is closed")
  610. pool_proxied_connection = self._dbapi_connection
  611. if pool_proxied_connection is None:
  612. raise exc.InvalidRequestError(
  613. "Can't detach an invalidated Connection"
  614. )
  615. pool_proxied_connection.detach()
  616. def _autobegin(self) -> None:
  617. if self._allow_autobegin and not self.__in_begin:
  618. self.begin()
  619. def begin(self) -> RootTransaction:
  620. """Begin a transaction prior to autobegin occurring.
  621. E.g.::
  622. with engine.connect() as conn:
  623. with conn.begin() as trans:
  624. conn.execute(table.insert(), {"username": "sandy"})
  625. The returned object is an instance of :class:`_engine.RootTransaction`.
  626. This object represents the "scope" of the transaction,
  627. which completes when either the :meth:`_engine.Transaction.rollback`
  628. or :meth:`_engine.Transaction.commit` method is called; the object
  629. also works as a context manager as illustrated above.
  630. The :meth:`_engine.Connection.begin` method begins a
  631. transaction that normally will be begun in any case when the connection
  632. is first used to execute a statement. The reason this method might be
  633. used would be to invoke the :meth:`_events.ConnectionEvents.begin`
  634. event at a specific time, or to organize code within the scope of a
  635. connection checkout in terms of context managed blocks, such as::
  636. with engine.connect() as conn:
  637. with conn.begin():
  638. conn.execute(...)
  639. conn.execute(...)
  640. with conn.begin():
  641. conn.execute(...)
  642. conn.execute(...)
  643. The above code is not fundamentally any different in its behavior than
  644. the following code which does not use
  645. :meth:`_engine.Connection.begin`; the below style is known
  646. as "commit as you go" style::
  647. with engine.connect() as conn:
  648. conn.execute(...)
  649. conn.execute(...)
  650. conn.commit()
  651. conn.execute(...)
  652. conn.execute(...)
  653. conn.commit()
  654. From a database point of view, the :meth:`_engine.Connection.begin`
  655. method does not emit any SQL or change the state of the underlying
  656. DBAPI connection in any way; the Python DBAPI does not have any
  657. concept of explicit transaction begin.
  658. .. seealso::
  659. :ref:`tutorial_working_with_transactions` - in the
  660. :ref:`unified_tutorial`
  661. :meth:`_engine.Connection.begin_nested` - use a SAVEPOINT
  662. :meth:`_engine.Connection.begin_twophase` -
  663. use a two phase /XID transaction
  664. :meth:`_engine.Engine.begin` - context manager available from
  665. :class:`_engine.Engine`
  666. """
  667. if self._transaction is None:
  668. self._transaction = RootTransaction(self)
  669. return self._transaction
  670. else:
  671. raise exc.InvalidRequestError(
  672. "This connection has already initialized a SQLAlchemy "
  673. "Transaction() object via begin() or autobegin; can't "
  674. "call begin() here unless rollback() or commit() "
  675. "is called first."
  676. )
  677. def begin_nested(self) -> NestedTransaction:
  678. """Begin a nested transaction (i.e. SAVEPOINT) and return a transaction
  679. handle that controls the scope of the SAVEPOINT.
  680. E.g.::
  681. with engine.begin() as connection:
  682. with connection.begin_nested():
  683. connection.execute(table.insert(), {"username": "sandy"})
  684. The returned object is an instance of
  685. :class:`_engine.NestedTransaction`, which includes transactional
  686. methods :meth:`_engine.NestedTransaction.commit` and
  687. :meth:`_engine.NestedTransaction.rollback`; for a nested transaction,
  688. these methods correspond to the operations "RELEASE SAVEPOINT <name>"
  689. and "ROLLBACK TO SAVEPOINT <name>". The name of the savepoint is local
  690. to the :class:`_engine.NestedTransaction` object and is generated
  691. automatically. Like any other :class:`_engine.Transaction`, the
  692. :class:`_engine.NestedTransaction` may be used as a context manager as
  693. illustrated above which will "release" or "rollback" corresponding to
  694. if the operation within the block were successful or raised an
  695. exception.
  696. Nested transactions require SAVEPOINT support in the underlying
  697. database, else the behavior is undefined. SAVEPOINT is commonly used to
  698. run operations within a transaction that may fail, while continuing the
  699. outer transaction. E.g.::
  700. from sqlalchemy import exc
  701. with engine.begin() as connection:
  702. trans = connection.begin_nested()
  703. try:
  704. connection.execute(table.insert(), {"username": "sandy"})
  705. trans.commit()
  706. except exc.IntegrityError: # catch for duplicate username
  707. trans.rollback() # rollback to savepoint
  708. # outer transaction continues
  709. connection.execute(...)
  710. If :meth:`_engine.Connection.begin_nested` is called without first
  711. calling :meth:`_engine.Connection.begin` or
  712. :meth:`_engine.Engine.begin`, the :class:`_engine.Connection` object
  713. will "autobegin" the outer transaction first. This outer transaction
  714. may be committed using "commit-as-you-go" style, e.g.::
  715. with engine.connect() as connection: # begin() wasn't called
  716. with connection.begin_nested(): # will auto-"begin()" first
  717. connection.execute(...)
  718. # savepoint is released
  719. connection.execute(...)
  720. # explicitly commit outer transaction
  721. connection.commit()
  722. # can continue working with connection here
  723. .. versionchanged:: 2.0
  724. :meth:`_engine.Connection.begin_nested` will now participate
  725. in the connection "autobegin" behavior that is new as of
  726. 2.0 / "future" style connections in 1.4.
  727. .. seealso::
  728. :meth:`_engine.Connection.begin`
  729. :ref:`session_begin_nested` - ORM support for SAVEPOINT
  730. """
  731. if self._transaction is None:
  732. self._autobegin()
  733. return NestedTransaction(self)
  734. def begin_twophase(self, xid: Optional[Any] = None) -> TwoPhaseTransaction:
  735. """Begin a two-phase or XA transaction and return a transaction
  736. handle.
  737. The returned object is an instance of :class:`.TwoPhaseTransaction`,
  738. which in addition to the methods provided by
  739. :class:`.Transaction`, also provides a
  740. :meth:`~.TwoPhaseTransaction.prepare` method.
  741. :param xid: the two phase transaction id. If not supplied, a
  742. random id will be generated.
  743. .. seealso::
  744. :meth:`_engine.Connection.begin`
  745. :meth:`_engine.Connection.begin_twophase`
  746. """
  747. if self._transaction is not None:
  748. raise exc.InvalidRequestError(
  749. "Cannot start a two phase transaction when a transaction "
  750. "is already in progress."
  751. )
  752. if xid is None:
  753. xid = self.engine.dialect.create_xid()
  754. return TwoPhaseTransaction(self, xid)
  755. def commit(self) -> None:
  756. """Commit the transaction that is currently in progress.
  757. This method commits the current transaction if one has been started.
  758. If no transaction was started, the method has no effect, assuming
  759. the connection is in a non-invalidated state.
  760. A transaction is begun on a :class:`_engine.Connection` automatically
  761. whenever a statement is first executed, or when the
  762. :meth:`_engine.Connection.begin` method is called.
  763. .. note:: The :meth:`_engine.Connection.commit` method only acts upon
  764. the primary database transaction that is linked to the
  765. :class:`_engine.Connection` object. It does not operate upon a
  766. SAVEPOINT that would have been invoked from the
  767. :meth:`_engine.Connection.begin_nested` method; for control of a
  768. SAVEPOINT, call :meth:`_engine.NestedTransaction.commit` on the
  769. :class:`_engine.NestedTransaction` that is returned by the
  770. :meth:`_engine.Connection.begin_nested` method itself.
  771. """
  772. if self._transaction:
  773. self._transaction.commit()
  774. def rollback(self) -> None:
  775. """Roll back the transaction that is currently in progress.
  776. This method rolls back the current transaction if one has been started.
  777. If no transaction was started, the method has no effect. If a
  778. transaction was started and the connection is in an invalidated state,
  779. the transaction is cleared using this method.
  780. A transaction is begun on a :class:`_engine.Connection` automatically
  781. whenever a statement is first executed, or when the
  782. :meth:`_engine.Connection.begin` method is called.
  783. .. note:: The :meth:`_engine.Connection.rollback` method only acts
  784. upon the primary database transaction that is linked to the
  785. :class:`_engine.Connection` object. It does not operate upon a
  786. SAVEPOINT that would have been invoked from the
  787. :meth:`_engine.Connection.begin_nested` method; for control of a
  788. SAVEPOINT, call :meth:`_engine.NestedTransaction.rollback` on the
  789. :class:`_engine.NestedTransaction` that is returned by the
  790. :meth:`_engine.Connection.begin_nested` method itself.
  791. """
  792. if self._transaction:
  793. self._transaction.rollback()
  794. def recover_twophase(self) -> List[Any]:
  795. return self.engine.dialect.do_recover_twophase(self)
  796. def rollback_prepared(self, xid: Any, recover: bool = False) -> None:
  797. self.engine.dialect.do_rollback_twophase(self, xid, recover=recover)
  798. def commit_prepared(self, xid: Any, recover: bool = False) -> None:
  799. self.engine.dialect.do_commit_twophase(self, xid, recover=recover)
  800. def in_transaction(self) -> bool:
  801. """Return True if a transaction is in progress."""
  802. return self._transaction is not None and self._transaction.is_active
  803. def in_nested_transaction(self) -> bool:
  804. """Return True if a transaction is in progress."""
  805. return (
  806. self._nested_transaction is not None
  807. and self._nested_transaction.is_active
  808. )
  809. def _is_autocommit_isolation(self) -> bool:
  810. opt_iso = self._execution_options.get("isolation_level", None)
  811. return bool(
  812. opt_iso == "AUTOCOMMIT"
  813. or (
  814. opt_iso is None
  815. and self.engine.dialect._on_connect_isolation_level
  816. == "AUTOCOMMIT"
  817. )
  818. )
  819. def _get_required_transaction(self) -> RootTransaction:
  820. trans = self._transaction
  821. if trans is None:
  822. raise exc.InvalidRequestError("connection is not in a transaction")
  823. return trans
  824. def _get_required_nested_transaction(self) -> NestedTransaction:
  825. trans = self._nested_transaction
  826. if trans is None:
  827. raise exc.InvalidRequestError(
  828. "connection is not in a nested transaction"
  829. )
  830. return trans
  831. def get_transaction(self) -> Optional[RootTransaction]:
  832. """Return the current root transaction in progress, if any.
  833. .. versionadded:: 1.4
  834. """
  835. return self._transaction
  836. def get_nested_transaction(self) -> Optional[NestedTransaction]:
  837. """Return the current nested transaction in progress, if any.
  838. .. versionadded:: 1.4
  839. """
  840. return self._nested_transaction
  841. def _begin_impl(self, transaction: RootTransaction) -> None:
  842. if self._echo:
  843. if self._is_autocommit_isolation():
  844. self._log_info(
  845. "BEGIN (implicit; DBAPI should not BEGIN due to "
  846. "autocommit mode)"
  847. )
  848. else:
  849. self._log_info("BEGIN (implicit)")
  850. self.__in_begin = True
  851. if self._has_events or self.engine._has_events:
  852. self.dispatch.begin(self)
  853. try:
  854. self.engine.dialect.do_begin(self.connection)
  855. except BaseException as e:
  856. self._handle_dbapi_exception(e, None, None, None, None)
  857. finally:
  858. self.__in_begin = False
  859. def _rollback_impl(self) -> None:
  860. if self._has_events or self.engine._has_events:
  861. self.dispatch.rollback(self)
  862. if self._still_open_and_dbapi_connection_is_valid:
  863. if self._echo:
  864. if self._is_autocommit_isolation():
  865. if self.dialect.skip_autocommit_rollback:
  866. self._log_info(
  867. "ROLLBACK will be skipped by "
  868. "skip_autocommit_rollback"
  869. )
  870. else:
  871. self._log_info(
  872. "ROLLBACK using DBAPI connection.rollback(); "
  873. "set skip_autocommit_rollback to prevent fully"
  874. )
  875. else:
  876. self._log_info("ROLLBACK")
  877. try:
  878. self.engine.dialect.do_rollback(self.connection)
  879. except BaseException as e:
  880. self._handle_dbapi_exception(e, None, None, None, None)
  881. def _commit_impl(self) -> None:
  882. if self._has_events or self.engine._has_events:
  883. self.dispatch.commit(self)
  884. if self._echo:
  885. if self._is_autocommit_isolation():
  886. self._log_info(
  887. "COMMIT using DBAPI connection.commit(), "
  888. "has no effect due to autocommit mode"
  889. )
  890. else:
  891. self._log_info("COMMIT")
  892. try:
  893. self.engine.dialect.do_commit(self.connection)
  894. except BaseException as e:
  895. self._handle_dbapi_exception(e, None, None, None, None)
  896. def _savepoint_impl(self, name: Optional[str] = None) -> str:
  897. if self._has_events or self.engine._has_events:
  898. self.dispatch.savepoint(self, name)
  899. if name is None:
  900. self.__savepoint_seq += 1
  901. name = "sa_savepoint_%s" % self.__savepoint_seq
  902. self.engine.dialect.do_savepoint(self, name)
  903. return name
  904. def _rollback_to_savepoint_impl(self, name: str) -> None:
  905. if self._has_events or self.engine._has_events:
  906. self.dispatch.rollback_savepoint(self, name, None)
  907. if self._still_open_and_dbapi_connection_is_valid:
  908. self.engine.dialect.do_rollback_to_savepoint(self, name)
  909. def _release_savepoint_impl(self, name: str) -> None:
  910. if self._has_events or self.engine._has_events:
  911. self.dispatch.release_savepoint(self, name, None)
  912. self.engine.dialect.do_release_savepoint(self, name)
  913. def _begin_twophase_impl(self, transaction: TwoPhaseTransaction) -> None:
  914. if self._echo:
  915. self._log_info("BEGIN TWOPHASE (implicit)")
  916. if self._has_events or self.engine._has_events:
  917. self.dispatch.begin_twophase(self, transaction.xid)
  918. self.__in_begin = True
  919. try:
  920. self.engine.dialect.do_begin_twophase(self, transaction.xid)
  921. except BaseException as e:
  922. self._handle_dbapi_exception(e, None, None, None, None)
  923. finally:
  924. self.__in_begin = False
  925. def _prepare_twophase_impl(self, xid: Any) -> None:
  926. if self._has_events or self.engine._has_events:
  927. self.dispatch.prepare_twophase(self, xid)
  928. assert isinstance(self._transaction, TwoPhaseTransaction)
  929. try:
  930. self.engine.dialect.do_prepare_twophase(self, xid)
  931. except BaseException as e:
  932. self._handle_dbapi_exception(e, None, None, None, None)
  933. def _rollback_twophase_impl(self, xid: Any, is_prepared: bool) -> None:
  934. if self._has_events or self.engine._has_events:
  935. self.dispatch.rollback_twophase(self, xid, is_prepared)
  936. if self._still_open_and_dbapi_connection_is_valid:
  937. assert isinstance(self._transaction, TwoPhaseTransaction)
  938. try:
  939. self.engine.dialect.do_rollback_twophase(
  940. self, xid, is_prepared
  941. )
  942. except BaseException as e:
  943. self._handle_dbapi_exception(e, None, None, None, None)
  944. def _commit_twophase_impl(self, xid: Any, is_prepared: bool) -> None:
  945. if self._has_events or self.engine._has_events:
  946. self.dispatch.commit_twophase(self, xid, is_prepared)
  947. assert isinstance(self._transaction, TwoPhaseTransaction)
  948. try:
  949. self.engine.dialect.do_commit_twophase(self, xid, is_prepared)
  950. except BaseException as e:
  951. self._handle_dbapi_exception(e, None, None, None, None)
  952. def close(self) -> None:
  953. """Close this :class:`_engine.Connection`.
  954. This results in a release of the underlying database
  955. resources, that is, the DBAPI connection referenced
  956. internally. The DBAPI connection is typically restored
  957. back to the connection-holding :class:`_pool.Pool` referenced
  958. by the :class:`_engine.Engine` that produced this
  959. :class:`_engine.Connection`. Any transactional state present on
  960. the DBAPI connection is also unconditionally released via
  961. the DBAPI connection's ``rollback()`` method, regardless
  962. of any :class:`.Transaction` object that may be
  963. outstanding with regards to this :class:`_engine.Connection`.
  964. This has the effect of also calling :meth:`_engine.Connection.rollback`
  965. if any transaction is in place.
  966. After :meth:`_engine.Connection.close` is called, the
  967. :class:`_engine.Connection` is permanently in a closed state,
  968. and will allow no further operations.
  969. """
  970. if self._transaction:
  971. self._transaction.close()
  972. skip_reset = True
  973. else:
  974. skip_reset = False
  975. if self._dbapi_connection is not None:
  976. conn = self._dbapi_connection
  977. # as we just closed the transaction, close the connection
  978. # pool connection without doing an additional reset
  979. if skip_reset:
  980. cast("_ConnectionFairy", conn)._close_special(
  981. transaction_reset=True
  982. )
  983. else:
  984. conn.close()
  985. # There is a slight chance that conn.close() may have
  986. # triggered an invalidation here in which case
  987. # _dbapi_connection would already be None, however usually
  988. # it will be non-None here and in a "closed" state.
  989. self._dbapi_connection = None
  990. self.__can_reconnect = False
  991. @overload
  992. def scalar(
  993. self,
  994. statement: TypedReturnsRows[Tuple[_T]],
  995. parameters: Optional[_CoreSingleExecuteParams] = None,
  996. *,
  997. execution_options: Optional[CoreExecuteOptionsParameter] = None,
  998. ) -> Optional[_T]: ...
  999. @overload
  1000. def scalar(
  1001. self,
  1002. statement: Executable,
  1003. parameters: Optional[_CoreSingleExecuteParams] = None,
  1004. *,
  1005. execution_options: Optional[CoreExecuteOptionsParameter] = None,
  1006. ) -> Any: ...
  1007. def scalar(
  1008. self,
  1009. statement: Executable,
  1010. parameters: Optional[_CoreSingleExecuteParams] = None,
  1011. *,
  1012. execution_options: Optional[CoreExecuteOptionsParameter] = None,
  1013. ) -> Any:
  1014. r"""Executes a SQL statement construct and returns a scalar object.
  1015. This method is shorthand for invoking the
  1016. :meth:`_engine.Result.scalar` method after invoking the
  1017. :meth:`_engine.Connection.execute` method. Parameters are equivalent.
  1018. :return: a scalar Python value representing the first column of the
  1019. first row returned.
  1020. """
  1021. distilled_parameters = _distill_params_20(parameters)
  1022. try:
  1023. meth = statement._execute_on_scalar
  1024. except AttributeError as err:
  1025. raise exc.ObjectNotExecutableError(statement) from err
  1026. else:
  1027. return meth(
  1028. self,
  1029. distilled_parameters,
  1030. execution_options or NO_OPTIONS,
  1031. )
  1032. @overload
  1033. def scalars(
  1034. self,
  1035. statement: TypedReturnsRows[Tuple[_T]],
  1036. parameters: Optional[_CoreAnyExecuteParams] = None,
  1037. *,
  1038. execution_options: Optional[CoreExecuteOptionsParameter] = None,
  1039. ) -> ScalarResult[_T]: ...
  1040. @overload
  1041. def scalars(
  1042. self,
  1043. statement: Executable,
  1044. parameters: Optional[_CoreAnyExecuteParams] = None,
  1045. *,
  1046. execution_options: Optional[CoreExecuteOptionsParameter] = None,
  1047. ) -> ScalarResult[Any]: ...
  1048. def scalars(
  1049. self,
  1050. statement: Executable,
  1051. parameters: Optional[_CoreAnyExecuteParams] = None,
  1052. *,
  1053. execution_options: Optional[CoreExecuteOptionsParameter] = None,
  1054. ) -> ScalarResult[Any]:
  1055. """Executes and returns a scalar result set, which yields scalar values
  1056. from the first column of each row.
  1057. This method is equivalent to calling :meth:`_engine.Connection.execute`
  1058. to receive a :class:`_result.Result` object, then invoking the
  1059. :meth:`_result.Result.scalars` method to produce a
  1060. :class:`_result.ScalarResult` instance.
  1061. :return: a :class:`_result.ScalarResult`
  1062. .. versionadded:: 1.4.24
  1063. """
  1064. return self.execute(
  1065. statement, parameters, execution_options=execution_options
  1066. ).scalars()
  1067. @overload
  1068. def execute(
  1069. self,
  1070. statement: TypedReturnsRows[_T],
  1071. parameters: Optional[_CoreAnyExecuteParams] = None,
  1072. *,
  1073. execution_options: Optional[CoreExecuteOptionsParameter] = None,
  1074. ) -> CursorResult[_T]: ...
  1075. @overload
  1076. def execute(
  1077. self,
  1078. statement: Executable,
  1079. parameters: Optional[_CoreAnyExecuteParams] = None,
  1080. *,
  1081. execution_options: Optional[CoreExecuteOptionsParameter] = None,
  1082. ) -> CursorResult[Any]: ...
  1083. def execute(
  1084. self,
  1085. statement: Executable,
  1086. parameters: Optional[_CoreAnyExecuteParams] = None,
  1087. *,
  1088. execution_options: Optional[CoreExecuteOptionsParameter] = None,
  1089. ) -> CursorResult[Any]:
  1090. r"""Executes a SQL statement construct and returns a
  1091. :class:`_engine.CursorResult`.
  1092. :param statement: The statement to be executed. This is always
  1093. an object that is in both the :class:`_expression.ClauseElement` and
  1094. :class:`_expression.Executable` hierarchies, including:
  1095. * :class:`_expression.Select`
  1096. * :class:`_expression.Insert`, :class:`_expression.Update`,
  1097. :class:`_expression.Delete`
  1098. * :class:`_expression.TextClause` and
  1099. :class:`_expression.TextualSelect`
  1100. * :class:`_schema.DDL` and objects which inherit from
  1101. :class:`_schema.ExecutableDDLElement`
  1102. :param parameters: parameters which will be bound into the statement.
  1103. This may be either a dictionary of parameter names to values,
  1104. or a mutable sequence (e.g. a list) of dictionaries. When a
  1105. list of dictionaries is passed, the underlying statement execution
  1106. will make use of the DBAPI ``cursor.executemany()`` method.
  1107. When a single dictionary is passed, the DBAPI ``cursor.execute()``
  1108. method will be used.
  1109. :param execution_options: optional dictionary of execution options,
  1110. which will be associated with the statement execution. This
  1111. dictionary can provide a subset of the options that are accepted
  1112. by :meth:`_engine.Connection.execution_options`.
  1113. :return: a :class:`_engine.Result` object.
  1114. """
  1115. distilled_parameters = _distill_params_20(parameters)
  1116. try:
  1117. meth = statement._execute_on_connection
  1118. except AttributeError as err:
  1119. raise exc.ObjectNotExecutableError(statement) from err
  1120. else:
  1121. return meth(
  1122. self,
  1123. distilled_parameters,
  1124. execution_options or NO_OPTIONS,
  1125. )
  1126. def _execute_function(
  1127. self,
  1128. func: FunctionElement[Any],
  1129. distilled_parameters: _CoreMultiExecuteParams,
  1130. execution_options: CoreExecuteOptionsParameter,
  1131. ) -> CursorResult[Any]:
  1132. """Execute a sql.FunctionElement object."""
  1133. return self._execute_clauseelement(
  1134. func.select(), distilled_parameters, execution_options
  1135. )
  1136. def _execute_default(
  1137. self,
  1138. default: DefaultGenerator,
  1139. distilled_parameters: _CoreMultiExecuteParams,
  1140. execution_options: CoreExecuteOptionsParameter,
  1141. ) -> Any:
  1142. """Execute a schema.ColumnDefault object."""
  1143. execution_options = self._execution_options.merge_with(
  1144. execution_options
  1145. )
  1146. event_multiparams: Optional[_CoreMultiExecuteParams]
  1147. event_params: Optional[_CoreAnyExecuteParams]
  1148. # note for event handlers, the "distilled parameters" which is always
  1149. # a list of dicts is broken out into separate "multiparams" and
  1150. # "params" collections, which allows the handler to distinguish
  1151. # between an executemany and execute style set of parameters.
  1152. if self._has_events or self.engine._has_events:
  1153. (
  1154. default,
  1155. distilled_parameters,
  1156. event_multiparams,
  1157. event_params,
  1158. ) = self._invoke_before_exec_event(
  1159. default, distilled_parameters, execution_options
  1160. )
  1161. else:
  1162. event_multiparams = event_params = None
  1163. try:
  1164. conn = self._dbapi_connection
  1165. if conn is None:
  1166. conn = self._revalidate_connection()
  1167. dialect = self.dialect
  1168. ctx = dialect.execution_ctx_cls._init_default(
  1169. dialect, self, conn, execution_options
  1170. )
  1171. except (exc.PendingRollbackError, exc.ResourceClosedError):
  1172. raise
  1173. except BaseException as e:
  1174. self._handle_dbapi_exception(e, None, None, None, None)
  1175. ret = ctx._exec_default(None, default, None)
  1176. if self._has_events or self.engine._has_events:
  1177. self.dispatch.after_execute(
  1178. self,
  1179. default,
  1180. event_multiparams,
  1181. event_params,
  1182. execution_options,
  1183. ret,
  1184. )
  1185. return ret
  1186. def _execute_ddl(
  1187. self,
  1188. ddl: ExecutableDDLElement,
  1189. distilled_parameters: _CoreMultiExecuteParams,
  1190. execution_options: CoreExecuteOptionsParameter,
  1191. ) -> CursorResult[Any]:
  1192. """Execute a schema.DDL object."""
  1193. exec_opts = ddl._execution_options.merge_with(
  1194. self._execution_options, execution_options
  1195. )
  1196. event_multiparams: Optional[_CoreMultiExecuteParams]
  1197. event_params: Optional[_CoreSingleExecuteParams]
  1198. if self._has_events or self.engine._has_events:
  1199. (
  1200. ddl,
  1201. distilled_parameters,
  1202. event_multiparams,
  1203. event_params,
  1204. ) = self._invoke_before_exec_event(
  1205. ddl, distilled_parameters, exec_opts
  1206. )
  1207. else:
  1208. event_multiparams = event_params = None
  1209. schema_translate_map = exec_opts.get("schema_translate_map", None)
  1210. dialect = self.dialect
  1211. compiled = ddl.compile(
  1212. dialect=dialect, schema_translate_map=schema_translate_map
  1213. )
  1214. ret = self._execute_context(
  1215. dialect,
  1216. dialect.execution_ctx_cls._init_ddl,
  1217. compiled,
  1218. None,
  1219. exec_opts,
  1220. compiled,
  1221. )
  1222. if self._has_events or self.engine._has_events:
  1223. self.dispatch.after_execute(
  1224. self,
  1225. ddl,
  1226. event_multiparams,
  1227. event_params,
  1228. exec_opts,
  1229. ret,
  1230. )
  1231. return ret
  1232. def _invoke_before_exec_event(
  1233. self,
  1234. elem: Any,
  1235. distilled_params: _CoreMultiExecuteParams,
  1236. execution_options: _ExecuteOptions,
  1237. ) -> Tuple[
  1238. Any,
  1239. _CoreMultiExecuteParams,
  1240. _CoreMultiExecuteParams,
  1241. _CoreSingleExecuteParams,
  1242. ]:
  1243. event_multiparams: _CoreMultiExecuteParams
  1244. event_params: _CoreSingleExecuteParams
  1245. if len(distilled_params) == 1:
  1246. event_multiparams, event_params = [], distilled_params[0]
  1247. else:
  1248. event_multiparams, event_params = distilled_params, {}
  1249. for fn in self.dispatch.before_execute:
  1250. elem, event_multiparams, event_params = fn(
  1251. self,
  1252. elem,
  1253. event_multiparams,
  1254. event_params,
  1255. execution_options,
  1256. )
  1257. if event_multiparams:
  1258. distilled_params = list(event_multiparams)
  1259. if event_params:
  1260. raise exc.InvalidRequestError(
  1261. "Event handler can't return non-empty multiparams "
  1262. "and params at the same time"
  1263. )
  1264. elif event_params:
  1265. distilled_params = [event_params]
  1266. else:
  1267. distilled_params = []
  1268. return elem, distilled_params, event_multiparams, event_params
  1269. def _execute_clauseelement(
  1270. self,
  1271. elem: Executable,
  1272. distilled_parameters: _CoreMultiExecuteParams,
  1273. execution_options: CoreExecuteOptionsParameter,
  1274. ) -> CursorResult[Any]:
  1275. """Execute a sql.ClauseElement object."""
  1276. execution_options = elem._execution_options.merge_with(
  1277. self._execution_options, execution_options
  1278. )
  1279. has_events = self._has_events or self.engine._has_events
  1280. if has_events:
  1281. (
  1282. elem,
  1283. distilled_parameters,
  1284. event_multiparams,
  1285. event_params,
  1286. ) = self._invoke_before_exec_event(
  1287. elem, distilled_parameters, execution_options
  1288. )
  1289. if distilled_parameters:
  1290. # ensure we don't retain a link to the view object for keys()
  1291. # which links to the values, which we don't want to cache
  1292. keys = sorted(distilled_parameters[0])
  1293. for_executemany = len(distilled_parameters) > 1
  1294. else:
  1295. keys = []
  1296. for_executemany = False
  1297. dialect = self.dialect
  1298. schema_translate_map = execution_options.get(
  1299. "schema_translate_map", None
  1300. )
  1301. compiled_cache: Optional[CompiledCacheType] = execution_options.get(
  1302. "compiled_cache", self.engine._compiled_cache
  1303. )
  1304. compiled_sql, extracted_params, cache_hit = elem._compile_w_cache(
  1305. dialect=dialect,
  1306. compiled_cache=compiled_cache,
  1307. column_keys=keys,
  1308. for_executemany=for_executemany,
  1309. schema_translate_map=schema_translate_map,
  1310. linting=self.dialect.compiler_linting | compiler.WARN_LINTING,
  1311. )
  1312. ret = self._execute_context(
  1313. dialect,
  1314. dialect.execution_ctx_cls._init_compiled,
  1315. compiled_sql,
  1316. distilled_parameters,
  1317. execution_options,
  1318. compiled_sql,
  1319. distilled_parameters,
  1320. elem,
  1321. extracted_params,
  1322. cache_hit=cache_hit,
  1323. )
  1324. if has_events:
  1325. self.dispatch.after_execute(
  1326. self,
  1327. elem,
  1328. event_multiparams,
  1329. event_params,
  1330. execution_options,
  1331. ret,
  1332. )
  1333. return ret
  1334. def _execute_compiled(
  1335. self,
  1336. compiled: Compiled,
  1337. distilled_parameters: _CoreMultiExecuteParams,
  1338. execution_options: CoreExecuteOptionsParameter = _EMPTY_EXECUTION_OPTS,
  1339. ) -> CursorResult[Any]:
  1340. """Execute a sql.Compiled object.
  1341. TODO: why do we have this? likely deprecate or remove
  1342. """
  1343. execution_options = compiled.execution_options.merge_with(
  1344. self._execution_options, execution_options
  1345. )
  1346. if self._has_events or self.engine._has_events:
  1347. (
  1348. compiled,
  1349. distilled_parameters,
  1350. event_multiparams,
  1351. event_params,
  1352. ) = self._invoke_before_exec_event(
  1353. compiled, distilled_parameters, execution_options
  1354. )
  1355. dialect = self.dialect
  1356. ret = self._execute_context(
  1357. dialect,
  1358. dialect.execution_ctx_cls._init_compiled,
  1359. compiled,
  1360. distilled_parameters,
  1361. execution_options,
  1362. compiled,
  1363. distilled_parameters,
  1364. None,
  1365. None,
  1366. )
  1367. if self._has_events or self.engine._has_events:
  1368. self.dispatch.after_execute(
  1369. self,
  1370. compiled,
  1371. event_multiparams,
  1372. event_params,
  1373. execution_options,
  1374. ret,
  1375. )
  1376. return ret
  1377. def exec_driver_sql(
  1378. self,
  1379. statement: str,
  1380. parameters: Optional[_DBAPIAnyExecuteParams] = None,
  1381. execution_options: Optional[CoreExecuteOptionsParameter] = None,
  1382. ) -> CursorResult[Any]:
  1383. r"""Executes a string SQL statement on the DBAPI cursor directly,
  1384. without any SQL compilation steps.
  1385. This can be used to pass any string directly to the
  1386. ``cursor.execute()`` method of the DBAPI in use.
  1387. :param statement: The statement str to be executed. Bound parameters
  1388. must use the underlying DBAPI's paramstyle, such as "qmark",
  1389. "pyformat", "format", etc.
  1390. :param parameters: represent bound parameter values to be used in the
  1391. execution. The format is one of: a dictionary of named parameters,
  1392. a tuple of positional parameters, or a list containing either
  1393. dictionaries or tuples for multiple-execute support.
  1394. :return: a :class:`_engine.CursorResult`.
  1395. E.g. multiple dictionaries::
  1396. conn.exec_driver_sql(
  1397. "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)",
  1398. [{"id": 1, "value": "v1"}, {"id": 2, "value": "v2"}],
  1399. )
  1400. Single dictionary::
  1401. conn.exec_driver_sql(
  1402. "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)",
  1403. dict(id=1, value="v1"),
  1404. )
  1405. Single tuple::
  1406. conn.exec_driver_sql(
  1407. "INSERT INTO table (id, value) VALUES (?, ?)", (1, "v1")
  1408. )
  1409. .. note:: The :meth:`_engine.Connection.exec_driver_sql` method does
  1410. not participate in the
  1411. :meth:`_events.ConnectionEvents.before_execute` and
  1412. :meth:`_events.ConnectionEvents.after_execute` events. To
  1413. intercept calls to :meth:`_engine.Connection.exec_driver_sql`, use
  1414. :meth:`_events.ConnectionEvents.before_cursor_execute` and
  1415. :meth:`_events.ConnectionEvents.after_cursor_execute`.
  1416. .. seealso::
  1417. :pep:`249`
  1418. """
  1419. distilled_parameters = _distill_raw_params(parameters)
  1420. execution_options = self._execution_options.merge_with(
  1421. execution_options
  1422. )
  1423. dialect = self.dialect
  1424. ret = self._execute_context(
  1425. dialect,
  1426. dialect.execution_ctx_cls._init_statement,
  1427. statement,
  1428. None,
  1429. execution_options,
  1430. statement,
  1431. distilled_parameters,
  1432. )
  1433. return ret
  1434. def _execute_context(
  1435. self,
  1436. dialect: Dialect,
  1437. constructor: Callable[..., ExecutionContext],
  1438. statement: Union[str, Compiled],
  1439. parameters: Optional[_AnyMultiExecuteParams],
  1440. execution_options: _ExecuteOptions,
  1441. *args: Any,
  1442. **kw: Any,
  1443. ) -> CursorResult[Any]:
  1444. """Create an :class:`.ExecutionContext` and execute, returning
  1445. a :class:`_engine.CursorResult`."""
  1446. if execution_options:
  1447. yp = execution_options.get("yield_per", None)
  1448. if yp:
  1449. execution_options = execution_options.union(
  1450. {"stream_results": True, "max_row_buffer": yp}
  1451. )
  1452. try:
  1453. conn = self._dbapi_connection
  1454. if conn is None:
  1455. conn = self._revalidate_connection()
  1456. context = constructor(
  1457. dialect, self, conn, execution_options, *args, **kw
  1458. )
  1459. except (exc.PendingRollbackError, exc.ResourceClosedError):
  1460. raise
  1461. except BaseException as e:
  1462. self._handle_dbapi_exception(
  1463. e, str(statement), parameters, None, None
  1464. )
  1465. if (
  1466. self._transaction
  1467. and not self._transaction.is_active
  1468. or (
  1469. self._nested_transaction
  1470. and not self._nested_transaction.is_active
  1471. )
  1472. ):
  1473. self._invalid_transaction()
  1474. elif self._trans_context_manager:
  1475. TransactionalContext._trans_ctx_check(self)
  1476. if self._transaction is None:
  1477. self._autobegin()
  1478. context.pre_exec()
  1479. if context.execute_style is ExecuteStyle.INSERTMANYVALUES:
  1480. return self._exec_insertmany_context(dialect, context)
  1481. else:
  1482. return self._exec_single_context(
  1483. dialect, context, statement, parameters
  1484. )
  1485. def _exec_single_context(
  1486. self,
  1487. dialect: Dialect,
  1488. context: ExecutionContext,
  1489. statement: Union[str, Compiled],
  1490. parameters: Optional[_AnyMultiExecuteParams],
  1491. ) -> CursorResult[Any]:
  1492. """continue the _execute_context() method for a single DBAPI
  1493. cursor.execute() or cursor.executemany() call.
  1494. """
  1495. if dialect.bind_typing is BindTyping.SETINPUTSIZES:
  1496. generic_setinputsizes = context._prepare_set_input_sizes()
  1497. if generic_setinputsizes:
  1498. try:
  1499. dialect.do_set_input_sizes(
  1500. context.cursor, generic_setinputsizes, context
  1501. )
  1502. except BaseException as e:
  1503. self._handle_dbapi_exception(
  1504. e, str(statement), parameters, None, context
  1505. )
  1506. cursor, str_statement, parameters = (
  1507. context.cursor,
  1508. context.statement,
  1509. context.parameters,
  1510. )
  1511. effective_parameters: Optional[_AnyExecuteParams]
  1512. if not context.executemany:
  1513. effective_parameters = parameters[0]
  1514. else:
  1515. effective_parameters = parameters
  1516. if self._has_events or self.engine._has_events:
  1517. for fn in self.dispatch.before_cursor_execute:
  1518. str_statement, effective_parameters = fn(
  1519. self,
  1520. cursor,
  1521. str_statement,
  1522. effective_parameters,
  1523. context,
  1524. context.executemany,
  1525. )
  1526. if self._echo:
  1527. self._log_info(str_statement)
  1528. stats = context._get_cache_stats()
  1529. if not self.engine.hide_parameters:
  1530. self._log_info(
  1531. "[%s] %r",
  1532. stats,
  1533. sql_util._repr_params(
  1534. effective_parameters,
  1535. batches=10,
  1536. ismulti=context.executemany,
  1537. ),
  1538. )
  1539. else:
  1540. self._log_info(
  1541. "[%s] [SQL parameters hidden due to hide_parameters=True]",
  1542. stats,
  1543. )
  1544. evt_handled: bool = False
  1545. try:
  1546. if context.execute_style is ExecuteStyle.EXECUTEMANY:
  1547. effective_parameters = cast(
  1548. "_CoreMultiExecuteParams", effective_parameters
  1549. )
  1550. if self.dialect._has_events:
  1551. for fn in self.dialect.dispatch.do_executemany:
  1552. if fn(
  1553. cursor,
  1554. str_statement,
  1555. effective_parameters,
  1556. context,
  1557. ):
  1558. evt_handled = True
  1559. break
  1560. if not evt_handled:
  1561. self.dialect.do_executemany(
  1562. cursor,
  1563. str_statement,
  1564. effective_parameters,
  1565. context,
  1566. )
  1567. elif not effective_parameters and context.no_parameters:
  1568. if self.dialect._has_events:
  1569. for fn in self.dialect.dispatch.do_execute_no_params:
  1570. if fn(cursor, str_statement, context):
  1571. evt_handled = True
  1572. break
  1573. if not evt_handled:
  1574. self.dialect.do_execute_no_params(
  1575. cursor, str_statement, context
  1576. )
  1577. else:
  1578. effective_parameters = cast(
  1579. "_CoreSingleExecuteParams", effective_parameters
  1580. )
  1581. if self.dialect._has_events:
  1582. for fn in self.dialect.dispatch.do_execute:
  1583. if fn(
  1584. cursor,
  1585. str_statement,
  1586. effective_parameters,
  1587. context,
  1588. ):
  1589. evt_handled = True
  1590. break
  1591. if not evt_handled:
  1592. self.dialect.do_execute(
  1593. cursor, str_statement, effective_parameters, context
  1594. )
  1595. if self._has_events or self.engine._has_events:
  1596. self.dispatch.after_cursor_execute(
  1597. self,
  1598. cursor,
  1599. str_statement,
  1600. effective_parameters,
  1601. context,
  1602. context.executemany,
  1603. )
  1604. context.post_exec()
  1605. result = context._setup_result_proxy()
  1606. except BaseException as e:
  1607. self._handle_dbapi_exception(
  1608. e, str_statement, effective_parameters, cursor, context
  1609. )
  1610. return result
  1611. def _exec_insertmany_context(
  1612. self,
  1613. dialect: Dialect,
  1614. context: ExecutionContext,
  1615. ) -> CursorResult[Any]:
  1616. """continue the _execute_context() method for an "insertmanyvalues"
  1617. operation, which will invoke DBAPI
  1618. cursor.execute() one or more times with individual log and
  1619. event hook calls.
  1620. """
  1621. if dialect.bind_typing is BindTyping.SETINPUTSIZES:
  1622. generic_setinputsizes = context._prepare_set_input_sizes()
  1623. else:
  1624. generic_setinputsizes = None
  1625. cursor, str_statement, parameters = (
  1626. context.cursor,
  1627. context.statement,
  1628. context.parameters,
  1629. )
  1630. effective_parameters = parameters
  1631. engine_events = self._has_events or self.engine._has_events
  1632. if self.dialect._has_events:
  1633. do_execute_dispatch: Iterable[Any] = (
  1634. self.dialect.dispatch.do_execute
  1635. )
  1636. else:
  1637. do_execute_dispatch = ()
  1638. if self._echo:
  1639. stats = context._get_cache_stats() + " (insertmanyvalues)"
  1640. preserve_rowcount = context.execution_options.get(
  1641. "preserve_rowcount", False
  1642. )
  1643. rowcount = 0
  1644. for imv_batch in dialect._deliver_insertmanyvalues_batches(
  1645. self,
  1646. cursor,
  1647. str_statement,
  1648. effective_parameters,
  1649. generic_setinputsizes,
  1650. context,
  1651. ):
  1652. if imv_batch.processed_setinputsizes:
  1653. try:
  1654. dialect.do_set_input_sizes(
  1655. context.cursor,
  1656. imv_batch.processed_setinputsizes,
  1657. context,
  1658. )
  1659. except BaseException as e:
  1660. self._handle_dbapi_exception(
  1661. e,
  1662. sql_util._long_statement(imv_batch.replaced_statement),
  1663. imv_batch.replaced_parameters,
  1664. None,
  1665. context,
  1666. is_sub_exec=True,
  1667. )
  1668. sub_stmt = imv_batch.replaced_statement
  1669. sub_params = imv_batch.replaced_parameters
  1670. if engine_events:
  1671. for fn in self.dispatch.before_cursor_execute:
  1672. sub_stmt, sub_params = fn(
  1673. self,
  1674. cursor,
  1675. sub_stmt,
  1676. sub_params,
  1677. context,
  1678. True,
  1679. )
  1680. if self._echo:
  1681. self._log_info(sql_util._long_statement(sub_stmt))
  1682. imv_stats = f""" {imv_batch.batchnum}/{
  1683. imv_batch.total_batches
  1684. } ({
  1685. 'ordered'
  1686. if imv_batch.rows_sorted else 'unordered'
  1687. }{
  1688. '; batch not supported'
  1689. if imv_batch.is_downgraded
  1690. else ''
  1691. })"""
  1692. if imv_batch.batchnum == 1:
  1693. stats += imv_stats
  1694. else:
  1695. stats = f"insertmanyvalues{imv_stats}"
  1696. if not self.engine.hide_parameters:
  1697. self._log_info(
  1698. "[%s] %r",
  1699. stats,
  1700. sql_util._repr_params(
  1701. sub_params,
  1702. batches=10,
  1703. ismulti=False,
  1704. ),
  1705. )
  1706. else:
  1707. self._log_info(
  1708. "[%s] [SQL parameters hidden due to "
  1709. "hide_parameters=True]",
  1710. stats,
  1711. )
  1712. try:
  1713. for fn in do_execute_dispatch:
  1714. if fn(
  1715. cursor,
  1716. sub_stmt,
  1717. sub_params,
  1718. context,
  1719. ):
  1720. break
  1721. else:
  1722. dialect.do_execute(
  1723. cursor,
  1724. sub_stmt,
  1725. sub_params,
  1726. context,
  1727. )
  1728. except BaseException as e:
  1729. self._handle_dbapi_exception(
  1730. e,
  1731. sql_util._long_statement(sub_stmt),
  1732. sub_params,
  1733. cursor,
  1734. context,
  1735. is_sub_exec=True,
  1736. )
  1737. if engine_events:
  1738. self.dispatch.after_cursor_execute(
  1739. self,
  1740. cursor,
  1741. str_statement,
  1742. effective_parameters,
  1743. context,
  1744. context.executemany,
  1745. )
  1746. if preserve_rowcount:
  1747. rowcount += imv_batch.current_batch_size
  1748. try:
  1749. context.post_exec()
  1750. if preserve_rowcount:
  1751. context._rowcount = rowcount # type: ignore[attr-defined]
  1752. result = context._setup_result_proxy()
  1753. except BaseException as e:
  1754. self._handle_dbapi_exception(
  1755. e, str_statement, effective_parameters, cursor, context
  1756. )
  1757. return result
  1758. def _cursor_execute(
  1759. self,
  1760. cursor: DBAPICursor,
  1761. statement: str,
  1762. parameters: _DBAPISingleExecuteParams,
  1763. context: Optional[ExecutionContext] = None,
  1764. ) -> None:
  1765. """Execute a statement + params on the given cursor.
  1766. Adds appropriate logging and exception handling.
  1767. This method is used by DefaultDialect for special-case
  1768. executions, such as for sequences and column defaults.
  1769. The path of statement execution in the majority of cases
  1770. terminates at _execute_context().
  1771. """
  1772. if self._has_events or self.engine._has_events:
  1773. for fn in self.dispatch.before_cursor_execute:
  1774. statement, parameters = fn(
  1775. self, cursor, statement, parameters, context, False
  1776. )
  1777. if self._echo:
  1778. self._log_info(statement)
  1779. self._log_info("[raw sql] %r", parameters)
  1780. try:
  1781. for fn in (
  1782. ()
  1783. if not self.dialect._has_events
  1784. else self.dialect.dispatch.do_execute
  1785. ):
  1786. if fn(cursor, statement, parameters, context):
  1787. break
  1788. else:
  1789. self.dialect.do_execute(cursor, statement, parameters, context)
  1790. except BaseException as e:
  1791. self._handle_dbapi_exception(
  1792. e, statement, parameters, cursor, context
  1793. )
  1794. if self._has_events or self.engine._has_events:
  1795. self.dispatch.after_cursor_execute(
  1796. self, cursor, statement, parameters, context, False
  1797. )
  1798. def _safe_close_cursor(self, cursor: DBAPICursor) -> None:
  1799. """Close the given cursor, catching exceptions
  1800. and turning into log warnings.
  1801. """
  1802. try:
  1803. cursor.close()
  1804. except Exception:
  1805. # log the error through the connection pool's logger.
  1806. self.engine.pool.logger.error(
  1807. "Error closing cursor", exc_info=True
  1808. )
  1809. _reentrant_error = False
  1810. _is_disconnect = False
  1811. def _handle_dbapi_exception(
  1812. self,
  1813. e: BaseException,
  1814. statement: Optional[str],
  1815. parameters: Optional[_AnyExecuteParams],
  1816. cursor: Optional[DBAPICursor],
  1817. context: Optional[ExecutionContext],
  1818. is_sub_exec: bool = False,
  1819. ) -> NoReturn:
  1820. exc_info = sys.exc_info()
  1821. is_exit_exception = util.is_exit_exception(e)
  1822. if not self._is_disconnect:
  1823. self._is_disconnect = (
  1824. isinstance(e, self.dialect.loaded_dbapi.Error)
  1825. and not self.closed
  1826. and self.dialect.is_disconnect(
  1827. e,
  1828. self._dbapi_connection if not self.invalidated else None,
  1829. cursor,
  1830. )
  1831. ) or (is_exit_exception and not self.closed)
  1832. invalidate_pool_on_disconnect = not is_exit_exception
  1833. ismulti: bool = (
  1834. not is_sub_exec and context.executemany
  1835. if context is not None
  1836. else False
  1837. )
  1838. if self._reentrant_error:
  1839. raise exc.DBAPIError.instance(
  1840. statement,
  1841. parameters,
  1842. e,
  1843. self.dialect.loaded_dbapi.Error,
  1844. hide_parameters=self.engine.hide_parameters,
  1845. dialect=self.dialect,
  1846. ismulti=ismulti,
  1847. ).with_traceback(exc_info[2]) from e
  1848. self._reentrant_error = True
  1849. try:
  1850. # non-DBAPI error - if we already got a context,
  1851. # or there's no string statement, don't wrap it
  1852. should_wrap = isinstance(e, self.dialect.loaded_dbapi.Error) or (
  1853. statement is not None
  1854. and context is None
  1855. and not is_exit_exception
  1856. )
  1857. if should_wrap:
  1858. sqlalchemy_exception = exc.DBAPIError.instance(
  1859. statement,
  1860. parameters,
  1861. cast(Exception, e),
  1862. self.dialect.loaded_dbapi.Error,
  1863. hide_parameters=self.engine.hide_parameters,
  1864. connection_invalidated=self._is_disconnect,
  1865. dialect=self.dialect,
  1866. ismulti=ismulti,
  1867. )
  1868. else:
  1869. sqlalchemy_exception = None
  1870. newraise = None
  1871. if (self.dialect._has_events) and not self._execution_options.get(
  1872. "skip_user_error_events", False
  1873. ):
  1874. ctx = ExceptionContextImpl(
  1875. e,
  1876. sqlalchemy_exception,
  1877. self.engine,
  1878. self.dialect,
  1879. self,
  1880. cursor,
  1881. statement,
  1882. parameters,
  1883. context,
  1884. self._is_disconnect,
  1885. invalidate_pool_on_disconnect,
  1886. False,
  1887. )
  1888. for fn in self.dialect.dispatch.handle_error:
  1889. try:
  1890. # handler returns an exception;
  1891. # call next handler in a chain
  1892. per_fn = fn(ctx)
  1893. if per_fn is not None:
  1894. ctx.chained_exception = newraise = per_fn
  1895. except Exception as _raised:
  1896. # handler raises an exception - stop processing
  1897. newraise = _raised
  1898. break
  1899. if self._is_disconnect != ctx.is_disconnect:
  1900. self._is_disconnect = ctx.is_disconnect
  1901. if sqlalchemy_exception:
  1902. sqlalchemy_exception.connection_invalidated = (
  1903. ctx.is_disconnect
  1904. )
  1905. # set up potentially user-defined value for
  1906. # invalidate pool.
  1907. invalidate_pool_on_disconnect = (
  1908. ctx.invalidate_pool_on_disconnect
  1909. )
  1910. if should_wrap and context:
  1911. context.handle_dbapi_exception(e)
  1912. if not self._is_disconnect:
  1913. if cursor:
  1914. self._safe_close_cursor(cursor)
  1915. # "autorollback" was mostly relevant in 1.x series.
  1916. # It's very unlikely to reach here, as the connection
  1917. # does autobegin so when we are here, we are usually
  1918. # in an explicit / semi-explicit transaction.
  1919. # however we have a test which manufactures this
  1920. # scenario in any case using an event handler.
  1921. # test/engine/test_execute.py-> test_actual_autorollback
  1922. if not self.in_transaction():
  1923. self._rollback_impl()
  1924. if newraise:
  1925. raise newraise.with_traceback(exc_info[2]) from e
  1926. elif should_wrap:
  1927. assert sqlalchemy_exception is not None
  1928. raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
  1929. else:
  1930. assert exc_info[1] is not None
  1931. raise exc_info[1].with_traceback(exc_info[2])
  1932. finally:
  1933. del self._reentrant_error
  1934. if self._is_disconnect:
  1935. del self._is_disconnect
  1936. if not self.invalidated:
  1937. dbapi_conn_wrapper = self._dbapi_connection
  1938. assert dbapi_conn_wrapper is not None
  1939. if invalidate_pool_on_disconnect:
  1940. self.engine.pool._invalidate(dbapi_conn_wrapper, e)
  1941. self.invalidate(e)
  1942. @classmethod
  1943. def _handle_dbapi_exception_noconnection(
  1944. cls,
  1945. e: BaseException,
  1946. dialect: Dialect,
  1947. engine: Optional[Engine] = None,
  1948. is_disconnect: Optional[bool] = None,
  1949. invalidate_pool_on_disconnect: bool = True,
  1950. is_pre_ping: bool = False,
  1951. ) -> NoReturn:
  1952. exc_info = sys.exc_info()
  1953. if is_disconnect is None:
  1954. is_disconnect = isinstance(
  1955. e, dialect.loaded_dbapi.Error
  1956. ) and dialect.is_disconnect(e, None, None)
  1957. should_wrap = isinstance(e, dialect.loaded_dbapi.Error)
  1958. if should_wrap:
  1959. sqlalchemy_exception = exc.DBAPIError.instance(
  1960. None,
  1961. None,
  1962. cast(Exception, e),
  1963. dialect.loaded_dbapi.Error,
  1964. hide_parameters=(
  1965. engine.hide_parameters if engine is not None else False
  1966. ),
  1967. connection_invalidated=is_disconnect,
  1968. dialect=dialect,
  1969. )
  1970. else:
  1971. sqlalchemy_exception = None
  1972. newraise = None
  1973. if dialect._has_events:
  1974. ctx = ExceptionContextImpl(
  1975. e,
  1976. sqlalchemy_exception,
  1977. engine,
  1978. dialect,
  1979. None,
  1980. None,
  1981. None,
  1982. None,
  1983. None,
  1984. is_disconnect,
  1985. invalidate_pool_on_disconnect,
  1986. is_pre_ping,
  1987. )
  1988. for fn in dialect.dispatch.handle_error:
  1989. try:
  1990. # handler returns an exception;
  1991. # call next handler in a chain
  1992. per_fn = fn(ctx)
  1993. if per_fn is not None:
  1994. ctx.chained_exception = newraise = per_fn
  1995. except Exception as _raised:
  1996. # handler raises an exception - stop processing
  1997. newraise = _raised
  1998. break
  1999. if sqlalchemy_exception and is_disconnect != ctx.is_disconnect:
  2000. sqlalchemy_exception.connection_invalidated = ctx.is_disconnect
  2001. if newraise:
  2002. raise newraise.with_traceback(exc_info[2]) from e
  2003. elif should_wrap:
  2004. assert sqlalchemy_exception is not None
  2005. raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
  2006. else:
  2007. assert exc_info[1] is not None
  2008. raise exc_info[1].with_traceback(exc_info[2])
  2009. def _run_ddl_visitor(
  2010. self,
  2011. visitorcallable: Type[InvokeDDLBase],
  2012. element: SchemaVisitable,
  2013. **kwargs: Any,
  2014. ) -> None:
  2015. """run a DDL visitor.
  2016. This method is only here so that the MockConnection can change the
  2017. options given to the visitor so that "checkfirst" is skipped.
  2018. """
  2019. visitorcallable(
  2020. dialect=self.dialect, connection=self, **kwargs
  2021. ).traverse_single(element)
  2022. class ExceptionContextImpl(ExceptionContext):
  2023. """Implement the :class:`.ExceptionContext` interface."""
  2024. __slots__ = (
  2025. "connection",
  2026. "engine",
  2027. "dialect",
  2028. "cursor",
  2029. "statement",
  2030. "parameters",
  2031. "original_exception",
  2032. "sqlalchemy_exception",
  2033. "chained_exception",
  2034. "execution_context",
  2035. "is_disconnect",
  2036. "invalidate_pool_on_disconnect",
  2037. "is_pre_ping",
  2038. )
  2039. def __init__(
  2040. self,
  2041. exception: BaseException,
  2042. sqlalchemy_exception: Optional[exc.StatementError],
  2043. engine: Optional[Engine],
  2044. dialect: Dialect,
  2045. connection: Optional[Connection],
  2046. cursor: Optional[DBAPICursor],
  2047. statement: Optional[str],
  2048. parameters: Optional[_DBAPIAnyExecuteParams],
  2049. context: Optional[ExecutionContext],
  2050. is_disconnect: bool,
  2051. invalidate_pool_on_disconnect: bool,
  2052. is_pre_ping: bool,
  2053. ):
  2054. self.engine = engine
  2055. self.dialect = dialect
  2056. self.connection = connection
  2057. self.sqlalchemy_exception = sqlalchemy_exception
  2058. self.original_exception = exception
  2059. self.execution_context = context
  2060. self.statement = statement
  2061. self.parameters = parameters
  2062. self.is_disconnect = is_disconnect
  2063. self.invalidate_pool_on_disconnect = invalidate_pool_on_disconnect
  2064. self.is_pre_ping = is_pre_ping
  2065. class Transaction(TransactionalContext):
  2066. """Represent a database transaction in progress.
  2067. The :class:`.Transaction` object is procured by
  2068. calling the :meth:`_engine.Connection.begin` method of
  2069. :class:`_engine.Connection`::
  2070. from sqlalchemy import create_engine
  2071. engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
  2072. connection = engine.connect()
  2073. trans = connection.begin()
  2074. connection.execute(text("insert into x (a, b) values (1, 2)"))
  2075. trans.commit()
  2076. The object provides :meth:`.rollback` and :meth:`.commit`
  2077. methods in order to control transaction boundaries. It
  2078. also implements a context manager interface so that
  2079. the Python ``with`` statement can be used with the
  2080. :meth:`_engine.Connection.begin` method::
  2081. with connection.begin():
  2082. connection.execute(text("insert into x (a, b) values (1, 2)"))
  2083. The Transaction object is **not** threadsafe.
  2084. .. seealso::
  2085. :meth:`_engine.Connection.begin`
  2086. :meth:`_engine.Connection.begin_twophase`
  2087. :meth:`_engine.Connection.begin_nested`
  2088. .. index::
  2089. single: thread safety; Transaction
  2090. """ # noqa
  2091. __slots__ = ()
  2092. _is_root: bool = False
  2093. is_active: bool
  2094. connection: Connection
  2095. def __init__(self, connection: Connection):
  2096. raise NotImplementedError()
  2097. @property
  2098. def _deactivated_from_connection(self) -> bool:
  2099. """True if this transaction is totally deactivated from the connection
  2100. and therefore can no longer affect its state.
  2101. """
  2102. raise NotImplementedError()
  2103. def _do_close(self) -> None:
  2104. raise NotImplementedError()
  2105. def _do_rollback(self) -> None:
  2106. raise NotImplementedError()
  2107. def _do_commit(self) -> None:
  2108. raise NotImplementedError()
  2109. @property
  2110. def is_valid(self) -> bool:
  2111. return self.is_active and not self.connection.invalidated
  2112. def close(self) -> None:
  2113. """Close this :class:`.Transaction`.
  2114. If this transaction is the base transaction in a begin/commit
  2115. nesting, the transaction will rollback(). Otherwise, the
  2116. method returns.
  2117. This is used to cancel a Transaction without affecting the scope of
  2118. an enclosing transaction.
  2119. """
  2120. try:
  2121. self._do_close()
  2122. finally:
  2123. assert not self.is_active
  2124. def rollback(self) -> None:
  2125. """Roll back this :class:`.Transaction`.
  2126. The implementation of this may vary based on the type of transaction in
  2127. use:
  2128. * For a simple database transaction (e.g. :class:`.RootTransaction`),
  2129. it corresponds to a ROLLBACK.
  2130. * For a :class:`.NestedTransaction`, it corresponds to a
  2131. "ROLLBACK TO SAVEPOINT" operation.
  2132. * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two
  2133. phase transactions may be used.
  2134. """
  2135. try:
  2136. self._do_rollback()
  2137. finally:
  2138. assert not self.is_active
  2139. def commit(self) -> None:
  2140. """Commit this :class:`.Transaction`.
  2141. The implementation of this may vary based on the type of transaction in
  2142. use:
  2143. * For a simple database transaction (e.g. :class:`.RootTransaction`),
  2144. it corresponds to a COMMIT.
  2145. * For a :class:`.NestedTransaction`, it corresponds to a
  2146. "RELEASE SAVEPOINT" operation.
  2147. * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two
  2148. phase transactions may be used.
  2149. """
  2150. try:
  2151. self._do_commit()
  2152. finally:
  2153. assert not self.is_active
  2154. def _get_subject(self) -> Connection:
  2155. return self.connection
  2156. def _transaction_is_active(self) -> bool:
  2157. return self.is_active
  2158. def _transaction_is_closed(self) -> bool:
  2159. return not self._deactivated_from_connection
  2160. def _rollback_can_be_called(self) -> bool:
  2161. # for RootTransaction / NestedTransaction, it's safe to call
  2162. # rollback() even if the transaction is deactive and no warnings
  2163. # will be emitted. tested in
  2164. # test_transaction.py -> test_no_rollback_in_deactive(?:_savepoint)?
  2165. return True
  2166. class RootTransaction(Transaction):
  2167. """Represent the "root" transaction on a :class:`_engine.Connection`.
  2168. This corresponds to the current "BEGIN/COMMIT/ROLLBACK" that's occurring
  2169. for the :class:`_engine.Connection`. The :class:`_engine.RootTransaction`
  2170. is created by calling upon the :meth:`_engine.Connection.begin` method, and
  2171. remains associated with the :class:`_engine.Connection` throughout its
  2172. active span. The current :class:`_engine.RootTransaction` in use is
  2173. accessible via the :attr:`_engine.Connection.get_transaction` method of
  2174. :class:`_engine.Connection`.
  2175. In :term:`2.0 style` use, the :class:`_engine.Connection` also employs
  2176. "autobegin" behavior that will create a new
  2177. :class:`_engine.RootTransaction` whenever a connection in a
  2178. non-transactional state is used to emit commands on the DBAPI connection.
  2179. The scope of the :class:`_engine.RootTransaction` in 2.0 style
  2180. use can be controlled using the :meth:`_engine.Connection.commit` and
  2181. :meth:`_engine.Connection.rollback` methods.
  2182. """
  2183. _is_root = True
  2184. __slots__ = ("connection", "is_active")
  2185. def __init__(self, connection: Connection):
  2186. assert connection._transaction is None
  2187. if connection._trans_context_manager:
  2188. TransactionalContext._trans_ctx_check(connection)
  2189. self.connection = connection
  2190. self._connection_begin_impl()
  2191. connection._transaction = self
  2192. self.is_active = True
  2193. def _deactivate_from_connection(self) -> None:
  2194. if self.is_active:
  2195. assert self.connection._transaction is self
  2196. self.is_active = False
  2197. elif self.connection._transaction is not self:
  2198. util.warn("transaction already deassociated from connection")
  2199. @property
  2200. def _deactivated_from_connection(self) -> bool:
  2201. return self.connection._transaction is not self
  2202. def _connection_begin_impl(self) -> None:
  2203. self.connection._begin_impl(self)
  2204. def _connection_rollback_impl(self) -> None:
  2205. self.connection._rollback_impl()
  2206. def _connection_commit_impl(self) -> None:
  2207. self.connection._commit_impl()
  2208. def _close_impl(self, try_deactivate: bool = False) -> None:
  2209. try:
  2210. if self.is_active:
  2211. self._connection_rollback_impl()
  2212. if self.connection._nested_transaction:
  2213. self.connection._nested_transaction._cancel()
  2214. finally:
  2215. if self.is_active or try_deactivate:
  2216. self._deactivate_from_connection()
  2217. if self.connection._transaction is self:
  2218. self.connection._transaction = None
  2219. assert not self.is_active
  2220. assert self.connection._transaction is not self
  2221. def _do_close(self) -> None:
  2222. self._close_impl()
  2223. def _do_rollback(self) -> None:
  2224. self._close_impl(try_deactivate=True)
  2225. def _do_commit(self) -> None:
  2226. if self.is_active:
  2227. assert self.connection._transaction is self
  2228. try:
  2229. self._connection_commit_impl()
  2230. finally:
  2231. # whether or not commit succeeds, cancel any
  2232. # nested transactions, make this transaction "inactive"
  2233. # and remove it as a reset agent
  2234. if self.connection._nested_transaction:
  2235. self.connection._nested_transaction._cancel()
  2236. self._deactivate_from_connection()
  2237. # ...however only remove as the connection's current transaction
  2238. # if commit succeeded. otherwise it stays on so that a rollback
  2239. # needs to occur.
  2240. self.connection._transaction = None
  2241. else:
  2242. if self.connection._transaction is self:
  2243. self.connection._invalid_transaction()
  2244. else:
  2245. raise exc.InvalidRequestError("This transaction is inactive")
  2246. assert not self.is_active
  2247. assert self.connection._transaction is not self
  2248. class NestedTransaction(Transaction):
  2249. """Represent a 'nested', or SAVEPOINT transaction.
  2250. The :class:`.NestedTransaction` object is created by calling the
  2251. :meth:`_engine.Connection.begin_nested` method of
  2252. :class:`_engine.Connection`.
  2253. When using :class:`.NestedTransaction`, the semantics of "begin" /
  2254. "commit" / "rollback" are as follows:
  2255. * the "begin" operation corresponds to the "BEGIN SAVEPOINT" command, where
  2256. the savepoint is given an explicit name that is part of the state
  2257. of this object.
  2258. * The :meth:`.NestedTransaction.commit` method corresponds to a
  2259. "RELEASE SAVEPOINT" operation, using the savepoint identifier associated
  2260. with this :class:`.NestedTransaction`.
  2261. * The :meth:`.NestedTransaction.rollback` method corresponds to a
  2262. "ROLLBACK TO SAVEPOINT" operation, using the savepoint identifier
  2263. associated with this :class:`.NestedTransaction`.
  2264. The rationale for mimicking the semantics of an outer transaction in
  2265. terms of savepoints so that code may deal with a "savepoint" transaction
  2266. and an "outer" transaction in an agnostic way.
  2267. .. seealso::
  2268. :ref:`session_begin_nested` - ORM version of the SAVEPOINT API.
  2269. """
  2270. __slots__ = ("connection", "is_active", "_savepoint", "_previous_nested")
  2271. _savepoint: str
  2272. def __init__(self, connection: Connection):
  2273. assert connection._transaction is not None
  2274. if connection._trans_context_manager:
  2275. TransactionalContext._trans_ctx_check(connection)
  2276. self.connection = connection
  2277. self._savepoint = self.connection._savepoint_impl()
  2278. self.is_active = True
  2279. self._previous_nested = connection._nested_transaction
  2280. connection._nested_transaction = self
  2281. def _deactivate_from_connection(self, warn: bool = True) -> None:
  2282. if self.connection._nested_transaction is self:
  2283. self.connection._nested_transaction = self._previous_nested
  2284. elif warn:
  2285. util.warn(
  2286. "nested transaction already deassociated from connection"
  2287. )
  2288. @property
  2289. def _deactivated_from_connection(self) -> bool:
  2290. return self.connection._nested_transaction is not self
  2291. def _cancel(self) -> None:
  2292. # called by RootTransaction when the outer transaction is
  2293. # committed, rolled back, or closed to cancel all savepoints
  2294. # without any action being taken
  2295. self.is_active = False
  2296. self._deactivate_from_connection()
  2297. if self._previous_nested:
  2298. self._previous_nested._cancel()
  2299. def _close_impl(
  2300. self, deactivate_from_connection: bool, warn_already_deactive: bool
  2301. ) -> None:
  2302. try:
  2303. if (
  2304. self.is_active
  2305. and self.connection._transaction
  2306. and self.connection._transaction.is_active
  2307. ):
  2308. self.connection._rollback_to_savepoint_impl(self._savepoint)
  2309. finally:
  2310. self.is_active = False
  2311. if deactivate_from_connection:
  2312. self._deactivate_from_connection(warn=warn_already_deactive)
  2313. assert not self.is_active
  2314. if deactivate_from_connection:
  2315. assert self.connection._nested_transaction is not self
  2316. def _do_close(self) -> None:
  2317. self._close_impl(True, False)
  2318. def _do_rollback(self) -> None:
  2319. self._close_impl(True, True)
  2320. def _do_commit(self) -> None:
  2321. if self.is_active:
  2322. try:
  2323. self.connection._release_savepoint_impl(self._savepoint)
  2324. finally:
  2325. # nested trans becomes inactive on failed release
  2326. # unconditionally. this prevents it from trying to
  2327. # emit SQL when it rolls back.
  2328. self.is_active = False
  2329. # but only de-associate from connection if it succeeded
  2330. self._deactivate_from_connection()
  2331. else:
  2332. if self.connection._nested_transaction is self:
  2333. self.connection._invalid_transaction()
  2334. else:
  2335. raise exc.InvalidRequestError(
  2336. "This nested transaction is inactive"
  2337. )
  2338. class TwoPhaseTransaction(RootTransaction):
  2339. """Represent a two-phase transaction.
  2340. A new :class:`.TwoPhaseTransaction` object may be procured
  2341. using the :meth:`_engine.Connection.begin_twophase` method.
  2342. The interface is the same as that of :class:`.Transaction`
  2343. with the addition of the :meth:`prepare` method.
  2344. """
  2345. __slots__ = ("xid", "_is_prepared")
  2346. xid: Any
  2347. def __init__(self, connection: Connection, xid: Any):
  2348. self._is_prepared = False
  2349. self.xid = xid
  2350. super().__init__(connection)
  2351. def prepare(self) -> None:
  2352. """Prepare this :class:`.TwoPhaseTransaction`.
  2353. After a PREPARE, the transaction can be committed.
  2354. """
  2355. if not self.is_active:
  2356. raise exc.InvalidRequestError("This transaction is inactive")
  2357. self.connection._prepare_twophase_impl(self.xid)
  2358. self._is_prepared = True
  2359. def _connection_begin_impl(self) -> None:
  2360. self.connection._begin_twophase_impl(self)
  2361. def _connection_rollback_impl(self) -> None:
  2362. self.connection._rollback_twophase_impl(self.xid, self._is_prepared)
  2363. def _connection_commit_impl(self) -> None:
  2364. self.connection._commit_twophase_impl(self.xid, self._is_prepared)
  2365. class Engine(
  2366. ConnectionEventsTarget, log.Identified, inspection.Inspectable["Inspector"]
  2367. ):
  2368. """
  2369. Connects a :class:`~sqlalchemy.pool.Pool` and
  2370. :class:`~sqlalchemy.engine.interfaces.Dialect` together to provide a
  2371. source of database connectivity and behavior.
  2372. An :class:`_engine.Engine` object is instantiated publicly using the
  2373. :func:`~sqlalchemy.create_engine` function.
  2374. .. seealso::
  2375. :doc:`/core/engines`
  2376. :ref:`connections_toplevel`
  2377. """
  2378. dispatch: dispatcher[ConnectionEventsTarget]
  2379. _compiled_cache: Optional[CompiledCacheType]
  2380. _execution_options: _ExecuteOptions = _EMPTY_EXECUTION_OPTS
  2381. _has_events: bool = False
  2382. _connection_cls: Type[Connection] = Connection
  2383. _sqla_logger_namespace: str = "sqlalchemy.engine.Engine"
  2384. _is_future: bool = False
  2385. _schema_translate_map: Optional[SchemaTranslateMapType] = None
  2386. _option_cls: Type[OptionEngine]
  2387. dialect: Dialect
  2388. pool: Pool
  2389. url: URL
  2390. hide_parameters: bool
  2391. def __init__(
  2392. self,
  2393. pool: Pool,
  2394. dialect: Dialect,
  2395. url: URL,
  2396. logging_name: Optional[str] = None,
  2397. echo: Optional[_EchoFlagType] = None,
  2398. query_cache_size: int = 500,
  2399. execution_options: Optional[Mapping[str, Any]] = None,
  2400. hide_parameters: bool = False,
  2401. ):
  2402. self.pool = pool
  2403. self.url = url
  2404. self.dialect = dialect
  2405. if logging_name:
  2406. self.logging_name = logging_name
  2407. self.echo = echo
  2408. self.hide_parameters = hide_parameters
  2409. if query_cache_size != 0:
  2410. self._compiled_cache = util.LRUCache(
  2411. query_cache_size, size_alert=self._lru_size_alert
  2412. )
  2413. else:
  2414. self._compiled_cache = None
  2415. log.instance_logger(self, echoflag=echo)
  2416. if execution_options:
  2417. self.update_execution_options(**execution_options)
  2418. def _lru_size_alert(self, cache: util.LRUCache[Any, Any]) -> None:
  2419. if self._should_log_info():
  2420. self.logger.info(
  2421. "Compiled cache size pruning from %d items to %d. "
  2422. "Increase cache size to reduce the frequency of pruning.",
  2423. len(cache),
  2424. cache.capacity,
  2425. )
  2426. @property
  2427. def engine(self) -> Engine:
  2428. """Returns this :class:`.Engine`.
  2429. Used for legacy schemes that accept :class:`.Connection` /
  2430. :class:`.Engine` objects within the same variable.
  2431. """
  2432. return self
  2433. def clear_compiled_cache(self) -> None:
  2434. """Clear the compiled cache associated with the dialect.
  2435. This applies **only** to the built-in cache that is established
  2436. via the :paramref:`_engine.create_engine.query_cache_size` parameter.
  2437. It will not impact any dictionary caches that were passed via the
  2438. :paramref:`.Connection.execution_options.compiled_cache` parameter.
  2439. .. versionadded:: 1.4
  2440. """
  2441. if self._compiled_cache:
  2442. self._compiled_cache.clear()
  2443. def update_execution_options(self, **opt: Any) -> None:
  2444. r"""Update the default execution_options dictionary
  2445. of this :class:`_engine.Engine`.
  2446. The given keys/values in \**opt are added to the
  2447. default execution options that will be used for
  2448. all connections. The initial contents of this dictionary
  2449. can be sent via the ``execution_options`` parameter
  2450. to :func:`_sa.create_engine`.
  2451. .. seealso::
  2452. :meth:`_engine.Connection.execution_options`
  2453. :meth:`_engine.Engine.execution_options`
  2454. """
  2455. self.dispatch.set_engine_execution_options(self, opt)
  2456. self._execution_options = self._execution_options.union(opt)
  2457. self.dialect.set_engine_execution_options(self, opt)
  2458. @overload
  2459. def execution_options(
  2460. self,
  2461. *,
  2462. compiled_cache: Optional[CompiledCacheType] = ...,
  2463. logging_token: str = ...,
  2464. isolation_level: IsolationLevel = ...,
  2465. insertmanyvalues_page_size: int = ...,
  2466. schema_translate_map: Optional[SchemaTranslateMapType] = ...,
  2467. **opt: Any,
  2468. ) -> OptionEngine: ...
  2469. @overload
  2470. def execution_options(self, **opt: Any) -> OptionEngine: ...
  2471. def execution_options(self, **opt: Any) -> OptionEngine:
  2472. """Return a new :class:`_engine.Engine` that will provide
  2473. :class:`_engine.Connection` objects with the given execution options.
  2474. The returned :class:`_engine.Engine` remains related to the original
  2475. :class:`_engine.Engine` in that it shares the same connection pool and
  2476. other state:
  2477. * The :class:`_pool.Pool` used by the new :class:`_engine.Engine`
  2478. is the
  2479. same instance. The :meth:`_engine.Engine.dispose`
  2480. method will replace
  2481. the connection pool instance for the parent engine as well
  2482. as this one.
  2483. * Event listeners are "cascaded" - meaning, the new
  2484. :class:`_engine.Engine`
  2485. inherits the events of the parent, and new events can be associated
  2486. with the new :class:`_engine.Engine` individually.
  2487. * The logging configuration and logging_name is copied from the parent
  2488. :class:`_engine.Engine`.
  2489. The intent of the :meth:`_engine.Engine.execution_options` method is
  2490. to implement schemes where multiple :class:`_engine.Engine`
  2491. objects refer to the same connection pool, but are differentiated
  2492. by options that affect some execution-level behavior for each
  2493. engine. One such example is breaking into separate "reader" and
  2494. "writer" :class:`_engine.Engine` instances, where one
  2495. :class:`_engine.Engine`
  2496. has a lower :term:`isolation level` setting configured or is even
  2497. transaction-disabled using "autocommit". An example of this
  2498. configuration is at :ref:`dbapi_autocommit_multiple`.
  2499. Another example is one that
  2500. uses a custom option ``shard_id`` which is consumed by an event
  2501. to change the current schema on a database connection::
  2502. from sqlalchemy import event
  2503. from sqlalchemy.engine import Engine
  2504. primary_engine = create_engine("mysql+mysqldb://")
  2505. shard1 = primary_engine.execution_options(shard_id="shard1")
  2506. shard2 = primary_engine.execution_options(shard_id="shard2")
  2507. shards = {"default": "base", "shard_1": "db1", "shard_2": "db2"}
  2508. @event.listens_for(Engine, "before_cursor_execute")
  2509. def _switch_shard(conn, cursor, stmt, params, context, executemany):
  2510. shard_id = conn.get_execution_options().get("shard_id", "default")
  2511. current_shard = conn.info.get("current_shard", None)
  2512. if current_shard != shard_id:
  2513. cursor.execute("use %s" % shards[shard_id])
  2514. conn.info["current_shard"] = shard_id
  2515. The above recipe illustrates two :class:`_engine.Engine` objects that
  2516. will each serve as factories for :class:`_engine.Connection` objects
  2517. that have pre-established "shard_id" execution options present. A
  2518. :meth:`_events.ConnectionEvents.before_cursor_execute` event handler
  2519. then interprets this execution option to emit a MySQL ``use`` statement
  2520. to switch databases before a statement execution, while at the same
  2521. time keeping track of which database we've established using the
  2522. :attr:`_engine.Connection.info` dictionary.
  2523. .. seealso::
  2524. :meth:`_engine.Connection.execution_options`
  2525. - update execution options
  2526. on a :class:`_engine.Connection` object.
  2527. :meth:`_engine.Engine.update_execution_options`
  2528. - update the execution
  2529. options for a given :class:`_engine.Engine` in place.
  2530. :meth:`_engine.Engine.get_execution_options`
  2531. """ # noqa: E501
  2532. return self._option_cls(self, opt)
  2533. def get_execution_options(self) -> _ExecuteOptions:
  2534. """Get the non-SQL options which will take effect during execution.
  2535. .. versionadded: 1.3
  2536. .. seealso::
  2537. :meth:`_engine.Engine.execution_options`
  2538. """
  2539. return self._execution_options
  2540. @property
  2541. def name(self) -> str:
  2542. """String name of the :class:`~sqlalchemy.engine.interfaces.Dialect`
  2543. in use by this :class:`Engine`.
  2544. """
  2545. return self.dialect.name
  2546. @property
  2547. def driver(self) -> str:
  2548. """Driver name of the :class:`~sqlalchemy.engine.interfaces.Dialect`
  2549. in use by this :class:`Engine`.
  2550. """
  2551. return self.dialect.driver
  2552. echo = log.echo_property()
  2553. def __repr__(self) -> str:
  2554. return "Engine(%r)" % (self.url,)
  2555. def dispose(self, close: bool = True) -> None:
  2556. """Dispose of the connection pool used by this
  2557. :class:`_engine.Engine`.
  2558. A new connection pool is created immediately after the old one has been
  2559. disposed. The previous connection pool is disposed either actively, by
  2560. closing out all currently checked-in connections in that pool, or
  2561. passively, by losing references to it but otherwise not closing any
  2562. connections. The latter strategy is more appropriate for an initializer
  2563. in a forked Python process.
  2564. :param close: if left at its default of ``True``, has the
  2565. effect of fully closing all **currently checked in**
  2566. database connections. Connections that are still checked out
  2567. will **not** be closed, however they will no longer be associated
  2568. with this :class:`_engine.Engine`,
  2569. so when they are closed individually, eventually the
  2570. :class:`_pool.Pool` which they are associated with will
  2571. be garbage collected and they will be closed out fully, if
  2572. not already closed on checkin.
  2573. If set to ``False``, the previous connection pool is de-referenced,
  2574. and otherwise not touched in any way.
  2575. .. versionadded:: 1.4.33 Added the :paramref:`.Engine.dispose.close`
  2576. parameter to allow the replacement of a connection pool in a child
  2577. process without interfering with the connections used by the parent
  2578. process.
  2579. .. seealso::
  2580. :ref:`engine_disposal`
  2581. :ref:`pooling_multiprocessing`
  2582. """
  2583. if close:
  2584. self.pool.dispose()
  2585. self.pool = self.pool.recreate()
  2586. self.dispatch.engine_disposed(self)
  2587. @contextlib.contextmanager
  2588. def _optional_conn_ctx_manager(
  2589. self, connection: Optional[Connection] = None
  2590. ) -> Iterator[Connection]:
  2591. if connection is None:
  2592. with self.connect() as conn:
  2593. yield conn
  2594. else:
  2595. yield connection
  2596. @contextlib.contextmanager
  2597. def begin(self) -> Iterator[Connection]:
  2598. """Return a context manager delivering a :class:`_engine.Connection`
  2599. with a :class:`.Transaction` established.
  2600. E.g.::
  2601. with engine.begin() as conn:
  2602. conn.execute(text("insert into table (x, y, z) values (1, 2, 3)"))
  2603. conn.execute(text("my_special_procedure(5)"))
  2604. Upon successful operation, the :class:`.Transaction`
  2605. is committed. If an error is raised, the :class:`.Transaction`
  2606. is rolled back.
  2607. .. seealso::
  2608. :meth:`_engine.Engine.connect` - procure a
  2609. :class:`_engine.Connection` from
  2610. an :class:`_engine.Engine`.
  2611. :meth:`_engine.Connection.begin` - start a :class:`.Transaction`
  2612. for a particular :class:`_engine.Connection`.
  2613. """ # noqa: E501
  2614. with self.connect() as conn:
  2615. with conn.begin():
  2616. yield conn
  2617. def _run_ddl_visitor(
  2618. self,
  2619. visitorcallable: Type[InvokeDDLBase],
  2620. element: SchemaVisitable,
  2621. **kwargs: Any,
  2622. ) -> None:
  2623. with self.begin() as conn:
  2624. conn._run_ddl_visitor(visitorcallable, element, **kwargs)
  2625. def connect(self) -> Connection:
  2626. """Return a new :class:`_engine.Connection` object.
  2627. The :class:`_engine.Connection` acts as a Python context manager, so
  2628. the typical use of this method looks like::
  2629. with engine.connect() as connection:
  2630. connection.execute(text("insert into table values ('foo')"))
  2631. connection.commit()
  2632. Where above, after the block is completed, the connection is "closed"
  2633. and its underlying DBAPI resources are returned to the connection pool.
  2634. This also has the effect of rolling back any transaction that
  2635. was explicitly begun or was begun via autobegin, and will
  2636. emit the :meth:`_events.ConnectionEvents.rollback` event if one was
  2637. started and is still in progress.
  2638. .. seealso::
  2639. :meth:`_engine.Engine.begin`
  2640. """
  2641. return self._connection_cls(self)
  2642. def raw_connection(self) -> PoolProxiedConnection:
  2643. """Return a "raw" DBAPI connection from the connection pool.
  2644. The returned object is a proxied version of the DBAPI
  2645. connection object used by the underlying driver in use.
  2646. The object will have all the same behavior as the real DBAPI
  2647. connection, except that its ``close()`` method will result in the
  2648. connection being returned to the pool, rather than being closed
  2649. for real.
  2650. This method provides direct DBAPI connection access for
  2651. special situations when the API provided by
  2652. :class:`_engine.Connection`
  2653. is not needed. When a :class:`_engine.Connection` object is already
  2654. present, the DBAPI connection is available using
  2655. the :attr:`_engine.Connection.connection` accessor.
  2656. .. seealso::
  2657. :ref:`dbapi_connections`
  2658. """
  2659. return self.pool.connect()
  2660. class OptionEngineMixin(log.Identified):
  2661. _sa_propagate_class_events = False
  2662. dispatch: dispatcher[ConnectionEventsTarget]
  2663. _compiled_cache: Optional[CompiledCacheType]
  2664. dialect: Dialect
  2665. pool: Pool
  2666. url: URL
  2667. hide_parameters: bool
  2668. echo: log.echo_property
  2669. def __init__(
  2670. self, proxied: Engine, execution_options: CoreExecuteOptionsParameter
  2671. ):
  2672. self._proxied = proxied
  2673. self.url = proxied.url
  2674. self.dialect = proxied.dialect
  2675. self.logging_name = proxied.logging_name
  2676. self.echo = proxied.echo
  2677. self._compiled_cache = proxied._compiled_cache
  2678. self.hide_parameters = proxied.hide_parameters
  2679. log.instance_logger(self, echoflag=self.echo)
  2680. # note: this will propagate events that are assigned to the parent
  2681. # engine after this OptionEngine is created. Since we share
  2682. # the events of the parent we also disallow class-level events
  2683. # to apply to the OptionEngine class directly.
  2684. #
  2685. # the other way this can work would be to transfer existing
  2686. # events only, using:
  2687. # self.dispatch._update(proxied.dispatch)
  2688. #
  2689. # that might be more appropriate however it would be a behavioral
  2690. # change for logic that assigns events to the parent engine and
  2691. # would like it to take effect for the already-created sub-engine.
  2692. self.dispatch = self.dispatch._join(proxied.dispatch)
  2693. self._execution_options = proxied._execution_options
  2694. self.update_execution_options(**execution_options)
  2695. def update_execution_options(self, **opt: Any) -> None:
  2696. raise NotImplementedError()
  2697. if not typing.TYPE_CHECKING:
  2698. # https://github.com/python/typing/discussions/1095
  2699. @property
  2700. def pool(self) -> Pool:
  2701. return self._proxied.pool
  2702. @pool.setter
  2703. def pool(self, pool: Pool) -> None:
  2704. self._proxied.pool = pool
  2705. @property
  2706. def _has_events(self) -> bool:
  2707. return self._proxied._has_events or self.__dict__.get(
  2708. "_has_events", False
  2709. )
  2710. @_has_events.setter
  2711. def _has_events(self, value: bool) -> None:
  2712. self.__dict__["_has_events"] = value
  2713. class OptionEngine(OptionEngineMixin, Engine):
  2714. def update_execution_options(self, **opt: Any) -> None:
  2715. Engine.update_execution_options(self, **opt)
  2716. Engine._option_cls = OptionEngine