_psycopg_common.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. # dialects/postgresql/_psycopg_common.py
  2. # Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. # mypy: ignore-errors
  8. from __future__ import annotations
  9. import decimal
  10. from .array import ARRAY as PGARRAY
  11. from .base import _DECIMAL_TYPES
  12. from .base import _FLOAT_TYPES
  13. from .base import _INT_TYPES
  14. from .base import PGDialect
  15. from .base import PGExecutionContext
  16. from .hstore import HSTORE
  17. from .pg_catalog import _SpaceVector
  18. from .pg_catalog import INT2VECTOR
  19. from .pg_catalog import OIDVECTOR
  20. from ... import exc
  21. from ... import types as sqltypes
  22. from ... import util
  23. from ...engine import processors
  24. _server_side_id = util.counter()
  25. class _PsycopgNumeric(sqltypes.Numeric):
  26. def bind_processor(self, dialect):
  27. return None
  28. def result_processor(self, dialect, coltype):
  29. if self.asdecimal:
  30. if coltype in _FLOAT_TYPES:
  31. return processors.to_decimal_processor_factory(
  32. decimal.Decimal, self._effective_decimal_return_scale
  33. )
  34. elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
  35. # psycopg returns Decimal natively for 1700
  36. return None
  37. else:
  38. raise exc.InvalidRequestError(
  39. "Unknown PG numeric type: %d" % coltype
  40. )
  41. else:
  42. if coltype in _FLOAT_TYPES:
  43. # psycopg returns float natively for 701
  44. return None
  45. elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
  46. return processors.to_float
  47. else:
  48. raise exc.InvalidRequestError(
  49. "Unknown PG numeric type: %d" % coltype
  50. )
  51. class _PsycopgFloat(_PsycopgNumeric):
  52. __visit_name__ = "float"
  53. class _PsycopgHStore(HSTORE):
  54. def bind_processor(self, dialect):
  55. if dialect._has_native_hstore:
  56. return None
  57. else:
  58. return super().bind_processor(dialect)
  59. def result_processor(self, dialect, coltype):
  60. if dialect._has_native_hstore:
  61. return None
  62. else:
  63. return super().result_processor(dialect, coltype)
  64. class _PsycopgARRAY(PGARRAY):
  65. render_bind_cast = True
  66. class _PsycopgINT2VECTOR(_SpaceVector, INT2VECTOR):
  67. pass
  68. class _PsycopgOIDVECTOR(_SpaceVector, OIDVECTOR):
  69. pass
  70. class _PGExecutionContext_common_psycopg(PGExecutionContext):
  71. def create_server_side_cursor(self):
  72. # use server-side cursors:
  73. # psycopg
  74. # https://www.psycopg.org/psycopg3/docs/advanced/cursors.html#server-side-cursors
  75. # psycopg2
  76. # https://www.psycopg.org/docs/usage.html#server-side-cursors
  77. ident = "c_%s_%s" % (hex(id(self))[2:], hex(_server_side_id())[2:])
  78. return self._dbapi_connection.cursor(ident)
  79. class _PGDialect_common_psycopg(PGDialect):
  80. supports_statement_cache = True
  81. supports_server_side_cursors = True
  82. default_paramstyle = "pyformat"
  83. _has_native_hstore = True
  84. colspecs = util.update_copy(
  85. PGDialect.colspecs,
  86. {
  87. sqltypes.Numeric: _PsycopgNumeric,
  88. sqltypes.Float: _PsycopgFloat,
  89. HSTORE: _PsycopgHStore,
  90. sqltypes.ARRAY: _PsycopgARRAY,
  91. INT2VECTOR: _PsycopgINT2VECTOR,
  92. OIDVECTOR: _PsycopgOIDVECTOR,
  93. },
  94. )
  95. def __init__(
  96. self,
  97. client_encoding=None,
  98. use_native_hstore=True,
  99. **kwargs,
  100. ):
  101. PGDialect.__init__(self, **kwargs)
  102. if not use_native_hstore:
  103. self._has_native_hstore = False
  104. self.use_native_hstore = use_native_hstore
  105. self.client_encoding = client_encoding
  106. def create_connect_args(self, url):
  107. opts = url.translate_connect_args(username="user", database="dbname")
  108. multihosts, multiports = self._split_multihost_from_url(url)
  109. if opts or url.query:
  110. if not opts:
  111. opts = {}
  112. if "port" in opts:
  113. opts["port"] = int(opts["port"])
  114. opts.update(url.query)
  115. if multihosts:
  116. opts["host"] = ",".join(multihosts)
  117. comma_ports = ",".join(str(p) if p else "" for p in multiports)
  118. if comma_ports:
  119. opts["port"] = comma_ports
  120. return ([], opts)
  121. else:
  122. # no connection arguments whatsoever; psycopg2.connect()
  123. # requires that "dsn" be present as a blank string.
  124. return ([""], opts)
  125. def get_isolation_level_values(self, dbapi_connection):
  126. return (
  127. "AUTOCOMMIT",
  128. "READ COMMITTED",
  129. "READ UNCOMMITTED",
  130. "REPEATABLE READ",
  131. "SERIALIZABLE",
  132. )
  133. def set_deferrable(self, connection, value):
  134. connection.deferrable = value
  135. def get_deferrable(self, connection):
  136. return connection.deferrable
  137. def _do_autocommit(self, connection, value):
  138. connection.autocommit = value
  139. def detect_autocommit_setting(self, dbapi_connection):
  140. return bool(dbapi_connection.autocommit)
  141. def do_ping(self, dbapi_connection):
  142. before_autocommit = dbapi_connection.autocommit
  143. if not before_autocommit:
  144. dbapi_connection.autocommit = True
  145. cursor = dbapi_connection.cursor()
  146. try:
  147. cursor.execute(self._dialect_specific_select_one)
  148. finally:
  149. cursor.close()
  150. if not before_autocommit and not dbapi_connection.closed:
  151. dbapi_connection.autocommit = before_autocommit
  152. return True