aiosqlite.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443
  1. # dialects/sqlite/aiosqlite.py
  2. # Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. r"""
  8. .. dialect:: sqlite+aiosqlite
  9. :name: aiosqlite
  10. :dbapi: aiosqlite
  11. :connectstring: sqlite+aiosqlite:///file_path
  12. :url: https://pypi.org/project/aiosqlite/
  13. The aiosqlite dialect provides support for the SQLAlchemy asyncio interface
  14. running on top of pysqlite.
  15. aiosqlite is a wrapper around pysqlite that uses a background thread for
  16. each connection. It does not actually use non-blocking IO, as SQLite
  17. databases are not socket-based. However it does provide a working asyncio
  18. interface that's useful for testing and prototyping purposes.
  19. Using a special asyncio mediation layer, the aiosqlite dialect is usable
  20. as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>`
  21. extension package.
  22. This dialect should normally be used only with the
  23. :func:`_asyncio.create_async_engine` engine creation function::
  24. from sqlalchemy.ext.asyncio import create_async_engine
  25. engine = create_async_engine("sqlite+aiosqlite:///filename")
  26. The URL passes through all arguments to the ``pysqlite`` driver, so all
  27. connection arguments are the same as they are for that of :ref:`pysqlite`.
  28. .. _aiosqlite_udfs:
  29. User-Defined Functions
  30. ----------------------
  31. aiosqlite extends pysqlite to support async, so we can create our own user-defined functions (UDFs)
  32. in Python and use them directly in SQLite queries as described here: :ref:`pysqlite_udfs`.
  33. .. _aiosqlite_serializable:
  34. Serializable isolation / Savepoints / Transactional DDL (asyncio version)
  35. -------------------------------------------------------------------------
  36. A newly revised version of this important section is now available
  37. at the top level of the SQLAlchemy SQLite documentation, in the section
  38. :ref:`sqlite_transactions`.
  39. .. _aiosqlite_pooling:
  40. Pooling Behavior
  41. ----------------
  42. The SQLAlchemy ``aiosqlite`` DBAPI establishes the connection pool differently
  43. based on the kind of SQLite database that's requested:
  44. * When a ``:memory:`` SQLite database is specified, the dialect by default
  45. will use :class:`.StaticPool`. This pool maintains a single
  46. connection, so that all access to the engine
  47. use the same ``:memory:`` database.
  48. * When a file-based database is specified, the dialect will use
  49. :class:`.AsyncAdaptedQueuePool` as the source of connections.
  50. .. versionchanged:: 2.0.38
  51. SQLite file database engines now use :class:`.AsyncAdaptedQueuePool` by default.
  52. Previously, :class:`.NullPool` were used. The :class:`.NullPool` class
  53. may be used by specifying it via the
  54. :paramref:`_sa.create_engine.poolclass` parameter.
  55. """ # noqa
  56. from __future__ import annotations
  57. import asyncio
  58. from collections import deque
  59. from functools import partial
  60. from types import ModuleType
  61. from typing import Any
  62. from typing import cast
  63. from typing import Deque
  64. from typing import Iterator
  65. from typing import NoReturn
  66. from typing import Optional
  67. from typing import Sequence
  68. from typing import TYPE_CHECKING
  69. from typing import Union
  70. from .base import SQLiteExecutionContext
  71. from .pysqlite import SQLiteDialect_pysqlite
  72. from ... import pool
  73. from ... import util
  74. from ...connectors.asyncio import AsyncAdapt_dbapi_module
  75. from ...engine import AdaptedConnection
  76. from ...util.concurrency import await_fallback
  77. from ...util.concurrency import await_only
  78. if TYPE_CHECKING:
  79. from ...connectors.asyncio import AsyncIODBAPIConnection
  80. from ...connectors.asyncio import AsyncIODBAPICursor
  81. from ...engine.interfaces import _DBAPICursorDescription
  82. from ...engine.interfaces import _DBAPIMultiExecuteParams
  83. from ...engine.interfaces import _DBAPISingleExecuteParams
  84. from ...engine.interfaces import DBAPIConnection
  85. from ...engine.interfaces import DBAPICursor
  86. from ...engine.interfaces import DBAPIModule
  87. from ...engine.url import URL
  88. from ...pool.base import PoolProxiedConnection
  89. class AsyncAdapt_aiosqlite_cursor:
  90. # TODO: base on connectors/asyncio.py
  91. # see #10415
  92. __slots__ = (
  93. "_adapt_connection",
  94. "_connection",
  95. "description",
  96. "await_",
  97. "_rows",
  98. "arraysize",
  99. "rowcount",
  100. "lastrowid",
  101. )
  102. server_side = False
  103. def __init__(self, adapt_connection: AsyncAdapt_aiosqlite_connection):
  104. self._adapt_connection = adapt_connection
  105. self._connection = adapt_connection._connection
  106. self.await_ = adapt_connection.await_
  107. self.arraysize = 1
  108. self.rowcount = -1
  109. self.description: Optional[_DBAPICursorDescription] = None
  110. self._rows: Deque[Any] = deque()
  111. def close(self) -> None:
  112. self._rows.clear()
  113. def execute(
  114. self,
  115. operation: Any,
  116. parameters: Optional[_DBAPISingleExecuteParams] = None,
  117. ) -> Any:
  118. try:
  119. _cursor: AsyncIODBAPICursor = self.await_(self._connection.cursor()) # type: ignore[arg-type] # noqa: E501
  120. if parameters is None:
  121. self.await_(_cursor.execute(operation))
  122. else:
  123. self.await_(_cursor.execute(operation, parameters))
  124. if _cursor.description:
  125. self.description = _cursor.description
  126. self.lastrowid = self.rowcount = -1
  127. if not self.server_side:
  128. self._rows = deque(self.await_(_cursor.fetchall()))
  129. else:
  130. self.description = None
  131. self.lastrowid = _cursor.lastrowid
  132. self.rowcount = _cursor.rowcount
  133. if not self.server_side:
  134. self.await_(_cursor.close())
  135. else:
  136. self._cursor = _cursor # type: ignore[misc]
  137. except Exception as error:
  138. self._adapt_connection._handle_exception(error)
  139. def executemany(
  140. self,
  141. operation: Any,
  142. seq_of_parameters: _DBAPIMultiExecuteParams,
  143. ) -> Any:
  144. try:
  145. _cursor: AsyncIODBAPICursor = self.await_(self._connection.cursor()) # type: ignore[arg-type] # noqa: E501
  146. self.await_(_cursor.executemany(operation, seq_of_parameters))
  147. self.description = None
  148. self.lastrowid = _cursor.lastrowid
  149. self.rowcount = _cursor.rowcount
  150. self.await_(_cursor.close())
  151. except Exception as error:
  152. self._adapt_connection._handle_exception(error)
  153. def setinputsizes(self, *inputsizes: Any) -> None:
  154. pass
  155. def __iter__(self) -> Iterator[Any]:
  156. while self._rows:
  157. yield self._rows.popleft()
  158. def fetchone(self) -> Optional[Any]:
  159. if self._rows:
  160. return self._rows.popleft()
  161. else:
  162. return None
  163. def fetchmany(self, size: Optional[int] = None) -> Sequence[Any]:
  164. if size is None:
  165. size = self.arraysize
  166. rr = self._rows
  167. return [rr.popleft() for _ in range(min(size, len(rr)))]
  168. def fetchall(self) -> Sequence[Any]:
  169. retval = list(self._rows)
  170. self._rows.clear()
  171. return retval
  172. class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_aiosqlite_cursor):
  173. # TODO: base on connectors/asyncio.py
  174. # see #10415
  175. __slots__ = "_cursor"
  176. server_side = True
  177. def __init__(self, *arg: Any, **kw: Any) -> None:
  178. super().__init__(*arg, **kw)
  179. self._cursor: Optional[AsyncIODBAPICursor] = None
  180. def close(self) -> None:
  181. if self._cursor is not None:
  182. self.await_(self._cursor.close())
  183. self._cursor = None
  184. def fetchone(self) -> Optional[Any]:
  185. assert self._cursor is not None
  186. return self.await_(self._cursor.fetchone())
  187. def fetchmany(self, size: Optional[int] = None) -> Sequence[Any]:
  188. assert self._cursor is not None
  189. if size is None:
  190. size = self.arraysize
  191. return self.await_(self._cursor.fetchmany(size=size))
  192. def fetchall(self) -> Sequence[Any]:
  193. assert self._cursor is not None
  194. return self.await_(self._cursor.fetchall())
  195. class AsyncAdapt_aiosqlite_connection(AdaptedConnection):
  196. await_ = staticmethod(await_only)
  197. __slots__ = ("dbapi",)
  198. def __init__(self, dbapi: Any, connection: AsyncIODBAPIConnection) -> None:
  199. self.dbapi = dbapi
  200. self._connection = connection
  201. @property
  202. def isolation_level(self) -> Optional[str]:
  203. return cast(str, self._connection.isolation_level)
  204. @isolation_level.setter
  205. def isolation_level(self, value: Optional[str]) -> None:
  206. # aiosqlite's isolation_level setter works outside the Thread
  207. # that it's supposed to, necessitating setting check_same_thread=False.
  208. # for improved stability, we instead invent our own awaitable version
  209. # using aiosqlite's async queue directly.
  210. def set_iso(
  211. connection: AsyncAdapt_aiosqlite_connection, value: Optional[str]
  212. ) -> None:
  213. connection.isolation_level = value
  214. function = partial(set_iso, self._connection._conn, value)
  215. future = asyncio.get_event_loop().create_future()
  216. self._connection._tx.put_nowait((future, function))
  217. try:
  218. self.await_(future)
  219. except Exception as error:
  220. self._handle_exception(error)
  221. def create_function(self, *args: Any, **kw: Any) -> None:
  222. try:
  223. self.await_(self._connection.create_function(*args, **kw))
  224. except Exception as error:
  225. self._handle_exception(error)
  226. def cursor(self, server_side: bool = False) -> AsyncAdapt_aiosqlite_cursor:
  227. if server_side:
  228. return AsyncAdapt_aiosqlite_ss_cursor(self)
  229. else:
  230. return AsyncAdapt_aiosqlite_cursor(self)
  231. def execute(self, *args: Any, **kw: Any) -> Any:
  232. return self.await_(self._connection.execute(*args, **kw))
  233. def rollback(self) -> None:
  234. try:
  235. self.await_(self._connection.rollback())
  236. except Exception as error:
  237. self._handle_exception(error)
  238. def commit(self) -> None:
  239. try:
  240. self.await_(self._connection.commit())
  241. except Exception as error:
  242. self._handle_exception(error)
  243. def close(self) -> None:
  244. try:
  245. self.await_(self._connection.close())
  246. except ValueError:
  247. # this is undocumented for aiosqlite, that ValueError
  248. # was raised if .close() was called more than once, which is
  249. # both not customary for DBAPI and is also not a DBAPI.Error
  250. # exception. This is now fixed in aiosqlite via my PR
  251. # https://github.com/omnilib/aiosqlite/pull/238, so we can be
  252. # assured this will not become some other kind of exception,
  253. # since it doesn't raise anymore.
  254. pass
  255. except Exception as error:
  256. self._handle_exception(error)
  257. def _handle_exception(self, error: Exception) -> NoReturn:
  258. if (
  259. isinstance(error, ValueError)
  260. and error.args[0] == "no active connection"
  261. ):
  262. raise self.dbapi.sqlite.OperationalError(
  263. "no active connection"
  264. ) from error
  265. else:
  266. raise error
  267. class AsyncAdaptFallback_aiosqlite_connection(AsyncAdapt_aiosqlite_connection):
  268. __slots__ = ()
  269. await_ = staticmethod(await_fallback)
  270. class AsyncAdapt_aiosqlite_dbapi(AsyncAdapt_dbapi_module):
  271. def __init__(self, aiosqlite: ModuleType, sqlite: ModuleType):
  272. self.aiosqlite = aiosqlite
  273. self.sqlite = sqlite
  274. self.paramstyle = "qmark"
  275. self._init_dbapi_attributes()
  276. def _init_dbapi_attributes(self) -> None:
  277. for name in (
  278. "DatabaseError",
  279. "Error",
  280. "IntegrityError",
  281. "NotSupportedError",
  282. "OperationalError",
  283. "ProgrammingError",
  284. "sqlite_version",
  285. "sqlite_version_info",
  286. ):
  287. setattr(self, name, getattr(self.aiosqlite, name))
  288. for name in ("PARSE_COLNAMES", "PARSE_DECLTYPES"):
  289. setattr(self, name, getattr(self.sqlite, name))
  290. for name in ("Binary",):
  291. setattr(self, name, getattr(self.sqlite, name))
  292. def connect(self, *arg: Any, **kw: Any) -> AsyncAdapt_aiosqlite_connection:
  293. async_fallback = kw.pop("async_fallback", False)
  294. creator_fn = kw.pop("async_creator_fn", None)
  295. if creator_fn:
  296. connection = creator_fn(*arg, **kw)
  297. else:
  298. connection = self.aiosqlite.connect(*arg, **kw)
  299. # it's a Thread. you'll thank us later
  300. connection.daemon = True
  301. if util.asbool(async_fallback):
  302. return AsyncAdaptFallback_aiosqlite_connection(
  303. self,
  304. await_fallback(connection),
  305. )
  306. else:
  307. return AsyncAdapt_aiosqlite_connection(
  308. self,
  309. await_only(connection),
  310. )
  311. class SQLiteExecutionContext_aiosqlite(SQLiteExecutionContext):
  312. def create_server_side_cursor(self) -> DBAPICursor:
  313. return self._dbapi_connection.cursor(server_side=True)
  314. class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite):
  315. driver = "aiosqlite"
  316. supports_statement_cache = True
  317. is_async = True
  318. supports_server_side_cursors = True
  319. execution_ctx_cls = SQLiteExecutionContext_aiosqlite
  320. @classmethod
  321. def import_dbapi(cls) -> AsyncAdapt_aiosqlite_dbapi:
  322. return AsyncAdapt_aiosqlite_dbapi(
  323. __import__("aiosqlite"), __import__("sqlite3")
  324. )
  325. @classmethod
  326. def get_pool_class(cls, url: URL) -> type[pool.Pool]:
  327. if cls._is_url_file_db(url):
  328. return pool.AsyncAdaptedQueuePool
  329. else:
  330. return pool.StaticPool
  331. def is_disconnect(
  332. self,
  333. e: DBAPIModule.Error,
  334. connection: Optional[Union[PoolProxiedConnection, DBAPIConnection]],
  335. cursor: Optional[DBAPICursor],
  336. ) -> bool:
  337. self.dbapi = cast("DBAPIModule", self.dbapi)
  338. if isinstance(
  339. e, self.dbapi.OperationalError
  340. ) and "no active connection" in str(e):
  341. return True
  342. return super().is_disconnect(e, connection, cursor)
  343. def get_driver_connection(
  344. self, connection: DBAPIConnection
  345. ) -> AsyncIODBAPIConnection:
  346. return connection._connection # type: ignore[no-any-return]
  347. dialect = SQLiteDialect_aiosqlite