impl.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579
  1. # pool/impl.py
  2. # Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. """Pool implementation classes."""
  8. from __future__ import annotations
  9. import threading
  10. import traceback
  11. import typing
  12. from typing import Any
  13. from typing import cast
  14. from typing import List
  15. from typing import Optional
  16. from typing import Set
  17. from typing import Type
  18. from typing import TYPE_CHECKING
  19. from typing import Union
  20. import weakref
  21. from .base import _AsyncConnDialect
  22. from .base import _ConnectionFairy
  23. from .base import _ConnectionRecord
  24. from .base import _CreatorFnType
  25. from .base import _CreatorWRecFnType
  26. from .base import ConnectionPoolEntry
  27. from .base import Pool
  28. from .base import PoolProxiedConnection
  29. from .. import exc
  30. from .. import util
  31. from ..util import chop_traceback
  32. from ..util import queue as sqla_queue
  33. from ..util.typing import Literal
  34. if typing.TYPE_CHECKING:
  35. from ..engine.interfaces import DBAPIConnection
  36. class QueuePool(Pool):
  37. """A :class:`_pool.Pool`
  38. that imposes a limit on the number of open connections.
  39. :class:`.QueuePool` is the default pooling implementation used for
  40. all :class:`_engine.Engine` objects other than SQLite with a ``:memory:``
  41. database.
  42. The :class:`.QueuePool` class **is not compatible** with asyncio and
  43. :func:`_asyncio.create_async_engine`. The
  44. :class:`.AsyncAdaptedQueuePool` class is used automatically when
  45. using :func:`_asyncio.create_async_engine`, if no other kind of pool
  46. is specified.
  47. .. seealso::
  48. :class:`.AsyncAdaptedQueuePool`
  49. """
  50. _is_asyncio = False
  51. _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = (
  52. sqla_queue.Queue
  53. )
  54. _pool: sqla_queue.QueueCommon[ConnectionPoolEntry]
  55. def __init__(
  56. self,
  57. creator: Union[_CreatorFnType, _CreatorWRecFnType],
  58. pool_size: int = 5,
  59. max_overflow: int = 10,
  60. timeout: float = 30.0,
  61. use_lifo: bool = False,
  62. **kw: Any,
  63. ):
  64. r"""
  65. Construct a QueuePool.
  66. :param creator: a callable function that returns a DB-API
  67. connection object, same as that of :paramref:`_pool.Pool.creator`.
  68. :param pool_size: The size of the pool to be maintained,
  69. defaults to 5. This is the largest number of connections that
  70. will be kept persistently in the pool. Note that the pool
  71. begins with no connections; once this number of connections
  72. is requested, that number of connections will remain.
  73. ``pool_size`` can be set to 0 to indicate no size limit; to
  74. disable pooling, use a :class:`~sqlalchemy.pool.NullPool`
  75. instead.
  76. :param max_overflow: The maximum overflow size of the
  77. pool. When the number of checked-out connections reaches the
  78. size set in pool_size, additional connections will be
  79. returned up to this limit. When those additional connections
  80. are returned to the pool, they are disconnected and
  81. discarded. It follows then that the total number of
  82. simultaneous connections the pool will allow is pool_size +
  83. `max_overflow`, and the total number of "sleeping"
  84. connections the pool will allow is pool_size. `max_overflow`
  85. can be set to -1 to indicate no overflow limit; no limit
  86. will be placed on the total number of concurrent
  87. connections. Defaults to 10.
  88. :param timeout: The number of seconds to wait before giving up
  89. on returning a connection. Defaults to 30.0. This can be a float
  90. but is subject to the limitations of Python time functions which
  91. may not be reliable in the tens of milliseconds.
  92. :param use_lifo: use LIFO (last-in-first-out) when retrieving
  93. connections instead of FIFO (first-in-first-out). Using LIFO, a
  94. server-side timeout scheme can reduce the number of connections used
  95. during non-peak periods of use. When planning for server-side
  96. timeouts, ensure that a recycle or pre-ping strategy is in use to
  97. gracefully handle stale connections.
  98. .. versionadded:: 1.3
  99. .. seealso::
  100. :ref:`pool_use_lifo`
  101. :ref:`pool_disconnects`
  102. :param \**kw: Other keyword arguments including
  103. :paramref:`_pool.Pool.recycle`, :paramref:`_pool.Pool.echo`,
  104. :paramref:`_pool.Pool.reset_on_return` and others are passed to the
  105. :class:`_pool.Pool` constructor.
  106. """
  107. Pool.__init__(self, creator, **kw)
  108. self._pool = self._queue_class(pool_size, use_lifo=use_lifo)
  109. self._overflow = 0 - pool_size
  110. self._max_overflow = -1 if pool_size == 0 else max_overflow
  111. self._timeout = timeout
  112. self._overflow_lock = threading.Lock()
  113. def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
  114. try:
  115. self._pool.put(record, False)
  116. except sqla_queue.Full:
  117. try:
  118. record.close()
  119. finally:
  120. self._dec_overflow()
  121. def _do_get(self) -> ConnectionPoolEntry:
  122. use_overflow = self._max_overflow > -1
  123. wait = use_overflow and self._overflow >= self._max_overflow
  124. try:
  125. return self._pool.get(wait, self._timeout)
  126. except sqla_queue.Empty:
  127. # don't do things inside of "except Empty", because when we say
  128. # we timed out or can't connect and raise, Python 3 tells
  129. # people the real error is queue.Empty which it isn't.
  130. pass
  131. if use_overflow and self._overflow >= self._max_overflow:
  132. if not wait:
  133. return self._do_get()
  134. else:
  135. raise exc.TimeoutError(
  136. "QueuePool limit of size %d overflow %d reached, "
  137. "connection timed out, timeout %0.2f"
  138. % (self.size(), self.overflow(), self._timeout),
  139. code="3o7r",
  140. )
  141. if self._inc_overflow():
  142. try:
  143. return self._create_connection()
  144. except:
  145. with util.safe_reraise():
  146. self._dec_overflow()
  147. raise
  148. else:
  149. return self._do_get()
  150. def _inc_overflow(self) -> bool:
  151. if self._max_overflow == -1:
  152. self._overflow += 1
  153. return True
  154. with self._overflow_lock:
  155. if self._overflow < self._max_overflow:
  156. self._overflow += 1
  157. return True
  158. else:
  159. return False
  160. def _dec_overflow(self) -> Literal[True]:
  161. if self._max_overflow == -1:
  162. self._overflow -= 1
  163. return True
  164. with self._overflow_lock:
  165. self._overflow -= 1
  166. return True
  167. def recreate(self) -> QueuePool:
  168. self.logger.info("Pool recreating")
  169. return self.__class__(
  170. self._creator,
  171. pool_size=self._pool.maxsize,
  172. max_overflow=self._max_overflow,
  173. pre_ping=self._pre_ping,
  174. use_lifo=self._pool.use_lifo,
  175. timeout=self._timeout,
  176. recycle=self._recycle,
  177. echo=self.echo,
  178. logging_name=self._orig_logging_name,
  179. reset_on_return=self._reset_on_return,
  180. _dispatch=self.dispatch,
  181. dialect=self._dialect,
  182. )
  183. def dispose(self) -> None:
  184. while True:
  185. try:
  186. conn = self._pool.get(False)
  187. conn.close()
  188. except sqla_queue.Empty:
  189. break
  190. self._overflow = 0 - self.size()
  191. self.logger.info("Pool disposed. %s", self.status())
  192. def status(self) -> str:
  193. return (
  194. "Pool size: %d Connections in pool: %d "
  195. "Current Overflow: %d Current Checked out "
  196. "connections: %d"
  197. % (
  198. self.size(),
  199. self.checkedin(),
  200. self.overflow(),
  201. self.checkedout(),
  202. )
  203. )
  204. def size(self) -> int:
  205. return self._pool.maxsize
  206. def timeout(self) -> float:
  207. return self._timeout
  208. def checkedin(self) -> int:
  209. return self._pool.qsize()
  210. def overflow(self) -> int:
  211. return self._overflow if self._pool.maxsize else 0
  212. def checkedout(self) -> int:
  213. return self._pool.maxsize - self._pool.qsize() + self._overflow
  214. class AsyncAdaptedQueuePool(QueuePool):
  215. """An asyncio-compatible version of :class:`.QueuePool`.
  216. This pool is used by default when using :class:`.AsyncEngine` engines that
  217. were generated from :func:`_asyncio.create_async_engine`. It uses an
  218. asyncio-compatible queue implementation that does not use
  219. ``threading.Lock``.
  220. The arguments and operation of :class:`.AsyncAdaptedQueuePool` are
  221. otherwise identical to that of :class:`.QueuePool`.
  222. """
  223. _is_asyncio = True
  224. _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = (
  225. sqla_queue.AsyncAdaptedQueue
  226. )
  227. _dialect = _AsyncConnDialect()
  228. class FallbackAsyncAdaptedQueuePool(AsyncAdaptedQueuePool):
  229. _queue_class = sqla_queue.FallbackAsyncAdaptedQueue # type: ignore[assignment] # noqa: E501
  230. class NullPool(Pool):
  231. """A Pool which does not pool connections.
  232. Instead it literally opens and closes the underlying DB-API connection
  233. per each connection open/close.
  234. Reconnect-related functions such as ``recycle`` and connection
  235. invalidation are not supported by this Pool implementation, since
  236. no connections are held persistently.
  237. The :class:`.NullPool` class **is compatible** with asyncio and
  238. :func:`_asyncio.create_async_engine`.
  239. """
  240. def status(self) -> str:
  241. return "NullPool"
  242. def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
  243. record.close()
  244. def _do_get(self) -> ConnectionPoolEntry:
  245. return self._create_connection()
  246. def recreate(self) -> NullPool:
  247. self.logger.info("Pool recreating")
  248. return self.__class__(
  249. self._creator,
  250. recycle=self._recycle,
  251. echo=self.echo,
  252. logging_name=self._orig_logging_name,
  253. reset_on_return=self._reset_on_return,
  254. pre_ping=self._pre_ping,
  255. _dispatch=self.dispatch,
  256. dialect=self._dialect,
  257. )
  258. def dispose(self) -> None:
  259. pass
  260. class SingletonThreadPool(Pool):
  261. """A Pool that maintains one connection per thread.
  262. Maintains one connection per each thread, never moving a connection to a
  263. thread other than the one which it was created in.
  264. .. warning:: the :class:`.SingletonThreadPool` will call ``.close()``
  265. on arbitrary connections that exist beyond the size setting of
  266. ``pool_size``, e.g. if more unique **thread identities**
  267. than what ``pool_size`` states are used. This cleanup is
  268. non-deterministic and not sensitive to whether or not the connections
  269. linked to those thread identities are currently in use.
  270. :class:`.SingletonThreadPool` may be improved in a future release,
  271. however in its current status it is generally used only for test
  272. scenarios using a SQLite ``:memory:`` database and is not recommended
  273. for production use.
  274. The :class:`.SingletonThreadPool` class **is not compatible** with asyncio
  275. and :func:`_asyncio.create_async_engine`.
  276. Options are the same as those of :class:`_pool.Pool`, as well as:
  277. :param pool_size: The number of threads in which to maintain connections
  278. at once. Defaults to five.
  279. :class:`.SingletonThreadPool` is used by the SQLite dialect
  280. automatically when a memory-based database is used.
  281. See :ref:`sqlite_toplevel`.
  282. """
  283. _is_asyncio = False
  284. def __init__(
  285. self,
  286. creator: Union[_CreatorFnType, _CreatorWRecFnType],
  287. pool_size: int = 5,
  288. **kw: Any,
  289. ):
  290. Pool.__init__(self, creator, **kw)
  291. self._conn = threading.local()
  292. self._fairy = threading.local()
  293. self._all_conns: Set[ConnectionPoolEntry] = set()
  294. self.size = pool_size
  295. def recreate(self) -> SingletonThreadPool:
  296. self.logger.info("Pool recreating")
  297. return self.__class__(
  298. self._creator,
  299. pool_size=self.size,
  300. recycle=self._recycle,
  301. echo=self.echo,
  302. pre_ping=self._pre_ping,
  303. logging_name=self._orig_logging_name,
  304. reset_on_return=self._reset_on_return,
  305. _dispatch=self.dispatch,
  306. dialect=self._dialect,
  307. )
  308. def dispose(self) -> None:
  309. """Dispose of this pool."""
  310. for conn in self._all_conns:
  311. try:
  312. conn.close()
  313. except Exception:
  314. # pysqlite won't even let you close a conn from a thread
  315. # that didn't create it
  316. pass
  317. self._all_conns.clear()
  318. def _cleanup(self) -> None:
  319. while len(self._all_conns) >= self.size:
  320. c = self._all_conns.pop()
  321. c.close()
  322. def status(self) -> str:
  323. return "SingletonThreadPool id:%d size: %d" % (
  324. id(self),
  325. len(self._all_conns),
  326. )
  327. def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
  328. try:
  329. del self._fairy.current
  330. except AttributeError:
  331. pass
  332. def _do_get(self) -> ConnectionPoolEntry:
  333. try:
  334. if TYPE_CHECKING:
  335. c = cast(ConnectionPoolEntry, self._conn.current())
  336. else:
  337. c = self._conn.current()
  338. if c:
  339. return c
  340. except AttributeError:
  341. pass
  342. c = self._create_connection()
  343. self._conn.current = weakref.ref(c)
  344. if len(self._all_conns) >= self.size:
  345. self._cleanup()
  346. self._all_conns.add(c)
  347. return c
  348. def connect(self) -> PoolProxiedConnection:
  349. # vendored from Pool to include the now removed use_threadlocal
  350. # behavior
  351. try:
  352. rec = cast(_ConnectionFairy, self._fairy.current())
  353. except AttributeError:
  354. pass
  355. else:
  356. if rec is not None:
  357. return rec._checkout_existing()
  358. return _ConnectionFairy._checkout(self, self._fairy)
  359. class StaticPool(Pool):
  360. """A Pool of exactly one connection, used for all requests.
  361. Reconnect-related functions such as ``recycle`` and connection
  362. invalidation (which is also used to support auto-reconnect) are only
  363. partially supported right now and may not yield good results.
  364. The :class:`.StaticPool` class **is compatible** with asyncio and
  365. :func:`_asyncio.create_async_engine`.
  366. """
  367. @util.memoized_property
  368. def connection(self) -> _ConnectionRecord:
  369. return _ConnectionRecord(self)
  370. def status(self) -> str:
  371. return "StaticPool"
  372. def dispose(self) -> None:
  373. if (
  374. "connection" in self.__dict__
  375. and self.connection.dbapi_connection is not None
  376. ):
  377. self.connection.close()
  378. del self.__dict__["connection"]
  379. def recreate(self) -> StaticPool:
  380. self.logger.info("Pool recreating")
  381. return self.__class__(
  382. creator=self._creator,
  383. recycle=self._recycle,
  384. reset_on_return=self._reset_on_return,
  385. pre_ping=self._pre_ping,
  386. echo=self.echo,
  387. logging_name=self._orig_logging_name,
  388. _dispatch=self.dispatch,
  389. dialect=self._dialect,
  390. )
  391. def _transfer_from(self, other_static_pool: StaticPool) -> None:
  392. # used by the test suite to make a new engine / pool without
  393. # losing the state of an existing SQLite :memory: connection
  394. def creator(rec: ConnectionPoolEntry) -> DBAPIConnection:
  395. conn = other_static_pool.connection.dbapi_connection
  396. assert conn is not None
  397. return conn
  398. self._invoke_creator = creator
  399. def _create_connection(self) -> ConnectionPoolEntry:
  400. raise NotImplementedError()
  401. def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
  402. pass
  403. def _do_get(self) -> ConnectionPoolEntry:
  404. rec = self.connection
  405. if rec._is_hard_or_soft_invalidated():
  406. del self.__dict__["connection"]
  407. rec = self.connection
  408. return rec
  409. class AssertionPool(Pool):
  410. """A :class:`_pool.Pool` that allows at most one checked out connection at
  411. any given time.
  412. This will raise an exception if more than one connection is checked out
  413. at a time. Useful for debugging code that is using more connections
  414. than desired.
  415. The :class:`.AssertionPool` class **is compatible** with asyncio and
  416. :func:`_asyncio.create_async_engine`.
  417. """
  418. _conn: Optional[ConnectionPoolEntry]
  419. _checkout_traceback: Optional[List[str]]
  420. def __init__(self, *args: Any, **kw: Any):
  421. self._conn = None
  422. self._checked_out = False
  423. self._store_traceback = kw.pop("store_traceback", True)
  424. self._checkout_traceback = None
  425. Pool.__init__(self, *args, **kw)
  426. def status(self) -> str:
  427. return "AssertionPool"
  428. def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
  429. if not self._checked_out:
  430. raise AssertionError("connection is not checked out")
  431. self._checked_out = False
  432. assert record is self._conn
  433. def dispose(self) -> None:
  434. self._checked_out = False
  435. if self._conn:
  436. self._conn.close()
  437. def recreate(self) -> AssertionPool:
  438. self.logger.info("Pool recreating")
  439. return self.__class__(
  440. self._creator,
  441. echo=self.echo,
  442. pre_ping=self._pre_ping,
  443. recycle=self._recycle,
  444. reset_on_return=self._reset_on_return,
  445. logging_name=self._orig_logging_name,
  446. _dispatch=self.dispatch,
  447. dialect=self._dialect,
  448. )
  449. def _do_get(self) -> ConnectionPoolEntry:
  450. if self._checked_out:
  451. if self._checkout_traceback:
  452. suffix = " at:\n%s" % "".join(
  453. chop_traceback(self._checkout_traceback)
  454. )
  455. else:
  456. suffix = ""
  457. raise AssertionError("connection is already checked out" + suffix)
  458. if not self._conn:
  459. self._conn = self._create_connection()
  460. self._checked_out = True
  461. if self._store_traceback:
  462. self._checkout_traceback = traceback.format_stack()
  463. return self._conn