| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443 |
- # dialects/sqlite/aiosqlite.py
- # Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
- # <see AUTHORS file>
- #
- # This module is part of SQLAlchemy and is released under
- # the MIT License: https://www.opensource.org/licenses/mit-license.php
- r"""
- .. dialect:: sqlite+aiosqlite
- :name: aiosqlite
- :dbapi: aiosqlite
- :connectstring: sqlite+aiosqlite:///file_path
- :url: https://pypi.org/project/aiosqlite/
- The aiosqlite dialect provides support for the SQLAlchemy asyncio interface
- running on top of pysqlite.
- aiosqlite is a wrapper around pysqlite that uses a background thread for
- each connection. It does not actually use non-blocking IO, as SQLite
- databases are not socket-based. However it does provide a working asyncio
- interface that's useful for testing and prototyping purposes.
- Using a special asyncio mediation layer, the aiosqlite dialect is usable
- as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>`
- extension package.
- This dialect should normally be used only with the
- :func:`_asyncio.create_async_engine` engine creation function::
- from sqlalchemy.ext.asyncio import create_async_engine
- engine = create_async_engine("sqlite+aiosqlite:///filename")
- The URL passes through all arguments to the ``pysqlite`` driver, so all
- connection arguments are the same as they are for that of :ref:`pysqlite`.
- .. _aiosqlite_udfs:
- User-Defined Functions
- ----------------------
- aiosqlite extends pysqlite to support async, so we can create our own user-defined functions (UDFs)
- in Python and use them directly in SQLite queries as described here: :ref:`pysqlite_udfs`.
- .. _aiosqlite_serializable:
- Serializable isolation / Savepoints / Transactional DDL (asyncio version)
- -------------------------------------------------------------------------
- A newly revised version of this important section is now available
- at the top level of the SQLAlchemy SQLite documentation, in the section
- :ref:`sqlite_transactions`.
- .. _aiosqlite_pooling:
- Pooling Behavior
- ----------------
- The SQLAlchemy ``aiosqlite`` DBAPI establishes the connection pool differently
- based on the kind of SQLite database that's requested:
- * When a ``:memory:`` SQLite database is specified, the dialect by default
- will use :class:`.StaticPool`. This pool maintains a single
- connection, so that all access to the engine
- use the same ``:memory:`` database.
- * When a file-based database is specified, the dialect will use
- :class:`.AsyncAdaptedQueuePool` as the source of connections.
- .. versionchanged:: 2.0.38
- SQLite file database engines now use :class:`.AsyncAdaptedQueuePool` by default.
- Previously, :class:`.NullPool` were used. The :class:`.NullPool` class
- may be used by specifying it via the
- :paramref:`_sa.create_engine.poolclass` parameter.
- """ # noqa
- from __future__ import annotations
- import asyncio
- from collections import deque
- from functools import partial
- from types import ModuleType
- from typing import Any
- from typing import cast
- from typing import Deque
- from typing import Iterator
- from typing import NoReturn
- from typing import Optional
- from typing import Sequence
- from typing import TYPE_CHECKING
- from typing import Union
- from .base import SQLiteExecutionContext
- from .pysqlite import SQLiteDialect_pysqlite
- from ... import pool
- from ... import util
- from ...connectors.asyncio import AsyncAdapt_dbapi_module
- from ...engine import AdaptedConnection
- from ...util.concurrency import await_fallback
- from ...util.concurrency import await_only
- if TYPE_CHECKING:
- from ...connectors.asyncio import AsyncIODBAPIConnection
- from ...connectors.asyncio import AsyncIODBAPICursor
- from ...engine.interfaces import _DBAPICursorDescription
- from ...engine.interfaces import _DBAPIMultiExecuteParams
- from ...engine.interfaces import _DBAPISingleExecuteParams
- from ...engine.interfaces import DBAPIConnection
- from ...engine.interfaces import DBAPICursor
- from ...engine.interfaces import DBAPIModule
- from ...engine.url import URL
- from ...pool.base import PoolProxiedConnection
- class AsyncAdapt_aiosqlite_cursor:
- # TODO: base on connectors/asyncio.py
- # see #10415
- __slots__ = (
- "_adapt_connection",
- "_connection",
- "description",
- "await_",
- "_rows",
- "arraysize",
- "rowcount",
- "lastrowid",
- )
- server_side = False
- def __init__(self, adapt_connection: AsyncAdapt_aiosqlite_connection):
- self._adapt_connection = adapt_connection
- self._connection = adapt_connection._connection
- self.await_ = adapt_connection.await_
- self.arraysize = 1
- self.rowcount = -1
- self.description: Optional[_DBAPICursorDescription] = None
- self._rows: Deque[Any] = deque()
- def close(self) -> None:
- self._rows.clear()
- def execute(
- self,
- operation: Any,
- parameters: Optional[_DBAPISingleExecuteParams] = None,
- ) -> Any:
- try:
- _cursor: AsyncIODBAPICursor = self.await_(self._connection.cursor()) # type: ignore[arg-type] # noqa: E501
- if parameters is None:
- self.await_(_cursor.execute(operation))
- else:
- self.await_(_cursor.execute(operation, parameters))
- if _cursor.description:
- self.description = _cursor.description
- self.lastrowid = self.rowcount = -1
- if not self.server_side:
- self._rows = deque(self.await_(_cursor.fetchall()))
- else:
- self.description = None
- self.lastrowid = _cursor.lastrowid
- self.rowcount = _cursor.rowcount
- if not self.server_side:
- self.await_(_cursor.close())
- else:
- self._cursor = _cursor # type: ignore[misc]
- except Exception as error:
- self._adapt_connection._handle_exception(error)
- def executemany(
- self,
- operation: Any,
- seq_of_parameters: _DBAPIMultiExecuteParams,
- ) -> Any:
- try:
- _cursor: AsyncIODBAPICursor = self.await_(self._connection.cursor()) # type: ignore[arg-type] # noqa: E501
- self.await_(_cursor.executemany(operation, seq_of_parameters))
- self.description = None
- self.lastrowid = _cursor.lastrowid
- self.rowcount = _cursor.rowcount
- self.await_(_cursor.close())
- except Exception as error:
- self._adapt_connection._handle_exception(error)
- def setinputsizes(self, *inputsizes: Any) -> None:
- pass
- def __iter__(self) -> Iterator[Any]:
- while self._rows:
- yield self._rows.popleft()
- def fetchone(self) -> Optional[Any]:
- if self._rows:
- return self._rows.popleft()
- else:
- return None
- def fetchmany(self, size: Optional[int] = None) -> Sequence[Any]:
- if size is None:
- size = self.arraysize
- rr = self._rows
- return [rr.popleft() for _ in range(min(size, len(rr)))]
- def fetchall(self) -> Sequence[Any]:
- retval = list(self._rows)
- self._rows.clear()
- return retval
- class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_aiosqlite_cursor):
- # TODO: base on connectors/asyncio.py
- # see #10415
- __slots__ = "_cursor"
- server_side = True
- def __init__(self, *arg: Any, **kw: Any) -> None:
- super().__init__(*arg, **kw)
- self._cursor: Optional[AsyncIODBAPICursor] = None
- def close(self) -> None:
- if self._cursor is not None:
- self.await_(self._cursor.close())
- self._cursor = None
- def fetchone(self) -> Optional[Any]:
- assert self._cursor is not None
- return self.await_(self._cursor.fetchone())
- def fetchmany(self, size: Optional[int] = None) -> Sequence[Any]:
- assert self._cursor is not None
- if size is None:
- size = self.arraysize
- return self.await_(self._cursor.fetchmany(size=size))
- def fetchall(self) -> Sequence[Any]:
- assert self._cursor is not None
- return self.await_(self._cursor.fetchall())
- class AsyncAdapt_aiosqlite_connection(AdaptedConnection):
- await_ = staticmethod(await_only)
- __slots__ = ("dbapi",)
- def __init__(self, dbapi: Any, connection: AsyncIODBAPIConnection) -> None:
- self.dbapi = dbapi
- self._connection = connection
- @property
- def isolation_level(self) -> Optional[str]:
- return cast(str, self._connection.isolation_level)
- @isolation_level.setter
- def isolation_level(self, value: Optional[str]) -> None:
- # aiosqlite's isolation_level setter works outside the Thread
- # that it's supposed to, necessitating setting check_same_thread=False.
- # for improved stability, we instead invent our own awaitable version
- # using aiosqlite's async queue directly.
- def set_iso(
- connection: AsyncAdapt_aiosqlite_connection, value: Optional[str]
- ) -> None:
- connection.isolation_level = value
- function = partial(set_iso, self._connection._conn, value)
- future = asyncio.get_event_loop().create_future()
- self._connection._tx.put_nowait((future, function))
- try:
- self.await_(future)
- except Exception as error:
- self._handle_exception(error)
- def create_function(self, *args: Any, **kw: Any) -> None:
- try:
- self.await_(self._connection.create_function(*args, **kw))
- except Exception as error:
- self._handle_exception(error)
- def cursor(self, server_side: bool = False) -> AsyncAdapt_aiosqlite_cursor:
- if server_side:
- return AsyncAdapt_aiosqlite_ss_cursor(self)
- else:
- return AsyncAdapt_aiosqlite_cursor(self)
- def execute(self, *args: Any, **kw: Any) -> Any:
- return self.await_(self._connection.execute(*args, **kw))
- def rollback(self) -> None:
- try:
- self.await_(self._connection.rollback())
- except Exception as error:
- self._handle_exception(error)
- def commit(self) -> None:
- try:
- self.await_(self._connection.commit())
- except Exception as error:
- self._handle_exception(error)
- def close(self) -> None:
- try:
- self.await_(self._connection.close())
- except ValueError:
- # this is undocumented for aiosqlite, that ValueError
- # was raised if .close() was called more than once, which is
- # both not customary for DBAPI and is also not a DBAPI.Error
- # exception. This is now fixed in aiosqlite via my PR
- # https://github.com/omnilib/aiosqlite/pull/238, so we can be
- # assured this will not become some other kind of exception,
- # since it doesn't raise anymore.
- pass
- except Exception as error:
- self._handle_exception(error)
- def _handle_exception(self, error: Exception) -> NoReturn:
- if (
- isinstance(error, ValueError)
- and error.args[0] == "no active connection"
- ):
- raise self.dbapi.sqlite.OperationalError(
- "no active connection"
- ) from error
- else:
- raise error
- class AsyncAdaptFallback_aiosqlite_connection(AsyncAdapt_aiosqlite_connection):
- __slots__ = ()
- await_ = staticmethod(await_fallback)
- class AsyncAdapt_aiosqlite_dbapi(AsyncAdapt_dbapi_module):
- def __init__(self, aiosqlite: ModuleType, sqlite: ModuleType):
- self.aiosqlite = aiosqlite
- self.sqlite = sqlite
- self.paramstyle = "qmark"
- self._init_dbapi_attributes()
- def _init_dbapi_attributes(self) -> None:
- for name in (
- "DatabaseError",
- "Error",
- "IntegrityError",
- "NotSupportedError",
- "OperationalError",
- "ProgrammingError",
- "sqlite_version",
- "sqlite_version_info",
- ):
- setattr(self, name, getattr(self.aiosqlite, name))
- for name in ("PARSE_COLNAMES", "PARSE_DECLTYPES"):
- setattr(self, name, getattr(self.sqlite, name))
- for name in ("Binary",):
- setattr(self, name, getattr(self.sqlite, name))
- def connect(self, *arg: Any, **kw: Any) -> AsyncAdapt_aiosqlite_connection:
- async_fallback = kw.pop("async_fallback", False)
- creator_fn = kw.pop("async_creator_fn", None)
- if creator_fn:
- connection = creator_fn(*arg, **kw)
- else:
- connection = self.aiosqlite.connect(*arg, **kw)
- # it's a Thread. you'll thank us later
- connection.daemon = True
- if util.asbool(async_fallback):
- return AsyncAdaptFallback_aiosqlite_connection(
- self,
- await_fallback(connection),
- )
- else:
- return AsyncAdapt_aiosqlite_connection(
- self,
- await_only(connection),
- )
- class SQLiteExecutionContext_aiosqlite(SQLiteExecutionContext):
- def create_server_side_cursor(self) -> DBAPICursor:
- return self._dbapi_connection.cursor(server_side=True)
- class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite):
- driver = "aiosqlite"
- supports_statement_cache = True
- is_async = True
- supports_server_side_cursors = True
- execution_ctx_cls = SQLiteExecutionContext_aiosqlite
- @classmethod
- def import_dbapi(cls) -> AsyncAdapt_aiosqlite_dbapi:
- return AsyncAdapt_aiosqlite_dbapi(
- __import__("aiosqlite"), __import__("sqlite3")
- )
- @classmethod
- def get_pool_class(cls, url: URL) -> type[pool.Pool]:
- if cls._is_url_file_db(url):
- return pool.AsyncAdaptedQueuePool
- else:
- return pool.StaticPool
- def is_disconnect(
- self,
- e: DBAPIModule.Error,
- connection: Optional[Union[PoolProxiedConnection, DBAPIConnection]],
- cursor: Optional[DBAPICursor],
- ) -> bool:
- self.dbapi = cast("DBAPIModule", self.dbapi)
- if isinstance(
- e, self.dbapi.OperationalError
- ) and "no active connection" in str(e):
- return True
- return super().is_disconnect(e, connection, cursor)
- def get_driver_connection(
- self, connection: DBAPIConnection
- ) -> AsyncIODBAPIConnection:
- return connection._connection # type: ignore[no-any-return]
- dialect = SQLiteDialect_aiosqlite
|