| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181 |
- # engine/cursor.py
- # Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
- # <see AUTHORS file>
- #
- # This module is part of SQLAlchemy and is released under
- # the MIT License: https://www.opensource.org/licenses/mit-license.php
- # mypy: allow-untyped-defs, allow-untyped-calls
- """Define cursor-specific result set constructs including
- :class:`.CursorResult`."""
- from __future__ import annotations
- import collections
- import functools
- import operator
- import typing
- from typing import Any
- from typing import cast
- from typing import ClassVar
- from typing import Dict
- from typing import Iterable
- from typing import Iterator
- from typing import List
- from typing import Mapping
- from typing import NoReturn
- from typing import Optional
- from typing import Sequence
- from typing import Tuple
- from typing import TYPE_CHECKING
- from typing import TypeVar
- from typing import Union
- from .result import IteratorResult
- from .result import MergedResult
- from .result import Result
- from .result import ResultMetaData
- from .result import SimpleResultMetaData
- from .result import tuplegetter
- from .row import Row
- from .. import exc
- from .. import util
- from ..sql import elements
- from ..sql import sqltypes
- from ..sql import util as sql_util
- from ..sql.base import _generative
- from ..sql.compiler import ResultColumnsEntry
- from ..sql.compiler import RM_NAME
- from ..sql.compiler import RM_OBJECTS
- from ..sql.compiler import RM_RENDERED_NAME
- from ..sql.compiler import RM_TYPE
- from ..sql.type_api import TypeEngine
- from ..util import compat
- from ..util.typing import Literal
- from ..util.typing import Self
- if typing.TYPE_CHECKING:
- from .base import Connection
- from .default import DefaultExecutionContext
- from .interfaces import _DBAPICursorDescription
- from .interfaces import DBAPICursor
- from .interfaces import Dialect
- from .interfaces import ExecutionContext
- from .result import _KeyIndexType
- from .result import _KeyMapRecType
- from .result import _KeyMapType
- from .result import _KeyType
- from .result import _ProcessorsType
- from .result import _TupleGetterType
- from ..sql.type_api import _ResultProcessorType
- _T = TypeVar("_T", bound=Any)
- # metadata entry tuple indexes.
- # using raw tuple is faster than namedtuple.
- # these match up to the positions in
- # _CursorKeyMapRecType
- MD_INDEX: Literal[0] = 0
- """integer index in cursor.description
- """
- MD_RESULT_MAP_INDEX: Literal[1] = 1
- """integer index in compiled._result_columns"""
- MD_OBJECTS: Literal[2] = 2
- """other string keys and ColumnElement obj that can match.
- This comes from compiler.RM_OBJECTS / compiler.ResultColumnsEntry.objects
- """
- MD_LOOKUP_KEY: Literal[3] = 3
- """string key we usually expect for key-based lookup
- this comes from compiler.RM_NAME / compiler.ResultColumnsEntry.name
- """
- MD_RENDERED_NAME: Literal[4] = 4
- """name that is usually in cursor.description
- this comes from compiler.RENDERED_NAME / compiler.ResultColumnsEntry.keyname
- """
- MD_PROCESSOR: Literal[5] = 5
- """callable to process a result value into a row"""
- MD_UNTRANSLATED: Literal[6] = 6
- """raw name from cursor.description"""
- _CursorKeyMapRecType = Tuple[
- Optional[int], # MD_INDEX, None means the record is ambiguously named
- int, # MD_RESULT_MAP_INDEX
- List[Any], # MD_OBJECTS
- str, # MD_LOOKUP_KEY
- str, # MD_RENDERED_NAME
- Optional["_ResultProcessorType[Any]"], # MD_PROCESSOR
- Optional[str], # MD_UNTRANSLATED
- ]
- _CursorKeyMapType = Mapping["_KeyType", _CursorKeyMapRecType]
- # same as _CursorKeyMapRecType except the MD_INDEX value is definitely
- # not None
- _NonAmbigCursorKeyMapRecType = Tuple[
- int,
- int,
- List[Any],
- str,
- str,
- Optional["_ResultProcessorType[Any]"],
- str,
- ]
- class CursorResultMetaData(ResultMetaData):
- """Result metadata for DBAPI cursors."""
- __slots__ = (
- "_keymap",
- "_processors",
- "_keys",
- "_keymap_by_result_column_idx",
- "_tuplefilter",
- "_translated_indexes",
- "_safe_for_cache",
- "_unpickled",
- "_key_to_index",
- # don't need _unique_filters support here for now. Can be added
- # if a need arises.
- )
- _keymap: _CursorKeyMapType
- _processors: _ProcessorsType
- _keymap_by_result_column_idx: Optional[Dict[int, _KeyMapRecType]]
- _unpickled: bool
- _safe_for_cache: bool
- _translated_indexes: Optional[List[int]]
- returns_rows: ClassVar[bool] = True
- def _has_key(self, key: Any) -> bool:
- return key in self._keymap
- def _for_freeze(self) -> ResultMetaData:
- return SimpleResultMetaData(
- self._keys,
- extra=[self._keymap[key][MD_OBJECTS] for key in self._keys],
- )
- def _make_new_metadata(
- self,
- *,
- unpickled: bool,
- processors: _ProcessorsType,
- keys: Sequence[str],
- keymap: _KeyMapType,
- tuplefilter: Optional[_TupleGetterType],
- translated_indexes: Optional[List[int]],
- safe_for_cache: bool,
- keymap_by_result_column_idx: Any,
- ) -> CursorResultMetaData:
- new_obj = self.__class__.__new__(self.__class__)
- new_obj._unpickled = unpickled
- new_obj._processors = processors
- new_obj._keys = keys
- new_obj._keymap = keymap
- new_obj._tuplefilter = tuplefilter
- new_obj._translated_indexes = translated_indexes
- new_obj._safe_for_cache = safe_for_cache
- new_obj._keymap_by_result_column_idx = keymap_by_result_column_idx
- new_obj._key_to_index = self._make_key_to_index(keymap, MD_INDEX)
- return new_obj
- def _remove_processors(self) -> CursorResultMetaData:
- assert not self._tuplefilter
- return self._make_new_metadata(
- unpickled=self._unpickled,
- processors=[None] * len(self._processors),
- tuplefilter=None,
- translated_indexes=None,
- keymap={
- key: value[0:5] + (None,) + value[6:]
- for key, value in self._keymap.items()
- },
- keys=self._keys,
- safe_for_cache=self._safe_for_cache,
- keymap_by_result_column_idx=self._keymap_by_result_column_idx,
- )
- def _splice_horizontally(
- self, other: CursorResultMetaData
- ) -> CursorResultMetaData:
- assert not self._tuplefilter
- keymap = dict(self._keymap)
- offset = len(self._keys)
- keymap.update(
- {
- key: (
- # int index should be None for ambiguous key
- (
- value[0] + offset
- if value[0] is not None and key not in keymap
- else None
- ),
- value[1] + offset,
- *value[2:],
- )
- for key, value in other._keymap.items()
- }
- )
- return self._make_new_metadata(
- unpickled=self._unpickled,
- processors=self._processors + other._processors, # type: ignore
- tuplefilter=None,
- translated_indexes=None,
- keys=self._keys + other._keys, # type: ignore
- keymap=keymap,
- safe_for_cache=self._safe_for_cache,
- keymap_by_result_column_idx={
- metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry
- for metadata_entry in keymap.values()
- },
- )
- def _reduce(self, keys: Sequence[_KeyIndexType]) -> ResultMetaData:
- recs = list(self._metadata_for_keys(keys))
- indexes = [rec[MD_INDEX] for rec in recs]
- new_keys: List[str] = [rec[MD_LOOKUP_KEY] for rec in recs]
- if self._translated_indexes:
- indexes = [self._translated_indexes[idx] for idx in indexes]
- tup = tuplegetter(*indexes)
- new_recs = [(index,) + rec[1:] for index, rec in enumerate(recs)]
- keymap = {rec[MD_LOOKUP_KEY]: rec for rec in new_recs}
- # TODO: need unit test for:
- # result = connection.execute("raw sql, no columns").scalars()
- # without the "or ()" it's failing because MD_OBJECTS is None
- keymap.update(
- (e, new_rec)
- for new_rec in new_recs
- for e in new_rec[MD_OBJECTS] or ()
- )
- return self._make_new_metadata(
- unpickled=self._unpickled,
- processors=self._processors,
- keys=new_keys,
- tuplefilter=tup,
- translated_indexes=indexes,
- keymap=keymap, # type: ignore[arg-type]
- safe_for_cache=self._safe_for_cache,
- keymap_by_result_column_idx=self._keymap_by_result_column_idx,
- )
- def _adapt_to_context(self, context: ExecutionContext) -> ResultMetaData:
- """When using a cached Compiled construct that has a _result_map,
- for a new statement that used the cached Compiled, we need to ensure
- the keymap has the Column objects from our new statement as keys.
- So here we rewrite keymap with new entries for the new columns
- as matched to those of the cached statement.
- """
- if not context.compiled or not context.compiled._result_columns:
- return self
- compiled_statement = context.compiled.statement
- invoked_statement = context.invoked_statement
- if TYPE_CHECKING:
- assert isinstance(invoked_statement, elements.ClauseElement)
- if compiled_statement is invoked_statement:
- return self
- assert invoked_statement is not None
- # this is the most common path for Core statements when
- # caching is used. In ORM use, this codepath is not really used
- # as the _result_disable_adapt_to_context execution option is
- # set by the ORM.
- # make a copy and add the columns from the invoked statement
- # to the result map.
- keymap_by_position = self._keymap_by_result_column_idx
- if keymap_by_position is None:
- # first retrival from cache, this map will not be set up yet,
- # initialize lazily
- keymap_by_position = self._keymap_by_result_column_idx = {
- metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry
- for metadata_entry in self._keymap.values()
- }
- assert not self._tuplefilter
- return self._make_new_metadata(
- keymap=compat.dict_union(
- self._keymap,
- {
- new: keymap_by_position[idx]
- for idx, new in enumerate(
- invoked_statement._all_selected_columns
- )
- if idx in keymap_by_position
- },
- ),
- unpickled=self._unpickled,
- processors=self._processors,
- tuplefilter=None,
- translated_indexes=None,
- keys=self._keys,
- safe_for_cache=self._safe_for_cache,
- keymap_by_result_column_idx=self._keymap_by_result_column_idx,
- )
- def __init__(
- self,
- parent: CursorResult[Any],
- cursor_description: _DBAPICursorDescription,
- ):
- context = parent.context
- self._tuplefilter = None
- self._translated_indexes = None
- self._safe_for_cache = self._unpickled = False
- if context.result_column_struct:
- (
- result_columns,
- cols_are_ordered,
- textual_ordered,
- ad_hoc_textual,
- loose_column_name_matching,
- ) = context.result_column_struct
- num_ctx_cols = len(result_columns)
- else:
- result_columns = cols_are_ordered = ( # type: ignore
- num_ctx_cols
- ) = ad_hoc_textual = loose_column_name_matching = (
- textual_ordered
- ) = False
- # merge cursor.description with the column info
- # present in the compiled structure, if any
- raw = self._merge_cursor_description(
- context,
- cursor_description,
- result_columns,
- num_ctx_cols,
- cols_are_ordered,
- textual_ordered,
- ad_hoc_textual,
- loose_column_name_matching,
- )
- # processors in key order which are used when building up
- # a row
- self._processors = [
- metadata_entry[MD_PROCESSOR] for metadata_entry in raw
- ]
- # this is used when using this ResultMetaData in a Core-only cache
- # retrieval context. it's initialized on first cache retrieval
- # when the _result_disable_adapt_to_context execution option
- # (which the ORM generally sets) is not set.
- self._keymap_by_result_column_idx = None
- # for compiled SQL constructs, copy additional lookup keys into
- # the key lookup map, such as Column objects, labels,
- # column keys and other names
- if num_ctx_cols:
- # keymap by primary string...
- by_key = {
- metadata_entry[MD_LOOKUP_KEY]: metadata_entry
- for metadata_entry in raw
- }
- if len(by_key) != num_ctx_cols:
- # if by-primary-string dictionary smaller than
- # number of columns, assume we have dupes; (this check
- # is also in place if string dictionary is bigger, as
- # can occur when '*' was used as one of the compiled columns,
- # which may or may not be suggestive of dupes), rewrite
- # dupe records with "None" for index which results in
- # ambiguous column exception when accessed.
- #
- # this is considered to be the less common case as it is not
- # common to have dupe column keys in a SELECT statement.
- #
- # new in 1.4: get the complete set of all possible keys,
- # strings, objects, whatever, that are dupes across two
- # different records, first.
- index_by_key: Dict[Any, Any] = {}
- dupes = set()
- for metadata_entry in raw:
- for key in (metadata_entry[MD_RENDERED_NAME],) + (
- metadata_entry[MD_OBJECTS] or ()
- ):
- idx = metadata_entry[MD_INDEX]
- # if this key has been associated with more than one
- # positional index, it's a dupe
- if index_by_key.setdefault(key, idx) != idx:
- dupes.add(key)
- # then put everything we have into the keymap excluding only
- # those keys that are dupes.
- self._keymap = {
- obj_elem: metadata_entry
- for metadata_entry in raw
- if metadata_entry[MD_OBJECTS]
- for obj_elem in metadata_entry[MD_OBJECTS]
- if obj_elem not in dupes
- }
- # then for the dupe keys, put the "ambiguous column"
- # record into by_key.
- by_key.update(
- {
- key: (None, None, [], key, key, None, None)
- for key in dupes
- }
- )
- else:
- # no dupes - copy secondary elements from compiled
- # columns into self._keymap. this is the most common
- # codepath for Core / ORM statement executions before the
- # result metadata is cached
- self._keymap = {
- obj_elem: metadata_entry
- for metadata_entry in raw
- if metadata_entry[MD_OBJECTS]
- for obj_elem in metadata_entry[MD_OBJECTS]
- }
- # update keymap with primary string names taking
- # precedence
- self._keymap.update(by_key)
- else:
- # no compiled objects to map, just create keymap by primary string
- self._keymap = {
- metadata_entry[MD_LOOKUP_KEY]: metadata_entry
- for metadata_entry in raw
- }
- # update keymap with "translated" names. In SQLAlchemy this is a
- # sqlite only thing, and in fact impacting only extremely old SQLite
- # versions unlikely to be present in modern Python versions.
- # however, the pyhive third party dialect is
- # also using this hook, which means others still might use it as well.
- # I dislike having this awkward hook here but as long as we need
- # to use names in cursor.description in some cases we need to have
- # some hook to accomplish this.
- if not num_ctx_cols and context._translate_colname:
- self._keymap.update(
- {
- metadata_entry[MD_UNTRANSLATED]: self._keymap[
- metadata_entry[MD_LOOKUP_KEY]
- ]
- for metadata_entry in raw
- if metadata_entry[MD_UNTRANSLATED]
- }
- )
- self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX)
- def _merge_cursor_description(
- self,
- context,
- cursor_description,
- result_columns,
- num_ctx_cols,
- cols_are_ordered,
- textual_ordered,
- ad_hoc_textual,
- loose_column_name_matching,
- ):
- """Merge a cursor.description with compiled result column information.
- There are at least four separate strategies used here, selected
- depending on the type of SQL construct used to start with.
- The most common case is that of the compiled SQL expression construct,
- which generated the column names present in the raw SQL string and
- which has the identical number of columns as were reported by
- cursor.description. In this case, we assume a 1-1 positional mapping
- between the entries in cursor.description and the compiled object.
- This is also the most performant case as we disregard extracting /
- decoding the column names present in cursor.description since we
- already have the desired name we generated in the compiled SQL
- construct.
- The next common case is that of the completely raw string SQL,
- such as passed to connection.execute(). In this case we have no
- compiled construct to work with, so we extract and decode the
- names from cursor.description and index those as the primary
- result row target keys.
- The remaining fairly common case is that of the textual SQL
- that includes at least partial column information; this is when
- we use a :class:`_expression.TextualSelect` construct.
- This construct may have
- unordered or ordered column information. In the ordered case, we
- merge the cursor.description and the compiled construct's information
- positionally, and warn if there are additional description names
- present, however we still decode the names in cursor.description
- as we don't have a guarantee that the names in the columns match
- on these. In the unordered case, we match names in cursor.description
- to that of the compiled construct based on name matching.
- In both of these cases, the cursor.description names and the column
- expression objects and names are indexed as result row target keys.
- The final case is much less common, where we have a compiled
- non-textual SQL expression construct, but the number of columns
- in cursor.description doesn't match what's in the compiled
- construct. We make the guess here that there might be textual
- column expressions in the compiled construct that themselves include
- a comma in them causing them to split. We do the same name-matching
- as with textual non-ordered columns.
- The name-matched system of merging is the same as that used by
- SQLAlchemy for all cases up through the 0.9 series. Positional
- matching for compiled SQL expressions was introduced in 1.0 as a
- major performance feature, and positional matching for textual
- :class:`_expression.TextualSelect` objects in 1.1.
- As name matching is no longer
- a common case, it was acceptable to factor it into smaller generator-
- oriented methods that are easier to understand, but incur slightly
- more performance overhead.
- """
- if (
- num_ctx_cols
- and cols_are_ordered
- and not textual_ordered
- and num_ctx_cols == len(cursor_description)
- ):
- self._keys = [elem[0] for elem in result_columns]
- # pure positional 1-1 case; doesn't need to read
- # the names from cursor.description
- # most common case for Core and ORM
- # this metadata is safe to cache because we are guaranteed
- # to have the columns in the same order for new executions
- self._safe_for_cache = True
- return [
- (
- idx,
- idx,
- rmap_entry[RM_OBJECTS],
- rmap_entry[RM_NAME],
- rmap_entry[RM_RENDERED_NAME],
- context.get_result_processor(
- rmap_entry[RM_TYPE],
- rmap_entry[RM_RENDERED_NAME],
- cursor_description[idx][1],
- ),
- None,
- )
- for idx, rmap_entry in enumerate(result_columns)
- ]
- else:
- # name-based or text-positional cases, where we need
- # to read cursor.description names
- if textual_ordered or (
- ad_hoc_textual and len(cursor_description) == num_ctx_cols
- ):
- self._safe_for_cache = True
- # textual positional case
- raw_iterator = self._merge_textual_cols_by_position(
- context, cursor_description, result_columns
- )
- elif num_ctx_cols:
- # compiled SQL with a mismatch of description cols
- # vs. compiled cols, or textual w/ unordered columns
- # the order of columns can change if the query is
- # against a "select *", so not safe to cache
- self._safe_for_cache = False
- raw_iterator = self._merge_cols_by_name(
- context,
- cursor_description,
- result_columns,
- loose_column_name_matching,
- )
- else:
- # no compiled SQL, just a raw string, order of columns
- # can change for "select *"
- self._safe_for_cache = False
- raw_iterator = self._merge_cols_by_none(
- context, cursor_description
- )
- return [
- (
- idx,
- ridx,
- obj,
- cursor_colname,
- cursor_colname,
- context.get_result_processor(
- mapped_type, cursor_colname, coltype
- ),
- untranslated,
- )
- for (
- idx,
- ridx,
- cursor_colname,
- mapped_type,
- coltype,
- obj,
- untranslated,
- ) in raw_iterator
- ]
- def _colnames_from_description(self, context, cursor_description):
- """Extract column names and data types from a cursor.description.
- Applies unicode decoding, column translation, "normalization",
- and case sensitivity rules to the names based on the dialect.
- """
- dialect = context.dialect
- translate_colname = context._translate_colname
- normalize_name = (
- dialect.normalize_name if dialect.requires_name_normalize else None
- )
- untranslated = None
- self._keys = []
- for idx, rec in enumerate(cursor_description):
- colname = rec[0]
- coltype = rec[1]
- if translate_colname:
- colname, untranslated = translate_colname(colname)
- if normalize_name:
- colname = normalize_name(colname)
- self._keys.append(colname)
- yield idx, colname, untranslated, coltype
- def _merge_textual_cols_by_position(
- self, context, cursor_description, result_columns
- ):
- num_ctx_cols = len(result_columns)
- if num_ctx_cols > len(cursor_description):
- util.warn(
- "Number of columns in textual SQL (%d) is "
- "smaller than number of columns requested (%d)"
- % (num_ctx_cols, len(cursor_description))
- )
- seen = set()
- for (
- idx,
- colname,
- untranslated,
- coltype,
- ) in self._colnames_from_description(context, cursor_description):
- if idx < num_ctx_cols:
- ctx_rec = result_columns[idx]
- obj = ctx_rec[RM_OBJECTS]
- ridx = idx
- mapped_type = ctx_rec[RM_TYPE]
- if obj[0] in seen:
- raise exc.InvalidRequestError(
- "Duplicate column expression requested "
- "in textual SQL: %r" % obj[0]
- )
- seen.add(obj[0])
- else:
- mapped_type = sqltypes.NULLTYPE
- obj = None
- ridx = None
- yield idx, ridx, colname, mapped_type, coltype, obj, untranslated
- def _merge_cols_by_name(
- self,
- context,
- cursor_description,
- result_columns,
- loose_column_name_matching,
- ):
- match_map = self._create_description_match_map(
- result_columns, loose_column_name_matching
- )
- mapped_type: TypeEngine[Any]
- for (
- idx,
- colname,
- untranslated,
- coltype,
- ) in self._colnames_from_description(context, cursor_description):
- try:
- ctx_rec = match_map[colname]
- except KeyError:
- mapped_type = sqltypes.NULLTYPE
- obj = None
- result_columns_idx = None
- else:
- obj = ctx_rec[1]
- mapped_type = ctx_rec[2]
- result_columns_idx = ctx_rec[3]
- yield (
- idx,
- result_columns_idx,
- colname,
- mapped_type,
- coltype,
- obj,
- untranslated,
- )
- @classmethod
- def _create_description_match_map(
- cls,
- result_columns: List[ResultColumnsEntry],
- loose_column_name_matching: bool = False,
- ) -> Dict[
- Union[str, object], Tuple[str, Tuple[Any, ...], TypeEngine[Any], int]
- ]:
- """when matching cursor.description to a set of names that are present
- in a Compiled object, as is the case with TextualSelect, get all the
- names we expect might match those in cursor.description.
- """
- d: Dict[
- Union[str, object],
- Tuple[str, Tuple[Any, ...], TypeEngine[Any], int],
- ] = {}
- for ridx, elem in enumerate(result_columns):
- key = elem[RM_RENDERED_NAME]
- if key in d:
- # conflicting keyname - just add the column-linked objects
- # to the existing record. if there is a duplicate column
- # name in the cursor description, this will allow all of those
- # objects to raise an ambiguous column error
- e_name, e_obj, e_type, e_ridx = d[key]
- d[key] = e_name, e_obj + elem[RM_OBJECTS], e_type, ridx
- else:
- d[key] = (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx)
- if loose_column_name_matching:
- # when using a textual statement with an unordered set
- # of columns that line up, we are expecting the user
- # to be using label names in the SQL that match to the column
- # expressions. Enable more liberal matching for this case;
- # duplicate keys that are ambiguous will be fixed later.
- for r_key in elem[RM_OBJECTS]:
- d.setdefault(
- r_key,
- (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx),
- )
- return d
- def _merge_cols_by_none(self, context, cursor_description):
- for (
- idx,
- colname,
- untranslated,
- coltype,
- ) in self._colnames_from_description(context, cursor_description):
- yield (
- idx,
- None,
- colname,
- sqltypes.NULLTYPE,
- coltype,
- None,
- untranslated,
- )
- if not TYPE_CHECKING:
- def _key_fallback(
- self, key: Any, err: Optional[Exception], raiseerr: bool = True
- ) -> Optional[NoReturn]:
- if raiseerr:
- if self._unpickled and isinstance(key, elements.ColumnElement):
- raise exc.NoSuchColumnError(
- "Row was unpickled; lookup by ColumnElement "
- "is unsupported"
- ) from err
- else:
- raise exc.NoSuchColumnError(
- "Could not locate column in row for column '%s'"
- % util.string_or_unprintable(key)
- ) from err
- else:
- return None
- def _raise_for_ambiguous_column_name(self, rec):
- raise exc.InvalidRequestError(
- "Ambiguous column name '%s' in "
- "result set column descriptions" % rec[MD_LOOKUP_KEY]
- )
- def _index_for_key(self, key: Any, raiseerr: bool = True) -> Optional[int]:
- # TODO: can consider pre-loading ints and negative ints
- # into _keymap - also no coverage here
- if isinstance(key, int):
- key = self._keys[key]
- try:
- rec = self._keymap[key]
- except KeyError as ke:
- x = self._key_fallback(key, ke, raiseerr)
- assert x is None
- return None
- index = rec[0]
- if index is None:
- self._raise_for_ambiguous_column_name(rec)
- return index
- def _indexes_for_keys(self, keys):
- try:
- return [self._keymap[key][0] for key in keys]
- except KeyError as ke:
- # ensure it raises
- CursorResultMetaData._key_fallback(self, ke.args[0], ke)
- def _metadata_for_keys(
- self, keys: Sequence[Any]
- ) -> Iterator[_NonAmbigCursorKeyMapRecType]:
- for key in keys:
- if int in key.__class__.__mro__:
- key = self._keys[key]
- try:
- rec = self._keymap[key]
- except KeyError as ke:
- # ensure it raises
- CursorResultMetaData._key_fallback(self, ke.args[0], ke)
- index = rec[MD_INDEX]
- if index is None:
- self._raise_for_ambiguous_column_name(rec)
- yield cast(_NonAmbigCursorKeyMapRecType, rec)
- def __getstate__(self):
- # TODO: consider serializing this as SimpleResultMetaData
- return {
- "_keymap": {
- key: (
- rec[MD_INDEX],
- rec[MD_RESULT_MAP_INDEX],
- [],
- key,
- rec[MD_RENDERED_NAME],
- None,
- None,
- )
- for key, rec in self._keymap.items()
- if isinstance(key, (str, int))
- },
- "_keys": self._keys,
- "_translated_indexes": self._translated_indexes,
- }
- def __setstate__(self, state):
- self._processors = [None for _ in range(len(state["_keys"]))]
- self._keymap = state["_keymap"]
- self._keymap_by_result_column_idx = None
- self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX)
- self._keys = state["_keys"]
- self._unpickled = True
- if state["_translated_indexes"]:
- self._translated_indexes = cast(
- "List[int]", state["_translated_indexes"]
- )
- self._tuplefilter = tuplegetter(*self._translated_indexes)
- else:
- self._translated_indexes = self._tuplefilter = None
- class ResultFetchStrategy:
- """Define a fetching strategy for a result object.
- .. versionadded:: 1.4
- """
- __slots__ = ()
- alternate_cursor_description: Optional[_DBAPICursorDescription] = None
- def soft_close(
- self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
- ) -> None:
- raise NotImplementedError()
- def hard_close(
- self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
- ) -> None:
- raise NotImplementedError()
- def yield_per(
- self,
- result: CursorResult[Any],
- dbapi_cursor: Optional[DBAPICursor],
- num: int,
- ) -> None:
- return
- def fetchone(
- self,
- result: CursorResult[Any],
- dbapi_cursor: DBAPICursor,
- hard_close: bool = False,
- ) -> Any:
- raise NotImplementedError()
- def fetchmany(
- self,
- result: CursorResult[Any],
- dbapi_cursor: DBAPICursor,
- size: Optional[int] = None,
- ) -> Any:
- raise NotImplementedError()
- def fetchall(
- self,
- result: CursorResult[Any],
- dbapi_cursor: DBAPICursor,
- ) -> Any:
- raise NotImplementedError()
- def handle_exception(
- self,
- result: CursorResult[Any],
- dbapi_cursor: Optional[DBAPICursor],
- err: BaseException,
- ) -> NoReturn:
- raise err
- class NoCursorFetchStrategy(ResultFetchStrategy):
- """Cursor strategy for a result that has no open cursor.
- There are two varieties of this strategy, one for DQL and one for
- DML (and also DDL), each of which represent a result that had a cursor
- but no longer has one.
- """
- __slots__ = ()
- def soft_close(self, result, dbapi_cursor):
- pass
- def hard_close(self, result, dbapi_cursor):
- pass
- def fetchone(self, result, dbapi_cursor, hard_close=False):
- return self._non_result(result, None)
- def fetchmany(self, result, dbapi_cursor, size=None):
- return self._non_result(result, [])
- def fetchall(self, result, dbapi_cursor):
- return self._non_result(result, [])
- def _non_result(self, result, default, err=None):
- raise NotImplementedError()
- class NoCursorDQLFetchStrategy(NoCursorFetchStrategy):
- """Cursor strategy for a DQL result that has no open cursor.
- This is a result set that can return rows, i.e. for a SELECT, or for an
- INSERT, UPDATE, DELETE that includes RETURNING. However it is in the state
- where the cursor is closed and no rows remain available. The owning result
- object may or may not be "hard closed", which determines if the fetch
- methods send empty results or raise for closed result.
- """
- __slots__ = ()
- def _non_result(self, result, default, err=None):
- if result.closed:
- raise exc.ResourceClosedError(
- "This result object is closed."
- ) from err
- else:
- return default
- _NO_CURSOR_DQL = NoCursorDQLFetchStrategy()
- class NoCursorDMLFetchStrategy(NoCursorFetchStrategy):
- """Cursor strategy for a DML result that has no open cursor.
- This is a result set that does not return rows, i.e. for an INSERT,
- UPDATE, DELETE that does not include RETURNING.
- """
- __slots__ = ()
- def _non_result(self, result, default, err=None):
- # we only expect to have a _NoResultMetaData() here right now.
- assert not result._metadata.returns_rows
- result._metadata._we_dont_return_rows(err)
- _NO_CURSOR_DML = NoCursorDMLFetchStrategy()
- class CursorFetchStrategy(ResultFetchStrategy):
- """Call fetch methods from a DBAPI cursor.
- Alternate versions of this class may instead buffer the rows from
- cursors or not use cursors at all.
- """
- __slots__ = ()
- def soft_close(
- self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
- ) -> None:
- result.cursor_strategy = _NO_CURSOR_DQL
- def hard_close(
- self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
- ) -> None:
- result.cursor_strategy = _NO_CURSOR_DQL
- def handle_exception(
- self,
- result: CursorResult[Any],
- dbapi_cursor: Optional[DBAPICursor],
- err: BaseException,
- ) -> NoReturn:
- result.connection._handle_dbapi_exception(
- err, None, None, dbapi_cursor, result.context
- )
- def yield_per(
- self,
- result: CursorResult[Any],
- dbapi_cursor: Optional[DBAPICursor],
- num: int,
- ) -> None:
- result.cursor_strategy = BufferedRowCursorFetchStrategy(
- dbapi_cursor,
- {"max_row_buffer": num},
- initial_buffer=collections.deque(),
- growth_factor=0,
- )
- def fetchone(
- self,
- result: CursorResult[Any],
- dbapi_cursor: DBAPICursor,
- hard_close: bool = False,
- ) -> Any:
- try:
- row = dbapi_cursor.fetchone()
- if row is None:
- result._soft_close(hard=hard_close)
- return row
- except BaseException as e:
- self.handle_exception(result, dbapi_cursor, e)
- def fetchmany(
- self,
- result: CursorResult[Any],
- dbapi_cursor: DBAPICursor,
- size: Optional[int] = None,
- ) -> Any:
- try:
- if size is None:
- l = dbapi_cursor.fetchmany()
- else:
- l = dbapi_cursor.fetchmany(size)
- if not l:
- result._soft_close()
- return l
- except BaseException as e:
- self.handle_exception(result, dbapi_cursor, e)
- def fetchall(
- self,
- result: CursorResult[Any],
- dbapi_cursor: DBAPICursor,
- ) -> Any:
- try:
- rows = dbapi_cursor.fetchall()
- result._soft_close()
- return rows
- except BaseException as e:
- self.handle_exception(result, dbapi_cursor, e)
- _DEFAULT_FETCH = CursorFetchStrategy()
- class BufferedRowCursorFetchStrategy(CursorFetchStrategy):
- """A cursor fetch strategy with row buffering behavior.
- This strategy buffers the contents of a selection of rows
- before ``fetchone()`` is called. This is to allow the results of
- ``cursor.description`` to be available immediately, when
- interfacing with a DB-API that requires rows to be consumed before
- this information is available (currently psycopg2, when used with
- server-side cursors).
- The pre-fetching behavior fetches only one row initially, and then
- grows its buffer size by a fixed amount with each successive need
- for additional rows up the ``max_row_buffer`` size, which defaults
- to 1000::
- with psycopg2_engine.connect() as conn:
- result = conn.execution_options(
- stream_results=True, max_row_buffer=50
- ).execute(text("select * from table"))
- .. versionadded:: 1.4 ``max_row_buffer`` may now exceed 1000 rows.
- .. seealso::
- :ref:`psycopg2_execution_options`
- """
- __slots__ = ("_max_row_buffer", "_rowbuffer", "_bufsize", "_growth_factor")
- def __init__(
- self,
- dbapi_cursor,
- execution_options,
- growth_factor=5,
- initial_buffer=None,
- ):
- self._max_row_buffer = execution_options.get("max_row_buffer", 1000)
- if initial_buffer is not None:
- self._rowbuffer = initial_buffer
- else:
- self._rowbuffer = collections.deque(dbapi_cursor.fetchmany(1))
- self._growth_factor = growth_factor
- if growth_factor:
- self._bufsize = min(self._max_row_buffer, self._growth_factor)
- else:
- self._bufsize = self._max_row_buffer
- @classmethod
- def create(cls, result):
- return BufferedRowCursorFetchStrategy(
- result.cursor,
- result.context.execution_options,
- )
- def _buffer_rows(self, result, dbapi_cursor):
- """this is currently used only by fetchone()."""
- size = self._bufsize
- try:
- if size < 1:
- new_rows = dbapi_cursor.fetchall()
- else:
- new_rows = dbapi_cursor.fetchmany(size)
- except BaseException as e:
- self.handle_exception(result, dbapi_cursor, e)
- if not new_rows:
- return
- self._rowbuffer = collections.deque(new_rows)
- if self._growth_factor and size < self._max_row_buffer:
- self._bufsize = min(
- self._max_row_buffer, size * self._growth_factor
- )
- def yield_per(self, result, dbapi_cursor, num):
- self._growth_factor = 0
- self._max_row_buffer = self._bufsize = num
- def soft_close(self, result, dbapi_cursor):
- self._rowbuffer.clear()
- super().soft_close(result, dbapi_cursor)
- def hard_close(self, result, dbapi_cursor):
- self._rowbuffer.clear()
- super().hard_close(result, dbapi_cursor)
- def fetchone(self, result, dbapi_cursor, hard_close=False):
- if not self._rowbuffer:
- self._buffer_rows(result, dbapi_cursor)
- if not self._rowbuffer:
- try:
- result._soft_close(hard=hard_close)
- except BaseException as e:
- self.handle_exception(result, dbapi_cursor, e)
- return None
- return self._rowbuffer.popleft()
- def fetchmany(self, result, dbapi_cursor, size=None):
- if size is None:
- return self.fetchall(result, dbapi_cursor)
- rb = self._rowbuffer
- lb = len(rb)
- close = False
- if size > lb:
- try:
- new = dbapi_cursor.fetchmany(size - lb)
- except BaseException as e:
- self.handle_exception(result, dbapi_cursor, e)
- else:
- if not new:
- # defer closing since it may clear the row buffer
- close = True
- else:
- rb.extend(new)
- res = [rb.popleft() for _ in range(min(size, len(rb)))]
- if close:
- result._soft_close()
- return res
- def fetchall(self, result, dbapi_cursor):
- try:
- ret = list(self._rowbuffer) + list(dbapi_cursor.fetchall())
- self._rowbuffer.clear()
- result._soft_close()
- return ret
- except BaseException as e:
- self.handle_exception(result, dbapi_cursor, e)
- class FullyBufferedCursorFetchStrategy(CursorFetchStrategy):
- """A cursor strategy that buffers rows fully upon creation.
- Used for operations where a result is to be delivered
- after the database conversation can not be continued,
- such as MSSQL INSERT...OUTPUT after an autocommit.
- """
- __slots__ = ("_rowbuffer", "alternate_cursor_description")
- def __init__(
- self,
- dbapi_cursor: Optional[DBAPICursor],
- alternate_description: Optional[_DBAPICursorDescription] = None,
- initial_buffer: Optional[Iterable[Any]] = None,
- ):
- self.alternate_cursor_description = alternate_description
- if initial_buffer is not None:
- self._rowbuffer = collections.deque(initial_buffer)
- else:
- assert dbapi_cursor is not None
- self._rowbuffer = collections.deque(dbapi_cursor.fetchall())
- def yield_per(self, result, dbapi_cursor, num):
- pass
- def soft_close(self, result, dbapi_cursor):
- self._rowbuffer.clear()
- super().soft_close(result, dbapi_cursor)
- def hard_close(self, result, dbapi_cursor):
- self._rowbuffer.clear()
- super().hard_close(result, dbapi_cursor)
- def fetchone(self, result, dbapi_cursor, hard_close=False):
- if self._rowbuffer:
- return self._rowbuffer.popleft()
- else:
- result._soft_close(hard=hard_close)
- return None
- def fetchmany(self, result, dbapi_cursor, size=None):
- if size is None:
- return self.fetchall(result, dbapi_cursor)
- rb = self._rowbuffer
- rows = [rb.popleft() for _ in range(min(size, len(rb)))]
- if not rows:
- result._soft_close()
- return rows
- def fetchall(self, result, dbapi_cursor):
- ret = self._rowbuffer
- self._rowbuffer = collections.deque()
- result._soft_close()
- return ret
- class _NoResultMetaData(ResultMetaData):
- __slots__ = ()
- returns_rows = False
- def _we_dont_return_rows(self, err=None):
- raise exc.ResourceClosedError(
- "This result object does not return rows. "
- "It has been closed automatically."
- ) from err
- def _index_for_key(self, keys, raiseerr):
- self._we_dont_return_rows()
- def _metadata_for_keys(self, key):
- self._we_dont_return_rows()
- def _reduce(self, keys):
- self._we_dont_return_rows()
- @property
- def _keymap(self): # type: ignore[override]
- self._we_dont_return_rows()
- @property
- def _key_to_index(self): # type: ignore[override]
- self._we_dont_return_rows()
- @property
- def _processors(self): # type: ignore[override]
- self._we_dont_return_rows()
- @property
- def keys(self):
- self._we_dont_return_rows()
- _NO_RESULT_METADATA = _NoResultMetaData()
- def null_dml_result() -> IteratorResult[Any]:
- it: IteratorResult[Any] = IteratorResult(_NoResultMetaData(), iter([]))
- it._soft_close()
- return it
- class CursorResult(Result[_T]):
- """A Result that is representing state from a DBAPI cursor.
- .. versionchanged:: 1.4 The :class:`.CursorResult``
- class replaces the previous :class:`.ResultProxy` interface.
- This classes are based on the :class:`.Result` calling API
- which provides an updated usage model and calling facade for
- SQLAlchemy Core and SQLAlchemy ORM.
- Returns database rows via the :class:`.Row` class, which provides
- additional API features and behaviors on top of the raw data returned by
- the DBAPI. Through the use of filters such as the :meth:`.Result.scalars`
- method, other kinds of objects may also be returned.
- .. seealso::
- :ref:`tutorial_selecting_data` - introductory material for accessing
- :class:`_engine.CursorResult` and :class:`.Row` objects.
- """
- __slots__ = (
- "context",
- "dialect",
- "cursor",
- "cursor_strategy",
- "_echo",
- "connection",
- )
- _metadata: Union[CursorResultMetaData, _NoResultMetaData]
- _no_result_metadata = _NO_RESULT_METADATA
- _soft_closed: bool = False
- closed: bool = False
- _is_cursor = True
- context: DefaultExecutionContext
- dialect: Dialect
- cursor_strategy: ResultFetchStrategy
- connection: Connection
- def __init__(
- self,
- context: DefaultExecutionContext,
- cursor_strategy: ResultFetchStrategy,
- cursor_description: Optional[_DBAPICursorDescription],
- ):
- self.context = context
- self.dialect = context.dialect
- self.cursor = context.cursor
- self.cursor_strategy = cursor_strategy
- self.connection = context.root_connection
- self._echo = echo = (
- self.connection._echo and context.engine._should_log_debug()
- )
- if cursor_description is not None:
- # inline of Result._row_getter(), set up an initial row
- # getter assuming no transformations will be called as this
- # is the most common case
- metadata = self._init_metadata(context, cursor_description)
- _make_row: Any
- _make_row = functools.partial(
- Row,
- metadata,
- metadata._effective_processors,
- metadata._key_to_index,
- )
- if context._num_sentinel_cols:
- sentinel_filter = operator.itemgetter(
- slice(-context._num_sentinel_cols)
- )
- def _sliced_row(raw_data):
- return _make_row(sentinel_filter(raw_data))
- sliced_row = _sliced_row
- else:
- sliced_row = _make_row
- if echo:
- log = self.context.connection._log_debug
- def _log_row(row):
- log("Row %r", sql_util._repr_row(row))
- return row
- self._row_logging_fn = _log_row
- def _make_row_2(row):
- return _log_row(sliced_row(row))
- make_row = _make_row_2
- else:
- make_row = sliced_row
- self._set_memoized_attribute("_row_getter", make_row)
- else:
- assert context._num_sentinel_cols == 0
- self._metadata = self._no_result_metadata
- def _init_metadata(self, context, cursor_description):
- if context.compiled:
- compiled = context.compiled
- if compiled._cached_metadata:
- metadata = compiled._cached_metadata
- else:
- metadata = CursorResultMetaData(self, cursor_description)
- if metadata._safe_for_cache:
- compiled._cached_metadata = metadata
- # result rewrite/ adapt step. this is to suit the case
- # when we are invoked against a cached Compiled object, we want
- # to rewrite the ResultMetaData to reflect the Column objects
- # that are in our current SQL statement object, not the one
- # that is associated with the cached Compiled object.
- # the Compiled object may also tell us to not
- # actually do this step; this is to support the ORM where
- # it is to produce a new Result object in any case, and will
- # be using the cached Column objects against this database result
- # so we don't want to rewrite them.
- #
- # Basically this step suits the use case where the end user
- # is using Core SQL expressions and is accessing columns in the
- # result row using row._mapping[table.c.column].
- if (
- not context.execution_options.get(
- "_result_disable_adapt_to_context", False
- )
- and compiled._result_columns
- and context.cache_hit is context.dialect.CACHE_HIT
- and compiled.statement is not context.invoked_statement
- ):
- metadata = metadata._adapt_to_context(context)
- self._metadata = metadata
- else:
- self._metadata = metadata = CursorResultMetaData(
- self, cursor_description
- )
- if self._echo:
- context.connection._log_debug(
- "Col %r", tuple(x[0] for x in cursor_description)
- )
- return metadata
- def _soft_close(self, hard=False):
- """Soft close this :class:`_engine.CursorResult`.
- This releases all DBAPI cursor resources, but leaves the
- CursorResult "open" from a semantic perspective, meaning the
- fetchXXX() methods will continue to return empty results.
- This method is called automatically when:
- * all result rows are exhausted using the fetchXXX() methods.
- * cursor.description is None.
- This method is **not public**, but is documented in order to clarify
- the "autoclose" process used.
- .. seealso::
- :meth:`_engine.CursorResult.close`
- """
- if (not hard and self._soft_closed) or (hard and self.closed):
- return
- if hard:
- self.closed = True
- self.cursor_strategy.hard_close(self, self.cursor)
- else:
- self.cursor_strategy.soft_close(self, self.cursor)
- if not self._soft_closed:
- cursor = self.cursor
- self.cursor = None # type: ignore
- self.connection._safe_close_cursor(cursor)
- self._soft_closed = True
- @property
- def inserted_primary_key_rows(self):
- """Return the value of
- :attr:`_engine.CursorResult.inserted_primary_key`
- as a row contained within a list; some dialects may support a
- multiple row form as well.
- .. note:: As indicated below, in current SQLAlchemy versions this
- accessor is only useful beyond what's already supplied by
- :attr:`_engine.CursorResult.inserted_primary_key` when using the
- :ref:`postgresql_psycopg2` dialect. Future versions hope to
- generalize this feature to more dialects.
- This accessor is added to support dialects that offer the feature
- that is currently implemented by the :ref:`psycopg2_executemany_mode`
- feature, currently **only the psycopg2 dialect**, which provides
- for many rows to be INSERTed at once while still retaining the
- behavior of being able to return server-generated primary key values.
- * **When using the psycopg2 dialect, or other dialects that may support
- "fast executemany" style inserts in upcoming releases** : When
- invoking an INSERT statement while passing a list of rows as the
- second argument to :meth:`_engine.Connection.execute`, this accessor
- will then provide a list of rows, where each row contains the primary
- key value for each row that was INSERTed.
- * **When using all other dialects / backends that don't yet support
- this feature**: This accessor is only useful for **single row INSERT
- statements**, and returns the same information as that of the
- :attr:`_engine.CursorResult.inserted_primary_key` within a
- single-element list. When an INSERT statement is executed in
- conjunction with a list of rows to be INSERTed, the list will contain
- one row per row inserted in the statement, however it will contain
- ``None`` for any server-generated values.
- Future releases of SQLAlchemy will further generalize the
- "fast execution helper" feature of psycopg2 to suit other dialects,
- thus allowing this accessor to be of more general use.
- .. versionadded:: 1.4
- .. seealso::
- :attr:`_engine.CursorResult.inserted_primary_key`
- """
- if not self.context.compiled:
- raise exc.InvalidRequestError(
- "Statement is not a compiled expression construct."
- )
- elif not self.context.isinsert:
- raise exc.InvalidRequestError(
- "Statement is not an insert() expression construct."
- )
- elif self.context._is_explicit_returning:
- raise exc.InvalidRequestError(
- "Can't call inserted_primary_key "
- "when returning() "
- "is used."
- )
- return self.context.inserted_primary_key_rows
- @property
- def inserted_primary_key(self):
- """Return the primary key for the row just inserted.
- The return value is a :class:`_result.Row` object representing
- a named tuple of primary key values in the order in which the
- primary key columns are configured in the source
- :class:`_schema.Table`.
- .. versionchanged:: 1.4.8 - the
- :attr:`_engine.CursorResult.inserted_primary_key`
- value is now a named tuple via the :class:`_result.Row` class,
- rather than a plain tuple.
- This accessor only applies to single row :func:`_expression.insert`
- constructs which did not explicitly specify
- :meth:`_expression.Insert.returning`. Support for multirow inserts,
- while not yet available for most backends, would be accessed using
- the :attr:`_engine.CursorResult.inserted_primary_key_rows` accessor.
- Note that primary key columns which specify a server_default clause, or
- otherwise do not qualify as "autoincrement" columns (see the notes at
- :class:`_schema.Column`), and were generated using the database-side
- default, will appear in this list as ``None`` unless the backend
- supports "returning" and the insert statement executed with the
- "implicit returning" enabled.
- Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
- statement is not a compiled expression construct
- or is not an insert() construct.
- """
- if self.context.executemany:
- raise exc.InvalidRequestError(
- "This statement was an executemany call; if primary key "
- "returning is supported, please "
- "use .inserted_primary_key_rows."
- )
- ikp = self.inserted_primary_key_rows
- if ikp:
- return ikp[0]
- else:
- return None
- def last_updated_params(self):
- """Return the collection of updated parameters from this
- execution.
- Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
- statement is not a compiled expression construct
- or is not an update() construct.
- """
- if not self.context.compiled:
- raise exc.InvalidRequestError(
- "Statement is not a compiled expression construct."
- )
- elif not self.context.isupdate:
- raise exc.InvalidRequestError(
- "Statement is not an update() expression construct."
- )
- elif self.context.executemany:
- return self.context.compiled_parameters
- else:
- return self.context.compiled_parameters[0]
- def last_inserted_params(self):
- """Return the collection of inserted parameters from this
- execution.
- Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
- statement is not a compiled expression construct
- or is not an insert() construct.
- """
- if not self.context.compiled:
- raise exc.InvalidRequestError(
- "Statement is not a compiled expression construct."
- )
- elif not self.context.isinsert:
- raise exc.InvalidRequestError(
- "Statement is not an insert() expression construct."
- )
- elif self.context.executemany:
- return self.context.compiled_parameters
- else:
- return self.context.compiled_parameters[0]
- @property
- def returned_defaults_rows(self):
- """Return a list of rows each containing the values of default
- columns that were fetched using
- the :meth:`.ValuesBase.return_defaults` feature.
- The return value is a list of :class:`.Row` objects.
- .. versionadded:: 1.4
- """
- return self.context.returned_default_rows
- def splice_horizontally(self, other):
- """Return a new :class:`.CursorResult` that "horizontally splices"
- together the rows of this :class:`.CursorResult` with that of another
- :class:`.CursorResult`.
- .. tip:: This method is for the benefit of the SQLAlchemy ORM and is
- not intended for general use.
- "horizontally splices" means that for each row in the first and second
- result sets, a new row that concatenates the two rows together is
- produced, which then becomes the new row. The incoming
- :class:`.CursorResult` must have the identical number of rows. It is
- typically expected that the two result sets come from the same sort
- order as well, as the result rows are spliced together based on their
- position in the result.
- The expected use case here is so that multiple INSERT..RETURNING
- statements (which definitely need to be sorted) against different
- tables can produce a single result that looks like a JOIN of those two
- tables.
- E.g.::
- r1 = connection.execute(
- users.insert().returning(
- users.c.user_name, users.c.user_id, sort_by_parameter_order=True
- ),
- user_values,
- )
- r2 = connection.execute(
- addresses.insert().returning(
- addresses.c.address_id,
- addresses.c.address,
- addresses.c.user_id,
- sort_by_parameter_order=True,
- ),
- address_values,
- )
- rows = r1.splice_horizontally(r2).all()
- assert rows == [
- ("john", 1, 1, "foo@bar.com", 1),
- ("jack", 2, 2, "bar@bat.com", 2),
- ]
- .. versionadded:: 2.0
- .. seealso::
- :meth:`.CursorResult.splice_vertically`
- """ # noqa: E501
- clone = self._generate()
- total_rows = [
- tuple(r1) + tuple(r2)
- for r1, r2 in zip(
- list(self._raw_row_iterator()),
- list(other._raw_row_iterator()),
- )
- ]
- clone._metadata = clone._metadata._splice_horizontally(other._metadata)
- clone.cursor_strategy = FullyBufferedCursorFetchStrategy(
- None,
- initial_buffer=total_rows,
- )
- clone._reset_memoizations()
- return clone
- def splice_vertically(self, other):
- """Return a new :class:`.CursorResult` that "vertically splices",
- i.e. "extends", the rows of this :class:`.CursorResult` with that of
- another :class:`.CursorResult`.
- .. tip:: This method is for the benefit of the SQLAlchemy ORM and is
- not intended for general use.
- "vertically splices" means the rows of the given result are appended to
- the rows of this cursor result. The incoming :class:`.CursorResult`
- must have rows that represent the identical list of columns in the
- identical order as they are in this :class:`.CursorResult`.
- .. versionadded:: 2.0
- .. seealso::
- :meth:`.CursorResult.splice_horizontally`
- """
- clone = self._generate()
- total_rows = list(self._raw_row_iterator()) + list(
- other._raw_row_iterator()
- )
- clone.cursor_strategy = FullyBufferedCursorFetchStrategy(
- None,
- initial_buffer=total_rows,
- )
- clone._reset_memoizations()
- return clone
- def _rewind(self, rows):
- """rewind this result back to the given rowset.
- this is used internally for the case where an :class:`.Insert`
- construct combines the use of
- :meth:`.Insert.return_defaults` along with the
- "supplemental columns" feature.
- """
- if self._echo:
- self.context.connection._log_debug(
- "CursorResult rewound %d row(s)", len(rows)
- )
- # the rows given are expected to be Row objects, so we
- # have to clear out processors which have already run on these
- # rows
- self._metadata = cast(
- CursorResultMetaData, self._metadata
- )._remove_processors()
- self.cursor_strategy = FullyBufferedCursorFetchStrategy(
- None,
- # TODO: if these are Row objects, can we save on not having to
- # re-make new Row objects out of them a second time? is that
- # what's actually happening right now? maybe look into this
- initial_buffer=rows,
- )
- self._reset_memoizations()
- return self
- @property
- def returned_defaults(self):
- """Return the values of default columns that were fetched using
- the :meth:`.ValuesBase.return_defaults` feature.
- The value is an instance of :class:`.Row`, or ``None``
- if :meth:`.ValuesBase.return_defaults` was not used or if the
- backend does not support RETURNING.
- .. seealso::
- :meth:`.ValuesBase.return_defaults`
- """
- if self.context.executemany:
- raise exc.InvalidRequestError(
- "This statement was an executemany call; if return defaults "
- "is supported, please use .returned_defaults_rows."
- )
- rows = self.context.returned_default_rows
- if rows:
- return rows[0]
- else:
- return None
- def lastrow_has_defaults(self):
- """Return ``lastrow_has_defaults()`` from the underlying
- :class:`.ExecutionContext`.
- See :class:`.ExecutionContext` for details.
- """
- return self.context.lastrow_has_defaults()
- def postfetch_cols(self):
- """Return ``postfetch_cols()`` from the underlying
- :class:`.ExecutionContext`.
- See :class:`.ExecutionContext` for details.
- Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
- statement is not a compiled expression construct
- or is not an insert() or update() construct.
- """
- if not self.context.compiled:
- raise exc.InvalidRequestError(
- "Statement is not a compiled expression construct."
- )
- elif not self.context.isinsert and not self.context.isupdate:
- raise exc.InvalidRequestError(
- "Statement is not an insert() or update() "
- "expression construct."
- )
- return self.context.postfetch_cols
- def prefetch_cols(self):
- """Return ``prefetch_cols()`` from the underlying
- :class:`.ExecutionContext`.
- See :class:`.ExecutionContext` for details.
- Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
- statement is not a compiled expression construct
- or is not an insert() or update() construct.
- """
- if not self.context.compiled:
- raise exc.InvalidRequestError(
- "Statement is not a compiled expression construct."
- )
- elif not self.context.isinsert and not self.context.isupdate:
- raise exc.InvalidRequestError(
- "Statement is not an insert() or update() "
- "expression construct."
- )
- return self.context.prefetch_cols
- def supports_sane_rowcount(self):
- """Return ``supports_sane_rowcount`` from the dialect.
- See :attr:`_engine.CursorResult.rowcount` for background.
- """
- return self.dialect.supports_sane_rowcount
- def supports_sane_multi_rowcount(self):
- """Return ``supports_sane_multi_rowcount`` from the dialect.
- See :attr:`_engine.CursorResult.rowcount` for background.
- """
- return self.dialect.supports_sane_multi_rowcount
- @util.memoized_property
- def rowcount(self) -> int:
- """Return the 'rowcount' for this result.
- The primary purpose of 'rowcount' is to report the number of rows
- matched by the WHERE criterion of an UPDATE or DELETE statement
- executed once (i.e. for a single parameter set), which may then be
- compared to the number of rows expected to be updated or deleted as a
- means of asserting data integrity.
- This attribute is transferred from the ``cursor.rowcount`` attribute
- of the DBAPI before the cursor is closed, to support DBAPIs that
- don't make this value available after cursor close. Some DBAPIs may
- offer meaningful values for other kinds of statements, such as INSERT
- and SELECT statements as well. In order to retrieve ``cursor.rowcount``
- for these statements, set the
- :paramref:`.Connection.execution_options.preserve_rowcount`
- execution option to True, which will cause the ``cursor.rowcount``
- value to be unconditionally memoized before any results are returned
- or the cursor is closed, regardless of statement type.
- For cases where the DBAPI does not support rowcount for a particular
- kind of statement and/or execution, the returned value will be ``-1``,
- which is delivered directly from the DBAPI and is part of :pep:`249`.
- All DBAPIs should support rowcount for single-parameter-set
- UPDATE and DELETE statements, however.
- .. note::
- Notes regarding :attr:`_engine.CursorResult.rowcount`:
- * This attribute returns the number of rows *matched*,
- which is not necessarily the same as the number of rows
- that were actually *modified*. For example, an UPDATE statement
- may have no net change on a given row if the SET values
- given are the same as those present in the row already.
- Such a row would be matched but not modified.
- On backends that feature both styles, such as MySQL,
- rowcount is configured to return the match
- count in all cases.
- * :attr:`_engine.CursorResult.rowcount` in the default case is
- *only* useful in conjunction with an UPDATE or DELETE statement,
- and only with a single set of parameters. For other kinds of
- statements, SQLAlchemy will not attempt to pre-memoize the value
- unless the
- :paramref:`.Connection.execution_options.preserve_rowcount`
- execution option is used. Note that contrary to :pep:`249`, many
- DBAPIs do not support rowcount values for statements that are not
- UPDATE or DELETE, particularly when rows are being returned which
- are not fully pre-buffered. DBAPIs that dont support rowcount
- for a particular kind of statement should return the value ``-1``
- for such statements.
- * :attr:`_engine.CursorResult.rowcount` may not be meaningful
- when executing a single statement with multiple parameter sets
- (i.e. an :term:`executemany`). Most DBAPIs do not sum "rowcount"
- values across multiple parameter sets and will return ``-1``
- when accessed.
- * SQLAlchemy's :ref:`engine_insertmanyvalues` feature does support
- a correct population of :attr:`_engine.CursorResult.rowcount`
- when the :paramref:`.Connection.execution_options.preserve_rowcount`
- execution option is set to True.
- * Statements that use RETURNING may not support rowcount, returning
- a ``-1`` value instead.
- .. seealso::
- :ref:`tutorial_update_delete_rowcount` - in the :ref:`unified_tutorial`
- :paramref:`.Connection.execution_options.preserve_rowcount`
- """ # noqa: E501
- try:
- return self.context.rowcount
- except BaseException as e:
- self.cursor_strategy.handle_exception(self, self.cursor, e)
- raise # not called
- @property
- def lastrowid(self):
- """Return the 'lastrowid' accessor on the DBAPI cursor.
- This is a DBAPI specific method and is only functional
- for those backends which support it, for statements
- where it is appropriate. It's behavior is not
- consistent across backends.
- Usage of this method is normally unnecessary when
- using insert() expression constructs; the
- :attr:`~CursorResult.inserted_primary_key` attribute provides a
- tuple of primary key values for a newly inserted row,
- regardless of database backend.
- """
- try:
- return self.context.get_lastrowid()
- except BaseException as e:
- self.cursor_strategy.handle_exception(self, self.cursor, e)
- @property
- def returns_rows(self):
- """True if this :class:`_engine.CursorResult` returns zero or more
- rows.
- I.e. if it is legal to call the methods
- :meth:`_engine.CursorResult.fetchone`,
- :meth:`_engine.CursorResult.fetchmany`
- :meth:`_engine.CursorResult.fetchall`.
- Overall, the value of :attr:`_engine.CursorResult.returns_rows` should
- always be synonymous with whether or not the DBAPI cursor had a
- ``.description`` attribute, indicating the presence of result columns,
- noting that a cursor that returns zero rows still has a
- ``.description`` if a row-returning statement was emitted.
- This attribute should be True for all results that are against
- SELECT statements, as well as for DML statements INSERT/UPDATE/DELETE
- that use RETURNING. For INSERT/UPDATE/DELETE statements that were
- not using RETURNING, the value will usually be False, however
- there are some dialect-specific exceptions to this, such as when
- using the MSSQL / pyodbc dialect a SELECT is emitted inline in
- order to retrieve an inserted primary key value.
- """
- return self._metadata.returns_rows
- @property
- def is_insert(self):
- """True if this :class:`_engine.CursorResult` is the result
- of a executing an expression language compiled
- :func:`_expression.insert` construct.
- When True, this implies that the
- :attr:`inserted_primary_key` attribute is accessible,
- assuming the statement did not include
- a user defined "returning" construct.
- """
- return self.context.isinsert
- def _fetchiter_impl(self):
- fetchone = self.cursor_strategy.fetchone
- while True:
- row = fetchone(self, self.cursor)
- if row is None:
- break
- yield row
- def _fetchone_impl(self, hard_close=False):
- return self.cursor_strategy.fetchone(self, self.cursor, hard_close)
- def _fetchall_impl(self):
- return self.cursor_strategy.fetchall(self, self.cursor)
- def _fetchmany_impl(self, size=None):
- return self.cursor_strategy.fetchmany(self, self.cursor, size)
- def _raw_row_iterator(self):
- return self._fetchiter_impl()
- def merge(self, *others: Result[Any]) -> MergedResult[Any]:
- merged_result = super().merge(*others)
- if self.context._has_rowcount:
- merged_result.rowcount = sum(
- cast("CursorResult[Any]", result).rowcount
- for result in (self,) + others
- )
- return merged_result
- def close(self) -> Any:
- """Close this :class:`_engine.CursorResult`.
- This closes out the underlying DBAPI cursor corresponding to the
- statement execution, if one is still present. Note that the DBAPI
- cursor is automatically released when the :class:`_engine.CursorResult`
- exhausts all available rows. :meth:`_engine.CursorResult.close` is
- generally an optional method except in the case when discarding a
- :class:`_engine.CursorResult` that still has additional rows pending
- for fetch.
- After this method is called, it is no longer valid to call upon
- the fetch methods, which will raise a :class:`.ResourceClosedError`
- on subsequent use.
- .. seealso::
- :ref:`connections_toplevel`
- """
- self._soft_close(hard=True)
- @_generative
- def yield_per(self, num: int) -> Self:
- self._yield_per = num
- self.cursor_strategy.yield_per(self, self.cursor, num)
- return self
- ResultProxy = CursorResult
|