_py_processors.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. # engine/_py_processors.py
  2. # Copyright (C) 2010-2025 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. # Copyright (C) 2010 Gaetan de Menten gdementen@gmail.com
  5. #
  6. # This module is part of SQLAlchemy and is released under
  7. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  8. """defines generic type conversion functions, as used in bind and result
  9. processors.
  10. They all share one common characteristic: None is passed through unchanged.
  11. """
  12. from __future__ import annotations
  13. import datetime
  14. from datetime import date as date_cls
  15. from datetime import datetime as datetime_cls
  16. from datetime import time as time_cls
  17. from decimal import Decimal
  18. import typing
  19. from typing import Any
  20. from typing import Callable
  21. from typing import Optional
  22. from typing import Type
  23. from typing import TypeVar
  24. from typing import Union
  25. _DT = TypeVar(
  26. "_DT", bound=Union[datetime.datetime, datetime.time, datetime.date]
  27. )
  28. def str_to_datetime_processor_factory(
  29. regexp: typing.Pattern[str], type_: Callable[..., _DT]
  30. ) -> Callable[[Optional[str]], Optional[_DT]]:
  31. rmatch = regexp.match
  32. # Even on python2.6 datetime.strptime is both slower than this code
  33. # and it does not support microseconds.
  34. has_named_groups = bool(regexp.groupindex)
  35. def process(value: Optional[str]) -> Optional[_DT]:
  36. if value is None:
  37. return None
  38. else:
  39. try:
  40. m = rmatch(value)
  41. except TypeError as err:
  42. raise ValueError(
  43. "Couldn't parse %s string '%r' "
  44. "- value is not a string." % (type_.__name__, value)
  45. ) from err
  46. if m is None:
  47. raise ValueError(
  48. "Couldn't parse %s string: "
  49. "'%s'" % (type_.__name__, value)
  50. )
  51. if has_named_groups:
  52. groups = m.groupdict(0)
  53. return type_(
  54. **dict(
  55. list(
  56. zip(
  57. iter(groups.keys()),
  58. list(map(int, iter(groups.values()))),
  59. )
  60. )
  61. )
  62. )
  63. else:
  64. return type_(*list(map(int, m.groups(0))))
  65. return process
  66. def to_decimal_processor_factory(
  67. target_class: Type[Decimal], scale: int
  68. ) -> Callable[[Optional[float]], Optional[Decimal]]:
  69. fstring = "%%.%df" % scale
  70. def process(value: Optional[float]) -> Optional[Decimal]:
  71. if value is None:
  72. return None
  73. else:
  74. return target_class(fstring % value)
  75. return process
  76. def to_float(value: Optional[Union[int, float]]) -> Optional[float]:
  77. if value is None:
  78. return None
  79. else:
  80. return float(value)
  81. def to_str(value: Optional[Any]) -> Optional[str]:
  82. if value is None:
  83. return None
  84. else:
  85. return str(value)
  86. def int_to_boolean(value: Optional[int]) -> Optional[bool]:
  87. if value is None:
  88. return None
  89. else:
  90. return bool(value)
  91. def str_to_datetime(value: Optional[str]) -> Optional[datetime.datetime]:
  92. if value is not None:
  93. dt_value = datetime_cls.fromisoformat(value)
  94. else:
  95. dt_value = None
  96. return dt_value
  97. def str_to_time(value: Optional[str]) -> Optional[datetime.time]:
  98. if value is not None:
  99. dt_value = time_cls.fromisoformat(value)
  100. else:
  101. dt_value = None
  102. return dt_value
  103. def str_to_date(value: Optional[str]) -> Optional[datetime.date]:
  104. if value is not None:
  105. dt_value = date_cls.fromisoformat(value)
  106. else:
  107. dt_value = None
  108. return dt_value