util.pyx 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. # cyextension/util.pyx
  2. # Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. from collections.abc import Mapping
  8. from sqlalchemy import exc
  9. cdef tuple _Empty_Tuple = ()
  10. cdef inline bint _is_mapping_or_tuple(object value):
  11. return isinstance(value, dict) or isinstance(value, tuple) or isinstance(value, Mapping)
  12. cdef inline bint _is_mapping(object value):
  13. return isinstance(value, dict) or isinstance(value, Mapping)
  14. def _distill_params_20(object params):
  15. if params is None:
  16. return _Empty_Tuple
  17. elif isinstance(params, list) or isinstance(params, tuple):
  18. if params and not _is_mapping(params[0]):
  19. raise exc.ArgumentError(
  20. "List argument must consist only of dictionaries"
  21. )
  22. return params
  23. elif _is_mapping(params):
  24. return [params]
  25. else:
  26. raise exc.ArgumentError("mapping or list expected for parameters")
  27. def _distill_raw_params(object params):
  28. if params is None:
  29. return _Empty_Tuple
  30. elif isinstance(params, list):
  31. if params and not _is_mapping_or_tuple(params[0]):
  32. raise exc.ArgumentError(
  33. "List argument must consist only of tuples or dictionaries"
  34. )
  35. return params
  36. elif _is_mapping_or_tuple(params):
  37. return [params]
  38. else:
  39. raise exc.ArgumentError("mapping or sequence expected for parameters")
  40. cdef class prefix_anon_map(dict):
  41. def __missing__(self, str key):
  42. cdef str derived
  43. cdef int anonymous_counter
  44. cdef dict self_dict = self
  45. derived = key.split(" ", 1)[1]
  46. anonymous_counter = self_dict.get(derived, 1)
  47. self_dict[derived] = anonymous_counter + 1
  48. value = f"{derived}_{anonymous_counter}"
  49. self_dict[key] = value
  50. return value
  51. cdef class cache_anon_map(dict):
  52. cdef int _index
  53. def __init__(self):
  54. self._index = 0
  55. def get_anon(self, obj):
  56. cdef long long idself
  57. cdef str id_
  58. cdef dict self_dict = self
  59. idself = id(obj)
  60. if idself in self_dict:
  61. return self_dict[idself], True
  62. else:
  63. id_ = self.__missing__(idself)
  64. return id_, False
  65. def __missing__(self, key):
  66. cdef str val
  67. cdef dict self_dict = self
  68. self_dict[key] = val = str(self._index)
  69. self._index += 1
  70. return val