compat.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. # util/compat.py
  2. # Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. # mypy: allow-untyped-defs, allow-untyped-calls
  8. """Handle Python version/platform incompatibilities."""
  9. from __future__ import annotations
  10. import base64
  11. import dataclasses
  12. import hashlib
  13. import inspect
  14. import operator
  15. import platform
  16. import sys
  17. import typing
  18. from typing import Any
  19. from typing import Callable
  20. from typing import Dict
  21. from typing import Iterable
  22. from typing import List
  23. from typing import Mapping
  24. from typing import Optional
  25. from typing import Sequence
  26. from typing import Set
  27. from typing import Tuple
  28. from typing import Type
  29. from typing import TypeVar
  30. py314b1 = sys.version_info >= (3, 14, 0, "beta", 1)
  31. py314 = sys.version_info >= (3, 14)
  32. py313 = sys.version_info >= (3, 13)
  33. py312 = sys.version_info >= (3, 12)
  34. py311 = sys.version_info >= (3, 11)
  35. py310 = sys.version_info >= (3, 10)
  36. py39 = sys.version_info >= (3, 9)
  37. py38 = sys.version_info >= (3, 8)
  38. pypy = platform.python_implementation() == "PyPy"
  39. cpython = platform.python_implementation() == "CPython"
  40. win32 = sys.platform.startswith("win")
  41. osx = sys.platform.startswith("darwin")
  42. arm = "aarch" in platform.machine().lower()
  43. is64bit = sys.maxsize > 2**32
  44. has_refcount_gc = bool(cpython)
  45. dottedgetter = operator.attrgetter
  46. _T_co = TypeVar("_T_co", covariant=True)
  47. class FullArgSpec(typing.NamedTuple):
  48. args: List[str]
  49. varargs: Optional[str]
  50. varkw: Optional[str]
  51. defaults: Optional[Tuple[Any, ...]]
  52. kwonlyargs: List[str]
  53. kwonlydefaults: Optional[Dict[str, Any]]
  54. annotations: Dict[str, Any]
  55. def inspect_getfullargspec(func: Callable[..., Any]) -> FullArgSpec:
  56. """Fully vendored version of getfullargspec from Python 3.3."""
  57. if inspect.ismethod(func):
  58. func = func.__func__
  59. if not inspect.isfunction(func):
  60. raise TypeError(f"{func!r} is not a Python function")
  61. co = func.__code__
  62. if not inspect.iscode(co):
  63. raise TypeError(f"{co!r} is not a code object")
  64. nargs = co.co_argcount
  65. names = co.co_varnames
  66. nkwargs = co.co_kwonlyargcount
  67. args = list(names[:nargs])
  68. kwonlyargs = list(names[nargs : nargs + nkwargs])
  69. nargs += nkwargs
  70. varargs = None
  71. if co.co_flags & inspect.CO_VARARGS:
  72. varargs = co.co_varnames[nargs]
  73. nargs = nargs + 1
  74. varkw = None
  75. if co.co_flags & inspect.CO_VARKEYWORDS:
  76. varkw = co.co_varnames[nargs]
  77. return FullArgSpec(
  78. args,
  79. varargs,
  80. varkw,
  81. func.__defaults__,
  82. kwonlyargs,
  83. func.__kwdefaults__,
  84. func.__annotations__,
  85. )
  86. if py39:
  87. # python stubs don't have a public type for this. not worth
  88. # making a protocol
  89. def md5_not_for_security() -> Any:
  90. return hashlib.md5(usedforsecurity=False)
  91. else:
  92. def md5_not_for_security() -> Any:
  93. return hashlib.md5()
  94. if typing.TYPE_CHECKING or py38:
  95. from importlib import metadata as importlib_metadata
  96. else:
  97. import importlib_metadata # noqa
  98. if typing.TYPE_CHECKING or py39:
  99. # pep 584 dict union
  100. dict_union = operator.or_ # noqa
  101. else:
  102. def dict_union(a: dict, b: dict) -> dict:
  103. a = a.copy()
  104. a.update(b)
  105. return a
  106. if py310:
  107. anext_ = anext
  108. else:
  109. _NOT_PROVIDED = object()
  110. from collections.abc import AsyncIterator
  111. async def anext_(async_iterator, default=_NOT_PROVIDED):
  112. """vendored from https://github.com/python/cpython/pull/8895"""
  113. if not isinstance(async_iterator, AsyncIterator):
  114. raise TypeError(
  115. f"anext expected an AsyncIterator, got {type(async_iterator)}"
  116. )
  117. anxt = type(async_iterator).__anext__
  118. try:
  119. return await anxt(async_iterator)
  120. except StopAsyncIteration:
  121. if default is _NOT_PROVIDED:
  122. raise
  123. return default
  124. def importlib_metadata_get(group):
  125. ep = importlib_metadata.entry_points()
  126. if typing.TYPE_CHECKING or hasattr(ep, "select"):
  127. return ep.select(group=group)
  128. else:
  129. return ep.get(group, ())
  130. def b(s):
  131. return s.encode("latin-1")
  132. def b64decode(x: str) -> bytes:
  133. return base64.b64decode(x.encode("ascii"))
  134. def b64encode(x: bytes) -> str:
  135. return base64.b64encode(x).decode("ascii")
  136. def decode_backslashreplace(text: bytes, encoding: str) -> str:
  137. return text.decode(encoding, errors="backslashreplace")
  138. def cmp(a, b):
  139. return (a > b) - (a < b)
  140. def _formatannotation(annotation, base_module=None):
  141. """vendored from python 3.7"""
  142. if isinstance(annotation, str):
  143. return annotation
  144. if getattr(annotation, "__module__", None) == "typing":
  145. return repr(annotation).replace("typing.", "").replace("~", "")
  146. if isinstance(annotation, type):
  147. if annotation.__module__ in ("builtins", base_module):
  148. return repr(annotation.__qualname__)
  149. return annotation.__module__ + "." + annotation.__qualname__
  150. elif isinstance(annotation, typing.TypeVar):
  151. return repr(annotation).replace("~", "")
  152. return repr(annotation).replace("~", "")
  153. def inspect_formatargspec(
  154. args: List[str],
  155. varargs: Optional[str] = None,
  156. varkw: Optional[str] = None,
  157. defaults: Optional[Sequence[Any]] = None,
  158. kwonlyargs: Optional[Sequence[str]] = (),
  159. kwonlydefaults: Optional[Mapping[str, Any]] = {},
  160. annotations: Mapping[str, Any] = {},
  161. formatarg: Callable[[str], str] = str,
  162. formatvarargs: Callable[[str], str] = lambda name: "*" + name,
  163. formatvarkw: Callable[[str], str] = lambda name: "**" + name,
  164. formatvalue: Callable[[Any], str] = lambda value: "=" + repr(value),
  165. formatreturns: Callable[[Any], str] = lambda text: " -> " + str(text),
  166. formatannotation: Callable[[Any], str] = _formatannotation,
  167. ) -> str:
  168. """Copy formatargspec from python 3.7 standard library.
  169. Python 3 has deprecated formatargspec and requested that Signature
  170. be used instead, however this requires a full reimplementation
  171. of formatargspec() in terms of creating Parameter objects and such.
  172. Instead of introducing all the object-creation overhead and having
  173. to reinvent from scratch, just copy their compatibility routine.
  174. Ultimately we would need to rewrite our "decorator" routine completely
  175. which is not really worth it right now, until all Python 2.x support
  176. is dropped.
  177. """
  178. kwonlydefaults = kwonlydefaults or {}
  179. annotations = annotations or {}
  180. def formatargandannotation(arg):
  181. result = formatarg(arg)
  182. if arg in annotations:
  183. result += ": " + formatannotation(annotations[arg])
  184. return result
  185. specs = []
  186. if defaults:
  187. firstdefault = len(args) - len(defaults)
  188. else:
  189. firstdefault = -1
  190. for i, arg in enumerate(args):
  191. spec = formatargandannotation(arg)
  192. if defaults and i >= firstdefault:
  193. spec = spec + formatvalue(defaults[i - firstdefault])
  194. specs.append(spec)
  195. if varargs is not None:
  196. specs.append(formatvarargs(formatargandannotation(varargs)))
  197. else:
  198. if kwonlyargs:
  199. specs.append("*")
  200. if kwonlyargs:
  201. for kwonlyarg in kwonlyargs:
  202. spec = formatargandannotation(kwonlyarg)
  203. if kwonlydefaults and kwonlyarg in kwonlydefaults:
  204. spec += formatvalue(kwonlydefaults[kwonlyarg])
  205. specs.append(spec)
  206. if varkw is not None:
  207. specs.append(formatvarkw(formatargandannotation(varkw)))
  208. result = "(" + ", ".join(specs) + ")"
  209. if "return" in annotations:
  210. result += formatreturns(formatannotation(annotations["return"]))
  211. return result
  212. def dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]:
  213. """Return a sequence of all dataclasses.Field objects associated
  214. with a class as an already processed dataclass.
  215. The class must **already be a dataclass** for Field objects to be returned.
  216. """
  217. if dataclasses.is_dataclass(cls):
  218. return dataclasses.fields(cls)
  219. else:
  220. return []
  221. def local_dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]:
  222. """Return a sequence of all dataclasses.Field objects associated with
  223. an already processed dataclass, excluding those that originate from a
  224. superclass.
  225. The class must **already be a dataclass** for Field objects to be returned.
  226. """
  227. if dataclasses.is_dataclass(cls):
  228. super_fields: Set[dataclasses.Field[Any]] = set()
  229. for sup in cls.__bases__:
  230. super_fields.update(dataclass_fields(sup))
  231. return [f for f in dataclasses.fields(cls) if f not in super_fields]
  232. else:
  233. return []