concurrency.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. # util/concurrency.py
  2. # Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. # mypy: allow-untyped-defs, allow-untyped-calls
  8. from __future__ import annotations
  9. import asyncio # noqa
  10. import typing
  11. from typing import Any
  12. from typing import Callable
  13. from typing import Coroutine
  14. from typing import TypeVar
  15. have_greenlet = False
  16. greenlet_error = None
  17. try:
  18. import greenlet # type: ignore[import-untyped,unused-ignore] # noqa: F401,E501
  19. except ImportError as e:
  20. greenlet_error = str(e)
  21. pass
  22. else:
  23. have_greenlet = True
  24. from ._concurrency_py3k import await_only as await_only
  25. from ._concurrency_py3k import await_fallback as await_fallback
  26. from ._concurrency_py3k import in_greenlet as in_greenlet
  27. from ._concurrency_py3k import greenlet_spawn as greenlet_spawn
  28. from ._concurrency_py3k import is_exit_exception as is_exit_exception
  29. from ._concurrency_py3k import AsyncAdaptedLock as AsyncAdaptedLock
  30. from ._concurrency_py3k import _Runner
  31. _T = TypeVar("_T")
  32. class _AsyncUtil:
  33. """Asyncio util for test suite/ util only"""
  34. def __init__(self) -> None:
  35. if have_greenlet:
  36. self.runner = _Runner()
  37. def run(
  38. self,
  39. fn: Callable[..., Coroutine[Any, Any, _T]],
  40. *args: Any,
  41. **kwargs: Any,
  42. ) -> _T:
  43. """Run coroutine on the loop"""
  44. return self.runner.run(fn(*args, **kwargs))
  45. def run_in_greenlet(
  46. self, fn: Callable[..., _T], *args: Any, **kwargs: Any
  47. ) -> _T:
  48. """Run sync function in greenlet. Support nested calls"""
  49. if have_greenlet:
  50. if self.runner.get_loop().is_running():
  51. return fn(*args, **kwargs)
  52. else:
  53. return self.runner.run(greenlet_spawn(fn, *args, **kwargs))
  54. else:
  55. return fn(*args, **kwargs)
  56. def close(self) -> None:
  57. if have_greenlet:
  58. self.runner.close()
  59. if not typing.TYPE_CHECKING and not have_greenlet:
  60. def _not_implemented():
  61. # this conditional is to prevent pylance from considering
  62. # greenlet_spawn() etc as "no return" and dimming out code below it
  63. if have_greenlet:
  64. return None
  65. raise ValueError(
  66. "the greenlet library is required to use this function."
  67. " %s" % greenlet_error
  68. if greenlet_error
  69. else ""
  70. )
  71. def is_exit_exception(e): # noqa: F811
  72. return not isinstance(e, Exception)
  73. def await_only(thing): # type: ignore # noqa: F811
  74. _not_implemented()
  75. def await_fallback(thing): # type: ignore # noqa: F811
  76. return thing
  77. def in_greenlet(): # type: ignore # noqa: F811
  78. _not_implemented()
  79. def greenlet_spawn(fn, *args, **kw): # type: ignore # noqa: F811
  80. _not_implemented()
  81. def AsyncAdaptedLock(*args, **kw): # type: ignore # noqa: F811
  82. _not_implemented()
  83. def _util_async_run(fn, *arg, **kw): # type: ignore # noqa: F811
  84. return fn(*arg, **kw)
  85. def _util_async_run_coroutine_function(fn, *arg, **kw): # type: ignore # noqa: F811,E501
  86. _not_implemented()