| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469 |
- # ext/asyncio/engine.py
- # Copyright (C) 2020-2025 the SQLAlchemy authors and contributors
- # <see AUTHORS file>
- #
- # This module is part of SQLAlchemy and is released under
- # the MIT License: https://www.opensource.org/licenses/mit-license.php
- from __future__ import annotations
- import asyncio
- import contextlib
- from typing import Any
- from typing import AsyncIterator
- from typing import Callable
- from typing import Dict
- from typing import Generator
- from typing import NoReturn
- from typing import Optional
- from typing import overload
- from typing import Tuple
- from typing import Type
- from typing import TYPE_CHECKING
- from typing import TypeVar
- from typing import Union
- from . import exc as async_exc
- from .base import asyncstartablecontext
- from .base import GeneratorStartableContext
- from .base import ProxyComparable
- from .base import StartableContext
- from .result import _ensure_sync_result
- from .result import AsyncResult
- from .result import AsyncScalarResult
- from ... import exc
- from ... import inspection
- from ... import util
- from ...engine import Connection
- from ...engine import create_engine as _create_engine
- from ...engine import create_pool_from_url as _create_pool_from_url
- from ...engine import Engine
- from ...engine.base import NestedTransaction
- from ...engine.base import Transaction
- from ...exc import ArgumentError
- from ...util.concurrency import greenlet_spawn
- from ...util.typing import Concatenate
- from ...util.typing import ParamSpec
- if TYPE_CHECKING:
- from ...engine.cursor import CursorResult
- from ...engine.interfaces import _CoreAnyExecuteParams
- from ...engine.interfaces import _CoreSingleExecuteParams
- from ...engine.interfaces import _DBAPIAnyExecuteParams
- from ...engine.interfaces import _ExecuteOptions
- from ...engine.interfaces import CompiledCacheType
- from ...engine.interfaces import CoreExecuteOptionsParameter
- from ...engine.interfaces import Dialect
- from ...engine.interfaces import IsolationLevel
- from ...engine.interfaces import SchemaTranslateMapType
- from ...engine.result import ScalarResult
- from ...engine.url import URL
- from ...pool import Pool
- from ...pool import PoolProxiedConnection
- from ...sql._typing import _InfoType
- from ...sql.base import Executable
- from ...sql.selectable import TypedReturnsRows
- _P = ParamSpec("_P")
- _T = TypeVar("_T", bound=Any)
- def create_async_engine(url: Union[str, URL], **kw: Any) -> AsyncEngine:
- """Create a new async engine instance.
- Arguments passed to :func:`_asyncio.create_async_engine` are mostly
- identical to those passed to the :func:`_sa.create_engine` function.
- The specified dialect must be an asyncio-compatible dialect
- such as :ref:`dialect-postgresql-asyncpg`.
- .. versionadded:: 1.4
- :param async_creator: an async callable which returns a driver-level
- asyncio connection. If given, the function should take no arguments,
- and return a new asyncio connection from the underlying asyncio
- database driver; the connection will be wrapped in the appropriate
- structures to be used with the :class:`.AsyncEngine`. Note that the
- parameters specified in the URL are not applied here, and the creator
- function should use its own connection parameters.
- This parameter is the asyncio equivalent of the
- :paramref:`_sa.create_engine.creator` parameter of the
- :func:`_sa.create_engine` function.
- .. versionadded:: 2.0.16
- """
- if kw.get("server_side_cursors", False):
- raise async_exc.AsyncMethodRequired(
- "Can't set server_side_cursors for async engine globally; "
- "use the connection.stream() method for an async "
- "streaming result set"
- )
- kw["_is_async"] = True
- async_creator = kw.pop("async_creator", None)
- if async_creator:
- if kw.get("creator", None):
- raise ArgumentError(
- "Can only specify one of 'async_creator' or 'creator', "
- "not both."
- )
- def creator() -> Any:
- # note that to send adapted arguments like
- # prepared_statement_cache_size, user would use
- # "creator" and emulate this form here
- return sync_engine.dialect.dbapi.connect( # type: ignore
- async_creator_fn=async_creator
- )
- kw["creator"] = creator
- sync_engine = _create_engine(url, **kw)
- return AsyncEngine(sync_engine)
- def async_engine_from_config(
- configuration: Dict[str, Any], prefix: str = "sqlalchemy.", **kwargs: Any
- ) -> AsyncEngine:
- """Create a new AsyncEngine instance using a configuration dictionary.
- This function is analogous to the :func:`_sa.engine_from_config` function
- in SQLAlchemy Core, except that the requested dialect must be an
- asyncio-compatible dialect such as :ref:`dialect-postgresql-asyncpg`.
- The argument signature of the function is identical to that
- of :func:`_sa.engine_from_config`.
- .. versionadded:: 1.4.29
- """
- options = {
- key[len(prefix) :]: value
- for key, value in configuration.items()
- if key.startswith(prefix)
- }
- options["_coerce_config"] = True
- options.update(kwargs)
- url = options.pop("url")
- return create_async_engine(url, **options)
- def create_async_pool_from_url(url: Union[str, URL], **kwargs: Any) -> Pool:
- """Create a new async engine instance.
- Arguments passed to :func:`_asyncio.create_async_pool_from_url` are mostly
- identical to those passed to the :func:`_sa.create_pool_from_url` function.
- The specified dialect must be an asyncio-compatible dialect
- such as :ref:`dialect-postgresql-asyncpg`.
- .. versionadded:: 2.0.10
- """
- kwargs["_is_async"] = True
- return _create_pool_from_url(url, **kwargs)
- class AsyncConnectable:
- __slots__ = "_slots_dispatch", "__weakref__"
- @classmethod
- def _no_async_engine_events(cls) -> NoReturn:
- raise NotImplementedError(
- "asynchronous events are not implemented at this time. Apply "
- "synchronous listeners to the AsyncEngine.sync_engine or "
- "AsyncConnection.sync_connection attributes."
- )
- @util.create_proxy_methods(
- Connection,
- ":class:`_engine.Connection`",
- ":class:`_asyncio.AsyncConnection`",
- classmethods=[],
- methods=[],
- attributes=[
- "closed",
- "invalidated",
- "dialect",
- "default_isolation_level",
- ],
- )
- class AsyncConnection(
- ProxyComparable[Connection],
- StartableContext["AsyncConnection"],
- AsyncConnectable,
- ):
- """An asyncio proxy for a :class:`_engine.Connection`.
- :class:`_asyncio.AsyncConnection` is acquired using the
- :meth:`_asyncio.AsyncEngine.connect`
- method of :class:`_asyncio.AsyncEngine`::
- from sqlalchemy.ext.asyncio import create_async_engine
- engine = create_async_engine("postgresql+asyncpg://user:pass@host/dbname")
- async with engine.connect() as conn:
- result = await conn.execute(select(table))
- .. versionadded:: 1.4
- """ # noqa
- # AsyncConnection is a thin proxy; no state should be added here
- # that is not retrievable from the "sync" engine / connection, e.g.
- # current transaction, info, etc. It should be possible to
- # create a new AsyncConnection that matches this one given only the
- # "sync" elements.
- __slots__ = (
- "engine",
- "sync_engine",
- "sync_connection",
- )
- def __init__(
- self,
- async_engine: AsyncEngine,
- sync_connection: Optional[Connection] = None,
- ):
- self.engine = async_engine
- self.sync_engine = async_engine.sync_engine
- self.sync_connection = self._assign_proxied(sync_connection)
- sync_connection: Optional[Connection]
- """Reference to the sync-style :class:`_engine.Connection` this
- :class:`_asyncio.AsyncConnection` proxies requests towards.
- This instance can be used as an event target.
- .. seealso::
- :ref:`asyncio_events`
- """
- sync_engine: Engine
- """Reference to the sync-style :class:`_engine.Engine` this
- :class:`_asyncio.AsyncConnection` is associated with via its underlying
- :class:`_engine.Connection`.
- This instance can be used as an event target.
- .. seealso::
- :ref:`asyncio_events`
- """
- @classmethod
- def _regenerate_proxy_for_target(
- cls, target: Connection, **additional_kw: Any # noqa: U100
- ) -> AsyncConnection:
- return AsyncConnection(
- AsyncEngine._retrieve_proxy_for_target(target.engine), target
- )
- async def start(
- self, is_ctxmanager: bool = False # noqa: U100
- ) -> AsyncConnection:
- """Start this :class:`_asyncio.AsyncConnection` object's context
- outside of using a Python ``with:`` block.
- """
- if self.sync_connection:
- raise exc.InvalidRequestError("connection is already started")
- self.sync_connection = self._assign_proxied(
- await greenlet_spawn(self.sync_engine.connect)
- )
- return self
- @property
- def connection(self) -> NoReturn:
- """Not implemented for async; call
- :meth:`_asyncio.AsyncConnection.get_raw_connection`.
- """
- raise exc.InvalidRequestError(
- "AsyncConnection.connection accessor is not implemented as the "
- "attribute may need to reconnect on an invalidated connection. "
- "Use the get_raw_connection() method."
- )
- async def get_raw_connection(self) -> PoolProxiedConnection:
- """Return the pooled DBAPI-level connection in use by this
- :class:`_asyncio.AsyncConnection`.
- This is a SQLAlchemy connection-pool proxied connection
- which then has the attribute
- :attr:`_pool._ConnectionFairy.driver_connection` that refers to the
- actual driver connection. Its
- :attr:`_pool._ConnectionFairy.dbapi_connection` refers instead
- to an :class:`_engine.AdaptedConnection` instance that
- adapts the driver connection to the DBAPI protocol.
- """
- return await greenlet_spawn(getattr, self._proxied, "connection")
- @util.ro_non_memoized_property
- def info(self) -> _InfoType:
- """Return the :attr:`_engine.Connection.info` dictionary of the
- underlying :class:`_engine.Connection`.
- This dictionary is freely writable for user-defined state to be
- associated with the database connection.
- This attribute is only available if the :class:`.AsyncConnection` is
- currently connected. If the :attr:`.AsyncConnection.closed` attribute
- is ``True``, then accessing this attribute will raise
- :class:`.ResourceClosedError`.
- .. versionadded:: 1.4.0b2
- """
- return self._proxied.info
- @util.ro_non_memoized_property
- def _proxied(self) -> Connection:
- if not self.sync_connection:
- self._raise_for_not_started()
- return self.sync_connection
- def begin(self) -> AsyncTransaction:
- """Begin a transaction prior to autobegin occurring."""
- assert self._proxied
- return AsyncTransaction(self)
- def begin_nested(self) -> AsyncTransaction:
- """Begin a nested transaction and return a transaction handle."""
- assert self._proxied
- return AsyncTransaction(self, nested=True)
- async def invalidate(
- self, exception: Optional[BaseException] = None
- ) -> None:
- """Invalidate the underlying DBAPI connection associated with
- this :class:`_engine.Connection`.
- See the method :meth:`_engine.Connection.invalidate` for full
- detail on this method.
- """
- return await greenlet_spawn(
- self._proxied.invalidate, exception=exception
- )
- async def get_isolation_level(self) -> IsolationLevel:
- return await greenlet_spawn(self._proxied.get_isolation_level)
- def in_transaction(self) -> bool:
- """Return True if a transaction is in progress."""
- return self._proxied.in_transaction()
- def in_nested_transaction(self) -> bool:
- """Return True if a transaction is in progress.
- .. versionadded:: 1.4.0b2
- """
- return self._proxied.in_nested_transaction()
- def get_transaction(self) -> Optional[AsyncTransaction]:
- """Return an :class:`.AsyncTransaction` representing the current
- transaction, if any.
- This makes use of the underlying synchronous connection's
- :meth:`_engine.Connection.get_transaction` method to get the current
- :class:`_engine.Transaction`, which is then proxied in a new
- :class:`.AsyncTransaction` object.
- .. versionadded:: 1.4.0b2
- """
- trans = self._proxied.get_transaction()
- if trans is not None:
- return AsyncTransaction._retrieve_proxy_for_target(trans)
- else:
- return None
- def get_nested_transaction(self) -> Optional[AsyncTransaction]:
- """Return an :class:`.AsyncTransaction` representing the current
- nested (savepoint) transaction, if any.
- This makes use of the underlying synchronous connection's
- :meth:`_engine.Connection.get_nested_transaction` method to get the
- current :class:`_engine.Transaction`, which is then proxied in a new
- :class:`.AsyncTransaction` object.
- .. versionadded:: 1.4.0b2
- """
- trans = self._proxied.get_nested_transaction()
- if trans is not None:
- return AsyncTransaction._retrieve_proxy_for_target(trans)
- else:
- return None
- @overload
- async def execution_options(
- self,
- *,
- compiled_cache: Optional[CompiledCacheType] = ...,
- logging_token: str = ...,
- isolation_level: IsolationLevel = ...,
- no_parameters: bool = False,
- stream_results: bool = False,
- max_row_buffer: int = ...,
- yield_per: int = ...,
- insertmanyvalues_page_size: int = ...,
- schema_translate_map: Optional[SchemaTranslateMapType] = ...,
- preserve_rowcount: bool = False,
- **opt: Any,
- ) -> AsyncConnection: ...
- @overload
- async def execution_options(self, **opt: Any) -> AsyncConnection: ...
- async def execution_options(self, **opt: Any) -> AsyncConnection:
- r"""Set non-SQL options for the connection which take effect
- during execution.
- This returns this :class:`_asyncio.AsyncConnection` object with
- the new options added.
- See :meth:`_engine.Connection.execution_options` for full details
- on this method.
- """
- conn = self._proxied
- c2 = await greenlet_spawn(conn.execution_options, **opt)
- assert c2 is conn
- return self
- async def commit(self) -> None:
- """Commit the transaction that is currently in progress.
- This method commits the current transaction if one has been started.
- If no transaction was started, the method has no effect, assuming
- the connection is in a non-invalidated state.
- A transaction is begun on a :class:`_engine.Connection` automatically
- whenever a statement is first executed, or when the
- :meth:`_engine.Connection.begin` method is called.
- """
- await greenlet_spawn(self._proxied.commit)
- async def rollback(self) -> None:
- """Roll back the transaction that is currently in progress.
- This method rolls back the current transaction if one has been started.
- If no transaction was started, the method has no effect. If a
- transaction was started and the connection is in an invalidated state,
- the transaction is cleared using this method.
- A transaction is begun on a :class:`_engine.Connection` automatically
- whenever a statement is first executed, or when the
- :meth:`_engine.Connection.begin` method is called.
- """
- await greenlet_spawn(self._proxied.rollback)
- async def close(self) -> None:
- """Close this :class:`_asyncio.AsyncConnection`.
- This has the effect of also rolling back the transaction if one
- is in place.
- """
- await greenlet_spawn(self._proxied.close)
- async def aclose(self) -> None:
- """A synonym for :meth:`_asyncio.AsyncConnection.close`.
- The :meth:`_asyncio.AsyncConnection.aclose` name is specifically
- to support the Python standard library ``@contextlib.aclosing``
- context manager function.
- .. versionadded:: 2.0.20
- """
- await self.close()
- async def exec_driver_sql(
- self,
- statement: str,
- parameters: Optional[_DBAPIAnyExecuteParams] = None,
- execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> CursorResult[Any]:
- r"""Executes a driver-level SQL string and return buffered
- :class:`_engine.Result`.
- """
- result = await greenlet_spawn(
- self._proxied.exec_driver_sql,
- statement,
- parameters,
- execution_options,
- _require_await=True,
- )
- return await _ensure_sync_result(result, self.exec_driver_sql)
- @overload
- def stream(
- self,
- statement: TypedReturnsRows[_T],
- parameters: Optional[_CoreAnyExecuteParams] = None,
- *,
- execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> GeneratorStartableContext[AsyncResult[_T]]: ...
- @overload
- def stream(
- self,
- statement: Executable,
- parameters: Optional[_CoreAnyExecuteParams] = None,
- *,
- execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> GeneratorStartableContext[AsyncResult[Any]]: ...
- @asyncstartablecontext
- async def stream(
- self,
- statement: Executable,
- parameters: Optional[_CoreAnyExecuteParams] = None,
- *,
- execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> AsyncIterator[AsyncResult[Any]]:
- """Execute a statement and return an awaitable yielding a
- :class:`_asyncio.AsyncResult` object.
- E.g.::
- result = await conn.stream(stmt)
- async for row in result:
- print(f"{row}")
- The :meth:`.AsyncConnection.stream`
- method supports optional context manager use against the
- :class:`.AsyncResult` object, as in::
- async with conn.stream(stmt) as result:
- async for row in result:
- print(f"{row}")
- In the above pattern, the :meth:`.AsyncResult.close` method is
- invoked unconditionally, even if the iterator is interrupted by an
- exception throw. Context manager use remains optional, however,
- and the function may be called in either an ``async with fn():`` or
- ``await fn()`` style.
- .. versionadded:: 2.0.0b3 added context manager support
- :return: an awaitable object that will yield an
- :class:`_asyncio.AsyncResult` object.
- .. seealso::
- :meth:`.AsyncConnection.stream_scalars`
- """
- if not self.dialect.supports_server_side_cursors:
- raise exc.InvalidRequestError(
- "Cant use `stream` or `stream_scalars` with the current "
- "dialect since it does not support server side cursors."
- )
- result = await greenlet_spawn(
- self._proxied.execute,
- statement,
- parameters,
- execution_options=util.EMPTY_DICT.merge_with(
- execution_options, {"stream_results": True}
- ),
- _require_await=True,
- )
- assert result.context._is_server_side
- ar = AsyncResult(result)
- try:
- yield ar
- except GeneratorExit:
- pass
- else:
- task = asyncio.create_task(ar.close())
- await asyncio.shield(task)
- @overload
- async def execute(
- self,
- statement: TypedReturnsRows[_T],
- parameters: Optional[_CoreAnyExecuteParams] = None,
- *,
- execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> CursorResult[_T]: ...
- @overload
- async def execute(
- self,
- statement: Executable,
- parameters: Optional[_CoreAnyExecuteParams] = None,
- *,
- execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> CursorResult[Any]: ...
- async def execute(
- self,
- statement: Executable,
- parameters: Optional[_CoreAnyExecuteParams] = None,
- *,
- execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> CursorResult[Any]:
- r"""Executes a SQL statement construct and return a buffered
- :class:`_engine.Result`.
- :param object: The statement to be executed. This is always
- an object that is in both the :class:`_expression.ClauseElement` and
- :class:`_expression.Executable` hierarchies, including:
- * :class:`_expression.Select`
- * :class:`_expression.Insert`, :class:`_expression.Update`,
- :class:`_expression.Delete`
- * :class:`_expression.TextClause` and
- :class:`_expression.TextualSelect`
- * :class:`_schema.DDL` and objects which inherit from
- :class:`_schema.ExecutableDDLElement`
- :param parameters: parameters which will be bound into the statement.
- This may be either a dictionary of parameter names to values,
- or a mutable sequence (e.g. a list) of dictionaries. When a
- list of dictionaries is passed, the underlying statement execution
- will make use of the DBAPI ``cursor.executemany()`` method.
- When a single dictionary is passed, the DBAPI ``cursor.execute()``
- method will be used.
- :param execution_options: optional dictionary of execution options,
- which will be associated with the statement execution. This
- dictionary can provide a subset of the options that are accepted
- by :meth:`_engine.Connection.execution_options`.
- :return: a :class:`_engine.Result` object.
- """
- result = await greenlet_spawn(
- self._proxied.execute,
- statement,
- parameters,
- execution_options=execution_options,
- _require_await=True,
- )
- return await _ensure_sync_result(result, self.execute)
- @overload
- async def scalar(
- self,
- statement: TypedReturnsRows[Tuple[_T]],
- parameters: Optional[_CoreSingleExecuteParams] = None,
- *,
- execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> Optional[_T]: ...
- @overload
- async def scalar(
- self,
- statement: Executable,
- parameters: Optional[_CoreSingleExecuteParams] = None,
- *,
- execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> Any: ...
- async def scalar(
- self,
- statement: Executable,
- parameters: Optional[_CoreSingleExecuteParams] = None,
- *,
- execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> Any:
- r"""Executes a SQL statement construct and returns a scalar object.
- This method is shorthand for invoking the
- :meth:`_engine.Result.scalar` method after invoking the
- :meth:`_engine.Connection.execute` method. Parameters are equivalent.
- :return: a scalar Python value representing the first column of the
- first row returned.
- """
- result = await self.execute(
- statement, parameters, execution_options=execution_options
- )
- return result.scalar()
- @overload
- async def scalars(
- self,
- statement: TypedReturnsRows[Tuple[_T]],
- parameters: Optional[_CoreAnyExecuteParams] = None,
- *,
- execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> ScalarResult[_T]: ...
- @overload
- async def scalars(
- self,
- statement: Executable,
- parameters: Optional[_CoreAnyExecuteParams] = None,
- *,
- execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> ScalarResult[Any]: ...
- async def scalars(
- self,
- statement: Executable,
- parameters: Optional[_CoreAnyExecuteParams] = None,
- *,
- execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> ScalarResult[Any]:
- r"""Executes a SQL statement construct and returns a scalar objects.
- This method is shorthand for invoking the
- :meth:`_engine.Result.scalars` method after invoking the
- :meth:`_engine.Connection.execute` method. Parameters are equivalent.
- :return: a :class:`_engine.ScalarResult` object.
- .. versionadded:: 1.4.24
- """
- result = await self.execute(
- statement, parameters, execution_options=execution_options
- )
- return result.scalars()
- @overload
- def stream_scalars(
- self,
- statement: TypedReturnsRows[Tuple[_T]],
- parameters: Optional[_CoreSingleExecuteParams] = None,
- *,
- execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> GeneratorStartableContext[AsyncScalarResult[_T]]: ...
- @overload
- def stream_scalars(
- self,
- statement: Executable,
- parameters: Optional[_CoreSingleExecuteParams] = None,
- *,
- execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> GeneratorStartableContext[AsyncScalarResult[Any]]: ...
- @asyncstartablecontext
- async def stream_scalars(
- self,
- statement: Executable,
- parameters: Optional[_CoreSingleExecuteParams] = None,
- *,
- execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> AsyncIterator[AsyncScalarResult[Any]]:
- r"""Execute a statement and return an awaitable yielding a
- :class:`_asyncio.AsyncScalarResult` object.
- E.g.::
- result = await conn.stream_scalars(stmt)
- async for scalar in result:
- print(f"{scalar}")
- This method is shorthand for invoking the
- :meth:`_engine.AsyncResult.scalars` method after invoking the
- :meth:`_engine.Connection.stream` method. Parameters are equivalent.
- The :meth:`.AsyncConnection.stream_scalars`
- method supports optional context manager use against the
- :class:`.AsyncScalarResult` object, as in::
- async with conn.stream_scalars(stmt) as result:
- async for scalar in result:
- print(f"{scalar}")
- In the above pattern, the :meth:`.AsyncScalarResult.close` method is
- invoked unconditionally, even if the iterator is interrupted by an
- exception throw. Context manager use remains optional, however,
- and the function may be called in either an ``async with fn():`` or
- ``await fn()`` style.
- .. versionadded:: 2.0.0b3 added context manager support
- :return: an awaitable object that will yield an
- :class:`_asyncio.AsyncScalarResult` object.
- .. versionadded:: 1.4.24
- .. seealso::
- :meth:`.AsyncConnection.stream`
- """
- async with self.stream(
- statement, parameters, execution_options=execution_options
- ) as result:
- yield result.scalars()
- async def run_sync(
- self,
- fn: Callable[Concatenate[Connection, _P], _T],
- *arg: _P.args,
- **kw: _P.kwargs,
- ) -> _T:
- '''Invoke the given synchronous (i.e. not async) callable,
- passing a synchronous-style :class:`_engine.Connection` as the first
- argument.
- This method allows traditional synchronous SQLAlchemy functions to
- run within the context of an asyncio application.
- E.g.::
- def do_something_with_core(conn: Connection, arg1: int, arg2: str) -> str:
- """A synchronous function that does not require awaiting
- :param conn: a Core SQLAlchemy Connection, used synchronously
- :return: an optional return value is supported
- """
- conn.execute(some_table.insert().values(int_col=arg1, str_col=arg2))
- return "success"
- async def do_something_async(async_engine: AsyncEngine) -> None:
- """an async function that uses awaiting"""
- async with async_engine.begin() as async_conn:
- # run do_something_with_core() with a sync-style
- # Connection, proxied into an awaitable
- return_code = await async_conn.run_sync(
- do_something_with_core, 5, "strval"
- )
- print(return_code)
- This method maintains the asyncio event loop all the way through
- to the database connection by running the given callable in a
- specially instrumented greenlet.
- The most rudimentary use of :meth:`.AsyncConnection.run_sync` is to
- invoke methods such as :meth:`_schema.MetaData.create_all`, given
- an :class:`.AsyncConnection` that needs to be provided to
- :meth:`_schema.MetaData.create_all` as a :class:`_engine.Connection`
- object::
- # run metadata.create_all(conn) with a sync-style Connection,
- # proxied into an awaitable
- with async_engine.begin() as conn:
- await conn.run_sync(metadata.create_all)
- .. note::
- The provided callable is invoked inline within the asyncio event
- loop, and will block on traditional IO calls. IO within this
- callable should only call into SQLAlchemy's asyncio database
- APIs which will be properly adapted to the greenlet context.
- .. seealso::
- :meth:`.AsyncSession.run_sync`
- :ref:`session_run_sync`
- ''' # noqa: E501
- return await greenlet_spawn(
- fn, self._proxied, *arg, _require_await=False, **kw
- )
- def __await__(self) -> Generator[Any, None, AsyncConnection]:
- return self.start().__await__()
- async def __aexit__(self, type_: Any, value: Any, traceback: Any) -> None:
- task = asyncio.create_task(self.close())
- await asyncio.shield(task)
- # START PROXY METHODS AsyncConnection
- # code within this block is **programmatically,
- # statically generated** by tools/generate_proxy_methods.py
- @property
- def closed(self) -> Any:
- r"""Return True if this connection is closed.
- .. container:: class_bases
- Proxied for the :class:`_engine.Connection` class
- on behalf of the :class:`_asyncio.AsyncConnection` class.
- """ # noqa: E501
- return self._proxied.closed
- @property
- def invalidated(self) -> Any:
- r"""Return True if this connection was invalidated.
- .. container:: class_bases
- Proxied for the :class:`_engine.Connection` class
- on behalf of the :class:`_asyncio.AsyncConnection` class.
- This does not indicate whether or not the connection was
- invalidated at the pool level, however
- """ # noqa: E501
- return self._proxied.invalidated
- @property
- def dialect(self) -> Dialect:
- r"""Proxy for the :attr:`_engine.Connection.dialect` attribute
- on behalf of the :class:`_asyncio.AsyncConnection` class.
- """ # noqa: E501
- return self._proxied.dialect
- @dialect.setter
- def dialect(self, attr: Dialect) -> None:
- self._proxied.dialect = attr
- @property
- def default_isolation_level(self) -> Any:
- r"""The initial-connection time isolation level associated with the
- :class:`_engine.Dialect` in use.
- .. container:: class_bases
- Proxied for the :class:`_engine.Connection` class
- on behalf of the :class:`_asyncio.AsyncConnection` class.
- This value is independent of the
- :paramref:`.Connection.execution_options.isolation_level` and
- :paramref:`.Engine.execution_options.isolation_level` execution
- options, and is determined by the :class:`_engine.Dialect` when the
- first connection is created, by performing a SQL query against the
- database for the current isolation level before any additional commands
- have been emitted.
- Calling this accessor does not invoke any new SQL queries.
- .. seealso::
- :meth:`_engine.Connection.get_isolation_level`
- - view current actual isolation level
- :paramref:`_sa.create_engine.isolation_level`
- - set per :class:`_engine.Engine` isolation level
- :paramref:`.Connection.execution_options.isolation_level`
- - set per :class:`_engine.Connection` isolation level
- """ # noqa: E501
- return self._proxied.default_isolation_level
- # END PROXY METHODS AsyncConnection
- @util.create_proxy_methods(
- Engine,
- ":class:`_engine.Engine`",
- ":class:`_asyncio.AsyncEngine`",
- classmethods=[],
- methods=[
- "clear_compiled_cache",
- "update_execution_options",
- "get_execution_options",
- ],
- attributes=["url", "pool", "dialect", "engine", "name", "driver", "echo"],
- )
- class AsyncEngine(ProxyComparable[Engine], AsyncConnectable):
- """An asyncio proxy for a :class:`_engine.Engine`.
- :class:`_asyncio.AsyncEngine` is acquired using the
- :func:`_asyncio.create_async_engine` function::
- from sqlalchemy.ext.asyncio import create_async_engine
- engine = create_async_engine("postgresql+asyncpg://user:pass@host/dbname")
- .. versionadded:: 1.4
- """ # noqa
- # AsyncEngine is a thin proxy; no state should be added here
- # that is not retrievable from the "sync" engine / connection, e.g.
- # current transaction, info, etc. It should be possible to
- # create a new AsyncEngine that matches this one given only the
- # "sync" elements.
- __slots__ = "sync_engine"
- _connection_cls: Type[AsyncConnection] = AsyncConnection
- sync_engine: Engine
- """Reference to the sync-style :class:`_engine.Engine` this
- :class:`_asyncio.AsyncEngine` proxies requests towards.
- This instance can be used as an event target.
- .. seealso::
- :ref:`asyncio_events`
- """
- def __init__(self, sync_engine: Engine):
- if not sync_engine.dialect.is_async:
- raise exc.InvalidRequestError(
- "The asyncio extension requires an async driver to be used. "
- f"The loaded {sync_engine.dialect.driver!r} is not async."
- )
- self.sync_engine = self._assign_proxied(sync_engine)
- @util.ro_non_memoized_property
- def _proxied(self) -> Engine:
- return self.sync_engine
- @classmethod
- def _regenerate_proxy_for_target(
- cls, target: Engine, **additional_kw: Any # noqa: U100
- ) -> AsyncEngine:
- return AsyncEngine(target)
- @contextlib.asynccontextmanager
- async def begin(self) -> AsyncIterator[AsyncConnection]:
- """Return a context manager which when entered will deliver an
- :class:`_asyncio.AsyncConnection` with an
- :class:`_asyncio.AsyncTransaction` established.
- E.g.::
- async with async_engine.begin() as conn:
- await conn.execute(
- text("insert into table (x, y, z) values (1, 2, 3)")
- )
- await conn.execute(text("my_special_procedure(5)"))
- """
- conn = self.connect()
- async with conn:
- async with conn.begin():
- yield conn
- def connect(self) -> AsyncConnection:
- """Return an :class:`_asyncio.AsyncConnection` object.
- The :class:`_asyncio.AsyncConnection` will procure a database
- connection from the underlying connection pool when it is entered
- as an async context manager::
- async with async_engine.connect() as conn:
- result = await conn.execute(select(user_table))
- The :class:`_asyncio.AsyncConnection` may also be started outside of a
- context manager by invoking its :meth:`_asyncio.AsyncConnection.start`
- method.
- """
- return self._connection_cls(self)
- async def raw_connection(self) -> PoolProxiedConnection:
- """Return a "raw" DBAPI connection from the connection pool.
- .. seealso::
- :ref:`dbapi_connections`
- """
- return await greenlet_spawn(self.sync_engine.raw_connection)
- @overload
- def execution_options(
- self,
- *,
- compiled_cache: Optional[CompiledCacheType] = ...,
- logging_token: str = ...,
- isolation_level: IsolationLevel = ...,
- insertmanyvalues_page_size: int = ...,
- schema_translate_map: Optional[SchemaTranslateMapType] = ...,
- **opt: Any,
- ) -> AsyncEngine: ...
- @overload
- def execution_options(self, **opt: Any) -> AsyncEngine: ...
- def execution_options(self, **opt: Any) -> AsyncEngine:
- """Return a new :class:`_asyncio.AsyncEngine` that will provide
- :class:`_asyncio.AsyncConnection` objects with the given execution
- options.
- Proxied from :meth:`_engine.Engine.execution_options`. See that
- method for details.
- """
- return AsyncEngine(self.sync_engine.execution_options(**opt))
- async def dispose(self, close: bool = True) -> None:
- """Dispose of the connection pool used by this
- :class:`_asyncio.AsyncEngine`.
- :param close: if left at its default of ``True``, has the
- effect of fully closing all **currently checked in**
- database connections. Connections that are still checked out
- will **not** be closed, however they will no longer be associated
- with this :class:`_engine.Engine`,
- so when they are closed individually, eventually the
- :class:`_pool.Pool` which they are associated with will
- be garbage collected and they will be closed out fully, if
- not already closed on checkin.
- If set to ``False``, the previous connection pool is de-referenced,
- and otherwise not touched in any way.
- .. seealso::
- :meth:`_engine.Engine.dispose`
- """
- await greenlet_spawn(self.sync_engine.dispose, close=close)
- # START PROXY METHODS AsyncEngine
- # code within this block is **programmatically,
- # statically generated** by tools/generate_proxy_methods.py
- def clear_compiled_cache(self) -> None:
- r"""Clear the compiled cache associated with the dialect.
- .. container:: class_bases
- Proxied for the :class:`_engine.Engine` class on
- behalf of the :class:`_asyncio.AsyncEngine` class.
- This applies **only** to the built-in cache that is established
- via the :paramref:`_engine.create_engine.query_cache_size` parameter.
- It will not impact any dictionary caches that were passed via the
- :paramref:`.Connection.execution_options.compiled_cache` parameter.
- .. versionadded:: 1.4
- """ # noqa: E501
- return self._proxied.clear_compiled_cache()
- def update_execution_options(self, **opt: Any) -> None:
- r"""Update the default execution_options dictionary
- of this :class:`_engine.Engine`.
- .. container:: class_bases
- Proxied for the :class:`_engine.Engine` class on
- behalf of the :class:`_asyncio.AsyncEngine` class.
- The given keys/values in \**opt are added to the
- default execution options that will be used for
- all connections. The initial contents of this dictionary
- can be sent via the ``execution_options`` parameter
- to :func:`_sa.create_engine`.
- .. seealso::
- :meth:`_engine.Connection.execution_options`
- :meth:`_engine.Engine.execution_options`
- """ # noqa: E501
- return self._proxied.update_execution_options(**opt)
- def get_execution_options(self) -> _ExecuteOptions:
- r"""Get the non-SQL options which will take effect during execution.
- .. container:: class_bases
- Proxied for the :class:`_engine.Engine` class on
- behalf of the :class:`_asyncio.AsyncEngine` class.
- .. versionadded: 1.3
- .. seealso::
- :meth:`_engine.Engine.execution_options`
- """ # noqa: E501
- return self._proxied.get_execution_options()
- @property
- def url(self) -> URL:
- r"""Proxy for the :attr:`_engine.Engine.url` attribute
- on behalf of the :class:`_asyncio.AsyncEngine` class.
- """ # noqa: E501
- return self._proxied.url
- @url.setter
- def url(self, attr: URL) -> None:
- self._proxied.url = attr
- @property
- def pool(self) -> Pool:
- r"""Proxy for the :attr:`_engine.Engine.pool` attribute
- on behalf of the :class:`_asyncio.AsyncEngine` class.
- """ # noqa: E501
- return self._proxied.pool
- @pool.setter
- def pool(self, attr: Pool) -> None:
- self._proxied.pool = attr
- @property
- def dialect(self) -> Dialect:
- r"""Proxy for the :attr:`_engine.Engine.dialect` attribute
- on behalf of the :class:`_asyncio.AsyncEngine` class.
- """ # noqa: E501
- return self._proxied.dialect
- @dialect.setter
- def dialect(self, attr: Dialect) -> None:
- self._proxied.dialect = attr
- @property
- def engine(self) -> Any:
- r"""Returns this :class:`.Engine`.
- .. container:: class_bases
- Proxied for the :class:`_engine.Engine` class
- on behalf of the :class:`_asyncio.AsyncEngine` class.
- Used for legacy schemes that accept :class:`.Connection` /
- :class:`.Engine` objects within the same variable.
- """ # noqa: E501
- return self._proxied.engine
- @property
- def name(self) -> Any:
- r"""String name of the :class:`~sqlalchemy.engine.interfaces.Dialect`
- in use by this :class:`Engine`.
- .. container:: class_bases
- Proxied for the :class:`_engine.Engine` class
- on behalf of the :class:`_asyncio.AsyncEngine` class.
- """ # noqa: E501
- return self._proxied.name
- @property
- def driver(self) -> Any:
- r"""Driver name of the :class:`~sqlalchemy.engine.interfaces.Dialect`
- in use by this :class:`Engine`.
- .. container:: class_bases
- Proxied for the :class:`_engine.Engine` class
- on behalf of the :class:`_asyncio.AsyncEngine` class.
- """ # noqa: E501
- return self._proxied.driver
- @property
- def echo(self) -> Any:
- r"""When ``True``, enable log output for this element.
- .. container:: class_bases
- Proxied for the :class:`_engine.Engine` class
- on behalf of the :class:`_asyncio.AsyncEngine` class.
- This has the effect of setting the Python logging level for the namespace
- of this element's class and object reference. A value of boolean ``True``
- indicates that the loglevel ``logging.INFO`` will be set for the logger,
- whereas the string value ``debug`` will set the loglevel to
- ``logging.DEBUG``.
- """ # noqa: E501
- return self._proxied.echo
- @echo.setter
- def echo(self, attr: Any) -> None:
- self._proxied.echo = attr
- # END PROXY METHODS AsyncEngine
- class AsyncTransaction(
- ProxyComparable[Transaction], StartableContext["AsyncTransaction"]
- ):
- """An asyncio proxy for a :class:`_engine.Transaction`."""
- __slots__ = ("connection", "sync_transaction", "nested")
- sync_transaction: Optional[Transaction]
- connection: AsyncConnection
- nested: bool
- def __init__(self, connection: AsyncConnection, nested: bool = False):
- self.connection = connection
- self.sync_transaction = None
- self.nested = nested
- @classmethod
- def _regenerate_proxy_for_target(
- cls, target: Transaction, **additional_kw: Any # noqa: U100
- ) -> AsyncTransaction:
- sync_connection = target.connection
- sync_transaction = target
- nested = isinstance(target, NestedTransaction)
- async_connection = AsyncConnection._retrieve_proxy_for_target(
- sync_connection
- )
- assert async_connection is not None
- obj = cls.__new__(cls)
- obj.connection = async_connection
- obj.sync_transaction = obj._assign_proxied(sync_transaction)
- obj.nested = nested
- return obj
- @util.ro_non_memoized_property
- def _proxied(self) -> Transaction:
- if not self.sync_transaction:
- self._raise_for_not_started()
- return self.sync_transaction
- @property
- def is_valid(self) -> bool:
- return self._proxied.is_valid
- @property
- def is_active(self) -> bool:
- return self._proxied.is_active
- async def close(self) -> None:
- """Close this :class:`.AsyncTransaction`.
- If this transaction is the base transaction in a begin/commit
- nesting, the transaction will rollback(). Otherwise, the
- method returns.
- This is used to cancel a Transaction without affecting the scope of
- an enclosing transaction.
- """
- await greenlet_spawn(self._proxied.close)
- async def rollback(self) -> None:
- """Roll back this :class:`.AsyncTransaction`."""
- await greenlet_spawn(self._proxied.rollback)
- async def commit(self) -> None:
- """Commit this :class:`.AsyncTransaction`."""
- await greenlet_spawn(self._proxied.commit)
- async def start(self, is_ctxmanager: bool = False) -> AsyncTransaction:
- """Start this :class:`_asyncio.AsyncTransaction` object's context
- outside of using a Python ``with:`` block.
- """
- self.sync_transaction = self._assign_proxied(
- await greenlet_spawn(
- self.connection._proxied.begin_nested
- if self.nested
- else self.connection._proxied.begin
- )
- )
- if is_ctxmanager:
- self.sync_transaction.__enter__()
- return self
- async def __aexit__(self, type_: Any, value: Any, traceback: Any) -> None:
- await greenlet_spawn(self._proxied.__exit__, type_, value, traceback)
- @overload
- def _get_sync_engine_or_connection(async_engine: AsyncEngine) -> Engine: ...
- @overload
- def _get_sync_engine_or_connection(
- async_engine: AsyncConnection,
- ) -> Connection: ...
- def _get_sync_engine_or_connection(
- async_engine: Union[AsyncEngine, AsyncConnection],
- ) -> Union[Engine, Connection]:
- if isinstance(async_engine, AsyncConnection):
- return async_engine._proxied
- try:
- return async_engine.sync_engine
- except AttributeError as e:
- raise exc.ArgumentError(
- "AsyncEngine expected, got %r" % async_engine
- ) from e
- @inspection._inspects(AsyncConnection)
- def _no_insp_for_async_conn_yet(
- subject: AsyncConnection, # noqa: U100
- ) -> NoReturn:
- raise exc.NoInspectionAvailable(
- "Inspection on an AsyncConnection is currently not supported. "
- "Please use ``run_sync`` to pass a callable where it's possible "
- "to call ``inspect`` on the passed connection.",
- code="xd3s",
- )
- @inspection._inspects(AsyncEngine)
- def _no_insp_for_async_engine_xyet(
- subject: AsyncEngine, # noqa: U100
- ) -> NoReturn:
- raise exc.NoInspectionAvailable(
- "Inspection on an AsyncEngine is currently not supported. "
- "Please obtain a connection then use ``conn.run_sync`` to pass a "
- "callable where it's possible to call ``inspect`` on the "
- "passed connection.",
- code="xd3s",
- )
|