asyncmy.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. # dialects/mysql/asyncmy.py
  2. # Copyright (C) 2005-2025 the SQLAlchemy authors and contributors <see AUTHORS
  3. # file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. r"""
  8. .. dialect:: mysql+asyncmy
  9. :name: asyncmy
  10. :dbapi: asyncmy
  11. :connectstring: mysql+asyncmy://user:password@host:port/dbname[?key=value&key=value...]
  12. :url: https://github.com/long2ice/asyncmy
  13. Using a special asyncio mediation layer, the asyncmy dialect is usable
  14. as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>`
  15. extension package.
  16. This dialect should normally be used only with the
  17. :func:`_asyncio.create_async_engine` engine creation function::
  18. from sqlalchemy.ext.asyncio import create_async_engine
  19. engine = create_async_engine(
  20. "mysql+asyncmy://user:pass@hostname/dbname?charset=utf8mb4"
  21. )
  22. """ # noqa
  23. from __future__ import annotations
  24. from types import ModuleType
  25. from typing import Any
  26. from typing import NoReturn
  27. from typing import Optional
  28. from typing import TYPE_CHECKING
  29. from typing import Union
  30. from .pymysql import MySQLDialect_pymysql
  31. from ... import pool
  32. from ... import util
  33. from ...connectors.asyncio import AsyncAdapt_dbapi_connection
  34. from ...connectors.asyncio import AsyncAdapt_dbapi_cursor
  35. from ...connectors.asyncio import AsyncAdapt_dbapi_module
  36. from ...connectors.asyncio import AsyncAdapt_dbapi_ss_cursor
  37. from ...util.concurrency import await_fallback
  38. from ...util.concurrency import await_only
  39. if TYPE_CHECKING:
  40. from ...connectors.asyncio import AsyncIODBAPIConnection
  41. from ...connectors.asyncio import AsyncIODBAPICursor
  42. from ...engine.interfaces import ConnectArgsType
  43. from ...engine.interfaces import DBAPIConnection
  44. from ...engine.interfaces import DBAPICursor
  45. from ...engine.interfaces import DBAPIModule
  46. from ...engine.interfaces import PoolProxiedConnection
  47. from ...engine.url import URL
  48. class AsyncAdapt_asyncmy_cursor(AsyncAdapt_dbapi_cursor):
  49. __slots__ = ()
  50. class AsyncAdapt_asyncmy_ss_cursor(
  51. AsyncAdapt_dbapi_ss_cursor, AsyncAdapt_asyncmy_cursor
  52. ):
  53. __slots__ = ()
  54. def _make_new_cursor(
  55. self, connection: AsyncIODBAPIConnection
  56. ) -> AsyncIODBAPICursor:
  57. return connection.cursor(
  58. self._adapt_connection.dbapi.asyncmy.cursors.SSCursor
  59. )
  60. class AsyncAdapt_asyncmy_connection(AsyncAdapt_dbapi_connection):
  61. __slots__ = ()
  62. _cursor_cls = AsyncAdapt_asyncmy_cursor
  63. _ss_cursor_cls = AsyncAdapt_asyncmy_ss_cursor
  64. def _handle_exception(self, error: Exception) -> NoReturn:
  65. if isinstance(error, AttributeError):
  66. raise self.dbapi.InternalError(
  67. "network operation failed due to asyncmy attribute error"
  68. )
  69. raise error
  70. def ping(self, reconnect: bool) -> None:
  71. assert not reconnect
  72. return self.await_(self._do_ping())
  73. async def _do_ping(self) -> None:
  74. try:
  75. async with self._execute_mutex:
  76. await self._connection.ping(False)
  77. except Exception as error:
  78. self._handle_exception(error)
  79. def character_set_name(self) -> Optional[str]:
  80. return self._connection.character_set_name() # type: ignore[no-any-return] # noqa: E501
  81. def autocommit(self, value: Any) -> None:
  82. self.await_(self._connection.autocommit(value))
  83. def get_autocommit(self) -> bool:
  84. return self._connection.get_autocommit() # type: ignore
  85. def terminate(self) -> None:
  86. # it's not awaitable.
  87. self._connection.close()
  88. def close(self) -> None:
  89. self.await_(self._connection.ensure_closed())
  90. class AsyncAdaptFallback_asyncmy_connection(AsyncAdapt_asyncmy_connection):
  91. __slots__ = ()
  92. await_ = staticmethod(await_fallback)
  93. class AsyncAdapt_asyncmy_dbapi(AsyncAdapt_dbapi_module):
  94. def __init__(self, asyncmy: ModuleType):
  95. self.asyncmy = asyncmy
  96. self.paramstyle = "format"
  97. self._init_dbapi_attributes()
  98. def _init_dbapi_attributes(self) -> None:
  99. for name in (
  100. "Warning",
  101. "Error",
  102. "InterfaceError",
  103. "DataError",
  104. "DatabaseError",
  105. "OperationalError",
  106. "InterfaceError",
  107. "IntegrityError",
  108. "ProgrammingError",
  109. "InternalError",
  110. "NotSupportedError",
  111. ):
  112. setattr(self, name, getattr(self.asyncmy.errors, name))
  113. STRING = util.symbol("STRING")
  114. NUMBER = util.symbol("NUMBER")
  115. BINARY = util.symbol("BINARY")
  116. DATETIME = util.symbol("DATETIME")
  117. TIMESTAMP = util.symbol("TIMESTAMP")
  118. Binary = staticmethod(bytes)
  119. def connect(self, *arg: Any, **kw: Any) -> AsyncAdapt_asyncmy_connection:
  120. async_fallback = kw.pop("async_fallback", False)
  121. creator_fn = kw.pop("async_creator_fn", self.asyncmy.connect)
  122. if util.asbool(async_fallback):
  123. return AsyncAdaptFallback_asyncmy_connection(
  124. self,
  125. await_fallback(creator_fn(*arg, **kw)),
  126. )
  127. else:
  128. return AsyncAdapt_asyncmy_connection(
  129. self,
  130. await_only(creator_fn(*arg, **kw)),
  131. )
  132. class MySQLDialect_asyncmy(MySQLDialect_pymysql):
  133. driver = "asyncmy"
  134. supports_statement_cache = True
  135. supports_server_side_cursors = True
  136. _sscursor = AsyncAdapt_asyncmy_ss_cursor
  137. is_async = True
  138. has_terminate = True
  139. @classmethod
  140. def import_dbapi(cls) -> DBAPIModule:
  141. return AsyncAdapt_asyncmy_dbapi(__import__("asyncmy"))
  142. @classmethod
  143. def get_pool_class(cls, url: URL) -> type:
  144. async_fallback = url.query.get("async_fallback", False)
  145. if util.asbool(async_fallback):
  146. return pool.FallbackAsyncAdaptedQueuePool
  147. else:
  148. return pool.AsyncAdaptedQueuePool
  149. def do_terminate(self, dbapi_connection: DBAPIConnection) -> None:
  150. dbapi_connection.terminate()
  151. def create_connect_args(self, url: URL) -> ConnectArgsType: # type: ignore[override] # noqa: E501
  152. return super().create_connect_args(
  153. url, _translate_args=dict(username="user", database="db")
  154. )
  155. def is_disconnect(
  156. self,
  157. e: DBAPIModule.Error,
  158. connection: Optional[Union[PoolProxiedConnection, DBAPIConnection]],
  159. cursor: Optional[DBAPICursor],
  160. ) -> bool:
  161. if super().is_disconnect(e, connection, cursor):
  162. return True
  163. else:
  164. str_e = str(e).lower()
  165. return (
  166. "not connected" in str_e or "network operation failed" in str_e
  167. )
  168. def _found_rows_client_flag(self) -> int:
  169. from asyncmy.constants import CLIENT # type: ignore
  170. return CLIENT.FOUND_ROWS # type: ignore[no-any-return]
  171. def get_driver_connection(
  172. self, connection: DBAPIConnection
  173. ) -> AsyncIODBAPIConnection:
  174. return connection._connection # type: ignore[no-any-return]
  175. dialect = MySQLDialect_asyncmy