| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840384138423843384438453846384738483849385038513852385338543855385638573858385938603861386238633864386538663867386838693870387138723873387438753876387738783879388038813882388338843885388638873888388938903891389238933894389538963897389838993900390139023903390439053906390739083909391039113912391339143915391639173918391939203921392239233924392539263927392839293930393139323933393439353936393739383939394039413942394339443945394639473948394939503951395239533954395539563957395839593960396139623963396439653966396739683969397039713972397339743975397639773978397939803981398239833984398539863987398839893990399139923993399439953996399739983999400040014002400340044005400640074008400940104011401240134014401540164017401840194020402140224023402440254026402740284029403040314032403340344035403640374038403940404041404240434044404540464047404840494050405140524053405440554056405740584059406040614062406340644065406640674068406940704071407240734074407540764077407840794080408140824083408440854086408740884089409040914092409340944095409640974098409941004101410241034104410541064107410841094110411141124113411441154116411741184119412041214122412341244125412641274128412941304131413241334134413541364137413841394140414141424143414441454146414741484149415041514152415341544155415641574158415941604161416241634164416541664167416841694170417141724173417441754176417741784179418041814182418341844185418641874188418941904191419241934194419541964197419841994200420142024203420442054206420742084209421042114212421342144215421642174218421942204221422242234224422542264227422842294230423142324233423442354236423742384239424042414242424342444245424642474248424942504251425242534254425542564257425842594260426142624263426442654266426742684269427042714272427342744275427642774278427942804281428242834284428542864287428842894290429142924293429442954296429742984299430043014302430343044305430643074308430943104311431243134314431543164317431843194320432143224323432443254326432743284329433043314332433343344335433643374338433943404341434243434344434543464347434843494350435143524353435443554356435743584359436043614362436343644365436643674368436943704371437243734374437543764377437843794380438143824383438443854386438743884389439043914392439343944395439643974398439944004401440244034404440544064407440844094410441144124413441444154416441744184419442044214422442344244425442644274428442944304431443244334434443544364437443844394440444144424443444444454446444744484449445044514452445344544455445644574458445944604461446244634464446544664467446844694470447144724473447444754476447744784479448044814482448344844485448644874488448944904491449244934494449544964497449844994500450145024503450445054506450745084509451045114512451345144515451645174518451945204521452245234524452545264527452845294530453145324533453445354536453745384539454045414542454345444545454645474548454945504551455245534554455545564557455845594560456145624563456445654566456745684569457045714572457345744575457645774578457945804581458245834584458545864587458845894590459145924593459445954596459745984599460046014602460346044605460646074608460946104611461246134614461546164617461846194620462146224623462446254626462746284629463046314632463346344635463646374638463946404641464246434644464546464647464846494650465146524653465446554656465746584659466046614662466346644665466646674668466946704671467246734674467546764677467846794680468146824683468446854686468746884689469046914692469346944695469646974698469947004701470247034704470547064707470847094710471147124713471447154716471747184719472047214722472347244725472647274728472947304731473247334734473547364737473847394740474147424743474447454746474747484749475047514752475347544755475647574758475947604761476247634764476547664767476847694770477147724773477447754776477747784779478047814782478347844785478647874788478947904791479247934794479547964797479847994800480148024803480448054806480748084809481048114812481348144815481648174818481948204821482248234824482548264827482848294830483148324833483448354836483748384839484048414842484348444845484648474848484948504851485248534854485548564857485848594860486148624863486448654866486748684869487048714872487348744875487648774878487948804881488248834884488548864887488848894890489148924893489448954896489748984899490049014902490349044905490649074908490949104911491249134914491549164917491849194920492149224923492449254926492749284929493049314932493349344935493649374938493949404941494249434944494549464947494849494950495149524953495449554956495749584959496049614962496349644965496649674968496949704971497249734974497549764977497849794980498149824983498449854986498749884989499049914992499349944995499649974998499950005001500250035004500550065007500850095010501150125013501450155016501750185019502050215022502350245025502650275028502950305031503250335034503550365037503850395040504150425043504450455046504750485049505050515052505350545055505650575058505950605061506250635064506550665067506850695070507150725073507450755076507750785079508050815082508350845085508650875088508950905091509250935094509550965097509850995100510151025103510451055106510751085109511051115112511351145115511651175118511951205121512251235124512551265127512851295130513151325133513451355136513751385139514051415142514351445145514651475148514951505151515251535154515551565157515851595160516151625163516451655166516751685169517051715172517351745175517651775178517951805181518251835184518551865187518851895190519151925193519451955196519751985199520052015202520352045205520652075208520952105211521252135214521552165217521852195220522152225223522452255226 |
- # dialects/postgresql/base.py
- # Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
- # <see AUTHORS file>
- #
- # This module is part of SQLAlchemy and is released under
- # the MIT License: https://www.opensource.org/licenses/mit-license.php
- # mypy: ignore-errors
- r"""
- .. dialect:: postgresql
- :name: PostgreSQL
- :normal_support: 9.6+
- :best_effort: 9+
- .. _postgresql_sequences:
- Sequences/SERIAL/IDENTITY
- -------------------------
- PostgreSQL supports sequences, and SQLAlchemy uses these as the default means
- of creating new primary key values for integer-based primary key columns. When
- creating tables, SQLAlchemy will issue the ``SERIAL`` datatype for
- integer-based primary key columns, which generates a sequence and server side
- default corresponding to the column.
- To specify a specific named sequence to be used for primary key generation,
- use the :func:`~sqlalchemy.schema.Sequence` construct::
- Table(
- "sometable",
- metadata,
- Column(
- "id", Integer, Sequence("some_id_seq", start=1), primary_key=True
- ),
- )
- When SQLAlchemy issues a single INSERT statement, to fulfill the contract of
- having the "last insert identifier" available, a RETURNING clause is added to
- the INSERT statement which specifies the primary key columns should be
- returned after the statement completes. The RETURNING functionality only takes
- place if PostgreSQL 8.2 or later is in use. As a fallback approach, the
- sequence, whether specified explicitly or implicitly via ``SERIAL``, is
- executed independently beforehand, the returned value to be used in the
- subsequent insert. Note that when an
- :func:`~sqlalchemy.sql.expression.insert()` construct is executed using
- "executemany" semantics, the "last inserted identifier" functionality does not
- apply; no RETURNING clause is emitted nor is the sequence pre-executed in this
- case.
- PostgreSQL 10 and above IDENTITY columns
- ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- PostgreSQL 10 and above have a new IDENTITY feature that supersedes the use
- of SERIAL. The :class:`_schema.Identity` construct in a
- :class:`_schema.Column` can be used to control its behavior::
- from sqlalchemy import Table, Column, MetaData, Integer, Computed
- metadata = MetaData()
- data = Table(
- "data",
- metadata,
- Column(
- "id", Integer, Identity(start=42, cycle=True), primary_key=True
- ),
- Column("data", String),
- )
- The CREATE TABLE for the above :class:`_schema.Table` object would be:
- .. sourcecode:: sql
- CREATE TABLE data (
- id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 42 CYCLE),
- data VARCHAR,
- PRIMARY KEY (id)
- )
- .. versionchanged:: 1.4 Added :class:`_schema.Identity` construct
- in a :class:`_schema.Column` to specify the option of an autoincrementing
- column.
- .. note::
- Previous versions of SQLAlchemy did not have built-in support for rendering
- of IDENTITY, and could use the following compilation hook to replace
- occurrences of SERIAL with IDENTITY::
- from sqlalchemy.schema import CreateColumn
- from sqlalchemy.ext.compiler import compiles
- @compiles(CreateColumn, "postgresql")
- def use_identity(element, compiler, **kw):
- text = compiler.visit_create_column(element, **kw)
- text = text.replace("SERIAL", "INT GENERATED BY DEFAULT AS IDENTITY")
- return text
- Using the above, a table such as::
- t = Table(
- "t", m, Column("id", Integer, primary_key=True), Column("data", String)
- )
- Will generate on the backing database as:
- .. sourcecode:: sql
- CREATE TABLE t (
- id INT GENERATED BY DEFAULT AS IDENTITY,
- data VARCHAR,
- PRIMARY KEY (id)
- )
- .. _postgresql_ss_cursors:
- Server Side Cursors
- -------------------
- Server-side cursor support is available for the psycopg2, asyncpg
- dialects and may also be available in others.
- Server side cursors are enabled on a per-statement basis by using the
- :paramref:`.Connection.execution_options.stream_results` connection execution
- option::
- with engine.connect() as conn:
- result = conn.execution_options(stream_results=True).execute(
- text("select * from table")
- )
- Note that some kinds of SQL statements may not be supported with
- server side cursors; generally, only SQL statements that return rows should be
- used with this option.
- .. deprecated:: 1.4 The dialect-level server_side_cursors flag is deprecated
- and will be removed in a future release. Please use the
- :paramref:`_engine.Connection.stream_results` execution option for
- unbuffered cursor support.
- .. seealso::
- :ref:`engine_stream_results`
- .. _postgresql_isolation_level:
- Transaction Isolation Level
- ---------------------------
- Most SQLAlchemy dialects support setting of transaction isolation level
- using the :paramref:`_sa.create_engine.isolation_level` parameter
- at the :func:`_sa.create_engine` level, and at the :class:`_engine.Connection`
- level via the :paramref:`.Connection.execution_options.isolation_level`
- parameter.
- For PostgreSQL dialects, this feature works either by making use of the
- DBAPI-specific features, such as psycopg2's isolation level flags which will
- embed the isolation level setting inline with the ``"BEGIN"`` statement, or for
- DBAPIs with no direct support by emitting ``SET SESSION CHARACTERISTICS AS
- TRANSACTION ISOLATION LEVEL <level>`` ahead of the ``"BEGIN"`` statement
- emitted by the DBAPI. For the special AUTOCOMMIT isolation level,
- DBAPI-specific techniques are used which is typically an ``.autocommit``
- flag on the DBAPI connection object.
- To set isolation level using :func:`_sa.create_engine`::
- engine = create_engine(
- "postgresql+pg8000://scott:tiger@localhost/test",
- isolation_level="REPEATABLE READ",
- )
- To set using per-connection execution options::
- with engine.connect() as conn:
- conn = conn.execution_options(isolation_level="REPEATABLE READ")
- with conn.begin():
- ... # work with transaction
- There are also more options for isolation level configurations, such as
- "sub-engine" objects linked to a main :class:`_engine.Engine` which each apply
- different isolation level settings. See the discussion at
- :ref:`dbapi_autocommit` for background.
- Valid values for ``isolation_level`` on most PostgreSQL dialects include:
- * ``READ COMMITTED``
- * ``READ UNCOMMITTED``
- * ``REPEATABLE READ``
- * ``SERIALIZABLE``
- * ``AUTOCOMMIT``
- .. seealso::
- :ref:`dbapi_autocommit`
- :ref:`postgresql_readonly_deferrable`
- :ref:`psycopg2_isolation_level`
- :ref:`pg8000_isolation_level`
- .. _postgresql_readonly_deferrable:
- Setting READ ONLY / DEFERRABLE
- ------------------------------
- Most PostgreSQL dialects support setting the "READ ONLY" and "DEFERRABLE"
- characteristics of the transaction, which is in addition to the isolation level
- setting. These two attributes can be established either in conjunction with or
- independently of the isolation level by passing the ``postgresql_readonly`` and
- ``postgresql_deferrable`` flags with
- :meth:`_engine.Connection.execution_options`. The example below illustrates
- passing the ``"SERIALIZABLE"`` isolation level at the same time as setting
- "READ ONLY" and "DEFERRABLE"::
- with engine.connect() as conn:
- conn = conn.execution_options(
- isolation_level="SERIALIZABLE",
- postgresql_readonly=True,
- postgresql_deferrable=True,
- )
- with conn.begin():
- ... # work with transaction
- Note that some DBAPIs such as asyncpg only support "readonly" with
- SERIALIZABLE isolation.
- .. versionadded:: 1.4 added support for the ``postgresql_readonly``
- and ``postgresql_deferrable`` execution options.
- .. _postgresql_reset_on_return:
- Temporary Table / Resource Reset for Connection Pooling
- -------------------------------------------------------
- The :class:`.QueuePool` connection pool implementation used
- by the SQLAlchemy :class:`.Engine` object includes
- :ref:`reset on return <pool_reset_on_return>` behavior that will invoke
- the DBAPI ``.rollback()`` method when connections are returned to the pool.
- While this rollback will clear out the immediate state used by the previous
- transaction, it does not cover a wider range of session-level state, including
- temporary tables as well as other server state such as prepared statement
- handles and statement caches. The PostgreSQL database includes a variety
- of commands which may be used to reset this state, including
- ``DISCARD``, ``RESET``, ``DEALLOCATE``, and ``UNLISTEN``.
- To install
- one or more of these commands as the means of performing reset-on-return,
- the :meth:`.PoolEvents.reset` event hook may be used, as demonstrated
- in the example below. The implementation
- will end transactions in progress as well as discard temporary tables
- using the ``CLOSE``, ``RESET`` and ``DISCARD`` commands; see the PostgreSQL
- documentation for background on what each of these statements do.
- The :paramref:`_sa.create_engine.pool_reset_on_return` parameter
- is set to ``None`` so that the custom scheme can replace the default behavior
- completely. The custom hook implementation calls ``.rollback()`` in any case,
- as it's usually important that the DBAPI's own tracking of commit/rollback
- will remain consistent with the state of the transaction::
- from sqlalchemy import create_engine
- from sqlalchemy import event
- postgresql_engine = create_engine(
- "postgresql+psycopg2://scott:tiger@hostname/dbname",
- # disable default reset-on-return scheme
- pool_reset_on_return=None,
- )
- @event.listens_for(postgresql_engine, "reset")
- def _reset_postgresql(dbapi_connection, connection_record, reset_state):
- if not reset_state.terminate_only:
- dbapi_connection.execute("CLOSE ALL")
- dbapi_connection.execute("RESET ALL")
- dbapi_connection.execute("DISCARD TEMP")
- # so that the DBAPI itself knows that the connection has been
- # reset
- dbapi_connection.rollback()
- .. versionchanged:: 2.0.0b3 Added additional state arguments to
- the :meth:`.PoolEvents.reset` event and additionally ensured the event
- is invoked for all "reset" occurrences, so that it's appropriate
- as a place for custom "reset" handlers. Previous schemes which
- use the :meth:`.PoolEvents.checkin` handler remain usable as well.
- .. seealso::
- :ref:`pool_reset_on_return` - in the :ref:`pooling_toplevel` documentation
- .. _postgresql_alternate_search_path:
- Setting Alternate Search Paths on Connect
- ------------------------------------------
- The PostgreSQL ``search_path`` variable refers to the list of schema names
- that will be implicitly referenced when a particular table or other
- object is referenced in a SQL statement. As detailed in the next section
- :ref:`postgresql_schema_reflection`, SQLAlchemy is generally organized around
- the concept of keeping this variable at its default value of ``public``,
- however, in order to have it set to any arbitrary name or names when connections
- are used automatically, the "SET SESSION search_path" command may be invoked
- for all connections in a pool using the following event handler, as discussed
- at :ref:`schema_set_default_connections`::
- from sqlalchemy import event
- from sqlalchemy import create_engine
- engine = create_engine("postgresql+psycopg2://scott:tiger@host/dbname")
- @event.listens_for(engine, "connect", insert=True)
- def set_search_path(dbapi_connection, connection_record):
- existing_autocommit = dbapi_connection.autocommit
- dbapi_connection.autocommit = True
- cursor = dbapi_connection.cursor()
- cursor.execute("SET SESSION search_path='%s'" % schema_name)
- cursor.close()
- dbapi_connection.autocommit = existing_autocommit
- The reason the recipe is complicated by use of the ``.autocommit`` DBAPI
- attribute is so that when the ``SET SESSION search_path`` directive is invoked,
- it is invoked outside of the scope of any transaction and therefore will not
- be reverted when the DBAPI connection has a rollback.
- .. seealso::
- :ref:`schema_set_default_connections` - in the :ref:`metadata_toplevel` documentation
- .. _postgresql_schema_reflection:
- Remote-Schema Table Introspection and PostgreSQL search_path
- ------------------------------------------------------------
- .. admonition:: Section Best Practices Summarized
- keep the ``search_path`` variable set to its default of ``public``, without
- any other schema names. Ensure the username used to connect **does not**
- match remote schemas, or ensure the ``"$user"`` token is **removed** from
- ``search_path``. For other schema names, name these explicitly
- within :class:`_schema.Table` definitions. Alternatively, the
- ``postgresql_ignore_search_path`` option will cause all reflected
- :class:`_schema.Table` objects to have a :attr:`_schema.Table.schema`
- attribute set up.
- The PostgreSQL dialect can reflect tables from any schema, as outlined in
- :ref:`metadata_reflection_schemas`.
- In all cases, the first thing SQLAlchemy does when reflecting tables is
- to **determine the default schema for the current database connection**.
- It does this using the PostgreSQL ``current_schema()``
- function, illustated below using a PostgreSQL client session (i.e. using
- the ``psql`` tool):
- .. sourcecode:: sql
- test=> select current_schema();
- current_schema
- ----------------
- public
- (1 row)
- Above we see that on a plain install of PostgreSQL, the default schema name
- is the name ``public``.
- However, if your database username **matches the name of a schema**, PostgreSQL's
- default is to then **use that name as the default schema**. Below, we log in
- using the username ``scott``. When we create a schema named ``scott``, **it
- implicitly changes the default schema**:
- .. sourcecode:: sql
- test=> select current_schema();
- current_schema
- ----------------
- public
- (1 row)
- test=> create schema scott;
- CREATE SCHEMA
- test=> select current_schema();
- current_schema
- ----------------
- scott
- (1 row)
- The behavior of ``current_schema()`` is derived from the
- `PostgreSQL search path
- <https://www.postgresql.org/docs/current/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_
- variable ``search_path``, which in modern PostgreSQL versions defaults to this:
- .. sourcecode:: sql
- test=> show search_path;
- search_path
- -----------------
- "$user", public
- (1 row)
- Where above, the ``"$user"`` variable will inject the current username as the
- default schema, if one exists. Otherwise, ``public`` is used.
- When a :class:`_schema.Table` object is reflected, if it is present in the
- schema indicated by the ``current_schema()`` function, **the schema name assigned
- to the ".schema" attribute of the Table is the Python "None" value**. Otherwise, the
- ".schema" attribute will be assigned the string name of that schema.
- With regards to tables which these :class:`_schema.Table`
- objects refer to via foreign key constraint, a decision must be made as to how
- the ``.schema`` is represented in those remote tables, in the case where that
- remote schema name is also a member of the current ``search_path``.
- By default, the PostgreSQL dialect mimics the behavior encouraged by
- PostgreSQL's own ``pg_get_constraintdef()`` builtin procedure. This function
- returns a sample definition for a particular foreign key constraint,
- omitting the referenced schema name from that definition when the name is
- also in the PostgreSQL schema search path. The interaction below
- illustrates this behavior:
- .. sourcecode:: sql
- test=> CREATE TABLE test_schema.referred(id INTEGER PRIMARY KEY);
- CREATE TABLE
- test=> CREATE TABLE referring(
- test(> id INTEGER PRIMARY KEY,
- test(> referred_id INTEGER REFERENCES test_schema.referred(id));
- CREATE TABLE
- test=> SET search_path TO public, test_schema;
- test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM
- test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n
- test-> ON n.oid = c.relnamespace
- test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid
- test-> WHERE c.relname='referring' AND r.contype = 'f'
- test-> ;
- pg_get_constraintdef
- ---------------------------------------------------
- FOREIGN KEY (referred_id) REFERENCES referred(id)
- (1 row)
- Above, we created a table ``referred`` as a member of the remote schema
- ``test_schema``, however when we added ``test_schema`` to the
- PG ``search_path`` and then asked ``pg_get_constraintdef()`` for the
- ``FOREIGN KEY`` syntax, ``test_schema`` was not included in the output of
- the function.
- On the other hand, if we set the search path back to the typical default
- of ``public``:
- .. sourcecode:: sql
- test=> SET search_path TO public;
- SET
- The same query against ``pg_get_constraintdef()`` now returns the fully
- schema-qualified name for us:
- .. sourcecode:: sql
- test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM
- test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n
- test-> ON n.oid = c.relnamespace
- test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid
- test-> WHERE c.relname='referring' AND r.contype = 'f';
- pg_get_constraintdef
- ---------------------------------------------------------------
- FOREIGN KEY (referred_id) REFERENCES test_schema.referred(id)
- (1 row)
- SQLAlchemy will by default use the return value of ``pg_get_constraintdef()``
- in order to determine the remote schema name. That is, if our ``search_path``
- were set to include ``test_schema``, and we invoked a table
- reflection process as follows::
- >>> from sqlalchemy import Table, MetaData, create_engine, text
- >>> engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
- >>> with engine.connect() as conn:
- ... conn.execute(text("SET search_path TO test_schema, public"))
- ... metadata_obj = MetaData()
- ... referring = Table("referring", metadata_obj, autoload_with=conn)
- <sqlalchemy.engine.result.CursorResult object at 0x101612ed0>
- The above process would deliver to the :attr:`_schema.MetaData.tables`
- collection
- ``referred`` table named **without** the schema::
- >>> metadata_obj.tables["referred"].schema is None
- True
- To alter the behavior of reflection such that the referred schema is
- maintained regardless of the ``search_path`` setting, use the
- ``postgresql_ignore_search_path`` option, which can be specified as a
- dialect-specific argument to both :class:`_schema.Table` as well as
- :meth:`_schema.MetaData.reflect`::
- >>> with engine.connect() as conn:
- ... conn.execute(text("SET search_path TO test_schema, public"))
- ... metadata_obj = MetaData()
- ... referring = Table(
- ... "referring",
- ... metadata_obj,
- ... autoload_with=conn,
- ... postgresql_ignore_search_path=True,
- ... )
- <sqlalchemy.engine.result.CursorResult object at 0x1016126d0>
- We will now have ``test_schema.referred`` stored as schema-qualified::
- >>> metadata_obj.tables["test_schema.referred"].schema
- 'test_schema'
- .. sidebar:: Best Practices for PostgreSQL Schema reflection
- The description of PostgreSQL schema reflection behavior is complex, and
- is the product of many years of dealing with widely varied use cases and
- user preferences. But in fact, there's no need to understand any of it if
- you just stick to the simplest use pattern: leave the ``search_path`` set
- to its default of ``public`` only, never refer to the name ``public`` as
- an explicit schema name otherwise, and refer to all other schema names
- explicitly when building up a :class:`_schema.Table` object. The options
- described here are only for those users who can't, or prefer not to, stay
- within these guidelines.
- .. seealso::
- :ref:`reflection_schema_qualified_interaction` - discussion of the issue
- from a backend-agnostic perspective
- `The Schema Search Path
- <https://www.postgresql.org/docs/current/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_
- - on the PostgreSQL website.
- INSERT/UPDATE...RETURNING
- -------------------------
- The dialect supports PG 8.2's ``INSERT..RETURNING``, ``UPDATE..RETURNING`` and
- ``DELETE..RETURNING`` syntaxes. ``INSERT..RETURNING`` is used by default
- for single-row INSERT statements in order to fetch newly generated
- primary key identifiers. To specify an explicit ``RETURNING`` clause,
- use the :meth:`._UpdateBase.returning` method on a per-statement basis::
- # INSERT..RETURNING
- result = (
- table.insert().returning(table.c.col1, table.c.col2).values(name="foo")
- )
- print(result.fetchall())
- # UPDATE..RETURNING
- result = (
- table.update()
- .returning(table.c.col1, table.c.col2)
- .where(table.c.name == "foo")
- .values(name="bar")
- )
- print(result.fetchall())
- # DELETE..RETURNING
- result = (
- table.delete()
- .returning(table.c.col1, table.c.col2)
- .where(table.c.name == "foo")
- )
- print(result.fetchall())
- .. _postgresql_insert_on_conflict:
- INSERT...ON CONFLICT (Upsert)
- ------------------------------
- Starting with version 9.5, PostgreSQL allows "upserts" (update or insert) of
- rows into a table via the ``ON CONFLICT`` clause of the ``INSERT`` statement. A
- candidate row will only be inserted if that row does not violate any unique
- constraints. In the case of a unique constraint violation, a secondary action
- can occur which can be either "DO UPDATE", indicating that the data in the
- target row should be updated, or "DO NOTHING", which indicates to silently skip
- this row.
- Conflicts are determined using existing unique constraints and indexes. These
- constraints may be identified either using their name as stated in DDL,
- or they may be inferred by stating the columns and conditions that comprise
- the indexes.
- SQLAlchemy provides ``ON CONFLICT`` support via the PostgreSQL-specific
- :func:`_postgresql.insert()` function, which provides
- the generative methods :meth:`_postgresql.Insert.on_conflict_do_update`
- and :meth:`~.postgresql.Insert.on_conflict_do_nothing`:
- .. sourcecode:: pycon+sql
- >>> from sqlalchemy.dialects.postgresql import insert
- >>> insert_stmt = insert(my_table).values(
- ... id="some_existing_id", data="inserted value"
- ... )
- >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing(index_elements=["id"])
- >>> print(do_nothing_stmt)
- {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
- ON CONFLICT (id) DO NOTHING
- {stop}
- >>> do_update_stmt = insert_stmt.on_conflict_do_update(
- ... constraint="pk_my_table", set_=dict(data="updated value")
- ... )
- >>> print(do_update_stmt)
- {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
- ON CONFLICT ON CONSTRAINT pk_my_table DO UPDATE SET data = %(param_1)s
- .. seealso::
- `INSERT .. ON CONFLICT
- <https://www.postgresql.org/docs/current/static/sql-insert.html#SQL-ON-CONFLICT>`_
- - in the PostgreSQL documentation.
- Specifying the Target
- ^^^^^^^^^^^^^^^^^^^^^
- Both methods supply the "target" of the conflict using either the
- named constraint or by column inference:
- * The :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` argument
- specifies a sequence containing string column names, :class:`_schema.Column`
- objects, and/or SQL expression elements, which would identify a unique
- index:
- .. sourcecode:: pycon+sql
- >>> do_update_stmt = insert_stmt.on_conflict_do_update(
- ... index_elements=["id"], set_=dict(data="updated value")
- ... )
- >>> print(do_update_stmt)
- {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
- ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
- {stop}
- >>> do_update_stmt = insert_stmt.on_conflict_do_update(
- ... index_elements=[my_table.c.id], set_=dict(data="updated value")
- ... )
- >>> print(do_update_stmt)
- {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
- ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
- * When using :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` to
- infer an index, a partial index can be inferred by also specifying the
- use the :paramref:`_postgresql.Insert.on_conflict_do_update.index_where` parameter:
- .. sourcecode:: pycon+sql
- >>> stmt = insert(my_table).values(user_email="a@b.com", data="inserted data")
- >>> stmt = stmt.on_conflict_do_update(
- ... index_elements=[my_table.c.user_email],
- ... index_where=my_table.c.user_email.like("%@gmail.com"),
- ... set_=dict(data=stmt.excluded.data),
- ... )
- >>> print(stmt)
- {printsql}INSERT INTO my_table (data, user_email)
- VALUES (%(data)s, %(user_email)s) ON CONFLICT (user_email)
- WHERE user_email LIKE %(user_email_1)s DO UPDATE SET data = excluded.data
- * The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument is
- used to specify an index directly rather than inferring it. This can be
- the name of a UNIQUE constraint, a PRIMARY KEY constraint, or an INDEX:
- .. sourcecode:: pycon+sql
- >>> do_update_stmt = insert_stmt.on_conflict_do_update(
- ... constraint="my_table_idx_1", set_=dict(data="updated value")
- ... )
- >>> print(do_update_stmt)
- {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
- ON CONFLICT ON CONSTRAINT my_table_idx_1 DO UPDATE SET data = %(param_1)s
- {stop}
- >>> do_update_stmt = insert_stmt.on_conflict_do_update(
- ... constraint="my_table_pk", set_=dict(data="updated value")
- ... )
- >>> print(do_update_stmt)
- {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
- ON CONFLICT ON CONSTRAINT my_table_pk DO UPDATE SET data = %(param_1)s
- {stop}
- * The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument may
- also refer to a SQLAlchemy construct representing a constraint,
- e.g. :class:`.UniqueConstraint`, :class:`.PrimaryKeyConstraint`,
- :class:`.Index`, or :class:`.ExcludeConstraint`. In this use,
- if the constraint has a name, it is used directly. Otherwise, if the
- constraint is unnamed, then inference will be used, where the expressions
- and optional WHERE clause of the constraint will be spelled out in the
- construct. This use is especially convenient
- to refer to the named or unnamed primary key of a :class:`_schema.Table`
- using the
- :attr:`_schema.Table.primary_key` attribute:
- .. sourcecode:: pycon+sql
- >>> do_update_stmt = insert_stmt.on_conflict_do_update(
- ... constraint=my_table.primary_key, set_=dict(data="updated value")
- ... )
- >>> print(do_update_stmt)
- {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
- ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
- The SET Clause
- ^^^^^^^^^^^^^^^
- ``ON CONFLICT...DO UPDATE`` is used to perform an update of the already
- existing row, using any combination of new values as well as values
- from the proposed insertion. These values are specified using the
- :paramref:`_postgresql.Insert.on_conflict_do_update.set_` parameter. This
- parameter accepts a dictionary which consists of direct values
- for UPDATE:
- .. sourcecode:: pycon+sql
- >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
- >>> do_update_stmt = stmt.on_conflict_do_update(
- ... index_elements=["id"], set_=dict(data="updated value")
- ... )
- >>> print(do_update_stmt)
- {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
- ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
- .. warning::
- The :meth:`_expression.Insert.on_conflict_do_update`
- method does **not** take into
- account Python-side default UPDATE values or generation functions, e.g.
- those specified using :paramref:`_schema.Column.onupdate`.
- These values will not be exercised for an ON CONFLICT style of UPDATE,
- unless they are manually specified in the
- :paramref:`_postgresql.Insert.on_conflict_do_update.set_` dictionary.
- Updating using the Excluded INSERT Values
- ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- In order to refer to the proposed insertion row, the special alias
- :attr:`~.postgresql.Insert.excluded` is available as an attribute on
- the :class:`_postgresql.Insert` object; this object is a
- :class:`_expression.ColumnCollection`
- which alias contains all columns of the target
- table:
- .. sourcecode:: pycon+sql
- >>> stmt = insert(my_table).values(
- ... id="some_id", data="inserted value", author="jlh"
- ... )
- >>> do_update_stmt = stmt.on_conflict_do_update(
- ... index_elements=["id"],
- ... set_=dict(data="updated value", author=stmt.excluded.author),
- ... )
- >>> print(do_update_stmt)
- {printsql}INSERT INTO my_table (id, data, author)
- VALUES (%(id)s, %(data)s, %(author)s)
- ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author
- Additional WHERE Criteria
- ^^^^^^^^^^^^^^^^^^^^^^^^^
- The :meth:`_expression.Insert.on_conflict_do_update` method also accepts
- a WHERE clause using the :paramref:`_postgresql.Insert.on_conflict_do_update.where`
- parameter, which will limit those rows which receive an UPDATE:
- .. sourcecode:: pycon+sql
- >>> stmt = insert(my_table).values(
- ... id="some_id", data="inserted value", author="jlh"
- ... )
- >>> on_update_stmt = stmt.on_conflict_do_update(
- ... index_elements=["id"],
- ... set_=dict(data="updated value", author=stmt.excluded.author),
- ... where=(my_table.c.status == 2),
- ... )
- >>> print(on_update_stmt)
- {printsql}INSERT INTO my_table (id, data, author)
- VALUES (%(id)s, %(data)s, %(author)s)
- ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author
- WHERE my_table.status = %(status_1)s
- Skipping Rows with DO NOTHING
- ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- ``ON CONFLICT`` may be used to skip inserting a row entirely
- if any conflict with a unique or exclusion constraint occurs; below
- this is illustrated using the
- :meth:`~.postgresql.Insert.on_conflict_do_nothing` method:
- .. sourcecode:: pycon+sql
- >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
- >>> stmt = stmt.on_conflict_do_nothing(index_elements=["id"])
- >>> print(stmt)
- {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
- ON CONFLICT (id) DO NOTHING
- If ``DO NOTHING`` is used without specifying any columns or constraint,
- it has the effect of skipping the INSERT for any unique or exclusion
- constraint violation which occurs:
- .. sourcecode:: pycon+sql
- >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
- >>> stmt = stmt.on_conflict_do_nothing()
- >>> print(stmt)
- {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
- ON CONFLICT DO NOTHING
- .. _postgresql_match:
- Full Text Search
- ----------------
- PostgreSQL's full text search system is available through the use of the
- :data:`.func` namespace, combined with the use of custom operators
- via the :meth:`.Operators.bool_op` method. For simple cases with some
- degree of cross-backend compatibility, the :meth:`.Operators.match` operator
- may also be used.
- .. _postgresql_simple_match:
- Simple plain text matching with ``match()``
- ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- The :meth:`.Operators.match` operator provides for cross-compatible simple
- text matching. For the PostgreSQL backend, it's hardcoded to generate
- an expression using the ``@@`` operator in conjunction with the
- ``plainto_tsquery()`` PostgreSQL function.
- On the PostgreSQL dialect, an expression like the following::
- select(sometable.c.text.match("search string"))
- would emit to the database:
- .. sourcecode:: sql
- SELECT text @@ plainto_tsquery('search string') FROM table
- Above, passing a plain string to :meth:`.Operators.match` will automatically
- make use of ``plainto_tsquery()`` to specify the type of tsquery. This
- establishes basic database cross-compatibility for :meth:`.Operators.match`
- with other backends.
- .. versionchanged:: 2.0 The default tsquery generation function used by the
- PostgreSQL dialect with :meth:`.Operators.match` is ``plainto_tsquery()``.
- To render exactly what was rendered in 1.4, use the following form::
- from sqlalchemy import func
- select(sometable.c.text.bool_op("@@")(func.to_tsquery("search string")))
- Which would emit:
- .. sourcecode:: sql
- SELECT text @@ to_tsquery('search string') FROM table
- Using PostgreSQL full text functions and operators directly
- ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- Text search operations beyond the simple use of :meth:`.Operators.match`
- may make use of the :data:`.func` namespace to generate PostgreSQL full-text
- functions, in combination with :meth:`.Operators.bool_op` to generate
- any boolean operator.
- For example, the query::
- select(func.to_tsquery("cat").bool_op("@>")(func.to_tsquery("cat & rat")))
- would generate:
- .. sourcecode:: sql
- SELECT to_tsquery('cat') @> to_tsquery('cat & rat')
- The :class:`_postgresql.TSVECTOR` type can provide for explicit CAST::
- from sqlalchemy.dialects.postgresql import TSVECTOR
- from sqlalchemy import select, cast
- select(cast("some text", TSVECTOR))
- produces a statement equivalent to:
- .. sourcecode:: sql
- SELECT CAST('some text' AS TSVECTOR) AS anon_1
- The ``func`` namespace is augmented by the PostgreSQL dialect to set up
- correct argument and return types for most full text search functions.
- These functions are used automatically by the :attr:`_sql.func` namespace
- assuming the ``sqlalchemy.dialects.postgresql`` package has been imported,
- or :func:`_sa.create_engine` has been invoked using a ``postgresql``
- dialect. These functions are documented at:
- * :class:`_postgresql.to_tsvector`
- * :class:`_postgresql.to_tsquery`
- * :class:`_postgresql.plainto_tsquery`
- * :class:`_postgresql.phraseto_tsquery`
- * :class:`_postgresql.websearch_to_tsquery`
- * :class:`_postgresql.ts_headline`
- Specifying the "regconfig" with ``match()`` or custom operators
- ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- PostgreSQL's ``plainto_tsquery()`` function accepts an optional
- "regconfig" argument that is used to instruct PostgreSQL to use a
- particular pre-computed GIN or GiST index in order to perform the search.
- When using :meth:`.Operators.match`, this additional parameter may be
- specified using the ``postgresql_regconfig`` parameter, such as::
- select(mytable.c.id).where(
- mytable.c.title.match("somestring", postgresql_regconfig="english")
- )
- Which would emit:
- .. sourcecode:: sql
- SELECT mytable.id FROM mytable
- WHERE mytable.title @@ plainto_tsquery('english', 'somestring')
- When using other PostgreSQL search functions with :data:`.func`, the
- "regconfig" parameter may be passed directly as the initial argument::
- select(mytable.c.id).where(
- func.to_tsvector("english", mytable.c.title).bool_op("@@")(
- func.to_tsquery("english", "somestring")
- )
- )
- produces a statement equivalent to:
- .. sourcecode:: sql
- SELECT mytable.id FROM mytable
- WHERE to_tsvector('english', mytable.title) @@
- to_tsquery('english', 'somestring')
- It is recommended that you use the ``EXPLAIN ANALYZE...`` tool from
- PostgreSQL to ensure that you are generating queries with SQLAlchemy that
- take full advantage of any indexes you may have created for full text search.
- .. seealso::
- `Full Text Search <https://www.postgresql.org/docs/current/textsearch-controls.html>`_ - in the PostgreSQL documentation
- FROM ONLY ...
- -------------
- The dialect supports PostgreSQL's ONLY keyword for targeting only a particular
- table in an inheritance hierarchy. This can be used to produce the
- ``SELECT ... FROM ONLY``, ``UPDATE ONLY ...``, and ``DELETE FROM ONLY ...``
- syntaxes. It uses SQLAlchemy's hints mechanism::
- # SELECT ... FROM ONLY ...
- result = table.select().with_hint(table, "ONLY", "postgresql")
- print(result.fetchall())
- # UPDATE ONLY ...
- table.update(values=dict(foo="bar")).with_hint(
- "ONLY", dialect_name="postgresql"
- )
- # DELETE FROM ONLY ...
- table.delete().with_hint("ONLY", dialect_name="postgresql")
- .. _postgresql_indexes:
- PostgreSQL-Specific Index Options
- ---------------------------------
- Several extensions to the :class:`.Index` construct are available, specific
- to the PostgreSQL dialect.
- .. _postgresql_covering_indexes:
- Covering Indexes
- ^^^^^^^^^^^^^^^^
- The ``postgresql_include`` option renders INCLUDE(colname) for the given
- string names::
- Index("my_index", table.c.x, postgresql_include=["y"])
- would render the index as ``CREATE INDEX my_index ON table (x) INCLUDE (y)``
- Note that this feature requires PostgreSQL 11 or later.
- .. seealso::
- :ref:`postgresql_constraint_options`
- .. versionadded:: 1.4
- .. _postgresql_partial_indexes:
- Partial Indexes
- ^^^^^^^^^^^^^^^
- Partial indexes add criterion to the index definition so that the index is
- applied to a subset of rows. These can be specified on :class:`.Index`
- using the ``postgresql_where`` keyword argument::
- Index("my_index", my_table.c.id, postgresql_where=my_table.c.value > 10)
- .. _postgresql_operator_classes:
- Operator Classes
- ^^^^^^^^^^^^^^^^
- PostgreSQL allows the specification of an *operator class* for each column of
- an index (see
- https://www.postgresql.org/docs/current/interactive/indexes-opclass.html).
- The :class:`.Index` construct allows these to be specified via the
- ``postgresql_ops`` keyword argument::
- Index(
- "my_index",
- my_table.c.id,
- my_table.c.data,
- postgresql_ops={"data": "text_pattern_ops", "id": "int4_ops"},
- )
- Note that the keys in the ``postgresql_ops`` dictionaries are the
- "key" name of the :class:`_schema.Column`, i.e. the name used to access it from
- the ``.c`` collection of :class:`_schema.Table`, which can be configured to be
- different than the actual name of the column as expressed in the database.
- If ``postgresql_ops`` is to be used against a complex SQL expression such
- as a function call, then to apply to the column it must be given a label
- that is identified in the dictionary by name, e.g.::
- Index(
- "my_index",
- my_table.c.id,
- func.lower(my_table.c.data).label("data_lower"),
- postgresql_ops={"data_lower": "text_pattern_ops", "id": "int4_ops"},
- )
- Operator classes are also supported by the
- :class:`_postgresql.ExcludeConstraint` construct using the
- :paramref:`_postgresql.ExcludeConstraint.ops` parameter. See that parameter for
- details.
- .. versionadded:: 1.3.21 added support for operator classes with
- :class:`_postgresql.ExcludeConstraint`.
- Index Types
- ^^^^^^^^^^^
- PostgreSQL provides several index types: B-Tree, Hash, GiST, and GIN, as well
- as the ability for users to create their own (see
- https://www.postgresql.org/docs/current/static/indexes-types.html). These can be
- specified on :class:`.Index` using the ``postgresql_using`` keyword argument::
- Index("my_index", my_table.c.data, postgresql_using="gin")
- The value passed to the keyword argument will be simply passed through to the
- underlying CREATE INDEX command, so it *must* be a valid index type for your
- version of PostgreSQL.
- .. _postgresql_index_storage:
- Index Storage Parameters
- ^^^^^^^^^^^^^^^^^^^^^^^^
- PostgreSQL allows storage parameters to be set on indexes. The storage
- parameters available depend on the index method used by the index. Storage
- parameters can be specified on :class:`.Index` using the ``postgresql_with``
- keyword argument::
- Index("my_index", my_table.c.data, postgresql_with={"fillfactor": 50})
- PostgreSQL allows to define the tablespace in which to create the index.
- The tablespace can be specified on :class:`.Index` using the
- ``postgresql_tablespace`` keyword argument::
- Index("my_index", my_table.c.data, postgresql_tablespace="my_tablespace")
- Note that the same option is available on :class:`_schema.Table` as well.
- .. _postgresql_index_concurrently:
- Indexes with CONCURRENTLY
- ^^^^^^^^^^^^^^^^^^^^^^^^^
- The PostgreSQL index option CONCURRENTLY is supported by passing the
- flag ``postgresql_concurrently`` to the :class:`.Index` construct::
- tbl = Table("testtbl", m, Column("data", Integer))
- idx1 = Index("test_idx1", tbl.c.data, postgresql_concurrently=True)
- The above index construct will render DDL for CREATE INDEX, assuming
- PostgreSQL 8.2 or higher is detected or for a connection-less dialect, as:
- .. sourcecode:: sql
- CREATE INDEX CONCURRENTLY test_idx1 ON testtbl (data)
- For DROP INDEX, assuming PostgreSQL 9.2 or higher is detected or for
- a connection-less dialect, it will emit:
- .. sourcecode:: sql
- DROP INDEX CONCURRENTLY test_idx1
- When using CONCURRENTLY, the PostgreSQL database requires that the statement
- be invoked outside of a transaction block. The Python DBAPI enforces that
- even for a single statement, a transaction is present, so to use this
- construct, the DBAPI's "autocommit" mode must be used::
- metadata = MetaData()
- table = Table("foo", metadata, Column("id", String))
- index = Index("foo_idx", table.c.id, postgresql_concurrently=True)
- with engine.connect() as conn:
- with conn.execution_options(isolation_level="AUTOCOMMIT"):
- table.create(conn)
- .. seealso::
- :ref:`postgresql_isolation_level`
- .. _postgresql_index_reflection:
- PostgreSQL Index Reflection
- ---------------------------
- The PostgreSQL database creates a UNIQUE INDEX implicitly whenever the
- UNIQUE CONSTRAINT construct is used. When inspecting a table using
- :class:`_reflection.Inspector`, the :meth:`_reflection.Inspector.get_indexes`
- and the :meth:`_reflection.Inspector.get_unique_constraints`
- will report on these
- two constructs distinctly; in the case of the index, the key
- ``duplicates_constraint`` will be present in the index entry if it is
- detected as mirroring a constraint. When performing reflection using
- ``Table(..., autoload_with=engine)``, the UNIQUE INDEX is **not** returned
- in :attr:`_schema.Table.indexes` when it is detected as mirroring a
- :class:`.UniqueConstraint` in the :attr:`_schema.Table.constraints` collection
- .
- Special Reflection Options
- --------------------------
- The :class:`_reflection.Inspector`
- used for the PostgreSQL backend is an instance
- of :class:`.PGInspector`, which offers additional methods::
- from sqlalchemy import create_engine, inspect
- engine = create_engine("postgresql+psycopg2://localhost/test")
- insp = inspect(engine) # will be a PGInspector
- print(insp.get_enums())
- .. autoclass:: PGInspector
- :members:
- .. _postgresql_table_options:
- PostgreSQL Table Options
- ------------------------
- Several options for CREATE TABLE are supported directly by the PostgreSQL
- dialect in conjunction with the :class:`_schema.Table` construct:
- * ``INHERITS``::
- Table("some_table", metadata, ..., postgresql_inherits="some_supertable")
- Table("some_table", metadata, ..., postgresql_inherits=("t1", "t2", ...))
- * ``ON COMMIT``::
- Table("some_table", metadata, ..., postgresql_on_commit="PRESERVE ROWS")
- *
- ``PARTITION BY``::
- Table(
- "some_table",
- metadata,
- ...,
- postgresql_partition_by="LIST (part_column)",
- )
- .. versionadded:: 1.2.6
- *
- ``TABLESPACE``::
- Table("some_table", metadata, ..., postgresql_tablespace="some_tablespace")
- The above option is also available on the :class:`.Index` construct.
- *
- ``USING``::
- Table("some_table", metadata, ..., postgresql_using="heap")
- .. versionadded:: 2.0.26
- * ``WITH OIDS``::
- Table("some_table", metadata, ..., postgresql_with_oids=True)
- * ``WITHOUT OIDS``::
- Table("some_table", metadata, ..., postgresql_with_oids=False)
- .. seealso::
- `PostgreSQL CREATE TABLE options
- <https://www.postgresql.org/docs/current/static/sql-createtable.html>`_ -
- in the PostgreSQL documentation.
- .. _postgresql_constraint_options:
- PostgreSQL Constraint Options
- -----------------------------
- The following option(s) are supported by the PostgreSQL dialect in conjunction
- with selected constraint constructs:
- * ``NOT VALID``: This option applies towards CHECK and FOREIGN KEY constraints
- when the constraint is being added to an existing table via ALTER TABLE,
- and has the effect that existing rows are not scanned during the ALTER
- operation against the constraint being added.
- When using a SQL migration tool such as `Alembic <https://alembic.sqlalchemy.org>`_
- that renders ALTER TABLE constructs, the ``postgresql_not_valid`` argument
- may be specified as an additional keyword argument within the operation
- that creates the constraint, as in the following Alembic example::
- def update():
- op.create_foreign_key(
- "fk_user_address",
- "address",
- "user",
- ["user_id"],
- ["id"],
- postgresql_not_valid=True,
- )
- The keyword is ultimately accepted directly by the
- :class:`_schema.CheckConstraint`, :class:`_schema.ForeignKeyConstraint`
- and :class:`_schema.ForeignKey` constructs; when using a tool like
- Alembic, dialect-specific keyword arguments are passed through to
- these constructs from the migration operation directives::
- CheckConstraint("some_field IS NOT NULL", postgresql_not_valid=True)
- ForeignKeyConstraint(
- ["some_id"], ["some_table.some_id"], postgresql_not_valid=True
- )
- .. versionadded:: 1.4.32
- .. seealso::
- `PostgreSQL ALTER TABLE options
- <https://www.postgresql.org/docs/current/static/sql-altertable.html>`_ -
- in the PostgreSQL documentation.
- * ``INCLUDE``: This option adds one or more columns as a "payload" to the
- unique index created automatically by PostgreSQL for the constraint.
- For example, the following table definition::
- Table(
- "mytable",
- metadata,
- Column("id", Integer, nullable=False),
- Column("value", Integer, nullable=False),
- UniqueConstraint("id", postgresql_include=["value"]),
- )
- would produce the DDL statement
- .. sourcecode:: sql
- CREATE TABLE mytable (
- id INTEGER NOT NULL,
- value INTEGER NOT NULL,
- UNIQUE (id) INCLUDE (value)
- )
- Note that this feature requires PostgreSQL 11 or later.
- .. versionadded:: 2.0.41
- .. seealso::
- :ref:`postgresql_covering_indexes`
- .. seealso::
- `PostgreSQL CREATE TABLE options
- <https://www.postgresql.org/docs/current/static/sql-createtable.html>`_ -
- in the PostgreSQL documentation.
- * Column list with foreign key ``ON DELETE SET`` actions: This applies to
- :class:`.ForeignKey` and :class:`.ForeignKeyConstraint`, the :paramref:`.ForeignKey.ondelete`
- parameter will accept on the PostgreSQL backend only a string list of column
- names inside parenthesis, following the ``SET NULL`` or ``SET DEFAULT``
- phrases, which will limit the set of columns that are subject to the
- action::
- fktable = Table(
- "fktable",
- metadata,
- Column("tid", Integer),
- Column("id", Integer),
- Column("fk_id_del_set_null", Integer),
- ForeignKeyConstraint(
- columns=["tid", "fk_id_del_set_null"],
- refcolumns=[pktable.c.tid, pktable.c.id],
- ondelete="SET NULL (fk_id_del_set_null)",
- ),
- )
- .. versionadded:: 2.0.40
- .. _postgresql_table_valued_overview:
- Table values, Table and Column valued functions, Row and Tuple objects
- -----------------------------------------------------------------------
- PostgreSQL makes great use of modern SQL forms such as table-valued functions,
- tables and rows as values. These constructs are commonly used as part
- of PostgreSQL's support for complex datatypes such as JSON, ARRAY, and other
- datatypes. SQLAlchemy's SQL expression language has native support for
- most table-valued and row-valued forms.
- .. _postgresql_table_valued:
- Table-Valued Functions
- ^^^^^^^^^^^^^^^^^^^^^^^
- Many PostgreSQL built-in functions are intended to be used in the FROM clause
- of a SELECT statement, and are capable of returning table rows or sets of table
- rows. A large portion of PostgreSQL's JSON functions for example such as
- ``json_array_elements()``, ``json_object_keys()``, ``json_each_text()``,
- ``json_each()``, ``json_to_record()``, ``json_populate_recordset()`` use such
- forms. These classes of SQL function calling forms in SQLAlchemy are available
- using the :meth:`_functions.FunctionElement.table_valued` method in conjunction
- with :class:`_functions.Function` objects generated from the :data:`_sql.func`
- namespace.
- Examples from PostgreSQL's reference documentation follow below:
- * ``json_each()``:
- .. sourcecode:: pycon+sql
- >>> from sqlalchemy import select, func
- >>> stmt = select(
- ... func.json_each('{"a":"foo", "b":"bar"}').table_valued("key", "value")
- ... )
- >>> print(stmt)
- {printsql}SELECT anon_1.key, anon_1.value
- FROM json_each(:json_each_1) AS anon_1
- * ``json_populate_record()``:
- .. sourcecode:: pycon+sql
- >>> from sqlalchemy import select, func, literal_column
- >>> stmt = select(
- ... func.json_populate_record(
- ... literal_column("null::myrowtype"), '{"a":1,"b":2}'
- ... ).table_valued("a", "b", name="x")
- ... )
- >>> print(stmt)
- {printsql}SELECT x.a, x.b
- FROM json_populate_record(null::myrowtype, :json_populate_record_1) AS x
- * ``json_to_record()`` - this form uses a PostgreSQL specific form of derived
- columns in the alias, where we may make use of :func:`_sql.column` elements with
- types to produce them. The :meth:`_functions.FunctionElement.table_valued`
- method produces a :class:`_sql.TableValuedAlias` construct, and the method
- :meth:`_sql.TableValuedAlias.render_derived` method sets up the derived
- columns specification:
- .. sourcecode:: pycon+sql
- >>> from sqlalchemy import select, func, column, Integer, Text
- >>> stmt = select(
- ... func.json_to_record('{"a":1,"b":[1,2,3],"c":"bar"}')
- ... .table_valued(
- ... column("a", Integer),
- ... column("b", Text),
- ... column("d", Text),
- ... )
- ... .render_derived(name="x", with_types=True)
- ... )
- >>> print(stmt)
- {printsql}SELECT x.a, x.b, x.d
- FROM json_to_record(:json_to_record_1) AS x(a INTEGER, b TEXT, d TEXT)
- * ``WITH ORDINALITY`` - part of the SQL standard, ``WITH ORDINALITY`` adds an
- ordinal counter to the output of a function and is accepted by a limited set
- of PostgreSQL functions including ``unnest()`` and ``generate_series()``. The
- :meth:`_functions.FunctionElement.table_valued` method accepts a keyword
- parameter ``with_ordinality`` for this purpose, which accepts the string name
- that will be applied to the "ordinality" column:
- .. sourcecode:: pycon+sql
- >>> from sqlalchemy import select, func
- >>> stmt = select(
- ... func.generate_series(4, 1, -1)
- ... .table_valued("value", with_ordinality="ordinality")
- ... .render_derived()
- ... )
- >>> print(stmt)
- {printsql}SELECT anon_1.value, anon_1.ordinality
- FROM generate_series(:generate_series_1, :generate_series_2, :generate_series_3)
- WITH ORDINALITY AS anon_1(value, ordinality)
- .. versionadded:: 1.4.0b2
- .. seealso::
- :ref:`tutorial_functions_table_valued` - in the :ref:`unified_tutorial`
- .. _postgresql_column_valued:
- Column Valued Functions
- ^^^^^^^^^^^^^^^^^^^^^^^
- Similar to the table valued function, a column valued function is present
- in the FROM clause, but delivers itself to the columns clause as a single
- scalar value. PostgreSQL functions such as ``json_array_elements()``,
- ``unnest()`` and ``generate_series()`` may use this form. Column valued functions are available using the
- :meth:`_functions.FunctionElement.column_valued` method of :class:`_functions.FunctionElement`:
- * ``json_array_elements()``:
- .. sourcecode:: pycon+sql
- >>> from sqlalchemy import select, func
- >>> stmt = select(
- ... func.json_array_elements('["one", "two"]').column_valued("x")
- ... )
- >>> print(stmt)
- {printsql}SELECT x
- FROM json_array_elements(:json_array_elements_1) AS x
- * ``unnest()`` - in order to generate a PostgreSQL ARRAY literal, the
- :func:`_postgresql.array` construct may be used:
- .. sourcecode:: pycon+sql
- >>> from sqlalchemy.dialects.postgresql import array
- >>> from sqlalchemy import select, func
- >>> stmt = select(func.unnest(array([1, 2])).column_valued())
- >>> print(stmt)
- {printsql}SELECT anon_1
- FROM unnest(ARRAY[%(param_1)s, %(param_2)s]) AS anon_1
- The function can of course be used against an existing table-bound column
- that's of type :class:`_types.ARRAY`:
- .. sourcecode:: pycon+sql
- >>> from sqlalchemy import table, column, ARRAY, Integer
- >>> from sqlalchemy import select, func
- >>> t = table("t", column("value", ARRAY(Integer)))
- >>> stmt = select(func.unnest(t.c.value).column_valued("unnested_value"))
- >>> print(stmt)
- {printsql}SELECT unnested_value
- FROM unnest(t.value) AS unnested_value
- .. seealso::
- :ref:`tutorial_functions_column_valued` - in the :ref:`unified_tutorial`
- Row Types
- ^^^^^^^^^
- Built-in support for rendering a ``ROW`` may be approximated using
- ``func.ROW`` with the :attr:`_sa.func` namespace, or by using the
- :func:`_sql.tuple_` construct:
- .. sourcecode:: pycon+sql
- >>> from sqlalchemy import table, column, func, tuple_
- >>> t = table("t", column("id"), column("fk"))
- >>> stmt = (
- ... t.select()
- ... .where(tuple_(t.c.id, t.c.fk) > (1, 2))
- ... .where(func.ROW(t.c.id, t.c.fk) < func.ROW(3, 7))
- ... )
- >>> print(stmt)
- {printsql}SELECT t.id, t.fk
- FROM t
- WHERE (t.id, t.fk) > (:param_1, :param_2) AND ROW(t.id, t.fk) < ROW(:ROW_1, :ROW_2)
- .. seealso::
- `PostgreSQL Row Constructors
- <https://www.postgresql.org/docs/current/sql-expressions.html#SQL-SYNTAX-ROW-CONSTRUCTORS>`_
- `PostgreSQL Row Constructor Comparison
- <https://www.postgresql.org/docs/current/functions-comparisons.html#ROW-WISE-COMPARISON>`_
- Table Types passed to Functions
- ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- PostgreSQL supports passing a table as an argument to a function, which is
- known as a "record" type. SQLAlchemy :class:`_sql.FromClause` objects
- such as :class:`_schema.Table` support this special form using the
- :meth:`_sql.FromClause.table_valued` method, which is comparable to the
- :meth:`_functions.FunctionElement.table_valued` method except that the collection
- of columns is already established by that of the :class:`_sql.FromClause`
- itself:
- .. sourcecode:: pycon+sql
- >>> from sqlalchemy import table, column, func, select
- >>> a = table("a", column("id"), column("x"), column("y"))
- >>> stmt = select(func.row_to_json(a.table_valued()))
- >>> print(stmt)
- {printsql}SELECT row_to_json(a) AS row_to_json_1
- FROM a
- .. versionadded:: 1.4.0b2
- """ # noqa: E501
- from __future__ import annotations
- from collections import defaultdict
- from functools import lru_cache
- import re
- from typing import Any
- from typing import cast
- from typing import Dict
- from typing import List
- from typing import Optional
- from typing import Tuple
- from typing import TYPE_CHECKING
- from typing import Union
- from . import arraylib as _array
- from . import json as _json
- from . import pg_catalog
- from . import ranges as _ranges
- from .ext import _regconfig_fn
- from .ext import aggregate_order_by
- from .hstore import HSTORE
- from .named_types import CreateDomainType as CreateDomainType # noqa: F401
- from .named_types import CreateEnumType as CreateEnumType # noqa: F401
- from .named_types import DOMAIN as DOMAIN # noqa: F401
- from .named_types import DropDomainType as DropDomainType # noqa: F401
- from .named_types import DropEnumType as DropEnumType # noqa: F401
- from .named_types import ENUM as ENUM # noqa: F401
- from .named_types import NamedType as NamedType # noqa: F401
- from .types import _DECIMAL_TYPES # noqa: F401
- from .types import _FLOAT_TYPES # noqa: F401
- from .types import _INT_TYPES # noqa: F401
- from .types import BIT as BIT
- from .types import BYTEA as BYTEA
- from .types import CIDR as CIDR
- from .types import CITEXT as CITEXT
- from .types import INET as INET
- from .types import INTERVAL as INTERVAL
- from .types import MACADDR as MACADDR
- from .types import MACADDR8 as MACADDR8
- from .types import MONEY as MONEY
- from .types import OID as OID
- from .types import PGBit as PGBit # noqa: F401
- from .types import PGCidr as PGCidr # noqa: F401
- from .types import PGInet as PGInet # noqa: F401
- from .types import PGInterval as PGInterval # noqa: F401
- from .types import PGMacAddr as PGMacAddr # noqa: F401
- from .types import PGMacAddr8 as PGMacAddr8 # noqa: F401
- from .types import PGUuid as PGUuid
- from .types import REGCLASS as REGCLASS
- from .types import REGCONFIG as REGCONFIG # noqa: F401
- from .types import TIME as TIME
- from .types import TIMESTAMP as TIMESTAMP
- from .types import TSVECTOR as TSVECTOR
- from ... import exc
- from ... import schema
- from ... import select
- from ... import sql
- from ... import util
- from ...engine import characteristics
- from ...engine import default
- from ...engine import interfaces
- from ...engine import ObjectKind
- from ...engine import ObjectScope
- from ...engine import reflection
- from ...engine import URL
- from ...engine.reflection import ReflectionDefaults
- from ...sql import bindparam
- from ...sql import coercions
- from ...sql import compiler
- from ...sql import elements
- from ...sql import expression
- from ...sql import roles
- from ...sql import sqltypes
- from ...sql import util as sql_util
- from ...sql.compiler import InsertmanyvaluesSentinelOpts
- from ...sql.visitors import InternalTraversal
- from ...types import BIGINT
- from ...types import BOOLEAN
- from ...types import CHAR
- from ...types import DATE
- from ...types import DOUBLE_PRECISION
- from ...types import FLOAT
- from ...types import INTEGER
- from ...types import NUMERIC
- from ...types import REAL
- from ...types import SMALLINT
- from ...types import TEXT
- from ...types import UUID as UUID
- from ...types import VARCHAR
- from ...util.typing import TypedDict
- IDX_USING = re.compile(r"^(?:btree|hash|gist|gin|[\w_]+)$", re.I)
- RESERVED_WORDS = {
- "all",
- "analyse",
- "analyze",
- "and",
- "any",
- "array",
- "as",
- "asc",
- "asymmetric",
- "both",
- "case",
- "cast",
- "check",
- "collate",
- "column",
- "constraint",
- "create",
- "current_catalog",
- "current_date",
- "current_role",
- "current_time",
- "current_timestamp",
- "current_user",
- "default",
- "deferrable",
- "desc",
- "distinct",
- "do",
- "else",
- "end",
- "except",
- "false",
- "fetch",
- "for",
- "foreign",
- "from",
- "grant",
- "group",
- "having",
- "in",
- "initially",
- "intersect",
- "into",
- "leading",
- "limit",
- "localtime",
- "localtimestamp",
- "new",
- "not",
- "null",
- "of",
- "off",
- "offset",
- "old",
- "on",
- "only",
- "or",
- "order",
- "placing",
- "primary",
- "references",
- "returning",
- "select",
- "session_user",
- "some",
- "symmetric",
- "table",
- "then",
- "to",
- "trailing",
- "true",
- "union",
- "unique",
- "user",
- "using",
- "variadic",
- "when",
- "where",
- "window",
- "with",
- "authorization",
- "between",
- "binary",
- "cross",
- "current_schema",
- "freeze",
- "full",
- "ilike",
- "inner",
- "is",
- "isnull",
- "join",
- "left",
- "like",
- "natural",
- "notnull",
- "outer",
- "over",
- "overlaps",
- "right",
- "similar",
- "verbose",
- }
- colspecs = {
- sqltypes.ARRAY: _array.ARRAY,
- sqltypes.Interval: INTERVAL,
- sqltypes.Enum: ENUM,
- sqltypes.JSON.JSONPathType: _json.JSONPATH,
- sqltypes.JSON: _json.JSON,
- sqltypes.Uuid: PGUuid,
- }
- ischema_names = {
- "_array": _array.ARRAY,
- "hstore": HSTORE,
- "json": _json.JSON,
- "jsonb": _json.JSONB,
- "int4range": _ranges.INT4RANGE,
- "int8range": _ranges.INT8RANGE,
- "numrange": _ranges.NUMRANGE,
- "daterange": _ranges.DATERANGE,
- "tsrange": _ranges.TSRANGE,
- "tstzrange": _ranges.TSTZRANGE,
- "int4multirange": _ranges.INT4MULTIRANGE,
- "int8multirange": _ranges.INT8MULTIRANGE,
- "nummultirange": _ranges.NUMMULTIRANGE,
- "datemultirange": _ranges.DATEMULTIRANGE,
- "tsmultirange": _ranges.TSMULTIRANGE,
- "tstzmultirange": _ranges.TSTZMULTIRANGE,
- "integer": INTEGER,
- "bigint": BIGINT,
- "smallint": SMALLINT,
- "character varying": VARCHAR,
- "character": CHAR,
- '"char"': sqltypes.String,
- "name": sqltypes.String,
- "text": TEXT,
- "numeric": NUMERIC,
- "float": FLOAT,
- "real": REAL,
- "inet": INET,
- "cidr": CIDR,
- "citext": CITEXT,
- "uuid": UUID,
- "bit": BIT,
- "bit varying": BIT,
- "macaddr": MACADDR,
- "macaddr8": MACADDR8,
- "money": MONEY,
- "oid": OID,
- "regclass": REGCLASS,
- "double precision": DOUBLE_PRECISION,
- "timestamp": TIMESTAMP,
- "timestamp with time zone": TIMESTAMP,
- "timestamp without time zone": TIMESTAMP,
- "time with time zone": TIME,
- "time without time zone": TIME,
- "date": DATE,
- "time": TIME,
- "bytea": BYTEA,
- "boolean": BOOLEAN,
- "interval": INTERVAL,
- "tsvector": TSVECTOR,
- }
- class PGCompiler(compiler.SQLCompiler):
- def visit_to_tsvector_func(self, element, **kw):
- return self._assert_pg_ts_ext(element, **kw)
- def visit_to_tsquery_func(self, element, **kw):
- return self._assert_pg_ts_ext(element, **kw)
- def visit_plainto_tsquery_func(self, element, **kw):
- return self._assert_pg_ts_ext(element, **kw)
- def visit_phraseto_tsquery_func(self, element, **kw):
- return self._assert_pg_ts_ext(element, **kw)
- def visit_websearch_to_tsquery_func(self, element, **kw):
- return self._assert_pg_ts_ext(element, **kw)
- def visit_ts_headline_func(self, element, **kw):
- return self._assert_pg_ts_ext(element, **kw)
- def _assert_pg_ts_ext(self, element, **kw):
- if not isinstance(element, _regconfig_fn):
- # other options here include trying to rewrite the function
- # with the correct types. however, that means we have to
- # "un-SQL-ize" the first argument, which can't work in a
- # generalized way. Also, parent compiler class has already added
- # the incorrect return type to the result map. So let's just
- # make sure the function we want is used up front.
- raise exc.CompileError(
- f'Can\'t compile "{element.name}()" full text search '
- f"function construct that does not originate from the "
- f'"sqlalchemy.dialects.postgresql" package. '
- f'Please ensure "import sqlalchemy.dialects.postgresql" is '
- f"called before constructing "
- f'"sqlalchemy.func.{element.name}()" to ensure registration '
- f"of the correct argument and return types."
- )
- return f"{element.name}{self.function_argspec(element, **kw)}"
- def render_bind_cast(self, type_, dbapi_type, sqltext):
- if dbapi_type._type_affinity is sqltypes.String and dbapi_type.length:
- # use VARCHAR with no length for VARCHAR cast.
- # see #9511
- dbapi_type = sqltypes.STRINGTYPE
- return f"""{sqltext}::{
- self.dialect.type_compiler_instance.process(
- dbapi_type, identifier_preparer=self.preparer
- )
- }"""
- def visit_array(self, element, **kw):
- if not element.clauses and not element.type.item_type._isnull:
- return "ARRAY[]::%s" % element.type.compile(self.dialect)
- return "ARRAY[%s]" % self.visit_clauselist(element, **kw)
- def visit_slice(self, element, **kw):
- return "%s:%s" % (
- self.process(element.start, **kw),
- self.process(element.stop, **kw),
- )
- def visit_bitwise_xor_op_binary(self, binary, operator, **kw):
- return self._generate_generic_binary(binary, " # ", **kw)
- def visit_json_getitem_op_binary(
- self, binary, operator, _cast_applied=False, **kw
- ):
- if (
- not _cast_applied
- and binary.type._type_affinity is not sqltypes.JSON
- ):
- kw["_cast_applied"] = True
- return self.process(sql.cast(binary, binary.type), **kw)
- kw["eager_grouping"] = True
- if (
- not _cast_applied
- and isinstance(binary.left.type, _json.JSONB)
- and self.dialect._supports_jsonb_subscripting
- ):
- # for pg14+JSONB use subscript notation: col['key'] instead
- # of col -> 'key'
- return "%s[%s]" % (
- self.process(binary.left, **kw),
- self.process(binary.right, **kw),
- )
- else:
- # Fall back to arrow notation for older versions or when cast
- # is applied
- return self._generate_generic_binary(
- binary, " -> " if not _cast_applied else " ->> ", **kw
- )
- def visit_json_path_getitem_op_binary(
- self, binary, operator, _cast_applied=False, **kw
- ):
- if (
- not _cast_applied
- and binary.type._type_affinity is not sqltypes.JSON
- ):
- kw["_cast_applied"] = True
- return self.process(sql.cast(binary, binary.type), **kw)
- kw["eager_grouping"] = True
- return self._generate_generic_binary(
- binary, " #> " if not _cast_applied else " #>> ", **kw
- )
- def visit_getitem_binary(self, binary, operator, **kw):
- return "%s[%s]" % (
- self.process(binary.left, **kw),
- self.process(binary.right, **kw),
- )
- def visit_aggregate_order_by(self, element, **kw):
- return "%s ORDER BY %s" % (
- self.process(element.target, **kw),
- self.process(element.order_by, **kw),
- )
- def visit_match_op_binary(self, binary, operator, **kw):
- if "postgresql_regconfig" in binary.modifiers:
- regconfig = self.render_literal_value(
- binary.modifiers["postgresql_regconfig"], sqltypes.STRINGTYPE
- )
- if regconfig:
- return "%s @@ plainto_tsquery(%s, %s)" % (
- self.process(binary.left, **kw),
- regconfig,
- self.process(binary.right, **kw),
- )
- return "%s @@ plainto_tsquery(%s)" % (
- self.process(binary.left, **kw),
- self.process(binary.right, **kw),
- )
- def visit_ilike_case_insensitive_operand(self, element, **kw):
- return element.element._compiler_dispatch(self, **kw)
- def visit_ilike_op_binary(self, binary, operator, **kw):
- escape = binary.modifiers.get("escape", None)
- return "%s ILIKE %s" % (
- self.process(binary.left, **kw),
- self.process(binary.right, **kw),
- ) + (
- " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
- if escape is not None
- else ""
- )
- def visit_not_ilike_op_binary(self, binary, operator, **kw):
- escape = binary.modifiers.get("escape", None)
- return "%s NOT ILIKE %s" % (
- self.process(binary.left, **kw),
- self.process(binary.right, **kw),
- ) + (
- " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
- if escape is not None
- else ""
- )
- def _regexp_match(self, base_op, binary, operator, kw):
- flags = binary.modifiers["flags"]
- if flags is None:
- return self._generate_generic_binary(
- binary, " %s " % base_op, **kw
- )
- if flags == "i":
- return self._generate_generic_binary(
- binary, " %s* " % base_op, **kw
- )
- return "%s %s CONCAT('(?', %s, ')', %s)" % (
- self.process(binary.left, **kw),
- base_op,
- self.render_literal_value(flags, sqltypes.STRINGTYPE),
- self.process(binary.right, **kw),
- )
- def visit_regexp_match_op_binary(self, binary, operator, **kw):
- return self._regexp_match("~", binary, operator, kw)
- def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
- return self._regexp_match("!~", binary, operator, kw)
- def visit_regexp_replace_op_binary(self, binary, operator, **kw):
- string = self.process(binary.left, **kw)
- pattern_replace = self.process(binary.right, **kw)
- flags = binary.modifiers["flags"]
- if flags is None:
- return "REGEXP_REPLACE(%s, %s)" % (
- string,
- pattern_replace,
- )
- else:
- return "REGEXP_REPLACE(%s, %s, %s)" % (
- string,
- pattern_replace,
- self.render_literal_value(flags, sqltypes.STRINGTYPE),
- )
- def visit_empty_set_expr(self, element_types, **kw):
- # cast the empty set to the type we are comparing against. if
- # we are comparing against the null type, pick an arbitrary
- # datatype for the empty set
- return "SELECT %s WHERE 1!=1" % (
- ", ".join(
- "CAST(NULL AS %s)"
- % self.dialect.type_compiler_instance.process(
- INTEGER() if type_._isnull else type_
- )
- for type_ in element_types or [INTEGER()]
- ),
- )
- def render_literal_value(self, value, type_):
- value = super().render_literal_value(value, type_)
- if self.dialect._backslash_escapes:
- value = value.replace("\\", "\\\\")
- return value
- def visit_aggregate_strings_func(self, fn, **kw):
- return "string_agg%s" % self.function_argspec(fn)
- def visit_sequence(self, seq, **kw):
- return "nextval('%s')" % self.preparer.format_sequence(seq)
- def limit_clause(self, select, **kw):
- text = ""
- if select._limit_clause is not None:
- text += " \n LIMIT " + self.process(select._limit_clause, **kw)
- if select._offset_clause is not None:
- if select._limit_clause is None:
- text += "\n LIMIT ALL"
- text += " OFFSET " + self.process(select._offset_clause, **kw)
- return text
- def format_from_hint_text(self, sqltext, table, hint, iscrud):
- if hint.upper() != "ONLY":
- raise exc.CompileError("Unrecognized hint: %r" % hint)
- return "ONLY " + sqltext
- def get_select_precolumns(self, select, **kw):
- # Do not call super().get_select_precolumns because
- # it will warn/raise when distinct on is present
- if select._distinct or select._distinct_on:
- if select._distinct_on:
- return (
- "DISTINCT ON ("
- + ", ".join(
- [
- self.process(col, **kw)
- for col in select._distinct_on
- ]
- )
- + ") "
- )
- else:
- return "DISTINCT "
- else:
- return ""
- def for_update_clause(self, select, **kw):
- if select._for_update_arg.read:
- if select._for_update_arg.key_share:
- tmp = " FOR KEY SHARE"
- else:
- tmp = " FOR SHARE"
- elif select._for_update_arg.key_share:
- tmp = " FOR NO KEY UPDATE"
- else:
- tmp = " FOR UPDATE"
- if select._for_update_arg.of:
- tables = util.OrderedSet()
- for c in select._for_update_arg.of:
- tables.update(sql_util.surface_selectables_only(c))
- of_kw = dict(kw)
- of_kw.update(ashint=True, use_schema=False)
- tmp += " OF " + ", ".join(
- self.process(table, **of_kw) for table in tables
- )
- if select._for_update_arg.nowait:
- tmp += " NOWAIT"
- if select._for_update_arg.skip_locked:
- tmp += " SKIP LOCKED"
- return tmp
- def visit_substring_func(self, func, **kw):
- s = self.process(func.clauses.clauses[0], **kw)
- start = self.process(func.clauses.clauses[1], **kw)
- if len(func.clauses.clauses) > 2:
- length = self.process(func.clauses.clauses[2], **kw)
- return "SUBSTRING(%s FROM %s FOR %s)" % (s, start, length)
- else:
- return "SUBSTRING(%s FROM %s)" % (s, start)
- def _on_conflict_target(self, clause, **kw):
- if clause.constraint_target is not None:
- # target may be a name of an Index, UniqueConstraint or
- # ExcludeConstraint. While there is a separate
- # "max_identifier_length" for indexes, PostgreSQL uses the same
- # length for all objects so we can use
- # truncate_and_render_constraint_name
- target_text = (
- "ON CONSTRAINT %s"
- % self.preparer.truncate_and_render_constraint_name(
- clause.constraint_target
- )
- )
- elif clause.inferred_target_elements is not None:
- target_text = "(%s)" % ", ".join(
- (
- self.preparer.quote(c)
- if isinstance(c, str)
- else self.process(c, include_table=False, use_schema=False)
- )
- for c in clause.inferred_target_elements
- )
- if clause.inferred_target_whereclause is not None:
- target_text += " WHERE %s" % self.process(
- clause.inferred_target_whereclause,
- include_table=False,
- use_schema=False,
- )
- else:
- target_text = ""
- return target_text
- def visit_on_conflict_do_nothing(self, on_conflict, **kw):
- target_text = self._on_conflict_target(on_conflict, **kw)
- if target_text:
- return "ON CONFLICT %s DO NOTHING" % target_text
- else:
- return "ON CONFLICT DO NOTHING"
- def visit_on_conflict_do_update(self, on_conflict, **kw):
- clause = on_conflict
- target_text = self._on_conflict_target(on_conflict, **kw)
- action_set_ops = []
- set_parameters = dict(clause.update_values_to_set)
- # create a list of column assignment clauses as tuples
- insert_statement = self.stack[-1]["selectable"]
- cols = insert_statement.table.c
- for c in cols:
- col_key = c.key
- if col_key in set_parameters:
- value = set_parameters.pop(col_key)
- elif c in set_parameters:
- value = set_parameters.pop(c)
- else:
- continue
- # TODO: this coercion should be up front. we can't cache
- # SQL constructs with non-bound literals buried in them
- if coercions._is_literal(value):
- value = elements.BindParameter(None, value, type_=c.type)
- else:
- if (
- isinstance(value, elements.BindParameter)
- and value.type._isnull
- ):
- value = value._clone()
- value.type = c.type
- value_text = self.process(value.self_group(), use_schema=False)
- key_text = self.preparer.quote(c.name)
- action_set_ops.append("%s = %s" % (key_text, value_text))
- # check for names that don't match columns
- if set_parameters:
- util.warn(
- "Additional column names not matching "
- "any column keys in table '%s': %s"
- % (
- self.current_executable.table.name,
- (", ".join("'%s'" % c for c in set_parameters)),
- )
- )
- for k, v in set_parameters.items():
- key_text = (
- self.preparer.quote(k)
- if isinstance(k, str)
- else self.process(k, use_schema=False)
- )
- value_text = self.process(
- coercions.expect(roles.ExpressionElementRole, v),
- use_schema=False,
- )
- action_set_ops.append("%s = %s" % (key_text, value_text))
- action_text = ", ".join(action_set_ops)
- if clause.update_whereclause is not None:
- action_text += " WHERE %s" % self.process(
- clause.update_whereclause, include_table=True, use_schema=False
- )
- return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text)
- def update_from_clause(
- self, update_stmt, from_table, extra_froms, from_hints, **kw
- ):
- kw["asfrom"] = True
- return "FROM " + ", ".join(
- t._compiler_dispatch(self, fromhints=from_hints, **kw)
- for t in extra_froms
- )
- def delete_extra_from_clause(
- self, delete_stmt, from_table, extra_froms, from_hints, **kw
- ):
- """Render the DELETE .. USING clause specific to PostgreSQL."""
- kw["asfrom"] = True
- return "USING " + ", ".join(
- t._compiler_dispatch(self, fromhints=from_hints, **kw)
- for t in extra_froms
- )
- def fetch_clause(self, select, **kw):
- # pg requires parens for non literal clauses. It's also required for
- # bind parameters if a ::type casts is used by the driver (asyncpg),
- # so it's easiest to just always add it
- text = ""
- if select._offset_clause is not None:
- text += "\n OFFSET (%s) ROWS" % self.process(
- select._offset_clause, **kw
- )
- if select._fetch_clause is not None:
- text += "\n FETCH FIRST (%s)%s ROWS %s" % (
- self.process(select._fetch_clause, **kw),
- " PERCENT" if select._fetch_clause_options["percent"] else "",
- (
- "WITH TIES"
- if select._fetch_clause_options["with_ties"]
- else "ONLY"
- ),
- )
- return text
- class PGDDLCompiler(compiler.DDLCompiler):
- def get_column_specification(self, column, **kwargs):
- colspec = self.preparer.format_column(column)
- impl_type = column.type.dialect_impl(self.dialect)
- if isinstance(impl_type, sqltypes.TypeDecorator):
- impl_type = impl_type.impl
- has_identity = (
- column.identity is not None
- and self.dialect.supports_identity_columns
- )
- if (
- column.primary_key
- and column is column.table._autoincrement_column
- and (
- self.dialect.supports_smallserial
- or not isinstance(impl_type, sqltypes.SmallInteger)
- )
- and not has_identity
- and (
- column.default is None
- or (
- isinstance(column.default, schema.Sequence)
- and column.default.optional
- )
- )
- ):
- if isinstance(impl_type, sqltypes.BigInteger):
- colspec += " BIGSERIAL"
- elif isinstance(impl_type, sqltypes.SmallInteger):
- colspec += " SMALLSERIAL"
- else:
- colspec += " SERIAL"
- else:
- colspec += " " + self.dialect.type_compiler_instance.process(
- column.type,
- type_expression=column,
- identifier_preparer=self.preparer,
- )
- default = self.get_column_default_string(column)
- if default is not None:
- colspec += " DEFAULT " + default
- if column.computed is not None:
- colspec += " " + self.process(column.computed)
- if has_identity:
- colspec += " " + self.process(column.identity)
- if not column.nullable and not has_identity:
- colspec += " NOT NULL"
- elif column.nullable and has_identity:
- colspec += " NULL"
- return colspec
- def _define_constraint_validity(self, constraint):
- not_valid = constraint.dialect_options["postgresql"]["not_valid"]
- return " NOT VALID" if not_valid else ""
- def _define_include(self, obj):
- includeclause = obj.dialect_options["postgresql"]["include"]
- if not includeclause:
- return ""
- inclusions = [
- obj.table.c[col] if isinstance(col, str) else col
- for col in includeclause
- ]
- return " INCLUDE (%s)" % ", ".join(
- [self.preparer.quote(c.name) for c in inclusions]
- )
- def visit_check_constraint(self, constraint, **kw):
- if constraint._type_bound:
- typ = list(constraint.columns)[0].type
- if (
- isinstance(typ, sqltypes.ARRAY)
- and isinstance(typ.item_type, sqltypes.Enum)
- and not typ.item_type.native_enum
- ):
- raise exc.CompileError(
- "PostgreSQL dialect cannot produce the CHECK constraint "
- "for ARRAY of non-native ENUM; please specify "
- "create_constraint=False on this Enum datatype."
- )
- text = super().visit_check_constraint(constraint)
- text += self._define_constraint_validity(constraint)
- return text
- def visit_foreign_key_constraint(self, constraint, **kw):
- text = super().visit_foreign_key_constraint(constraint)
- text += self._define_constraint_validity(constraint)
- return text
- def visit_primary_key_constraint(self, constraint, **kw):
- text = super().visit_primary_key_constraint(constraint)
- text += self._define_include(constraint)
- return text
- def visit_unique_constraint(self, constraint, **kw):
- text = super().visit_unique_constraint(constraint)
- text += self._define_include(constraint)
- return text
- @util.memoized_property
- def _fk_ondelete_pattern(self):
- return re.compile(
- r"^(?:RESTRICT|CASCADE|SET (?:NULL|DEFAULT)(?:\s*\(.+\))?"
- r"|NO ACTION)$",
- re.I,
- )
- def define_constraint_ondelete_cascade(self, constraint):
- return " ON DELETE %s" % self.preparer.validate_sql_phrase(
- constraint.ondelete, self._fk_ondelete_pattern
- )
- def visit_create_enum_type(self, create, **kw):
- type_ = create.element
- return "CREATE TYPE %s AS ENUM (%s)" % (
- self.preparer.format_type(type_),
- ", ".join(
- self.sql_compiler.process(sql.literal(e), literal_binds=True)
- for e in type_.enums
- ),
- )
- def visit_drop_enum_type(self, drop, **kw):
- type_ = drop.element
- return "DROP TYPE %s" % (self.preparer.format_type(type_))
- def visit_create_domain_type(self, create, **kw):
- domain: DOMAIN = create.element
- options = []
- if domain.collation is not None:
- options.append(f"COLLATE {self.preparer.quote(domain.collation)}")
- if domain.default is not None:
- default = self.render_default_string(domain.default)
- options.append(f"DEFAULT {default}")
- if domain.constraint_name is not None:
- name = self.preparer.truncate_and_render_constraint_name(
- domain.constraint_name
- )
- options.append(f"CONSTRAINT {name}")
- if domain.not_null:
- options.append("NOT NULL")
- if domain.check is not None:
- check = self.sql_compiler.process(
- domain.check, include_table=False, literal_binds=True
- )
- options.append(f"CHECK ({check})")
- return (
- f"CREATE DOMAIN {self.preparer.format_type(domain)} AS "
- f"{self.type_compiler.process(domain.data_type)} "
- f"{' '.join(options)}"
- )
- def visit_drop_domain_type(self, drop, **kw):
- domain = drop.element
- return f"DROP DOMAIN {self.preparer.format_type(domain)}"
- def visit_create_index(self, create, **kw):
- preparer = self.preparer
- index = create.element
- self._verify_index_table(index)
- text = "CREATE "
- if index.unique:
- text += "UNIQUE "
- text += "INDEX "
- if self.dialect._supports_create_index_concurrently:
- concurrently = index.dialect_options["postgresql"]["concurrently"]
- if concurrently:
- text += "CONCURRENTLY "
- if create.if_not_exists:
- text += "IF NOT EXISTS "
- text += "%s ON %s " % (
- self._prepared_index_name(index, include_schema=False),
- preparer.format_table(index.table),
- )
- using = index.dialect_options["postgresql"]["using"]
- if using:
- text += (
- "USING %s "
- % self.preparer.validate_sql_phrase(using, IDX_USING).lower()
- )
- ops = index.dialect_options["postgresql"]["ops"]
- text += "(%s)" % (
- ", ".join(
- [
- self.sql_compiler.process(
- (
- expr.self_group()
- if not isinstance(expr, expression.ColumnClause)
- else expr
- ),
- include_table=False,
- literal_binds=True,
- )
- + (
- (" " + ops[expr.key])
- if hasattr(expr, "key") and expr.key in ops
- else ""
- )
- for expr in index.expressions
- ]
- )
- )
- text += self._define_include(index)
- nulls_not_distinct = index.dialect_options["postgresql"][
- "nulls_not_distinct"
- ]
- if nulls_not_distinct is True:
- text += " NULLS NOT DISTINCT"
- elif nulls_not_distinct is False:
- text += " NULLS DISTINCT"
- withclause = index.dialect_options["postgresql"]["with"]
- if withclause:
- text += " WITH (%s)" % (
- ", ".join(
- [
- "%s = %s" % storage_parameter
- for storage_parameter in withclause.items()
- ]
- )
- )
- tablespace_name = index.dialect_options["postgresql"]["tablespace"]
- if tablespace_name:
- text += " TABLESPACE %s" % preparer.quote(tablespace_name)
- whereclause = index.dialect_options["postgresql"]["where"]
- if whereclause is not None:
- whereclause = coercions.expect(
- roles.DDLExpressionRole, whereclause
- )
- where_compiled = self.sql_compiler.process(
- whereclause, include_table=False, literal_binds=True
- )
- text += " WHERE " + where_compiled
- return text
- def define_unique_constraint_distinct(self, constraint, **kw):
- nulls_not_distinct = constraint.dialect_options["postgresql"][
- "nulls_not_distinct"
- ]
- if nulls_not_distinct is True:
- nulls_not_distinct_param = "NULLS NOT DISTINCT "
- elif nulls_not_distinct is False:
- nulls_not_distinct_param = "NULLS DISTINCT "
- else:
- nulls_not_distinct_param = ""
- return nulls_not_distinct_param
- def visit_drop_index(self, drop, **kw):
- index = drop.element
- text = "\nDROP INDEX "
- if self.dialect._supports_drop_index_concurrently:
- concurrently = index.dialect_options["postgresql"]["concurrently"]
- if concurrently:
- text += "CONCURRENTLY "
- if drop.if_exists:
- text += "IF EXISTS "
- text += self._prepared_index_name(index, include_schema=True)
- return text
- def visit_exclude_constraint(self, constraint, **kw):
- text = ""
- if constraint.name is not None:
- text += "CONSTRAINT %s " % self.preparer.format_constraint(
- constraint
- )
- elements = []
- kw["include_table"] = False
- kw["literal_binds"] = True
- for expr, name, op in constraint._render_exprs:
- exclude_element = self.sql_compiler.process(expr, **kw) + (
- (" " + constraint.ops[expr.key])
- if hasattr(expr, "key") and expr.key in constraint.ops
- else ""
- )
- elements.append("%s WITH %s" % (exclude_element, op))
- text += "EXCLUDE USING %s (%s)" % (
- self.preparer.validate_sql_phrase(
- constraint.using, IDX_USING
- ).lower(),
- ", ".join(elements),
- )
- if constraint.where is not None:
- text += " WHERE (%s)" % self.sql_compiler.process(
- constraint.where, literal_binds=True
- )
- text += self.define_constraint_deferrability(constraint)
- return text
- def post_create_table(self, table):
- table_opts = []
- pg_opts = table.dialect_options["postgresql"]
- inherits = pg_opts.get("inherits")
- if inherits is not None:
- if not isinstance(inherits, (list, tuple)):
- inherits = (inherits,)
- table_opts.append(
- "\n INHERITS ( "
- + ", ".join(self.preparer.quote(name) for name in inherits)
- + " )"
- )
- if pg_opts["partition_by"]:
- table_opts.append("\n PARTITION BY %s" % pg_opts["partition_by"])
- if pg_opts["using"]:
- table_opts.append("\n USING %s" % pg_opts["using"])
- if pg_opts["with_oids"] is True:
- table_opts.append("\n WITH OIDS")
- elif pg_opts["with_oids"] is False:
- table_opts.append("\n WITHOUT OIDS")
- if pg_opts["on_commit"]:
- on_commit_options = pg_opts["on_commit"].replace("_", " ").upper()
- table_opts.append("\n ON COMMIT %s" % on_commit_options)
- if pg_opts["tablespace"]:
- tablespace_name = pg_opts["tablespace"]
- table_opts.append(
- "\n TABLESPACE %s" % self.preparer.quote(tablespace_name)
- )
- return "".join(table_opts)
- def visit_computed_column(self, generated, **kw):
- if generated.persisted is False:
- raise exc.CompileError(
- "PostrgreSQL computed columns do not support 'virtual' "
- "persistence; set the 'persisted' flag to None or True for "
- "PostgreSQL support."
- )
- return "GENERATED ALWAYS AS (%s) STORED" % self.sql_compiler.process(
- generated.sqltext, include_table=False, literal_binds=True
- )
- def visit_create_sequence(self, create, **kw):
- prefix = None
- if create.element.data_type is not None:
- prefix = " AS %s" % self.type_compiler.process(
- create.element.data_type
- )
- return super().visit_create_sequence(create, prefix=prefix, **kw)
- def _can_comment_on_constraint(self, ddl_instance):
- constraint = ddl_instance.element
- if constraint.name is None:
- raise exc.CompileError(
- f"Can't emit COMMENT ON for constraint {constraint!r}: "
- "it has no name"
- )
- if constraint.table is None:
- raise exc.CompileError(
- f"Can't emit COMMENT ON for constraint {constraint!r}: "
- "it has no associated table"
- )
- def visit_set_constraint_comment(self, create, **kw):
- self._can_comment_on_constraint(create)
- return "COMMENT ON CONSTRAINT %s ON %s IS %s" % (
- self.preparer.format_constraint(create.element),
- self.preparer.format_table(create.element.table),
- self.sql_compiler.render_literal_value(
- create.element.comment, sqltypes.String()
- ),
- )
- def visit_drop_constraint_comment(self, drop, **kw):
- self._can_comment_on_constraint(drop)
- return "COMMENT ON CONSTRAINT %s ON %s IS NULL" % (
- self.preparer.format_constraint(drop.element),
- self.preparer.format_table(drop.element.table),
- )
- class PGTypeCompiler(compiler.GenericTypeCompiler):
- def visit_TSVECTOR(self, type_, **kw):
- return "TSVECTOR"
- def visit_TSQUERY(self, type_, **kw):
- return "TSQUERY"
- def visit_INET(self, type_, **kw):
- return "INET"
- def visit_CIDR(self, type_, **kw):
- return "CIDR"
- def visit_CITEXT(self, type_, **kw):
- return "CITEXT"
- def visit_MACADDR(self, type_, **kw):
- return "MACADDR"
- def visit_MACADDR8(self, type_, **kw):
- return "MACADDR8"
- def visit_MONEY(self, type_, **kw):
- return "MONEY"
- def visit_OID(self, type_, **kw):
- return "OID"
- def visit_REGCONFIG(self, type_, **kw):
- return "REGCONFIG"
- def visit_REGCLASS(self, type_, **kw):
- return "REGCLASS"
- def visit_FLOAT(self, type_, **kw):
- if not type_.precision:
- return "FLOAT"
- else:
- return "FLOAT(%(precision)s)" % {"precision": type_.precision}
- def visit_double(self, type_, **kw):
- return self.visit_DOUBLE_PRECISION(type, **kw)
- def visit_BIGINT(self, type_, **kw):
- return "BIGINT"
- def visit_HSTORE(self, type_, **kw):
- return "HSTORE"
- def visit_JSON(self, type_, **kw):
- return "JSON"
- def visit_JSONB(self, type_, **kw):
- return "JSONB"
- def visit_INT4MULTIRANGE(self, type_, **kw):
- return "INT4MULTIRANGE"
- def visit_INT8MULTIRANGE(self, type_, **kw):
- return "INT8MULTIRANGE"
- def visit_NUMMULTIRANGE(self, type_, **kw):
- return "NUMMULTIRANGE"
- def visit_DATEMULTIRANGE(self, type_, **kw):
- return "DATEMULTIRANGE"
- def visit_TSMULTIRANGE(self, type_, **kw):
- return "TSMULTIRANGE"
- def visit_TSTZMULTIRANGE(self, type_, **kw):
- return "TSTZMULTIRANGE"
- def visit_INT4RANGE(self, type_, **kw):
- return "INT4RANGE"
- def visit_INT8RANGE(self, type_, **kw):
- return "INT8RANGE"
- def visit_NUMRANGE(self, type_, **kw):
- return "NUMRANGE"
- def visit_DATERANGE(self, type_, **kw):
- return "DATERANGE"
- def visit_TSRANGE(self, type_, **kw):
- return "TSRANGE"
- def visit_TSTZRANGE(self, type_, **kw):
- return "TSTZRANGE"
- def visit_json_int_index(self, type_, **kw):
- return "INT"
- def visit_json_str_index(self, type_, **kw):
- return "TEXT"
- def visit_datetime(self, type_, **kw):
- return self.visit_TIMESTAMP(type_, **kw)
- def visit_enum(self, type_, **kw):
- if not type_.native_enum or not self.dialect.supports_native_enum:
- return super().visit_enum(type_, **kw)
- else:
- return self.visit_ENUM(type_, **kw)
- def visit_ENUM(self, type_, identifier_preparer=None, **kw):
- if identifier_preparer is None:
- identifier_preparer = self.dialect.identifier_preparer
- return identifier_preparer.format_type(type_)
- def visit_DOMAIN(self, type_, identifier_preparer=None, **kw):
- if identifier_preparer is None:
- identifier_preparer = self.dialect.identifier_preparer
- return identifier_preparer.format_type(type_)
- def visit_TIMESTAMP(self, type_, **kw):
- return "TIMESTAMP%s %s" % (
- (
- "(%d)" % type_.precision
- if getattr(type_, "precision", None) is not None
- else ""
- ),
- (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE",
- )
- def visit_TIME(self, type_, **kw):
- return "TIME%s %s" % (
- (
- "(%d)" % type_.precision
- if getattr(type_, "precision", None) is not None
- else ""
- ),
- (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE",
- )
- def visit_INTERVAL(self, type_, **kw):
- text = "INTERVAL"
- if type_.fields is not None:
- text += " " + type_.fields
- if type_.precision is not None:
- text += " (%d)" % type_.precision
- return text
- def visit_BIT(self, type_, **kw):
- if type_.varying:
- compiled = "BIT VARYING"
- if type_.length is not None:
- compiled += "(%d)" % type_.length
- else:
- compiled = "BIT(%d)" % type_.length
- return compiled
- def visit_uuid(self, type_, **kw):
- if type_.native_uuid:
- return self.visit_UUID(type_, **kw)
- else:
- return super().visit_uuid(type_, **kw)
- def visit_UUID(self, type_, **kw):
- return "UUID"
- def visit_large_binary(self, type_, **kw):
- return self.visit_BYTEA(type_, **kw)
- def visit_BYTEA(self, type_, **kw):
- return "BYTEA"
- def visit_ARRAY(self, type_, **kw):
- inner = self.process(type_.item_type, **kw)
- return re.sub(
- r"((?: COLLATE.*)?)$",
- (
- r"%s\1"
- % (
- "[]"
- * (type_.dimensions if type_.dimensions is not None else 1)
- )
- ),
- inner,
- count=1,
- )
- def visit_json_path(self, type_, **kw):
- return self.visit_JSONPATH(type_, **kw)
- def visit_JSONPATH(self, type_, **kw):
- return "JSONPATH"
- class PGIdentifierPreparer(compiler.IdentifierPreparer):
- reserved_words = RESERVED_WORDS
- def _unquote_identifier(self, value):
- if value[0] == self.initial_quote:
- value = value[1:-1].replace(
- self.escape_to_quote, self.escape_quote
- )
- return value
- def format_type(self, type_, use_schema=True):
- if not type_.name:
- raise exc.CompileError(
- f"PostgreSQL {type_.__class__.__name__} type requires a name."
- )
- name = self.quote(type_.name)
- effective_schema = self.schema_for_object(type_)
- if (
- not self.omit_schema
- and use_schema
- and effective_schema is not None
- ):
- name = f"{self.quote_schema(effective_schema)}.{name}"
- return name
- class ReflectedNamedType(TypedDict):
- """Represents a reflected named type."""
- name: str
- """Name of the type."""
- schema: str
- """The schema of the type."""
- visible: bool
- """Indicates if this type is in the current search path."""
- class ReflectedDomainConstraint(TypedDict):
- """Represents a reflect check constraint of a domain."""
- name: str
- """Name of the constraint."""
- check: str
- """The check constraint text."""
- class ReflectedDomain(ReflectedNamedType):
- """Represents a reflected enum."""
- type: str
- """The string name of the underlying data type of the domain."""
- nullable: bool
- """Indicates if the domain allows null or not."""
- default: Optional[str]
- """The string representation of the default value of this domain
- or ``None`` if none present.
- """
- constraints: List[ReflectedDomainConstraint]
- """The constraints defined in the domain, if any.
- The constraint are in order of evaluation by postgresql.
- """
- collation: Optional[str]
- """The collation for the domain."""
- class ReflectedEnum(ReflectedNamedType):
- """Represents a reflected enum."""
- labels: List[str]
- """The labels that compose the enum."""
- class PGInspector(reflection.Inspector):
- dialect: PGDialect
- def get_table_oid(
- self, table_name: str, schema: Optional[str] = None
- ) -> int:
- """Return the OID for the given table name.
- :param table_name: string name of the table. For special quoting,
- use :class:`.quoted_name`.
- :param schema: string schema name; if omitted, uses the default schema
- of the database connection. For special quoting,
- use :class:`.quoted_name`.
- """
- with self._operation_context() as conn:
- return self.dialect.get_table_oid(
- conn, table_name, schema, info_cache=self.info_cache
- )
- def get_domains(
- self, schema: Optional[str] = None
- ) -> List[ReflectedDomain]:
- """Return a list of DOMAIN objects.
- Each member is a dictionary containing these fields:
- * name - name of the domain
- * schema - the schema name for the domain.
- * visible - boolean, whether or not this domain is visible
- in the default search path.
- * type - the type defined by this domain.
- * nullable - Indicates if this domain can be ``NULL``.
- * default - The default value of the domain or ``None`` if the
- domain has no default.
- * constraints - A list of dict wit the constraint defined by this
- domain. Each element constaints two keys: ``name`` of the
- constraint and ``check`` with the constraint text.
- :param schema: schema name. If None, the default schema
- (typically 'public') is used. May also be set to ``'*'`` to
- indicate load domains for all schemas.
- .. versionadded:: 2.0
- """
- with self._operation_context() as conn:
- return self.dialect._load_domains(
- conn, schema, info_cache=self.info_cache
- )
- def get_enums(self, schema: Optional[str] = None) -> List[ReflectedEnum]:
- """Return a list of ENUM objects.
- Each member is a dictionary containing these fields:
- * name - name of the enum
- * schema - the schema name for the enum.
- * visible - boolean, whether or not this enum is visible
- in the default search path.
- * labels - a list of string labels that apply to the enum.
- :param schema: schema name. If None, the default schema
- (typically 'public') is used. May also be set to ``'*'`` to
- indicate load enums for all schemas.
- """
- with self._operation_context() as conn:
- return self.dialect._load_enums(
- conn, schema, info_cache=self.info_cache
- )
- def get_foreign_table_names(
- self, schema: Optional[str] = None
- ) -> List[str]:
- """Return a list of FOREIGN TABLE names.
- Behavior is similar to that of
- :meth:`_reflection.Inspector.get_table_names`,
- except that the list is limited to those tables that report a
- ``relkind`` value of ``f``.
- """
- with self._operation_context() as conn:
- return self.dialect._get_foreign_table_names(
- conn, schema, info_cache=self.info_cache
- )
- def has_type(
- self, type_name: str, schema: Optional[str] = None, **kw: Any
- ) -> bool:
- """Return if the database has the specified type in the provided
- schema.
- :param type_name: the type to check.
- :param schema: schema name. If None, the default schema
- (typically 'public') is used. May also be set to ``'*'`` to
- check in all schemas.
- .. versionadded:: 2.0
- """
- with self._operation_context() as conn:
- return self.dialect.has_type(
- conn, type_name, schema, info_cache=self.info_cache
- )
- class PGExecutionContext(default.DefaultExecutionContext):
- def fire_sequence(self, seq, type_):
- return self._execute_scalar(
- (
- "select nextval('%s')"
- % self.identifier_preparer.format_sequence(seq)
- ),
- type_,
- )
- def get_insert_default(self, column):
- if column.primary_key and column is column.table._autoincrement_column:
- if column.server_default and column.server_default.has_argument:
- # pre-execute passive defaults on primary key columns
- return self._execute_scalar(
- "select %s" % column.server_default.arg, column.type
- )
- elif column.default is None or (
- column.default.is_sequence and column.default.optional
- ):
- # execute the sequence associated with a SERIAL primary
- # key column. for non-primary-key SERIAL, the ID just
- # generates server side.
- try:
- seq_name = column._postgresql_seq_name
- except AttributeError:
- tab = column.table.name
- col = column.name
- tab = tab[0 : 29 + max(0, (29 - len(col)))]
- col = col[0 : 29 + max(0, (29 - len(tab)))]
- name = "%s_%s_seq" % (tab, col)
- column._postgresql_seq_name = seq_name = name
- if column.table is not None:
- effective_schema = self.connection.schema_for_object(
- column.table
- )
- else:
- effective_schema = None
- if effective_schema is not None:
- exc = 'select nextval(\'"%s"."%s"\')' % (
- effective_schema,
- seq_name,
- )
- else:
- exc = "select nextval('\"%s\"')" % (seq_name,)
- return self._execute_scalar(exc, column.type)
- return super().get_insert_default(column)
- class PGReadOnlyConnectionCharacteristic(
- characteristics.ConnectionCharacteristic
- ):
- transactional = True
- def reset_characteristic(self, dialect, dbapi_conn):
- dialect.set_readonly(dbapi_conn, False)
- def set_characteristic(self, dialect, dbapi_conn, value):
- dialect.set_readonly(dbapi_conn, value)
- def get_characteristic(self, dialect, dbapi_conn):
- return dialect.get_readonly(dbapi_conn)
- class PGDeferrableConnectionCharacteristic(
- characteristics.ConnectionCharacteristic
- ):
- transactional = True
- def reset_characteristic(self, dialect, dbapi_conn):
- dialect.set_deferrable(dbapi_conn, False)
- def set_characteristic(self, dialect, dbapi_conn, value):
- dialect.set_deferrable(dbapi_conn, value)
- def get_characteristic(self, dialect, dbapi_conn):
- return dialect.get_deferrable(dbapi_conn)
- class PGDialect(default.DefaultDialect):
- name = "postgresql"
- supports_statement_cache = True
- supports_alter = True
- max_identifier_length = 63
- supports_sane_rowcount = True
- bind_typing = interfaces.BindTyping.RENDER_CASTS
- supports_native_enum = True
- supports_native_boolean = True
- supports_native_uuid = True
- supports_smallserial = True
- supports_sequences = True
- sequences_optional = True
- preexecute_autoincrement_sequences = True
- postfetch_lastrowid = False
- use_insertmanyvalues = True
- returns_native_bytes = True
- insertmanyvalues_implicit_sentinel = (
- InsertmanyvaluesSentinelOpts.ANY_AUTOINCREMENT
- | InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT
- | InsertmanyvaluesSentinelOpts.RENDER_SELECT_COL_CASTS
- )
- supports_comments = True
- supports_constraint_comments = True
- supports_default_values = True
- supports_default_metavalue = True
- supports_empty_insert = False
- supports_multivalues_insert = True
- supports_identity_columns = True
- default_paramstyle = "pyformat"
- ischema_names = ischema_names
- colspecs = colspecs
- statement_compiler = PGCompiler
- ddl_compiler = PGDDLCompiler
- type_compiler_cls = PGTypeCompiler
- preparer = PGIdentifierPreparer
- execution_ctx_cls = PGExecutionContext
- inspector = PGInspector
- update_returning = True
- delete_returning = True
- insert_returning = True
- update_returning_multifrom = True
- delete_returning_multifrom = True
- connection_characteristics = (
- default.DefaultDialect.connection_characteristics
- )
- connection_characteristics = connection_characteristics.union(
- {
- "postgresql_readonly": PGReadOnlyConnectionCharacteristic(),
- "postgresql_deferrable": PGDeferrableConnectionCharacteristic(),
- }
- )
- construct_arguments = [
- (
- schema.Index,
- {
- "using": False,
- "include": None,
- "where": None,
- "ops": {},
- "concurrently": False,
- "with": {},
- "tablespace": None,
- "nulls_not_distinct": None,
- },
- ),
- (
- schema.Table,
- {
- "ignore_search_path": False,
- "tablespace": None,
- "partition_by": None,
- "with_oids": None,
- "on_commit": None,
- "inherits": None,
- "using": None,
- },
- ),
- (
- schema.CheckConstraint,
- {
- "not_valid": False,
- },
- ),
- (
- schema.ForeignKeyConstraint,
- {
- "not_valid": False,
- },
- ),
- (
- schema.PrimaryKeyConstraint,
- {"include": None},
- ),
- (
- schema.UniqueConstraint,
- {
- "include": None,
- "nulls_not_distinct": None,
- },
- ),
- ]
- reflection_options = ("postgresql_ignore_search_path",)
- _backslash_escapes = True
- _supports_create_index_concurrently = True
- _supports_drop_index_concurrently = True
- _supports_jsonb_subscripting = True
- def __init__(
- self,
- native_inet_types=None,
- json_serializer=None,
- json_deserializer=None,
- **kwargs,
- ):
- default.DefaultDialect.__init__(self, **kwargs)
- self._native_inet_types = native_inet_types
- self._json_deserializer = json_deserializer
- self._json_serializer = json_serializer
- def initialize(self, connection):
- super().initialize(connection)
- # https://www.postgresql.org/docs/9.3/static/release-9-2.html#AEN116689
- self.supports_smallserial = self.server_version_info >= (9, 2)
- self._set_backslash_escapes(connection)
- self._supports_drop_index_concurrently = self.server_version_info >= (
- 9,
- 2,
- )
- self.supports_identity_columns = self.server_version_info >= (10,)
- self._supports_jsonb_subscripting = self.server_version_info >= (14,)
- def get_isolation_level_values(self, dbapi_conn):
- # note the generic dialect doesn't have AUTOCOMMIT, however
- # all postgresql dialects should include AUTOCOMMIT.
- return (
- "SERIALIZABLE",
- "READ UNCOMMITTED",
- "READ COMMITTED",
- "REPEATABLE READ",
- )
- def set_isolation_level(self, dbapi_connection, level):
- cursor = dbapi_connection.cursor()
- cursor.execute(
- "SET SESSION CHARACTERISTICS AS TRANSACTION "
- f"ISOLATION LEVEL {level}"
- )
- cursor.execute("COMMIT")
- cursor.close()
- def get_isolation_level(self, dbapi_connection):
- cursor = dbapi_connection.cursor()
- cursor.execute("show transaction isolation level")
- val = cursor.fetchone()[0]
- cursor.close()
- return val.upper()
- def set_readonly(self, connection, value):
- raise NotImplementedError()
- def get_readonly(self, connection):
- raise NotImplementedError()
- def set_deferrable(self, connection, value):
- raise NotImplementedError()
- def get_deferrable(self, connection):
- raise NotImplementedError()
- def _split_multihost_from_url(self, url: URL) -> Union[
- Tuple[None, None],
- Tuple[Tuple[Optional[str], ...], Tuple[Optional[int], ...]],
- ]:
- hosts: Optional[Tuple[Optional[str], ...]] = None
- ports_str: Union[str, Tuple[Optional[str], ...], None] = None
- integrated_multihost = False
- if "host" in url.query:
- if isinstance(url.query["host"], (list, tuple)):
- integrated_multihost = True
- hosts, ports_str = zip(
- *[
- token.split(":") if ":" in token else (token, None)
- for token in url.query["host"]
- ]
- )
- elif isinstance(url.query["host"], str):
- hosts = tuple(url.query["host"].split(","))
- if (
- "port" not in url.query
- and len(hosts) == 1
- and ":" in hosts[0]
- ):
- # internet host is alphanumeric plus dots or hyphens.
- # this is essentially rfc1123, which refers to rfc952.
- # https://stackoverflow.com/questions/3523028/
- # valid-characters-of-a-hostname
- host_port_match = re.match(
- r"^([a-zA-Z0-9\-\.]*)(?:\:(\d*))?$", hosts[0]
- )
- if host_port_match:
- integrated_multihost = True
- h, p = host_port_match.group(1, 2)
- if TYPE_CHECKING:
- assert isinstance(h, str)
- assert isinstance(p, str)
- hosts = (h,)
- ports_str = cast(
- "Tuple[Optional[str], ...]", (p,) if p else (None,)
- )
- if "port" in url.query:
- if integrated_multihost:
- raise exc.ArgumentError(
- "Can't mix 'multihost' formats together; use "
- '"host=h1,h2,h3&port=p1,p2,p3" or '
- '"host=h1:p1&host=h2:p2&host=h3:p3" separately'
- )
- if isinstance(url.query["port"], (list, tuple)):
- ports_str = url.query["port"]
- elif isinstance(url.query["port"], str):
- ports_str = tuple(url.query["port"].split(","))
- ports: Optional[Tuple[Optional[int], ...]] = None
- if ports_str:
- try:
- ports = tuple(int(x) if x else None for x in ports_str)
- except ValueError:
- raise exc.ArgumentError(
- f"Received non-integer port arguments: {ports_str}"
- ) from None
- if ports and (
- (not hosts and len(ports) > 1)
- or (
- hosts
- and ports
- and len(hosts) != len(ports)
- and (len(hosts) > 1 or len(ports) > 1)
- )
- ):
- raise exc.ArgumentError("number of hosts and ports don't match")
- if hosts is not None:
- if ports is None:
- ports = tuple(None for _ in hosts)
- return hosts, ports # type: ignore
- def do_begin_twophase(self, connection, xid):
- self.do_begin(connection.connection)
- def do_prepare_twophase(self, connection, xid):
- connection.exec_driver_sql("PREPARE TRANSACTION '%s'" % xid)
- def do_rollback_twophase(
- self, connection, xid, is_prepared=True, recover=False
- ):
- if is_prepared:
- if recover:
- # FIXME: ugly hack to get out of transaction
- # context when committing recoverable transactions
- # Must find out a way how to make the dbapi not
- # open a transaction.
- connection.exec_driver_sql("ROLLBACK")
- connection.exec_driver_sql("ROLLBACK PREPARED '%s'" % xid)
- connection.exec_driver_sql("BEGIN")
- self.do_rollback(connection.connection)
- else:
- self.do_rollback(connection.connection)
- def do_commit_twophase(
- self, connection, xid, is_prepared=True, recover=False
- ):
- if is_prepared:
- if recover:
- connection.exec_driver_sql("ROLLBACK")
- connection.exec_driver_sql("COMMIT PREPARED '%s'" % xid)
- connection.exec_driver_sql("BEGIN")
- self.do_rollback(connection.connection)
- else:
- self.do_commit(connection.connection)
- def do_recover_twophase(self, connection):
- return connection.scalars(
- sql.text("SELECT gid FROM pg_prepared_xacts")
- ).all()
- def _get_default_schema_name(self, connection):
- return connection.exec_driver_sql("select current_schema()").scalar()
- @reflection.cache
- def has_schema(self, connection, schema, **kw):
- query = select(pg_catalog.pg_namespace.c.nspname).where(
- pg_catalog.pg_namespace.c.nspname == schema
- )
- return bool(connection.scalar(query))
- def _pg_class_filter_scope_schema(
- self, query, schema, scope, pg_class_table=None
- ):
- if pg_class_table is None:
- pg_class_table = pg_catalog.pg_class
- query = query.join(
- pg_catalog.pg_namespace,
- pg_catalog.pg_namespace.c.oid == pg_class_table.c.relnamespace,
- )
- if scope is ObjectScope.DEFAULT:
- query = query.where(pg_class_table.c.relpersistence != "t")
- elif scope is ObjectScope.TEMPORARY:
- query = query.where(pg_class_table.c.relpersistence == "t")
- if schema is None:
- query = query.where(
- pg_catalog.pg_table_is_visible(pg_class_table.c.oid),
- # ignore pg_catalog schema
- pg_catalog.pg_namespace.c.nspname != "pg_catalog",
- )
- else:
- query = query.where(pg_catalog.pg_namespace.c.nspname == schema)
- return query
- def _pg_class_relkind_condition(self, relkinds, pg_class_table=None):
- if pg_class_table is None:
- pg_class_table = pg_catalog.pg_class
- # uses the any form instead of in otherwise postgresql complaings
- # that 'IN could not convert type character to "char"'
- return pg_class_table.c.relkind == sql.any_(_array.array(relkinds))
- @lru_cache()
- def _has_table_query(self, schema):
- query = select(pg_catalog.pg_class.c.relname).where(
- pg_catalog.pg_class.c.relname == bindparam("table_name"),
- self._pg_class_relkind_condition(
- pg_catalog.RELKINDS_ALL_TABLE_LIKE
- ),
- )
- return self._pg_class_filter_scope_schema(
- query, schema, scope=ObjectScope.ANY
- )
- @reflection.cache
- def has_table(self, connection, table_name, schema=None, **kw):
- self._ensure_has_table_connection(connection)
- query = self._has_table_query(schema)
- return bool(connection.scalar(query, {"table_name": table_name}))
- @reflection.cache
- def has_sequence(self, connection, sequence_name, schema=None, **kw):
- query = select(pg_catalog.pg_class.c.relname).where(
- pg_catalog.pg_class.c.relkind == "S",
- pg_catalog.pg_class.c.relname == sequence_name,
- )
- query = self._pg_class_filter_scope_schema(
- query, schema, scope=ObjectScope.ANY
- )
- return bool(connection.scalar(query))
- @reflection.cache
- def has_type(self, connection, type_name, schema=None, **kw):
- query = (
- select(pg_catalog.pg_type.c.typname)
- .join(
- pg_catalog.pg_namespace,
- pg_catalog.pg_namespace.c.oid
- == pg_catalog.pg_type.c.typnamespace,
- )
- .where(pg_catalog.pg_type.c.typname == type_name)
- )
- if schema is None:
- query = query.where(
- pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid),
- # ignore pg_catalog schema
- pg_catalog.pg_namespace.c.nspname != "pg_catalog",
- )
- elif schema != "*":
- query = query.where(pg_catalog.pg_namespace.c.nspname == schema)
- return bool(connection.scalar(query))
- def _get_server_version_info(self, connection):
- v = connection.exec_driver_sql("select pg_catalog.version()").scalar()
- m = re.match(
- r".*(?:PostgreSQL|EnterpriseDB) "
- r"(\d+)\.?(\d+)?(?:\.(\d+))?(?:\.\d+)?(?:devel|beta)?",
- v,
- )
- if not m:
- raise AssertionError(
- "Could not determine version from string '%s'" % v
- )
- return tuple([int(x) for x in m.group(1, 2, 3) if x is not None])
- @reflection.cache
- def get_table_oid(self, connection, table_name, schema=None, **kw):
- """Fetch the oid for schema.table_name."""
- query = select(pg_catalog.pg_class.c.oid).where(
- pg_catalog.pg_class.c.relname == table_name,
- self._pg_class_relkind_condition(
- pg_catalog.RELKINDS_ALL_TABLE_LIKE
- ),
- )
- query = self._pg_class_filter_scope_schema(
- query, schema, scope=ObjectScope.ANY
- )
- table_oid = connection.scalar(query)
- if table_oid is None:
- raise exc.NoSuchTableError(
- f"{schema}.{table_name}" if schema else table_name
- )
- return table_oid
- @reflection.cache
- def get_schema_names(self, connection, **kw):
- query = (
- select(pg_catalog.pg_namespace.c.nspname)
- .where(pg_catalog.pg_namespace.c.nspname.not_like("pg_%"))
- .order_by(pg_catalog.pg_namespace.c.nspname)
- )
- return connection.scalars(query).all()
- def _get_relnames_for_relkinds(self, connection, schema, relkinds, scope):
- query = select(pg_catalog.pg_class.c.relname).where(
- self._pg_class_relkind_condition(relkinds)
- )
- query = self._pg_class_filter_scope_schema(query, schema, scope=scope)
- return connection.scalars(query).all()
- @reflection.cache
- def get_table_names(self, connection, schema=None, **kw):
- return self._get_relnames_for_relkinds(
- connection,
- schema,
- pg_catalog.RELKINDS_TABLE_NO_FOREIGN,
- scope=ObjectScope.DEFAULT,
- )
- @reflection.cache
- def get_temp_table_names(self, connection, **kw):
- return self._get_relnames_for_relkinds(
- connection,
- schema=None,
- relkinds=pg_catalog.RELKINDS_TABLE_NO_FOREIGN,
- scope=ObjectScope.TEMPORARY,
- )
- @reflection.cache
- def _get_foreign_table_names(self, connection, schema=None, **kw):
- return self._get_relnames_for_relkinds(
- connection, schema, relkinds=("f",), scope=ObjectScope.ANY
- )
- @reflection.cache
- def get_view_names(self, connection, schema=None, **kw):
- return self._get_relnames_for_relkinds(
- connection,
- schema,
- pg_catalog.RELKINDS_VIEW,
- scope=ObjectScope.DEFAULT,
- )
- @reflection.cache
- def get_materialized_view_names(self, connection, schema=None, **kw):
- return self._get_relnames_for_relkinds(
- connection,
- schema,
- pg_catalog.RELKINDS_MAT_VIEW,
- scope=ObjectScope.DEFAULT,
- )
- @reflection.cache
- def get_temp_view_names(self, connection, schema=None, **kw):
- return self._get_relnames_for_relkinds(
- connection,
- schema,
- # NOTE: do not include temp materialzied views (that do not
- # seem to be a thing at least up to version 14)
- pg_catalog.RELKINDS_VIEW,
- scope=ObjectScope.TEMPORARY,
- )
- @reflection.cache
- def get_sequence_names(self, connection, schema=None, **kw):
- return self._get_relnames_for_relkinds(
- connection, schema, relkinds=("S",), scope=ObjectScope.ANY
- )
- @reflection.cache
- def get_view_definition(self, connection, view_name, schema=None, **kw):
- query = (
- select(pg_catalog.pg_get_viewdef(pg_catalog.pg_class.c.oid))
- .select_from(pg_catalog.pg_class)
- .where(
- pg_catalog.pg_class.c.relname == view_name,
- self._pg_class_relkind_condition(
- pg_catalog.RELKINDS_VIEW + pg_catalog.RELKINDS_MAT_VIEW
- ),
- )
- )
- query = self._pg_class_filter_scope_schema(
- query, schema, scope=ObjectScope.ANY
- )
- res = connection.scalar(query)
- if res is None:
- raise exc.NoSuchTableError(
- f"{schema}.{view_name}" if schema else view_name
- )
- else:
- return res
- def _value_or_raise(self, data, table, schema):
- try:
- return dict(data)[(schema, table)]
- except KeyError:
- raise exc.NoSuchTableError(
- f"{schema}.{table}" if schema else table
- ) from None
- def _prepare_filter_names(self, filter_names):
- if filter_names:
- return True, {"filter_names": filter_names}
- else:
- return False, {}
- def _kind_to_relkinds(self, kind: ObjectKind) -> Tuple[str, ...]:
- if kind is ObjectKind.ANY:
- return pg_catalog.RELKINDS_ALL_TABLE_LIKE
- relkinds = ()
- if ObjectKind.TABLE in kind:
- relkinds += pg_catalog.RELKINDS_TABLE
- if ObjectKind.VIEW in kind:
- relkinds += pg_catalog.RELKINDS_VIEW
- if ObjectKind.MATERIALIZED_VIEW in kind:
- relkinds += pg_catalog.RELKINDS_MAT_VIEW
- return relkinds
- @reflection.cache
- def get_columns(self, connection, table_name, schema=None, **kw):
- data = self.get_multi_columns(
- connection,
- schema=schema,
- filter_names=[table_name],
- scope=ObjectScope.ANY,
- kind=ObjectKind.ANY,
- **kw,
- )
- return self._value_or_raise(data, table_name, schema)
- @lru_cache()
- def _columns_query(self, schema, has_filter_names, scope, kind):
- # NOTE: the query with the default and identity options scalar
- # subquery is faster than trying to use outer joins for them
- generated = (
- pg_catalog.pg_attribute.c.attgenerated.label("generated")
- if self.server_version_info >= (12,)
- else sql.null().label("generated")
- )
- if self.server_version_info >= (10,):
- # join lateral performs worse (~2x slower) than a scalar_subquery
- identity = (
- select(
- sql.func.json_build_object(
- "always",
- pg_catalog.pg_attribute.c.attidentity == "a",
- "start",
- pg_catalog.pg_sequence.c.seqstart,
- "increment",
- pg_catalog.pg_sequence.c.seqincrement,
- "minvalue",
- pg_catalog.pg_sequence.c.seqmin,
- "maxvalue",
- pg_catalog.pg_sequence.c.seqmax,
- "cache",
- pg_catalog.pg_sequence.c.seqcache,
- "cycle",
- pg_catalog.pg_sequence.c.seqcycle,
- type_=sqltypes.JSON(),
- )
- )
- .select_from(pg_catalog.pg_sequence)
- .where(
- # attidentity != '' is required or it will reflect also
- # serial columns as identity.
- pg_catalog.pg_attribute.c.attidentity != "",
- pg_catalog.pg_sequence.c.seqrelid
- == sql.cast(
- sql.cast(
- pg_catalog.pg_get_serial_sequence(
- sql.cast(
- sql.cast(
- pg_catalog.pg_attribute.c.attrelid,
- REGCLASS,
- ),
- TEXT,
- ),
- pg_catalog.pg_attribute.c.attname,
- ),
- REGCLASS,
- ),
- OID,
- ),
- )
- .correlate(pg_catalog.pg_attribute)
- .scalar_subquery()
- .label("identity_options")
- )
- else:
- identity = sql.null().label("identity_options")
- # join lateral performs the same as scalar_subquery here
- default = (
- select(
- pg_catalog.pg_get_expr(
- pg_catalog.pg_attrdef.c.adbin,
- pg_catalog.pg_attrdef.c.adrelid,
- )
- )
- .select_from(pg_catalog.pg_attrdef)
- .where(
- pg_catalog.pg_attrdef.c.adrelid
- == pg_catalog.pg_attribute.c.attrelid,
- pg_catalog.pg_attrdef.c.adnum
- == pg_catalog.pg_attribute.c.attnum,
- pg_catalog.pg_attribute.c.atthasdef,
- )
- .correlate(pg_catalog.pg_attribute)
- .scalar_subquery()
- .label("default")
- )
- relkinds = self._kind_to_relkinds(kind)
- query = (
- select(
- pg_catalog.pg_attribute.c.attname.label("name"),
- pg_catalog.format_type(
- pg_catalog.pg_attribute.c.atttypid,
- pg_catalog.pg_attribute.c.atttypmod,
- ).label("format_type"),
- default,
- pg_catalog.pg_attribute.c.attnotnull.label("not_null"),
- pg_catalog.pg_class.c.relname.label("table_name"),
- pg_catalog.pg_description.c.description.label("comment"),
- generated,
- identity,
- )
- .select_from(pg_catalog.pg_class)
- # NOTE: postgresql support table with no user column, meaning
- # there is no row with pg_attribute.attnum > 0. use a left outer
- # join to avoid filtering these tables.
- .outerjoin(
- pg_catalog.pg_attribute,
- sql.and_(
- pg_catalog.pg_class.c.oid
- == pg_catalog.pg_attribute.c.attrelid,
- pg_catalog.pg_attribute.c.attnum > 0,
- ~pg_catalog.pg_attribute.c.attisdropped,
- ),
- )
- .outerjoin(
- pg_catalog.pg_description,
- sql.and_(
- pg_catalog.pg_description.c.objoid
- == pg_catalog.pg_attribute.c.attrelid,
- pg_catalog.pg_description.c.objsubid
- == pg_catalog.pg_attribute.c.attnum,
- ),
- )
- .where(self._pg_class_relkind_condition(relkinds))
- .order_by(
- pg_catalog.pg_class.c.relname, pg_catalog.pg_attribute.c.attnum
- )
- )
- query = self._pg_class_filter_scope_schema(query, schema, scope=scope)
- if has_filter_names:
- query = query.where(
- pg_catalog.pg_class.c.relname.in_(bindparam("filter_names"))
- )
- return query
- def get_multi_columns(
- self, connection, schema, filter_names, scope, kind, **kw
- ):
- has_filter_names, params = self._prepare_filter_names(filter_names)
- query = self._columns_query(schema, has_filter_names, scope, kind)
- rows = connection.execute(query, params).mappings()
- # dictionary with (name, ) if default search path or (schema, name)
- # as keys
- domains = {
- ((d["schema"], d["name"]) if not d["visible"] else (d["name"],)): d
- for d in self._load_domains(
- connection, schema="*", info_cache=kw.get("info_cache")
- )
- }
- # dictionary with (name, ) if default search path or (schema, name)
- # as keys
- enums = dict(
- (
- ((rec["name"],), rec)
- if rec["visible"]
- else ((rec["schema"], rec["name"]), rec)
- )
- for rec in self._load_enums(
- connection, schema="*", info_cache=kw.get("info_cache")
- )
- )
- columns = self._get_columns_info(rows, domains, enums, schema)
- return columns.items()
- _format_type_args_pattern = re.compile(r"\((.*)\)")
- _format_type_args_delim = re.compile(r"\s*,\s*")
- _format_array_spec_pattern = re.compile(r"((?:\[\])*)$")
- def _reflect_type(
- self,
- format_type: Optional[str],
- domains: Dict[str, ReflectedDomain],
- enums: Dict[str, ReflectedEnum],
- type_description: str,
- ) -> sqltypes.TypeEngine[Any]:
- """
- Attempts to reconstruct a column type defined in ischema_names based
- on the information available in the format_type.
- If the `format_type` cannot be associated with a known `ischema_names`,
- it is treated as a reference to a known PostgreSQL named `ENUM` or
- `DOMAIN` type.
- """
- type_description = type_description or "unknown type"
- if format_type is None:
- util.warn(
- "PostgreSQL format_type() returned NULL for %s"
- % type_description
- )
- return sqltypes.NULLTYPE
- attype_args_match = self._format_type_args_pattern.search(format_type)
- if attype_args_match and attype_args_match.group(1):
- attype_args = self._format_type_args_delim.split(
- attype_args_match.group(1)
- )
- else:
- attype_args = ()
- match_array_dim = self._format_array_spec_pattern.search(format_type)
- # Each "[]" in array specs corresponds to an array dimension
- array_dim = len(match_array_dim.group(1) or "") // 2
- # Remove all parameters and array specs from format_type to obtain an
- # ischema_name candidate
- attype = self._format_type_args_pattern.sub("", format_type)
- attype = self._format_array_spec_pattern.sub("", attype)
- schema_type = self.ischema_names.get(attype.lower(), None)
- args, kwargs = (), {}
- if attype == "numeric":
- if len(attype_args) == 2:
- precision, scale = map(int, attype_args)
- args = (precision, scale)
- elif attype == "double precision":
- args = (53,)
- elif attype == "integer":
- args = ()
- elif attype in ("timestamp with time zone", "time with time zone"):
- kwargs["timezone"] = True
- if len(attype_args) == 1:
- kwargs["precision"] = int(attype_args[0])
- elif attype in (
- "timestamp without time zone",
- "time without time zone",
- "time",
- ):
- kwargs["timezone"] = False
- if len(attype_args) == 1:
- kwargs["precision"] = int(attype_args[0])
- elif attype == "bit varying":
- kwargs["varying"] = True
- if len(attype_args) == 1:
- charlen = int(attype_args[0])
- args = (charlen,)
- # a domain or enum can start with interval, so be mindful of that.
- elif attype == "interval" or attype.startswith("interval "):
- schema_type = INTERVAL
- field_match = re.match(r"interval (.+)", attype)
- if field_match:
- kwargs["fields"] = field_match.group(1)
- if len(attype_args) == 1:
- kwargs["precision"] = int(attype_args[0])
- else:
- enum_or_domain_key = tuple(util.quoted_token_parser(attype))
- if enum_or_domain_key in enums:
- schema_type = ENUM
- enum = enums[enum_or_domain_key]
- kwargs["name"] = enum["name"]
- if not enum["visible"]:
- kwargs["schema"] = enum["schema"]
- args = tuple(enum["labels"])
- elif enum_or_domain_key in domains:
- schema_type = DOMAIN
- domain = domains[enum_or_domain_key]
- data_type = self._reflect_type(
- domain["type"],
- domains,
- enums,
- type_description="DOMAIN '%s'" % domain["name"],
- )
- args = (domain["name"], data_type)
- kwargs["collation"] = domain["collation"]
- kwargs["default"] = domain["default"]
- kwargs["not_null"] = not domain["nullable"]
- kwargs["create_type"] = False
- if domain["constraints"]:
- # We only support a single constraint
- check_constraint = domain["constraints"][0]
- kwargs["constraint_name"] = check_constraint["name"]
- kwargs["check"] = check_constraint["check"]
- if not domain["visible"]:
- kwargs["schema"] = domain["schema"]
- else:
- try:
- charlen = int(attype_args[0])
- args = (charlen, *attype_args[1:])
- except (ValueError, IndexError):
- args = attype_args
- if not schema_type:
- util.warn(
- "Did not recognize type '%s' of %s"
- % (attype, type_description)
- )
- return sqltypes.NULLTYPE
- data_type = schema_type(*args, **kwargs)
- if array_dim >= 1:
- # postgres does not preserve dimensionality or size of array types.
- data_type = _array.ARRAY(data_type)
- return data_type
- def _get_columns_info(self, rows, domains, enums, schema):
- columns = defaultdict(list)
- for row_dict in rows:
- # ensure that each table has an entry, even if it has no columns
- if row_dict["name"] is None:
- columns[(schema, row_dict["table_name"])] = (
- ReflectionDefaults.columns()
- )
- continue
- table_cols = columns[(schema, row_dict["table_name"])]
- coltype = self._reflect_type(
- row_dict["format_type"],
- domains,
- enums,
- type_description="column '%s'" % row_dict["name"],
- )
- default = row_dict["default"]
- name = row_dict["name"]
- generated = row_dict["generated"]
- nullable = not row_dict["not_null"]
- if isinstance(coltype, DOMAIN):
- if not default:
- # domain can override the default value but
- # cant set it to None
- if coltype.default is not None:
- default = coltype.default
- nullable = nullable and not coltype.not_null
- identity = row_dict["identity_options"]
- # If a zero byte or blank string depending on driver (is also
- # absent for older PG versions), then not a generated column.
- # Otherwise, s = stored. (Other values might be added in the
- # future.)
- if generated not in (None, "", b"\x00"):
- computed = dict(
- sqltext=default, persisted=generated in ("s", b"s")
- )
- default = None
- else:
- computed = None
- # adjust the default value
- autoincrement = False
- if default is not None:
- match = re.search(r"""(nextval\(')([^']+)('.*$)""", default)
- if match is not None:
- if issubclass(coltype._type_affinity, sqltypes.Integer):
- autoincrement = True
- # the default is related to a Sequence
- if "." not in match.group(2) and schema is not None:
- # unconditionally quote the schema name. this could
- # later be enhanced to obey quoting rules /
- # "quote schema"
- default = (
- match.group(1)
- + ('"%s"' % schema)
- + "."
- + match.group(2)
- + match.group(3)
- )
- column_info = {
- "name": name,
- "type": coltype,
- "nullable": nullable,
- "default": default,
- "autoincrement": autoincrement or identity is not None,
- "comment": row_dict["comment"],
- }
- if computed is not None:
- column_info["computed"] = computed
- if identity is not None:
- column_info["identity"] = identity
- table_cols.append(column_info)
- return columns
- @lru_cache()
- def _table_oids_query(self, schema, has_filter_names, scope, kind):
- relkinds = self._kind_to_relkinds(kind)
- oid_q = select(
- pg_catalog.pg_class.c.oid, pg_catalog.pg_class.c.relname
- ).where(self._pg_class_relkind_condition(relkinds))
- oid_q = self._pg_class_filter_scope_schema(oid_q, schema, scope=scope)
- if has_filter_names:
- oid_q = oid_q.where(
- pg_catalog.pg_class.c.relname.in_(bindparam("filter_names"))
- )
- return oid_q
- @reflection.flexi_cache(
- ("schema", InternalTraversal.dp_string),
- ("filter_names", InternalTraversal.dp_string_list),
- ("kind", InternalTraversal.dp_plain_obj),
- ("scope", InternalTraversal.dp_plain_obj),
- )
- def _get_table_oids(
- self, connection, schema, filter_names, scope, kind, **kw
- ):
- has_filter_names, params = self._prepare_filter_names(filter_names)
- oid_q = self._table_oids_query(schema, has_filter_names, scope, kind)
- result = connection.execute(oid_q, params)
- return result.all()
- @util.memoized_property
- def _constraint_query(self):
- if self.server_version_info >= (11, 0):
- indnkeyatts = pg_catalog.pg_index.c.indnkeyatts
- else:
- indnkeyatts = pg_catalog.pg_index.c.indnatts.label("indnkeyatts")
- if self.server_version_info >= (15,):
- indnullsnotdistinct = pg_catalog.pg_index.c.indnullsnotdistinct
- else:
- indnullsnotdistinct = sql.false().label("indnullsnotdistinct")
- con_sq = (
- select(
- pg_catalog.pg_constraint.c.conrelid,
- pg_catalog.pg_constraint.c.conname,
- sql.func.unnest(pg_catalog.pg_index.c.indkey).label("attnum"),
- sql.func.generate_subscripts(
- pg_catalog.pg_index.c.indkey, 1
- ).label("ord"),
- indnkeyatts,
- indnullsnotdistinct,
- pg_catalog.pg_description.c.description,
- )
- .join(
- pg_catalog.pg_index,
- pg_catalog.pg_constraint.c.conindid
- == pg_catalog.pg_index.c.indexrelid,
- )
- .outerjoin(
- pg_catalog.pg_description,
- pg_catalog.pg_description.c.objoid
- == pg_catalog.pg_constraint.c.oid,
- )
- .where(
- pg_catalog.pg_constraint.c.contype == bindparam("contype"),
- pg_catalog.pg_constraint.c.conrelid.in_(bindparam("oids")),
- # NOTE: filtering also on pg_index.indrelid for oids does
- # not seem to have a performance effect, but it may be an
- # option if perf problems are reported
- )
- .subquery("con")
- )
- attr_sq = (
- select(
- con_sq.c.conrelid,
- con_sq.c.conname,
- con_sq.c.description,
- con_sq.c.ord,
- con_sq.c.indnkeyatts,
- con_sq.c.indnullsnotdistinct,
- pg_catalog.pg_attribute.c.attname,
- )
- .select_from(pg_catalog.pg_attribute)
- .join(
- con_sq,
- sql.and_(
- pg_catalog.pg_attribute.c.attnum == con_sq.c.attnum,
- pg_catalog.pg_attribute.c.attrelid == con_sq.c.conrelid,
- ),
- )
- .where(
- # NOTE: restate the condition here, since pg15 otherwise
- # seems to get confused on pscopg2 sometimes, doing
- # a sequential scan of pg_attribute.
- # The condition in the con_sq subquery is not actually needed
- # in pg15, but it may be needed in older versions. Keeping it
- # does not seems to have any inpact in any case.
- con_sq.c.conrelid.in_(bindparam("oids"))
- )
- .subquery("attr")
- )
- return (
- select(
- attr_sq.c.conrelid,
- sql.func.array_agg(
- # NOTE: cast since some postgresql derivatives may
- # not support array_agg on the name type
- aggregate_order_by(
- attr_sq.c.attname.cast(TEXT), attr_sq.c.ord
- )
- ).label("cols"),
- attr_sq.c.conname,
- sql.func.min(attr_sq.c.description).label("description"),
- sql.func.min(attr_sq.c.indnkeyatts).label("indnkeyatts"),
- sql.func.bool_and(attr_sq.c.indnullsnotdistinct).label(
- "indnullsnotdistinct"
- ),
- )
- .group_by(attr_sq.c.conrelid, attr_sq.c.conname)
- .order_by(attr_sq.c.conrelid, attr_sq.c.conname)
- )
- def _reflect_constraint(
- self, connection, contype, schema, filter_names, scope, kind, **kw
- ):
- # used to reflect primary and unique constraint
- table_oids = self._get_table_oids(
- connection, schema, filter_names, scope, kind, **kw
- )
- batches = list(table_oids)
- is_unique = contype == "u"
- while batches:
- batch = batches[0:3000]
- batches[0:3000] = []
- result = connection.execute(
- self._constraint_query,
- {"oids": [r[0] for r in batch], "contype": contype},
- ).mappings()
- result_by_oid = defaultdict(list)
- for row_dict in result:
- result_by_oid[row_dict["conrelid"]].append(row_dict)
- for oid, tablename in batch:
- for_oid = result_by_oid.get(oid, ())
- if for_oid:
- for row in for_oid:
- # See note in get_multi_indexes
- all_cols = row["cols"]
- indnkeyatts = row["indnkeyatts"]
- if len(all_cols) > indnkeyatts:
- inc_cols = all_cols[indnkeyatts:]
- cst_cols = all_cols[:indnkeyatts]
- else:
- inc_cols = []
- cst_cols = all_cols
- opts = {}
- if self.server_version_info >= (11,):
- opts["postgresql_include"] = inc_cols
- if is_unique:
- opts["postgresql_nulls_not_distinct"] = row[
- "indnullsnotdistinct"
- ]
- yield (
- tablename,
- cst_cols,
- row["conname"],
- row["description"],
- opts,
- )
- else:
- yield tablename, None, None, None, None
- @reflection.cache
- def get_pk_constraint(self, connection, table_name, schema=None, **kw):
- data = self.get_multi_pk_constraint(
- connection,
- schema=schema,
- filter_names=[table_name],
- scope=ObjectScope.ANY,
- kind=ObjectKind.ANY,
- **kw,
- )
- return self._value_or_raise(data, table_name, schema)
- def get_multi_pk_constraint(
- self, connection, schema, filter_names, scope, kind, **kw
- ):
- result = self._reflect_constraint(
- connection, "p", schema, filter_names, scope, kind, **kw
- )
- # only a single pk can be present for each table. Return an entry
- # even if a table has no primary key
- default = ReflectionDefaults.pk_constraint
- def pk_constraint(pk_name, cols, comment, opts):
- info = {
- "constrained_columns": cols,
- "name": pk_name,
- "comment": comment,
- }
- if opts:
- info["dialect_options"] = opts
- return info
- return (
- (
- (schema, table_name),
- (
- pk_constraint(pk_name, cols, comment, opts)
- if pk_name is not None
- else default()
- ),
- )
- for table_name, cols, pk_name, comment, opts in result
- )
- @reflection.cache
- def get_foreign_keys(
- self,
- connection,
- table_name,
- schema=None,
- postgresql_ignore_search_path=False,
- **kw,
- ):
- data = self.get_multi_foreign_keys(
- connection,
- schema=schema,
- filter_names=[table_name],
- postgresql_ignore_search_path=postgresql_ignore_search_path,
- scope=ObjectScope.ANY,
- kind=ObjectKind.ANY,
- **kw,
- )
- return self._value_or_raise(data, table_name, schema)
- @lru_cache()
- def _foreing_key_query(self, schema, has_filter_names, scope, kind):
- pg_class_ref = pg_catalog.pg_class.alias("cls_ref")
- pg_namespace_ref = pg_catalog.pg_namespace.alias("nsp_ref")
- relkinds = self._kind_to_relkinds(kind)
- query = (
- select(
- pg_catalog.pg_class.c.relname,
- pg_catalog.pg_constraint.c.conname,
- # NOTE: avoid calling pg_get_constraintdef when not needed
- # to speed up the query
- sql.case(
- (
- pg_catalog.pg_constraint.c.oid.is_not(None),
- pg_catalog.pg_get_constraintdef(
- pg_catalog.pg_constraint.c.oid, True
- ),
- ),
- else_=None,
- ),
- pg_namespace_ref.c.nspname,
- pg_catalog.pg_description.c.description,
- )
- .select_from(pg_catalog.pg_class)
- .outerjoin(
- pg_catalog.pg_constraint,
- sql.and_(
- pg_catalog.pg_class.c.oid
- == pg_catalog.pg_constraint.c.conrelid,
- pg_catalog.pg_constraint.c.contype == "f",
- ),
- )
- .outerjoin(
- pg_class_ref,
- pg_class_ref.c.oid == pg_catalog.pg_constraint.c.confrelid,
- )
- .outerjoin(
- pg_namespace_ref,
- pg_class_ref.c.relnamespace == pg_namespace_ref.c.oid,
- )
- .outerjoin(
- pg_catalog.pg_description,
- pg_catalog.pg_description.c.objoid
- == pg_catalog.pg_constraint.c.oid,
- )
- .order_by(
- pg_catalog.pg_class.c.relname,
- pg_catalog.pg_constraint.c.conname,
- )
- .where(self._pg_class_relkind_condition(relkinds))
- )
- query = self._pg_class_filter_scope_schema(query, schema, scope)
- if has_filter_names:
- query = query.where(
- pg_catalog.pg_class.c.relname.in_(bindparam("filter_names"))
- )
- return query
- @util.memoized_property
- def _fk_regex_pattern(self):
- # optionally quoted token
- qtoken = '(?:"[^"]+"|[A-Za-z0-9_]+?)'
- # https://www.postgresql.org/docs/current/static/sql-createtable.html
- return re.compile(
- r"FOREIGN KEY \((.*?)\) "
- rf"REFERENCES (?:({qtoken})\.)?({qtoken})\(((?:{qtoken}(?: *, *)?)+)\)" # noqa: E501
- r"[\s]?(MATCH (FULL|PARTIAL|SIMPLE)+)?"
- r"[\s]?(ON UPDATE "
- r"(CASCADE|RESTRICT|NO ACTION|SET NULL|SET DEFAULT)+)?"
- r"[\s]?(ON DELETE "
- r"(CASCADE|RESTRICT|NO ACTION|"
- r"SET (?:NULL|DEFAULT)(?:\s\(.+\))?)+)?"
- r"[\s]?(DEFERRABLE|NOT DEFERRABLE)?"
- r"[\s]?(INITIALLY (DEFERRED|IMMEDIATE)+)?"
- )
- def get_multi_foreign_keys(
- self,
- connection,
- schema,
- filter_names,
- scope,
- kind,
- postgresql_ignore_search_path=False,
- **kw,
- ):
- preparer = self.identifier_preparer
- has_filter_names, params = self._prepare_filter_names(filter_names)
- query = self._foreing_key_query(schema, has_filter_names, scope, kind)
- result = connection.execute(query, params)
- FK_REGEX = self._fk_regex_pattern
- fkeys = defaultdict(list)
- default = ReflectionDefaults.foreign_keys
- for table_name, conname, condef, conschema, comment in result:
- # ensure that each table has an entry, even if it has
- # no foreign keys
- if conname is None:
- fkeys[(schema, table_name)] = default()
- continue
- table_fks = fkeys[(schema, table_name)]
- m = re.search(FK_REGEX, condef).groups()
- (
- constrained_columns,
- referred_schema,
- referred_table,
- referred_columns,
- _,
- match,
- _,
- onupdate,
- _,
- ondelete,
- deferrable,
- _,
- initially,
- ) = m
- if deferrable is not None:
- deferrable = True if deferrable == "DEFERRABLE" else False
- constrained_columns = [
- preparer._unquote_identifier(x)
- for x in re.split(r"\s*,\s*", constrained_columns)
- ]
- if postgresql_ignore_search_path:
- # when ignoring search path, we use the actual schema
- # provided it isn't the "default" schema
- if conschema != self.default_schema_name:
- referred_schema = conschema
- else:
- referred_schema = schema
- elif referred_schema:
- # referred_schema is the schema that we regexp'ed from
- # pg_get_constraintdef(). If the schema is in the search
- # path, pg_get_constraintdef() will give us None.
- referred_schema = preparer._unquote_identifier(referred_schema)
- elif schema is not None and schema == conschema:
- # If the actual schema matches the schema of the table
- # we're reflecting, then we will use that.
- referred_schema = schema
- referred_table = preparer._unquote_identifier(referred_table)
- referred_columns = [
- preparer._unquote_identifier(x)
- for x in re.split(r"\s*,\s", referred_columns)
- ]
- options = {
- k: v
- for k, v in [
- ("onupdate", onupdate),
- ("ondelete", ondelete),
- ("initially", initially),
- ("deferrable", deferrable),
- ("match", match),
- ]
- if v is not None and v != "NO ACTION"
- }
- fkey_d = {
- "name": conname,
- "constrained_columns": constrained_columns,
- "referred_schema": referred_schema,
- "referred_table": referred_table,
- "referred_columns": referred_columns,
- "options": options,
- "comment": comment,
- }
- table_fks.append(fkey_d)
- return fkeys.items()
- @reflection.cache
- def get_indexes(self, connection, table_name, schema=None, **kw):
- data = self.get_multi_indexes(
- connection,
- schema=schema,
- filter_names=[table_name],
- scope=ObjectScope.ANY,
- kind=ObjectKind.ANY,
- **kw,
- )
- return self._value_or_raise(data, table_name, schema)
- @util.memoized_property
- def _index_query(self):
- # NOTE: pg_index is used as from two times to improve performance,
- # since extraing all the index information from `idx_sq` to avoid
- # the second pg_index use leads to a worse performing query in
- # particular when querying for a single table (as of pg 17)
- # NOTE: repeating oids clause improve query performance
- # subquery to get the columns
- idx_sq = (
- select(
- pg_catalog.pg_index.c.indexrelid,
- pg_catalog.pg_index.c.indrelid,
- sql.func.unnest(pg_catalog.pg_index.c.indkey).label("attnum"),
- sql.func.unnest(pg_catalog.pg_index.c.indclass).label(
- "att_opclass"
- ),
- sql.func.generate_subscripts(
- pg_catalog.pg_index.c.indkey, 1
- ).label("ord"),
- )
- .where(
- ~pg_catalog.pg_index.c.indisprimary,
- pg_catalog.pg_index.c.indrelid.in_(bindparam("oids")),
- )
- .subquery("idx")
- )
- attr_sq = (
- select(
- idx_sq.c.indexrelid,
- idx_sq.c.indrelid,
- idx_sq.c.ord,
- # NOTE: always using pg_get_indexdef is too slow so just
- # invoke when the element is an expression
- sql.case(
- (
- idx_sq.c.attnum == 0,
- pg_catalog.pg_get_indexdef(
- idx_sq.c.indexrelid, idx_sq.c.ord + 1, True
- ),
- ),
- # NOTE: need to cast this since attname is of type "name"
- # that's limited to 63 bytes, while pg_get_indexdef
- # returns "text" so its output may get cut
- else_=pg_catalog.pg_attribute.c.attname.cast(TEXT),
- ).label("element"),
- (idx_sq.c.attnum == 0).label("is_expr"),
- pg_catalog.pg_opclass.c.opcname,
- pg_catalog.pg_opclass.c.opcdefault,
- )
- .select_from(idx_sq)
- .outerjoin(
- # do not remove rows where idx_sq.c.attnum is 0
- pg_catalog.pg_attribute,
- sql.and_(
- pg_catalog.pg_attribute.c.attnum == idx_sq.c.attnum,
- pg_catalog.pg_attribute.c.attrelid == idx_sq.c.indrelid,
- ),
- )
- .outerjoin(
- pg_catalog.pg_opclass,
- pg_catalog.pg_opclass.c.oid == idx_sq.c.att_opclass,
- )
- .where(idx_sq.c.indrelid.in_(bindparam("oids")))
- .subquery("idx_attr")
- )
- cols_sq = (
- select(
- attr_sq.c.indexrelid,
- sql.func.min(attr_sq.c.indrelid),
- sql.func.array_agg(
- aggregate_order_by(attr_sq.c.element, attr_sq.c.ord)
- ).label("elements"),
- sql.func.array_agg(
- aggregate_order_by(attr_sq.c.is_expr, attr_sq.c.ord)
- ).label("elements_is_expr"),
- sql.func.array_agg(
- aggregate_order_by(attr_sq.c.opcname, attr_sq.c.ord)
- ).label("elements_opclass"),
- sql.func.array_agg(
- aggregate_order_by(attr_sq.c.opcdefault, attr_sq.c.ord)
- ).label("elements_opdefault"),
- )
- .group_by(attr_sq.c.indexrelid)
- .subquery("idx_cols")
- )
- if self.server_version_info >= (11, 0):
- indnkeyatts = pg_catalog.pg_index.c.indnkeyatts
- else:
- indnkeyatts = pg_catalog.pg_index.c.indnatts.label("indnkeyatts")
- if self.server_version_info >= (15,):
- nulls_not_distinct = pg_catalog.pg_index.c.indnullsnotdistinct
- else:
- nulls_not_distinct = sql.false().label("indnullsnotdistinct")
- return (
- select(
- pg_catalog.pg_index.c.indrelid,
- pg_catalog.pg_class.c.relname,
- pg_catalog.pg_index.c.indisunique,
- pg_catalog.pg_constraint.c.conrelid.is_not(None).label(
- "has_constraint"
- ),
- pg_catalog.pg_index.c.indoption,
- pg_catalog.pg_class.c.reloptions,
- pg_catalog.pg_am.c.amname,
- # NOTE: pg_get_expr is very fast so this case has almost no
- # performance impact
- sql.case(
- (
- pg_catalog.pg_index.c.indpred.is_not(None),
- pg_catalog.pg_get_expr(
- pg_catalog.pg_index.c.indpred,
- pg_catalog.pg_index.c.indrelid,
- ),
- ),
- else_=None,
- ).label("filter_definition"),
- indnkeyatts,
- nulls_not_distinct,
- cols_sq.c.elements,
- cols_sq.c.elements_is_expr,
- cols_sq.c.elements_opclass,
- cols_sq.c.elements_opdefault,
- )
- .select_from(pg_catalog.pg_index)
- .where(
- pg_catalog.pg_index.c.indrelid.in_(bindparam("oids")),
- ~pg_catalog.pg_index.c.indisprimary,
- )
- .join(
- pg_catalog.pg_class,
- pg_catalog.pg_index.c.indexrelid == pg_catalog.pg_class.c.oid,
- )
- .join(
- pg_catalog.pg_am,
- pg_catalog.pg_class.c.relam == pg_catalog.pg_am.c.oid,
- )
- .outerjoin(
- cols_sq,
- pg_catalog.pg_index.c.indexrelid == cols_sq.c.indexrelid,
- )
- .outerjoin(
- pg_catalog.pg_constraint,
- sql.and_(
- pg_catalog.pg_index.c.indrelid
- == pg_catalog.pg_constraint.c.conrelid,
- pg_catalog.pg_index.c.indexrelid
- == pg_catalog.pg_constraint.c.conindid,
- pg_catalog.pg_constraint.c.contype
- == sql.any_(_array.array(("p", "u", "x"))),
- ),
- )
- .order_by(
- pg_catalog.pg_index.c.indrelid, pg_catalog.pg_class.c.relname
- )
- )
- def get_multi_indexes(
- self, connection, schema, filter_names, scope, kind, **kw
- ):
- table_oids = self._get_table_oids(
- connection, schema, filter_names, scope, kind, **kw
- )
- indexes = defaultdict(list)
- default = ReflectionDefaults.indexes
- batches = list(table_oids)
- while batches:
- batch = batches[0:3000]
- batches[0:3000] = []
- result = connection.execute(
- self._index_query, {"oids": [r[0] for r in batch]}
- ).mappings()
- result_by_oid = defaultdict(list)
- for row_dict in result:
- result_by_oid[row_dict["indrelid"]].append(row_dict)
- for oid, table_name in batch:
- if oid not in result_by_oid:
- # ensure that each table has an entry, even if reflection
- # is skipped because not supported
- indexes[(schema, table_name)] = default()
- continue
- for row in result_by_oid[oid]:
- index_name = row["relname"]
- table_indexes = indexes[(schema, table_name)]
- all_elements = row["elements"]
- all_elements_is_expr = row["elements_is_expr"]
- all_elements_opclass = row["elements_opclass"]
- all_elements_opdefault = row["elements_opdefault"]
- indnkeyatts = row["indnkeyatts"]
- # "The number of key columns in the index, not counting any
- # included columns, which are merely stored and do not
- # participate in the index semantics"
- if len(all_elements) > indnkeyatts:
- # this is a "covering index" which has INCLUDE columns
- # as well as regular index columns
- inc_cols = all_elements[indnkeyatts:]
- idx_elements = all_elements[:indnkeyatts]
- idx_elements_is_expr = all_elements_is_expr[
- :indnkeyatts
- ]
- # postgresql does not support expression on included
- # columns as of v14: "ERROR: expressions are not
- # supported in included columns".
- assert all(
- not is_expr
- for is_expr in all_elements_is_expr[indnkeyatts:]
- )
- idx_elements_opclass = all_elements_opclass[
- :indnkeyatts
- ]
- idx_elements_opdefault = all_elements_opdefault[
- :indnkeyatts
- ]
- else:
- idx_elements = all_elements
- idx_elements_is_expr = all_elements_is_expr
- inc_cols = []
- idx_elements_opclass = all_elements_opclass
- idx_elements_opdefault = all_elements_opdefault
- index = {"name": index_name, "unique": row["indisunique"]}
- if any(idx_elements_is_expr):
- index["column_names"] = [
- None if is_expr else expr
- for expr, is_expr in zip(
- idx_elements, idx_elements_is_expr
- )
- ]
- index["expressions"] = idx_elements
- else:
- index["column_names"] = idx_elements
- dialect_options = {}
- if not all(idx_elements_opdefault):
- dialect_options["postgresql_ops"] = {
- name: opclass
- for name, opclass, is_default in zip(
- idx_elements,
- idx_elements_opclass,
- idx_elements_opdefault,
- )
- if not is_default
- }
- sorting = {}
- for col_index, col_flags in enumerate(row["indoption"]):
- col_sorting = ()
- # try to set flags only if they differ from PG
- # defaults...
- if col_flags & 0x01:
- col_sorting += ("desc",)
- if not (col_flags & 0x02):
- col_sorting += ("nulls_last",)
- else:
- if col_flags & 0x02:
- col_sorting += ("nulls_first",)
- if col_sorting:
- sorting[idx_elements[col_index]] = col_sorting
- if sorting:
- index["column_sorting"] = sorting
- if row["has_constraint"]:
- index["duplicates_constraint"] = index_name
- if row["reloptions"]:
- dialect_options["postgresql_with"] = dict(
- [
- option.split("=", 1)
- for option in row["reloptions"]
- ]
- )
- # it *might* be nice to include that this is 'btree' in the
- # reflection info. But we don't want an Index object
- # to have a ``postgresql_using`` in it that is just the
- # default, so for the moment leaving this out.
- amname = row["amname"]
- if amname != "btree":
- dialect_options["postgresql_using"] = row["amname"]
- if row["filter_definition"]:
- dialect_options["postgresql_where"] = row[
- "filter_definition"
- ]
- if self.server_version_info >= (11,):
- # NOTE: this is legacy, this is part of
- # dialect_options now as of #7382
- index["include_columns"] = inc_cols
- dialect_options["postgresql_include"] = inc_cols
- if row["indnullsnotdistinct"]:
- # the default is False, so ignore it.
- dialect_options["postgresql_nulls_not_distinct"] = row[
- "indnullsnotdistinct"
- ]
- if dialect_options:
- index["dialect_options"] = dialect_options
- table_indexes.append(index)
- return indexes.items()
- @reflection.cache
- def get_unique_constraints(
- self, connection, table_name, schema=None, **kw
- ):
- data = self.get_multi_unique_constraints(
- connection,
- schema=schema,
- filter_names=[table_name],
- scope=ObjectScope.ANY,
- kind=ObjectKind.ANY,
- **kw,
- )
- return self._value_or_raise(data, table_name, schema)
- def get_multi_unique_constraints(
- self,
- connection,
- schema,
- filter_names,
- scope,
- kind,
- **kw,
- ):
- result = self._reflect_constraint(
- connection, "u", schema, filter_names, scope, kind, **kw
- )
- # each table can have multiple unique constraints
- uniques = defaultdict(list)
- default = ReflectionDefaults.unique_constraints
- for table_name, cols, con_name, comment, options in result:
- # ensure a list is created for each table. leave it empty if
- # the table has no unique cosntraint
- if con_name is None:
- uniques[(schema, table_name)] = default()
- continue
- uc_dict = {
- "column_names": cols,
- "name": con_name,
- "comment": comment,
- }
- if options:
- uc_dict["dialect_options"] = options
- uniques[(schema, table_name)].append(uc_dict)
- return uniques.items()
- @reflection.cache
- def get_table_comment(self, connection, table_name, schema=None, **kw):
- data = self.get_multi_table_comment(
- connection,
- schema,
- [table_name],
- scope=ObjectScope.ANY,
- kind=ObjectKind.ANY,
- **kw,
- )
- return self._value_or_raise(data, table_name, schema)
- @lru_cache()
- def _comment_query(self, schema, has_filter_names, scope, kind):
- relkinds = self._kind_to_relkinds(kind)
- query = (
- select(
- pg_catalog.pg_class.c.relname,
- pg_catalog.pg_description.c.description,
- )
- .select_from(pg_catalog.pg_class)
- .outerjoin(
- pg_catalog.pg_description,
- sql.and_(
- pg_catalog.pg_class.c.oid
- == pg_catalog.pg_description.c.objoid,
- pg_catalog.pg_description.c.objsubid == 0,
- pg_catalog.pg_description.c.classoid
- == sql.func.cast("pg_catalog.pg_class", REGCLASS),
- ),
- )
- .where(self._pg_class_relkind_condition(relkinds))
- )
- query = self._pg_class_filter_scope_schema(query, schema, scope)
- if has_filter_names:
- query = query.where(
- pg_catalog.pg_class.c.relname.in_(bindparam("filter_names"))
- )
- return query
- def get_multi_table_comment(
- self, connection, schema, filter_names, scope, kind, **kw
- ):
- has_filter_names, params = self._prepare_filter_names(filter_names)
- query = self._comment_query(schema, has_filter_names, scope, kind)
- result = connection.execute(query, params)
- default = ReflectionDefaults.table_comment
- return (
- (
- (schema, table),
- {"text": comment} if comment is not None else default(),
- )
- for table, comment in result
- )
- @reflection.cache
- def get_check_constraints(self, connection, table_name, schema=None, **kw):
- data = self.get_multi_check_constraints(
- connection,
- schema,
- [table_name],
- scope=ObjectScope.ANY,
- kind=ObjectKind.ANY,
- **kw,
- )
- return self._value_or_raise(data, table_name, schema)
- @lru_cache()
- def _check_constraint_query(self, schema, has_filter_names, scope, kind):
- relkinds = self._kind_to_relkinds(kind)
- query = (
- select(
- pg_catalog.pg_class.c.relname,
- pg_catalog.pg_constraint.c.conname,
- # NOTE: avoid calling pg_get_constraintdef when not needed
- # to speed up the query
- sql.case(
- (
- pg_catalog.pg_constraint.c.oid.is_not(None),
- pg_catalog.pg_get_constraintdef(
- pg_catalog.pg_constraint.c.oid, True
- ),
- ),
- else_=None,
- ),
- pg_catalog.pg_description.c.description,
- )
- .select_from(pg_catalog.pg_class)
- .outerjoin(
- pg_catalog.pg_constraint,
- sql.and_(
- pg_catalog.pg_class.c.oid
- == pg_catalog.pg_constraint.c.conrelid,
- pg_catalog.pg_constraint.c.contype == "c",
- ),
- )
- .outerjoin(
- pg_catalog.pg_description,
- pg_catalog.pg_description.c.objoid
- == pg_catalog.pg_constraint.c.oid,
- )
- .order_by(
- pg_catalog.pg_class.c.relname,
- pg_catalog.pg_constraint.c.conname,
- )
- .where(self._pg_class_relkind_condition(relkinds))
- )
- query = self._pg_class_filter_scope_schema(query, schema, scope)
- if has_filter_names:
- query = query.where(
- pg_catalog.pg_class.c.relname.in_(bindparam("filter_names"))
- )
- return query
- def get_multi_check_constraints(
- self, connection, schema, filter_names, scope, kind, **kw
- ):
- has_filter_names, params = self._prepare_filter_names(filter_names)
- query = self._check_constraint_query(
- schema, has_filter_names, scope, kind
- )
- result = connection.execute(query, params)
- check_constraints = defaultdict(list)
- default = ReflectionDefaults.check_constraints
- for table_name, check_name, src, comment in result:
- # only two cases for check_name and src: both null or both defined
- if check_name is None and src is None:
- check_constraints[(schema, table_name)] = default()
- continue
- # samples:
- # "CHECK (((a > 1) AND (a < 5)))"
- # "CHECK (((a = 1) OR ((a > 2) AND (a < 5))))"
- # "CHECK (((a > 1) AND (a < 5))) NOT VALID"
- # "CHECK (some_boolean_function(a))"
- # "CHECK (((a\n < 1)\n OR\n (a\n >= 5))\n)"
- # "CHECK (a NOT NULL) NO INHERIT"
- # "CHECK (a NOT NULL) NO INHERIT NOT VALID"
- m = re.match(
- r"^CHECK *\((.+)\)( NO INHERIT)?( NOT VALID)?$",
- src,
- flags=re.DOTALL,
- )
- if not m:
- util.warn("Could not parse CHECK constraint text: %r" % src)
- sqltext = ""
- else:
- sqltext = re.compile(
- r"^[\s\n]*\((.+)\)[\s\n]*$", flags=re.DOTALL
- ).sub(r"\1", m.group(1))
- entry = {
- "name": check_name,
- "sqltext": sqltext,
- "comment": comment,
- }
- if m:
- do = {}
- if " NOT VALID" in m.groups():
- do["not_valid"] = True
- if " NO INHERIT" in m.groups():
- do["no_inherit"] = True
- if do:
- entry["dialect_options"] = do
- check_constraints[(schema, table_name)].append(entry)
- return check_constraints.items()
- def _pg_type_filter_schema(self, query, schema):
- if schema is None:
- query = query.where(
- pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid),
- # ignore pg_catalog schema
- pg_catalog.pg_namespace.c.nspname != "pg_catalog",
- )
- elif schema != "*":
- query = query.where(pg_catalog.pg_namespace.c.nspname == schema)
- return query
- @lru_cache()
- def _enum_query(self, schema):
- lbl_agg_sq = (
- select(
- pg_catalog.pg_enum.c.enumtypid,
- sql.func.array_agg(
- aggregate_order_by(
- # NOTE: cast since some postgresql derivatives may
- # not support array_agg on the name type
- pg_catalog.pg_enum.c.enumlabel.cast(TEXT),
- pg_catalog.pg_enum.c.enumsortorder,
- )
- ).label("labels"),
- )
- .group_by(pg_catalog.pg_enum.c.enumtypid)
- .subquery("lbl_agg")
- )
- query = (
- select(
- pg_catalog.pg_type.c.typname.label("name"),
- pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid).label(
- "visible"
- ),
- pg_catalog.pg_namespace.c.nspname.label("schema"),
- lbl_agg_sq.c.labels.label("labels"),
- )
- .join(
- pg_catalog.pg_namespace,
- pg_catalog.pg_namespace.c.oid
- == pg_catalog.pg_type.c.typnamespace,
- )
- .outerjoin(
- lbl_agg_sq, pg_catalog.pg_type.c.oid == lbl_agg_sq.c.enumtypid
- )
- .where(pg_catalog.pg_type.c.typtype == "e")
- .order_by(
- pg_catalog.pg_namespace.c.nspname, pg_catalog.pg_type.c.typname
- )
- )
- return self._pg_type_filter_schema(query, schema)
- @reflection.cache
- def _load_enums(self, connection, schema=None, **kw):
- if not self.supports_native_enum:
- return []
- result = connection.execute(self._enum_query(schema))
- enums = []
- for name, visible, schema, labels in result:
- enums.append(
- {
- "name": name,
- "schema": schema,
- "visible": visible,
- "labels": [] if labels is None else labels,
- }
- )
- return enums
- @lru_cache()
- def _domain_query(self, schema):
- con_sq = (
- select(
- pg_catalog.pg_constraint.c.contypid,
- sql.func.array_agg(
- pg_catalog.pg_get_constraintdef(
- pg_catalog.pg_constraint.c.oid, True
- )
- ).label("condefs"),
- sql.func.array_agg(
- # NOTE: cast since some postgresql derivatives may
- # not support array_agg on the name type
- pg_catalog.pg_constraint.c.conname.cast(TEXT)
- ).label("connames"),
- )
- # The domain this constraint is on; zero if not a domain constraint
- .where(pg_catalog.pg_constraint.c.contypid != 0)
- .group_by(pg_catalog.pg_constraint.c.contypid)
- .subquery("domain_constraints")
- )
- query = (
- select(
- pg_catalog.pg_type.c.typname.label("name"),
- pg_catalog.format_type(
- pg_catalog.pg_type.c.typbasetype,
- pg_catalog.pg_type.c.typtypmod,
- ).label("attype"),
- (~pg_catalog.pg_type.c.typnotnull).label("nullable"),
- pg_catalog.pg_type.c.typdefault.label("default"),
- pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid).label(
- "visible"
- ),
- pg_catalog.pg_namespace.c.nspname.label("schema"),
- con_sq.c.condefs,
- con_sq.c.connames,
- pg_catalog.pg_collation.c.collname,
- )
- .join(
- pg_catalog.pg_namespace,
- pg_catalog.pg_namespace.c.oid
- == pg_catalog.pg_type.c.typnamespace,
- )
- .outerjoin(
- pg_catalog.pg_collation,
- pg_catalog.pg_type.c.typcollation
- == pg_catalog.pg_collation.c.oid,
- )
- .outerjoin(
- con_sq,
- pg_catalog.pg_type.c.oid == con_sq.c.contypid,
- )
- .where(pg_catalog.pg_type.c.typtype == "d")
- .order_by(
- pg_catalog.pg_namespace.c.nspname, pg_catalog.pg_type.c.typname
- )
- )
- return self._pg_type_filter_schema(query, schema)
- @reflection.cache
- def _load_domains(self, connection, schema=None, **kw):
- result = connection.execute(self._domain_query(schema))
- domains: List[ReflectedDomain] = []
- for domain in result.mappings():
- # strip (30) from character varying(30)
- attype = re.search(r"([^\(]+)", domain["attype"]).group(1)
- constraints: List[ReflectedDomainConstraint] = []
- if domain["connames"]:
- # When a domain has multiple CHECK constraints, they will
- # be tested in alphabetical order by name.
- sorted_constraints = sorted(
- zip(domain["connames"], domain["condefs"]),
- key=lambda t: t[0],
- )
- for name, def_ in sorted_constraints:
- # constraint is in the form "CHECK (expression)"
- # or "NOT NULL". Ignore the "NOT NULL" and
- # remove "CHECK (" and the tailing ")".
- if def_.casefold().startswith("check"):
- check = def_[7:-1]
- constraints.append({"name": name, "check": check})
- domain_rec: ReflectedDomain = {
- "name": domain["name"],
- "schema": domain["schema"],
- "visible": domain["visible"],
- "type": attype,
- "nullable": domain["nullable"],
- "default": domain["default"],
- "constraints": constraints,
- "collation": domain["collname"],
- }
- domains.append(domain_rec)
- return domains
- def _set_backslash_escapes(self, connection):
- # this method is provided as an override hook for descendant
- # dialects (e.g. Redshift), so removing it may break them
- std_string = connection.exec_driver_sql(
- "show standard_conforming_strings"
- ).scalar()
- self._backslash_escapes = std_string == "off"
|