| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783 |
- # dialects/postgresql/psycopg.py
- # Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
- # <see AUTHORS file>
- #
- # This module is part of SQLAlchemy and is released under
- # the MIT License: https://www.opensource.org/licenses/mit-license.php
- # mypy: ignore-errors
- r"""
- .. dialect:: postgresql+psycopg
- :name: psycopg (a.k.a. psycopg 3)
- :dbapi: psycopg
- :connectstring: postgresql+psycopg://user:password@host:port/dbname[?key=value&key=value...]
- :url: https://pypi.org/project/psycopg/
- ``psycopg`` is the package and module name for version 3 of the ``psycopg``
- database driver, formerly known as ``psycopg2``. This driver is different
- enough from its ``psycopg2`` predecessor that SQLAlchemy supports it
- via a totally separate dialect; support for ``psycopg2`` is expected to remain
- for as long as that package continues to function for modern Python versions,
- and also remains the default dialect for the ``postgresql://`` dialect
- series.
- The SQLAlchemy ``psycopg`` dialect provides both a sync and an async
- implementation under the same dialect name. The proper version is
- selected depending on how the engine is created:
- * calling :func:`_sa.create_engine` with ``postgresql+psycopg://...`` will
- automatically select the sync version, e.g.::
- from sqlalchemy import create_engine
- sync_engine = create_engine(
- "postgresql+psycopg://scott:tiger@localhost/test"
- )
- * calling :func:`_asyncio.create_async_engine` with
- ``postgresql+psycopg://...`` will automatically select the async version,
- e.g.::
- from sqlalchemy.ext.asyncio import create_async_engine
- asyncio_engine = create_async_engine(
- "postgresql+psycopg://scott:tiger@localhost/test"
- )
- The asyncio version of the dialect may also be specified explicitly using the
- ``psycopg_async`` suffix, as::
- from sqlalchemy.ext.asyncio import create_async_engine
- asyncio_engine = create_async_engine(
- "postgresql+psycopg_async://scott:tiger@localhost/test"
- )
- .. seealso::
- :ref:`postgresql_psycopg2` - The SQLAlchemy ``psycopg``
- dialect shares most of its behavior with the ``psycopg2`` dialect.
- Further documentation is available there.
- Using a different Cursor class
- ------------------------------
- One of the differences between ``psycopg`` and the older ``psycopg2``
- is how bound parameters are handled: ``psycopg2`` would bind them
- client side, while ``psycopg`` by default will bind them server side.
- It's possible to configure ``psycopg`` to do client side binding by
- specifying the ``cursor_factory`` to be ``ClientCursor`` when creating
- the engine::
- from psycopg import ClientCursor
- client_side_engine = create_engine(
- "postgresql+psycopg://...",
- connect_args={"cursor_factory": ClientCursor},
- )
- Similarly when using an async engine the ``AsyncClientCursor`` can be
- specified::
- from psycopg import AsyncClientCursor
- client_side_engine = create_async_engine(
- "postgresql+psycopg://...",
- connect_args={"cursor_factory": AsyncClientCursor},
- )
- .. seealso::
- `Client-side-binding cursors <https://www.psycopg.org/psycopg3/docs/advanced/cursors.html#client-side-binding-cursors>`_
- """ # noqa
- from __future__ import annotations
- from collections import deque
- import logging
- import re
- from typing import cast
- from typing import TYPE_CHECKING
- from . import ranges
- from ._psycopg_common import _PGDialect_common_psycopg
- from ._psycopg_common import _PGExecutionContext_common_psycopg
- from .base import INTERVAL
- from .base import PGCompiler
- from .base import PGIdentifierPreparer
- from .base import REGCONFIG
- from .json import JSON
- from .json import JSONB
- from .json import JSONPathType
- from .types import CITEXT
- from ... import pool
- from ... import util
- from ...engine import AdaptedConnection
- from ...sql import sqltypes
- from ...util.concurrency import await_fallback
- from ...util.concurrency import await_only
- if TYPE_CHECKING:
- from typing import Iterable
- from psycopg import AsyncConnection
- logger = logging.getLogger("sqlalchemy.dialects.postgresql")
- class _PGString(sqltypes.String):
- render_bind_cast = True
- class _PGREGCONFIG(REGCONFIG):
- render_bind_cast = True
- class _PGJSON(JSON):
- def bind_processor(self, dialect):
- return self._make_bind_processor(None, dialect._psycopg_Json)
- def result_processor(self, dialect, coltype):
- return None
- class _PGJSONB(JSONB):
- def bind_processor(self, dialect):
- return self._make_bind_processor(None, dialect._psycopg_Jsonb)
- def result_processor(self, dialect, coltype):
- return None
- class _PGJSONIntIndexType(sqltypes.JSON.JSONIntIndexType):
- __visit_name__ = "json_int_index"
- render_bind_cast = True
- class _PGJSONStrIndexType(sqltypes.JSON.JSONStrIndexType):
- __visit_name__ = "json_str_index"
- render_bind_cast = True
- class _PGJSONPathType(JSONPathType):
- pass
- class _PGInterval(INTERVAL):
- render_bind_cast = True
- class _PGTimeStamp(sqltypes.DateTime):
- render_bind_cast = True
- class _PGDate(sqltypes.Date):
- render_bind_cast = True
- class _PGTime(sqltypes.Time):
- render_bind_cast = True
- class _PGInteger(sqltypes.Integer):
- render_bind_cast = True
- class _PGSmallInteger(sqltypes.SmallInteger):
- render_bind_cast = True
- class _PGNullType(sqltypes.NullType):
- render_bind_cast = True
- class _PGBigInteger(sqltypes.BigInteger):
- render_bind_cast = True
- class _PGBoolean(sqltypes.Boolean):
- render_bind_cast = True
- class _PsycopgRange(ranges.AbstractSingleRangeImpl):
- def bind_processor(self, dialect):
- psycopg_Range = cast(PGDialect_psycopg, dialect)._psycopg_Range
- def to_range(value):
- if isinstance(value, ranges.Range):
- value = psycopg_Range(
- value.lower, value.upper, value.bounds, value.empty
- )
- return value
- return to_range
- def result_processor(self, dialect, coltype):
- def to_range(value):
- if value is not None:
- value = ranges.Range(
- value._lower,
- value._upper,
- bounds=value._bounds if value._bounds else "[)",
- empty=not value._bounds,
- )
- return value
- return to_range
- class _PsycopgMultiRange(ranges.AbstractMultiRangeImpl):
- def bind_processor(self, dialect):
- psycopg_Range = cast(PGDialect_psycopg, dialect)._psycopg_Range
- psycopg_Multirange = cast(
- PGDialect_psycopg, dialect
- )._psycopg_Multirange
- NoneType = type(None)
- def to_range(value):
- if isinstance(value, (str, NoneType, psycopg_Multirange)):
- return value
- return psycopg_Multirange(
- [
- psycopg_Range(
- element.lower,
- element.upper,
- element.bounds,
- element.empty,
- )
- for element in cast("Iterable[ranges.Range]", value)
- ]
- )
- return to_range
- def result_processor(self, dialect, coltype):
- def to_range(value):
- if value is None:
- return None
- else:
- return ranges.MultiRange(
- ranges.Range(
- elem._lower,
- elem._upper,
- bounds=elem._bounds if elem._bounds else "[)",
- empty=not elem._bounds,
- )
- for elem in value
- )
- return to_range
- class PGExecutionContext_psycopg(_PGExecutionContext_common_psycopg):
- pass
- class PGCompiler_psycopg(PGCompiler):
- pass
- class PGIdentifierPreparer_psycopg(PGIdentifierPreparer):
- pass
- def _log_notices(diagnostic):
- logger.info("%s: %s", diagnostic.severity, diagnostic.message_primary)
- class PGDialect_psycopg(_PGDialect_common_psycopg):
- driver = "psycopg"
- supports_statement_cache = True
- supports_server_side_cursors = True
- default_paramstyle = "pyformat"
- supports_sane_multi_rowcount = True
- execution_ctx_cls = PGExecutionContext_psycopg
- statement_compiler = PGCompiler_psycopg
- preparer = PGIdentifierPreparer_psycopg
- psycopg_version = (0, 0)
- _has_native_hstore = True
- _psycopg_adapters_map = None
- colspecs = util.update_copy(
- _PGDialect_common_psycopg.colspecs,
- {
- sqltypes.String: _PGString,
- REGCONFIG: _PGREGCONFIG,
- JSON: _PGJSON,
- CITEXT: CITEXT,
- sqltypes.JSON: _PGJSON,
- JSONB: _PGJSONB,
- sqltypes.JSON.JSONPathType: _PGJSONPathType,
- sqltypes.JSON.JSONIntIndexType: _PGJSONIntIndexType,
- sqltypes.JSON.JSONStrIndexType: _PGJSONStrIndexType,
- sqltypes.Interval: _PGInterval,
- INTERVAL: _PGInterval,
- sqltypes.Date: _PGDate,
- sqltypes.DateTime: _PGTimeStamp,
- sqltypes.Time: _PGTime,
- sqltypes.Integer: _PGInteger,
- sqltypes.SmallInteger: _PGSmallInteger,
- sqltypes.BigInteger: _PGBigInteger,
- ranges.AbstractSingleRange: _PsycopgRange,
- ranges.AbstractMultiRange: _PsycopgMultiRange,
- },
- )
- def __init__(self, **kwargs):
- super().__init__(**kwargs)
- if self.dbapi:
- m = re.match(r"(\d+)\.(\d+)(?:\.(\d+))?", self.dbapi.__version__)
- if m:
- self.psycopg_version = tuple(
- int(x) for x in m.group(1, 2, 3) if x is not None
- )
- if self.psycopg_version < (3, 0, 2):
- raise ImportError(
- "psycopg version 3.0.2 or higher is required."
- )
- from psycopg.adapt import AdaptersMap
- self._psycopg_adapters_map = adapters_map = AdaptersMap(
- self.dbapi.adapters
- )
- if self._native_inet_types is False:
- import psycopg.types.string
- adapters_map.register_loader(
- "inet", psycopg.types.string.TextLoader
- )
- adapters_map.register_loader(
- "cidr", psycopg.types.string.TextLoader
- )
- if self._json_deserializer:
- from psycopg.types.json import set_json_loads
- set_json_loads(self._json_deserializer, adapters_map)
- if self._json_serializer:
- from psycopg.types.json import set_json_dumps
- set_json_dumps(self._json_serializer, adapters_map)
- def create_connect_args(self, url):
- # see https://github.com/psycopg/psycopg/issues/83
- cargs, cparams = super().create_connect_args(url)
- if self._psycopg_adapters_map:
- cparams["context"] = self._psycopg_adapters_map
- if self.client_encoding is not None:
- cparams["client_encoding"] = self.client_encoding
- return cargs, cparams
- def _type_info_fetch(self, connection, name):
- from psycopg.types import TypeInfo
- return TypeInfo.fetch(connection.connection.driver_connection, name)
- def initialize(self, connection):
- super().initialize(connection)
- # PGDialect.initialize() checks server version for <= 8.2 and sets
- # this flag to False if so
- if not self.insert_returning:
- self.insert_executemany_returning = False
- # HSTORE can't be registered until we have a connection so that
- # we can look up its OID, so we set up this adapter in
- # initialize()
- if self.use_native_hstore:
- info = self._type_info_fetch(connection, "hstore")
- self._has_native_hstore = info is not None
- if self._has_native_hstore:
- from psycopg.types.hstore import register_hstore
- # register the adapter for connections made subsequent to
- # this one
- assert self._psycopg_adapters_map
- register_hstore(info, self._psycopg_adapters_map)
- # register the adapter for this connection
- assert connection.connection
- register_hstore(info, connection.connection.driver_connection)
- @classmethod
- def import_dbapi(cls):
- import psycopg
- return psycopg
- @classmethod
- def get_async_dialect_cls(cls, url):
- return PGDialectAsync_psycopg
- @util.memoized_property
- def _isolation_lookup(self):
- return {
- "READ COMMITTED": self.dbapi.IsolationLevel.READ_COMMITTED,
- "READ UNCOMMITTED": self.dbapi.IsolationLevel.READ_UNCOMMITTED,
- "REPEATABLE READ": self.dbapi.IsolationLevel.REPEATABLE_READ,
- "SERIALIZABLE": self.dbapi.IsolationLevel.SERIALIZABLE,
- }
- @util.memoized_property
- def _psycopg_Json(self):
- from psycopg.types import json
- return json.Json
- @util.memoized_property
- def _psycopg_Jsonb(self):
- from psycopg.types import json
- return json.Jsonb
- @util.memoized_property
- def _psycopg_TransactionStatus(self):
- from psycopg.pq import TransactionStatus
- return TransactionStatus
- @util.memoized_property
- def _psycopg_Range(self):
- from psycopg.types.range import Range
- return Range
- @util.memoized_property
- def _psycopg_Multirange(self):
- from psycopg.types.multirange import Multirange
- return Multirange
- def _do_isolation_level(self, connection, autocommit, isolation_level):
- connection.autocommit = autocommit
- connection.isolation_level = isolation_level
- def get_isolation_level(self, dbapi_connection):
- status_before = dbapi_connection.info.transaction_status
- value = super().get_isolation_level(dbapi_connection)
- # don't rely on psycopg providing enum symbols, compare with
- # eq/ne
- if status_before == self._psycopg_TransactionStatus.IDLE:
- dbapi_connection.rollback()
- return value
- def set_isolation_level(self, dbapi_connection, level):
- if level == "AUTOCOMMIT":
- self._do_isolation_level(
- dbapi_connection, autocommit=True, isolation_level=None
- )
- else:
- self._do_isolation_level(
- dbapi_connection,
- autocommit=False,
- isolation_level=self._isolation_lookup[level],
- )
- def set_readonly(self, connection, value):
- connection.read_only = value
- def get_readonly(self, connection):
- return connection.read_only
- def on_connect(self):
- def notices(conn):
- conn.add_notice_handler(_log_notices)
- fns = [notices]
- if self.isolation_level is not None:
- def on_connect(conn):
- self.set_isolation_level(conn, self.isolation_level)
- fns.append(on_connect)
- # fns always has the notices function
- def on_connect(conn):
- for fn in fns:
- fn(conn)
- return on_connect
- def is_disconnect(self, e, connection, cursor):
- if isinstance(e, self.dbapi.Error) and connection is not None:
- if connection.closed or connection.broken:
- return True
- return False
- def _do_prepared_twophase(self, connection, command, recover=False):
- dbapi_conn = connection.connection.dbapi_connection
- if (
- recover
- # don't rely on psycopg providing enum symbols, compare with
- # eq/ne
- or dbapi_conn.info.transaction_status
- != self._psycopg_TransactionStatus.IDLE
- ):
- dbapi_conn.rollback()
- before_autocommit = dbapi_conn.autocommit
- try:
- if not before_autocommit:
- self._do_autocommit(dbapi_conn, True)
- dbapi_conn.execute(command)
- finally:
- if not before_autocommit:
- self._do_autocommit(dbapi_conn, before_autocommit)
- def do_rollback_twophase(
- self, connection, xid, is_prepared=True, recover=False
- ):
- if is_prepared:
- self._do_prepared_twophase(
- connection, f"ROLLBACK PREPARED '{xid}'", recover=recover
- )
- else:
- self.do_rollback(connection.connection)
- def do_commit_twophase(
- self, connection, xid, is_prepared=True, recover=False
- ):
- if is_prepared:
- self._do_prepared_twophase(
- connection, f"COMMIT PREPARED '{xid}'", recover=recover
- )
- else:
- self.do_commit(connection.connection)
- @util.memoized_property
- def _dialect_specific_select_one(self):
- return ";"
- class AsyncAdapt_psycopg_cursor:
- __slots__ = ("_cursor", "await_", "_rows")
- _psycopg_ExecStatus = None
- def __init__(self, cursor, await_) -> None:
- self._cursor = cursor
- self.await_ = await_
- self._rows = deque()
- def __getattr__(self, name):
- return getattr(self._cursor, name)
- @property
- def arraysize(self):
- return self._cursor.arraysize
- @arraysize.setter
- def arraysize(self, value):
- self._cursor.arraysize = value
- def close(self):
- self._rows.clear()
- # Normal cursor just call _close() in a non-sync way.
- self._cursor._close()
- def execute(self, query, params=None, **kw):
- result = self.await_(self._cursor.execute(query, params, **kw))
- # sqlalchemy result is not async, so need to pull all rows here
- res = self._cursor.pgresult
- # don't rely on psycopg providing enum symbols, compare with
- # eq/ne
- if res and res.status == self._psycopg_ExecStatus.TUPLES_OK:
- rows = self.await_(self._cursor.fetchall())
- self._rows = deque(rows)
- return result
- def executemany(self, query, params_seq):
- return self.await_(self._cursor.executemany(query, params_seq))
- def __iter__(self):
- while self._rows:
- yield self._rows.popleft()
- def fetchone(self):
- if self._rows:
- return self._rows.popleft()
- else:
- return None
- def fetchmany(self, size=None):
- if size is None:
- size = self._cursor.arraysize
- rr = self._rows
- return [rr.popleft() for _ in range(min(size, len(rr)))]
- def fetchall(self):
- retval = list(self._rows)
- self._rows.clear()
- return retval
- class AsyncAdapt_psycopg_ss_cursor(AsyncAdapt_psycopg_cursor):
- def execute(self, query, params=None, **kw):
- self.await_(self._cursor.execute(query, params, **kw))
- return self
- def close(self):
- self.await_(self._cursor.close())
- def fetchone(self):
- return self.await_(self._cursor.fetchone())
- def fetchmany(self, size=0):
- return self.await_(self._cursor.fetchmany(size))
- def fetchall(self):
- return self.await_(self._cursor.fetchall())
- def __iter__(self):
- iterator = self._cursor.__aiter__()
- while True:
- try:
- yield self.await_(iterator.__anext__())
- except StopAsyncIteration:
- break
- class AsyncAdapt_psycopg_connection(AdaptedConnection):
- _connection: AsyncConnection
- __slots__ = ()
- await_ = staticmethod(await_only)
- def __init__(self, connection) -> None:
- self._connection = connection
- def __getattr__(self, name):
- return getattr(self._connection, name)
- def execute(self, query, params=None, **kw):
- cursor = self.await_(self._connection.execute(query, params, **kw))
- return AsyncAdapt_psycopg_cursor(cursor, self.await_)
- def cursor(self, *args, **kw):
- cursor = self._connection.cursor(*args, **kw)
- if hasattr(cursor, "name"):
- return AsyncAdapt_psycopg_ss_cursor(cursor, self.await_)
- else:
- return AsyncAdapt_psycopg_cursor(cursor, self.await_)
- def commit(self):
- self.await_(self._connection.commit())
- def rollback(self):
- self.await_(self._connection.rollback())
- def close(self):
- self.await_(self._connection.close())
- @property
- def autocommit(self):
- return self._connection.autocommit
- @autocommit.setter
- def autocommit(self, value):
- self.set_autocommit(value)
- def set_autocommit(self, value):
- self.await_(self._connection.set_autocommit(value))
- def set_isolation_level(self, value):
- self.await_(self._connection.set_isolation_level(value))
- def set_read_only(self, value):
- self.await_(self._connection.set_read_only(value))
- def set_deferrable(self, value):
- self.await_(self._connection.set_deferrable(value))
- class AsyncAdaptFallback_psycopg_connection(AsyncAdapt_psycopg_connection):
- __slots__ = ()
- await_ = staticmethod(await_fallback)
- class PsycopgAdaptDBAPI:
- def __init__(self, psycopg) -> None:
- self.psycopg = psycopg
- for k, v in self.psycopg.__dict__.items():
- if k != "connect":
- self.__dict__[k] = v
- def connect(self, *arg, **kw):
- async_fallback = kw.pop("async_fallback", False)
- creator_fn = kw.pop(
- "async_creator_fn", self.psycopg.AsyncConnection.connect
- )
- if util.asbool(async_fallback):
- return AsyncAdaptFallback_psycopg_connection(
- await_fallback(creator_fn(*arg, **kw))
- )
- else:
- return AsyncAdapt_psycopg_connection(
- await_only(creator_fn(*arg, **kw))
- )
- class PGDialectAsync_psycopg(PGDialect_psycopg):
- is_async = True
- supports_statement_cache = True
- @classmethod
- def import_dbapi(cls):
- import psycopg
- from psycopg.pq import ExecStatus
- AsyncAdapt_psycopg_cursor._psycopg_ExecStatus = ExecStatus
- return PsycopgAdaptDBAPI(psycopg)
- @classmethod
- def get_pool_class(cls, url):
- async_fallback = url.query.get("async_fallback", False)
- if util.asbool(async_fallback):
- return pool.FallbackAsyncAdaptedQueuePool
- else:
- return pool.AsyncAdaptedQueuePool
- def _type_info_fetch(self, connection, name):
- from psycopg.types import TypeInfo
- adapted = connection.connection
- return adapted.await_(TypeInfo.fetch(adapted.driver_connection, name))
- def _do_isolation_level(self, connection, autocommit, isolation_level):
- connection.set_autocommit(autocommit)
- connection.set_isolation_level(isolation_level)
- def _do_autocommit(self, connection, value):
- connection.set_autocommit(value)
- def set_readonly(self, connection, value):
- connection.set_read_only(value)
- def set_deferrable(self, connection, value):
- connection.set_deferrable(value)
- def get_driver_connection(self, connection):
- return connection._connection
- dialect = PGDialect_psycopg
- dialect_async = PGDialectAsync_psycopg
|