aiomysql.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. # dialects/mysql/aiomysql.py
  2. # Copyright (C) 2005-2025 the SQLAlchemy authors and contributors <see AUTHORS
  3. # file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. r"""
  8. .. dialect:: mysql+aiomysql
  9. :name: aiomysql
  10. :dbapi: aiomysql
  11. :connectstring: mysql+aiomysql://user:password@host:port/dbname[?key=value&key=value...]
  12. :url: https://github.com/aio-libs/aiomysql
  13. The aiomysql dialect is SQLAlchemy's second Python asyncio dialect.
  14. Using a special asyncio mediation layer, the aiomysql dialect is usable
  15. as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>`
  16. extension package.
  17. This dialect should normally be used only with the
  18. :func:`_asyncio.create_async_engine` engine creation function::
  19. from sqlalchemy.ext.asyncio import create_async_engine
  20. engine = create_async_engine(
  21. "mysql+aiomysql://user:pass@hostname/dbname?charset=utf8mb4"
  22. )
  23. """ # noqa
  24. from __future__ import annotations
  25. from types import ModuleType
  26. from typing import Any
  27. from typing import Dict
  28. from typing import Optional
  29. from typing import Tuple
  30. from typing import TYPE_CHECKING
  31. from typing import Union
  32. from .pymysql import MySQLDialect_pymysql
  33. from ... import pool
  34. from ... import util
  35. from ...connectors.asyncio import AsyncAdapt_dbapi_connection
  36. from ...connectors.asyncio import AsyncAdapt_dbapi_cursor
  37. from ...connectors.asyncio import AsyncAdapt_dbapi_module
  38. from ...connectors.asyncio import AsyncAdapt_dbapi_ss_cursor
  39. from ...util.concurrency import await_fallback
  40. from ...util.concurrency import await_only
  41. if TYPE_CHECKING:
  42. from ...connectors.asyncio import AsyncIODBAPIConnection
  43. from ...connectors.asyncio import AsyncIODBAPICursor
  44. from ...engine.interfaces import ConnectArgsType
  45. from ...engine.interfaces import DBAPIConnection
  46. from ...engine.interfaces import DBAPICursor
  47. from ...engine.interfaces import DBAPIModule
  48. from ...engine.interfaces import PoolProxiedConnection
  49. from ...engine.url import URL
  50. class AsyncAdapt_aiomysql_cursor(AsyncAdapt_dbapi_cursor):
  51. __slots__ = ()
  52. def _make_new_cursor(
  53. self, connection: AsyncIODBAPIConnection
  54. ) -> AsyncIODBAPICursor:
  55. return connection.cursor(self._adapt_connection.dbapi.Cursor)
  56. class AsyncAdapt_aiomysql_ss_cursor(
  57. AsyncAdapt_dbapi_ss_cursor, AsyncAdapt_aiomysql_cursor
  58. ):
  59. __slots__ = ()
  60. def _make_new_cursor(
  61. self, connection: AsyncIODBAPIConnection
  62. ) -> AsyncIODBAPICursor:
  63. return connection.cursor(
  64. self._adapt_connection.dbapi.aiomysql.cursors.SSCursor
  65. )
  66. class AsyncAdapt_aiomysql_connection(AsyncAdapt_dbapi_connection):
  67. __slots__ = ()
  68. _cursor_cls = AsyncAdapt_aiomysql_cursor
  69. _ss_cursor_cls = AsyncAdapt_aiomysql_ss_cursor
  70. def ping(self, reconnect: bool) -> None:
  71. assert not reconnect
  72. self.await_(self._connection.ping(reconnect))
  73. def character_set_name(self) -> Optional[str]:
  74. return self._connection.character_set_name() # type: ignore[no-any-return] # noqa: E501
  75. def autocommit(self, value: Any) -> None:
  76. self.await_(self._connection.autocommit(value))
  77. def get_autocommit(self) -> bool:
  78. return self._connection.get_autocommit() # type: ignore
  79. def terminate(self) -> None:
  80. # it's not awaitable.
  81. self._connection.close()
  82. def close(self) -> None:
  83. self.await_(self._connection.ensure_closed())
  84. class AsyncAdaptFallback_aiomysql_connection(AsyncAdapt_aiomysql_connection):
  85. __slots__ = ()
  86. await_ = staticmethod(await_fallback)
  87. class AsyncAdapt_aiomysql_dbapi(AsyncAdapt_dbapi_module):
  88. def __init__(self, aiomysql: ModuleType, pymysql: ModuleType):
  89. self.aiomysql = aiomysql
  90. self.pymysql = pymysql
  91. self.paramstyle = "format"
  92. self._init_dbapi_attributes()
  93. self.Cursor, self.SSCursor = self._init_cursors_subclasses()
  94. def _init_dbapi_attributes(self) -> None:
  95. for name in (
  96. "Warning",
  97. "Error",
  98. "InterfaceError",
  99. "DataError",
  100. "DatabaseError",
  101. "OperationalError",
  102. "InterfaceError",
  103. "IntegrityError",
  104. "ProgrammingError",
  105. "InternalError",
  106. "NotSupportedError",
  107. ):
  108. setattr(self, name, getattr(self.aiomysql, name))
  109. for name in (
  110. "NUMBER",
  111. "STRING",
  112. "DATETIME",
  113. "BINARY",
  114. "TIMESTAMP",
  115. "Binary",
  116. ):
  117. setattr(self, name, getattr(self.pymysql, name))
  118. def connect(self, *arg: Any, **kw: Any) -> AsyncAdapt_aiomysql_connection:
  119. async_fallback = kw.pop("async_fallback", False)
  120. creator_fn = kw.pop("async_creator_fn", self.aiomysql.connect)
  121. if util.asbool(async_fallback):
  122. return AsyncAdaptFallback_aiomysql_connection(
  123. self,
  124. await_fallback(creator_fn(*arg, **kw)),
  125. )
  126. else:
  127. return AsyncAdapt_aiomysql_connection(
  128. self,
  129. await_only(creator_fn(*arg, **kw)),
  130. )
  131. def _init_cursors_subclasses(
  132. self,
  133. ) -> Tuple[AsyncIODBAPICursor, AsyncIODBAPICursor]:
  134. # suppress unconditional warning emitted by aiomysql
  135. class Cursor(self.aiomysql.Cursor): # type: ignore[misc, name-defined]
  136. async def _show_warnings(
  137. self, conn: AsyncIODBAPIConnection
  138. ) -> None:
  139. pass
  140. class SSCursor(self.aiomysql.SSCursor): # type: ignore[misc, name-defined] # noqa: E501
  141. async def _show_warnings(
  142. self, conn: AsyncIODBAPIConnection
  143. ) -> None:
  144. pass
  145. return Cursor, SSCursor # type: ignore[return-value]
  146. class MySQLDialect_aiomysql(MySQLDialect_pymysql):
  147. driver = "aiomysql"
  148. supports_statement_cache = True
  149. supports_server_side_cursors = True
  150. _sscursor = AsyncAdapt_aiomysql_ss_cursor
  151. is_async = True
  152. has_terminate = True
  153. @classmethod
  154. def import_dbapi(cls) -> AsyncAdapt_aiomysql_dbapi:
  155. return AsyncAdapt_aiomysql_dbapi(
  156. __import__("aiomysql"), __import__("pymysql")
  157. )
  158. @classmethod
  159. def get_pool_class(cls, url: URL) -> type:
  160. async_fallback = url.query.get("async_fallback", False)
  161. if util.asbool(async_fallback):
  162. return pool.FallbackAsyncAdaptedQueuePool
  163. else:
  164. return pool.AsyncAdaptedQueuePool
  165. def do_terminate(self, dbapi_connection: DBAPIConnection) -> None:
  166. dbapi_connection.terminate()
  167. def create_connect_args(
  168. self, url: URL, _translate_args: Optional[Dict[str, Any]] = None
  169. ) -> ConnectArgsType:
  170. return super().create_connect_args(
  171. url, _translate_args=dict(username="user", database="db")
  172. )
  173. def is_disconnect(
  174. self,
  175. e: DBAPIModule.Error,
  176. connection: Optional[Union[PoolProxiedConnection, DBAPIConnection]],
  177. cursor: Optional[DBAPICursor],
  178. ) -> bool:
  179. if super().is_disconnect(e, connection, cursor):
  180. return True
  181. else:
  182. str_e = str(e).lower()
  183. return "not connected" in str_e
  184. def _found_rows_client_flag(self) -> int:
  185. from pymysql.constants import CLIENT # type: ignore
  186. return CLIENT.FOUND_ROWS # type: ignore[no-any-return]
  187. def get_driver_connection(
  188. self, connection: DBAPIConnection
  189. ) -> AsyncIODBAPIConnection:
  190. return connection._connection # type: ignore[no-any-return]
  191. dialect = MySQLDialect_aiomysql