| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763376437653766376737683769377037713772377337743775377637773778377937803781378237833784378537863787378837893790379137923793379437953796379737983799380038013802 |
- # dialects/oracle/base.py
- # Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
- # <see AUTHORS file>
- #
- # This module is part of SQLAlchemy and is released under
- # the MIT License: https://www.opensource.org/licenses/mit-license.php
- # mypy: ignore-errors
- r"""
- .. dialect:: oracle
- :name: Oracle Database
- :normal_support: 11+
- :best_effort: 9+
- Auto Increment Behavior
- -----------------------
- SQLAlchemy Table objects which include integer primary keys are usually assumed
- to have "autoincrementing" behavior, meaning they can generate their own
- primary key values upon INSERT. For use within Oracle Database, two options are
- available, which are the use of IDENTITY columns (Oracle Database 12 and above
- only) or the association of a SEQUENCE with the column.
- Specifying GENERATED AS IDENTITY (Oracle Database 12 and above)
- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- Starting from version 12, Oracle Database can make use of identity columns
- using the :class:`_sql.Identity` to specify the autoincrementing behavior::
- t = Table(
- "mytable",
- metadata,
- Column("id", Integer, Identity(start=3), primary_key=True),
- Column(...),
- ...,
- )
- The CREATE TABLE for the above :class:`_schema.Table` object would be:
- .. sourcecode:: sql
- CREATE TABLE mytable (
- id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 3),
- ...,
- PRIMARY KEY (id)
- )
- The :class:`_schema.Identity` object support many options to control the
- "autoincrementing" behavior of the column, like the starting value, the
- incrementing value, etc. In addition to the standard options, Oracle Database
- supports setting :paramref:`_schema.Identity.always` to ``None`` to use the
- default generated mode, rendering GENERATED AS IDENTITY in the DDL. It also supports
- setting :paramref:`_schema.Identity.on_null` to ``True`` to specify ON NULL
- in conjunction with a 'BY DEFAULT' identity column.
- Using a SEQUENCE (all Oracle Database versions)
- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- Older version of Oracle Database had no "autoincrement" feature: SQLAlchemy
- relies upon sequences to produce these values. With the older Oracle Database
- versions, *a sequence must always be explicitly specified to enable
- autoincrement*. This is divergent with the majority of documentation examples
- which assume the usage of an autoincrement-capable database. To specify
- sequences, use the sqlalchemy.schema.Sequence object which is passed to a
- Column construct::
- t = Table(
- "mytable",
- metadata,
- Column("id", Integer, Sequence("id_seq", start=1), primary_key=True),
- Column(...),
- ...,
- )
- This step is also required when using table reflection, i.e. autoload_with=engine::
- t = Table(
- "mytable",
- metadata,
- Column("id", Integer, Sequence("id_seq", start=1), primary_key=True),
- autoload_with=engine,
- )
- .. versionchanged:: 1.4 Added :class:`_schema.Identity` construct
- in a :class:`_schema.Column` to specify the option of an autoincrementing
- column.
- .. _oracle_isolation_level:
- Transaction Isolation Level / Autocommit
- ----------------------------------------
- Oracle Database supports "READ COMMITTED" and "SERIALIZABLE" modes of
- isolation. The AUTOCOMMIT isolation level is also supported by the
- python-oracledb and cx_Oracle dialects.
- To set using per-connection execution options::
- connection = engine.connect()
- connection = connection.execution_options(isolation_level="AUTOCOMMIT")
- For ``READ COMMITTED`` and ``SERIALIZABLE``, the Oracle Database dialects sets
- the level at the session level using ``ALTER SESSION``, which is reverted back
- to its default setting when the connection is returned to the connection pool.
- Valid values for ``isolation_level`` include:
- * ``READ COMMITTED``
- * ``AUTOCOMMIT``
- * ``SERIALIZABLE``
- .. note:: The implementation for the
- :meth:`_engine.Connection.get_isolation_level` method as implemented by the
- Oracle Database dialects necessarily force the start of a transaction using the
- Oracle Database DBMS_TRANSACTION.LOCAL_TRANSACTION_ID function; otherwise no
- level is normally readable.
- Additionally, the :meth:`_engine.Connection.get_isolation_level` method will
- raise an exception if the ``v$transaction`` view is not available due to
- permissions or other reasons, which is a common occurrence in Oracle Database
- installations.
- The python-oracledb and cx_Oracle dialects attempt to call the
- :meth:`_engine.Connection.get_isolation_level` method when the dialect makes
- its first connection to the database in order to acquire the
- "default"isolation level. This default level is necessary so that the level
- can be reset on a connection after it has been temporarily modified using
- :meth:`_engine.Connection.execution_options` method. In the common event
- that the :meth:`_engine.Connection.get_isolation_level` method raises an
- exception due to ``v$transaction`` not being readable as well as any other
- database-related failure, the level is assumed to be "READ COMMITTED". No
- warning is emitted for this initial first-connect condition as it is
- expected to be a common restriction on Oracle databases.
- .. versionadded:: 1.3.16 added support for AUTOCOMMIT to the cx_Oracle dialect
- as well as the notion of a default isolation level
- .. versionadded:: 1.3.21 Added support for SERIALIZABLE as well as live
- reading of the isolation level.
- .. versionchanged:: 1.3.22 In the event that the default isolation
- level cannot be read due to permissions on the v$transaction view as
- is common in Oracle installations, the default isolation level is hardcoded
- to "READ COMMITTED" which was the behavior prior to 1.3.21.
- .. seealso::
- :ref:`dbapi_autocommit`
- Identifier Casing
- -----------------
- In Oracle Database, the data dictionary represents all case insensitive
- identifier names using UPPERCASE text. This is in contradiction to the
- expectations of SQLAlchemy, which assume a case insensitive name is represented
- as lowercase text.
- As an example of case insensitive identifier names, consider the following table:
- .. sourcecode:: sql
- CREATE TABLE MyTable (Identifier INTEGER PRIMARY KEY)
- If you were to ask Oracle Database for information about this table, the
- table name would be reported as ``MYTABLE`` and the column name would
- be reported as ``IDENTIFIER``. Compare to most other databases such as
- PostgreSQL and MySQL which would report these names as ``mytable`` and
- ``identifier``. The names are **not quoted, therefore are case insensitive**.
- The special casing of ``MyTable`` and ``Identifier`` would only be maintained
- if they were quoted in the table definition:
- .. sourcecode:: sql
- CREATE TABLE "MyTable" ("Identifier" INTEGER PRIMARY KEY)
- When constructing a SQLAlchemy :class:`.Table` object, **an all lowercase name
- is considered to be case insensitive**. So the following table assumes
- case insensitive names::
- Table("mytable", metadata, Column("identifier", Integer, primary_key=True))
- Whereas when mixed case or UPPERCASE names are used, case sensitivity is
- assumed::
- Table("MyTable", metadata, Column("Identifier", Integer, primary_key=True))
- A similar situation occurs at the database driver level when emitting a
- textual SQL SELECT statement and looking at column names in the DBAPI
- ``cursor.description`` attribute. A database like PostgreSQL will normalize
- case insensitive names to be lowercase::
- >>> pg_engine = create_engine("postgresql://scott:tiger@localhost/test")
- >>> pg_connection = pg_engine.connect()
- >>> result = pg_connection.exec_driver_sql("SELECT 1 AS SomeName")
- >>> result.cursor.description
- (Column(name='somename', type_code=23),)
- Whereas Oracle normalizes them to UPPERCASE::
- >>> oracle_engine = create_engine("oracle+oracledb://scott:tiger@oracle18c/xe")
- >>> oracle_connection = oracle_engine.connect()
- >>> result = oracle_connection.exec_driver_sql(
- ... "SELECT 1 AS SomeName FROM DUAL"
- ... )
- >>> result.cursor.description
- [('SOMENAME', <DbType DB_TYPE_NUMBER>, 127, None, 0, -127, True)]
- In order to achieve cross-database parity for the two cases of a. table
- reflection and b. textual-only SQL statement round trips, SQLAlchemy performs a step
- called **name normalization** when using the Oracle dialect. This process may
- also apply to other third party dialects that have similar UPPERCASE handling
- of case insensitive names.
- When using name normalization, SQLAlchemy attempts to detect if a name is
- case insensitive by checking if all characters are UPPERCASE letters only;
- if so, then it assumes this is a case insensitive name and is delivered as
- a lowercase name.
- For table reflection, a tablename that is seen represented as all UPPERCASE
- in Oracle Database's catalog tables will be assumed to have a case insensitive
- name. This is what allows the ``Table`` definition to use lower case names
- and be equally compatible from a reflection point of view on Oracle Database
- and all other databases such as PostgreSQL and MySQL::
- # matches a table created with CREATE TABLE mytable
- Table("mytable", metadata, autoload_with=some_engine)
- Above, the all lowercase name ``"mytable"`` is case insensitive; it will match
- a table reported by PostgreSQL as ``"mytable"`` and a table reported by
- Oracle as ``"MYTABLE"``. If name normalization were not present, it would
- not be possible for the above :class:`.Table` definition to be introspectable
- in a cross-database way, since we are dealing with a case insensitive name
- that is not reported by each database in the same way.
- Case sensitivity can be forced on in this case, such as if we wanted to represent
- the quoted tablename ``"MYTABLE"`` with that exact casing, most simply by using
- that casing directly, which will be seen as a case sensitive name::
- # matches a table created with CREATE TABLE "MYTABLE"
- Table("MYTABLE", metadata, autoload_with=some_engine)
- For the unusual case of a quoted all-lowercase name, the :class:`.quoted_name`
- construct may be used::
- from sqlalchemy import quoted_name
- # matches a table created with CREATE TABLE "mytable"
- Table(
- quoted_name("mytable", quote=True), metadata, autoload_with=some_engine
- )
- Name normalization also takes place when handling result sets from **purely
- textual SQL strings**, that have no other :class:`.Table` or :class:`.Column`
- metadata associated with them. This includes SQL strings executed using
- :meth:`.Connection.exec_driver_sql` and SQL strings executed using the
- :func:`.text` construct which do not include :class:`.Column` metadata.
- Returning to the Oracle Database SELECT statement, we see that even though
- ``cursor.description`` reports the column name as ``SOMENAME``, SQLAlchemy
- name normalizes this to ``somename``::
- >>> oracle_engine = create_engine("oracle+oracledb://scott:tiger@oracle18c/xe")
- >>> oracle_connection = oracle_engine.connect()
- >>> result = oracle_connection.exec_driver_sql(
- ... "SELECT 1 AS SomeName FROM DUAL"
- ... )
- >>> result.cursor.description
- [('SOMENAME', <DbType DB_TYPE_NUMBER>, 127, None, 0, -127, True)]
- >>> result.keys()
- RMKeyView(['somename'])
- The single scenario where the above behavior produces inaccurate results
- is when using an all-uppercase, quoted name. SQLAlchemy has no way to determine
- that a particular name in ``cursor.description`` was quoted, and is therefore
- case sensitive, or was not quoted, and should be name normalized::
- >>> result = oracle_connection.exec_driver_sql(
- ... 'SELECT 1 AS "SOMENAME" FROM DUAL'
- ... )
- >>> result.cursor.description
- [('SOMENAME', <DbType DB_TYPE_NUMBER>, 127, None, 0, -127, True)]
- >>> result.keys()
- RMKeyView(['somename'])
- For this case, a new feature will be available in SQLAlchemy 2.1 to disable
- the name normalization behavior in specific cases.
- .. _oracle_max_identifier_lengths:
- Maximum Identifier Lengths
- --------------------------
- SQLAlchemy is sensitive to the maximum identifier length supported by Oracle
- Database. This affects generated SQL label names as well as the generation of
- constraint names, particularly in the case where the constraint naming
- convention feature described at :ref:`constraint_naming_conventions` is being
- used.
- Oracle Database 12.2 increased the default maximum identifier length from 30 to
- 128. As of SQLAlchemy 1.4, the default maximum identifier length for the Oracle
- dialects is 128 characters. Upon first connection, the maximum length actually
- supported by the database is obtained. In all cases, setting the
- :paramref:`_sa.create_engine.max_identifier_length` parameter will bypass this
- change and the value given will be used as is::
- engine = create_engine(
- "oracle+oracledb://scott:tiger@localhost:1521?service_name=freepdb1",
- max_identifier_length=30,
- )
- If :paramref:`_sa.create_engine.max_identifier_length` is not set, the oracledb
- dialect internally uses the ``max_identifier_length`` attribute available on
- driver connections since python-oracledb version 2.5. When using an older
- driver version, or using the cx_Oracle dialect, SQLAlchemy will instead attempt
- to use the query ``SELECT value FROM v$parameter WHERE name = 'compatible'``
- upon first connect in order to determine the effective compatibility version of
- the database. The "compatibility" version is a version number that is
- independent of the actual database version. It is used to assist database
- migration. It is configured by an Oracle Database initialization parameter. The
- compatibility version then determines the maximum allowed identifier length for
- the database. If the V$ view is not available, the database version information
- is used instead.
- The maximum identifier length comes into play both when generating anonymized
- SQL labels in SELECT statements, but more crucially when generating constraint
- names from a naming convention. It is this area that has created the need for
- SQLAlchemy to change this default conservatively. For example, the following
- naming convention produces two very different constraint names based on the
- identifier length::
- from sqlalchemy import Column
- from sqlalchemy import Index
- from sqlalchemy import Integer
- from sqlalchemy import MetaData
- from sqlalchemy import Table
- from sqlalchemy.dialects import oracle
- from sqlalchemy.schema import CreateIndex
- m = MetaData(naming_convention={"ix": "ix_%(column_0N_name)s"})
- t = Table(
- "t",
- m,
- Column("some_column_name_1", Integer),
- Column("some_column_name_2", Integer),
- Column("some_column_name_3", Integer),
- )
- ix = Index(
- None,
- t.c.some_column_name_1,
- t.c.some_column_name_2,
- t.c.some_column_name_3,
- )
- oracle_dialect = oracle.dialect(max_identifier_length=30)
- print(CreateIndex(ix).compile(dialect=oracle_dialect))
- With an identifier length of 30, the above CREATE INDEX looks like:
- .. sourcecode:: sql
- CREATE INDEX ix_some_column_name_1s_70cd ON t
- (some_column_name_1, some_column_name_2, some_column_name_3)
- However with length of 128, it becomes::
- .. sourcecode:: sql
- CREATE INDEX ix_some_column_name_1some_column_name_2some_column_name_3 ON t
- (some_column_name_1, some_column_name_2, some_column_name_3)
- Applications which have run versions of SQLAlchemy prior to 1.4 on Oracle
- Database version 12.2 or greater are therefore subject to the scenario of a
- database migration that wishes to "DROP CONSTRAINT" on a name that was
- previously generated with the shorter length. This migration will fail when
- the identifier length is changed without the name of the index or constraint
- first being adjusted. Such applications are strongly advised to make use of
- :paramref:`_sa.create_engine.max_identifier_length` in order to maintain
- control of the generation of truncated names, and to fully review and test all
- database migrations in a staging environment when changing this value to ensure
- that the impact of this change has been mitigated.
- .. versionchanged:: 1.4 the default max_identifier_length for Oracle Database
- is 128 characters, which is adjusted down to 30 upon first connect if the
- Oracle Database, or its compatibility setting, are lower than version 12.2.
- LIMIT/OFFSET/FETCH Support
- --------------------------
- Methods like :meth:`_sql.Select.limit` and :meth:`_sql.Select.offset` make use
- of ``FETCH FIRST N ROW / OFFSET N ROWS`` syntax assuming Oracle Database 12c or
- above, and assuming the SELECT statement is not embedded within a compound
- statement like UNION. This syntax is also available directly by using the
- :meth:`_sql.Select.fetch` method.
- .. versionchanged:: 2.0 the Oracle Database dialects now use ``FETCH FIRST N
- ROW / OFFSET N ROWS`` for all :meth:`_sql.Select.limit` and
- :meth:`_sql.Select.offset` usage including within the ORM and legacy
- :class:`_orm.Query`. To force the legacy behavior using window functions,
- specify the ``enable_offset_fetch=False`` dialect parameter to
- :func:`_sa.create_engine`.
- The use of ``FETCH FIRST / OFFSET`` may be disabled on any Oracle Database
- version by passing ``enable_offset_fetch=False`` to :func:`_sa.create_engine`,
- which will force the use of "legacy" mode that makes use of window functions.
- This mode is also selected automatically when using a version of Oracle
- Database prior to 12c.
- When using legacy mode, or when a :class:`.Select` statement with limit/offset
- is embedded in a compound statement, an emulated approach for LIMIT / OFFSET
- based on window functions is used, which involves creation of a subquery using
- ``ROW_NUMBER`` that is prone to performance issues as well as SQL construction
- issues for complex statements. However, this approach is supported by all
- Oracle Database versions. See notes below.
- Notes on LIMIT / OFFSET emulation (when fetch() method cannot be used)
- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- If using :meth:`_sql.Select.limit` and :meth:`_sql.Select.offset`, or with the
- ORM the :meth:`_orm.Query.limit` and :meth:`_orm.Query.offset` methods on an
- Oracle Database version prior to 12c, the following notes apply:
- * SQLAlchemy currently makes use of ROWNUM to achieve
- LIMIT/OFFSET; the exact methodology is taken from
- https://blogs.oracle.com/oraclemagazine/on-rownum-and-limiting-results .
- * the "FIRST_ROWS()" optimization keyword is not used by default. To enable
- the usage of this optimization directive, specify ``optimize_limits=True``
- to :func:`_sa.create_engine`.
- .. versionchanged:: 1.4
- The Oracle Database dialect renders limit/offset integer values using a
- "post compile" scheme which renders the integer directly before passing
- the statement to the cursor for execution. The ``use_binds_for_limits``
- flag no longer has an effect.
- .. seealso::
- :ref:`change_4808`.
- .. _oracle_returning:
- RETURNING Support
- -----------------
- Oracle Database supports RETURNING fully for INSERT, UPDATE and DELETE
- statements that are invoked with a single collection of bound parameters (that
- is, a ``cursor.execute()`` style statement; SQLAlchemy does not generally
- support RETURNING with :term:`executemany` statements). Multiple rows may be
- returned as well.
- .. versionchanged:: 2.0 the Oracle Database backend has full support for
- RETURNING on parity with other backends.
- ON UPDATE CASCADE
- -----------------
- Oracle Database doesn't have native ON UPDATE CASCADE functionality. A trigger
- based solution is available at
- https://web.archive.org/web/20090317041251/https://asktom.oracle.com/tkyte/update_cascade/index.html
- When using the SQLAlchemy ORM, the ORM has limited ability to manually issue
- cascading updates - specify ForeignKey objects using the
- "deferrable=True, initially='deferred'" keyword arguments,
- and specify "passive_updates=False" on each relationship().
- Oracle Database 8 Compatibility
- -------------------------------
- .. warning:: The status of Oracle Database 8 compatibility is not known for
- SQLAlchemy 2.0.
- When Oracle Database 8 is detected, the dialect internally configures itself to
- the following behaviors:
- * the use_ansi flag is set to False. This has the effect of converting all
- JOIN phrases into the WHERE clause, and in the case of LEFT OUTER JOIN
- makes use of Oracle's (+) operator.
- * the NVARCHAR2 and NCLOB datatypes are no longer generated as DDL when
- the :class:`~sqlalchemy.types.Unicode` is used - VARCHAR2 and CLOB are issued
- instead. This because these types don't seem to work correctly on Oracle 8
- even though they are available. The :class:`~sqlalchemy.types.NVARCHAR` and
- :class:`~sqlalchemy.dialects.oracle.NCLOB` types will always generate
- NVARCHAR2 and NCLOB.
- Synonym/DBLINK Reflection
- -------------------------
- When using reflection with Table objects, the dialect can optionally search
- for tables indicated by synonyms, either in local or remote schemas or
- accessed over DBLINK, by passing the flag ``oracle_resolve_synonyms=True`` as
- a keyword argument to the :class:`_schema.Table` construct::
- some_table = Table(
- "some_table", autoload_with=some_engine, oracle_resolve_synonyms=True
- )
- When this flag is set, the given name (such as ``some_table`` above) will be
- searched not just in the ``ALL_TABLES`` view, but also within the
- ``ALL_SYNONYMS`` view to see if this name is actually a synonym to another
- name. If the synonym is located and refers to a DBLINK, the Oracle Database
- dialects know how to locate the table's information using DBLINK syntax(e.g.
- ``@dblink``).
- ``oracle_resolve_synonyms`` is accepted wherever reflection arguments are
- accepted, including methods such as :meth:`_schema.MetaData.reflect` and
- :meth:`_reflection.Inspector.get_columns`.
- If synonyms are not in use, this flag should be left disabled.
- .. _oracle_constraint_reflection:
- Constraint Reflection
- ---------------------
- The Oracle Database dialects can return information about foreign key, unique,
- and CHECK constraints, as well as indexes on tables.
- Raw information regarding these constraints can be acquired using
- :meth:`_reflection.Inspector.get_foreign_keys`,
- :meth:`_reflection.Inspector.get_unique_constraints`,
- :meth:`_reflection.Inspector.get_check_constraints`, and
- :meth:`_reflection.Inspector.get_indexes`.
- .. versionchanged:: 1.2 The Oracle Database dialect can now reflect UNIQUE and
- CHECK constraints.
- When using reflection at the :class:`_schema.Table` level, the
- :class:`_schema.Table`
- will also include these constraints.
- Note the following caveats:
- * When using the :meth:`_reflection.Inspector.get_check_constraints` method,
- Oracle Database builds a special "IS NOT NULL" constraint for columns that
- specify "NOT NULL". This constraint is **not** returned by default; to
- include the "IS NOT NULL" constraints, pass the flag ``include_all=True``::
- from sqlalchemy import create_engine, inspect
- engine = create_engine(
- "oracle+oracledb://scott:tiger@localhost:1521?service_name=freepdb1"
- )
- inspector = inspect(engine)
- all_check_constraints = inspector.get_check_constraints(
- "some_table", include_all=True
- )
- * in most cases, when reflecting a :class:`_schema.Table`, a UNIQUE constraint
- will **not** be available as a :class:`.UniqueConstraint` object, as Oracle
- Database mirrors unique constraints with a UNIQUE index in most cases (the
- exception seems to be when two or more unique constraints represent the same
- columns); the :class:`_schema.Table` will instead represent these using
- :class:`.Index` with the ``unique=True`` flag set.
- * Oracle Database creates an implicit index for the primary key of a table;
- this index is **excluded** from all index results.
- * the list of columns reflected for an index will not include column names
- that start with SYS_NC.
- Table names with SYSTEM/SYSAUX tablespaces
- -------------------------------------------
- The :meth:`_reflection.Inspector.get_table_names` and
- :meth:`_reflection.Inspector.get_temp_table_names`
- methods each return a list of table names for the current engine. These methods
- are also part of the reflection which occurs within an operation such as
- :meth:`_schema.MetaData.reflect`. By default,
- these operations exclude the ``SYSTEM``
- and ``SYSAUX`` tablespaces from the operation. In order to change this, the
- default list of tablespaces excluded can be changed at the engine level using
- the ``exclude_tablespaces`` parameter::
- # exclude SYSAUX and SOME_TABLESPACE, but not SYSTEM
- e = create_engine(
- "oracle+oracledb://scott:tiger@localhost:1521/?service_name=freepdb1",
- exclude_tablespaces=["SYSAUX", "SOME_TABLESPACE"],
- )
- .. _oracle_float_support:
- FLOAT / DOUBLE Support and Behaviors
- ------------------------------------
- The SQLAlchemy :class:`.Float` and :class:`.Double` datatypes are generic
- datatypes that resolve to the "least surprising" datatype for a given backend.
- For Oracle Database, this means they resolve to the ``FLOAT`` and ``DOUBLE``
- types::
- >>> from sqlalchemy import cast, literal, Float
- >>> from sqlalchemy.dialects import oracle
- >>> float_datatype = Float()
- >>> print(cast(literal(5.0), float_datatype).compile(dialect=oracle.dialect()))
- CAST(:param_1 AS FLOAT)
- Oracle's ``FLOAT`` / ``DOUBLE`` datatypes are aliases for ``NUMBER``. Oracle
- Database stores ``NUMBER`` values with full precision, not floating point
- precision, which means that ``FLOAT`` / ``DOUBLE`` do not actually behave like
- native FP values. Oracle Database instead offers special datatypes
- ``BINARY_FLOAT`` and ``BINARY_DOUBLE`` to deliver real 4- and 8- byte FP
- values.
- SQLAlchemy supports these datatypes directly using :class:`.BINARY_FLOAT` and
- :class:`.BINARY_DOUBLE`. To use the :class:`.Float` or :class:`.Double`
- datatypes in a database agnostic way, while allowing Oracle backends to utilize
- one of these types, use the :meth:`.TypeEngine.with_variant` method to set up a
- variant::
- >>> from sqlalchemy import cast, literal, Float
- >>> from sqlalchemy.dialects import oracle
- >>> float_datatype = Float().with_variant(oracle.BINARY_FLOAT(), "oracle")
- >>> print(cast(literal(5.0), float_datatype).compile(dialect=oracle.dialect()))
- CAST(:param_1 AS BINARY_FLOAT)
- E.g. to use this datatype in a :class:`.Table` definition::
- my_table = Table(
- "my_table",
- metadata,
- Column(
- "fp_data", Float().with_variant(oracle.BINARY_FLOAT(), "oracle")
- ),
- )
- DateTime Compatibility
- ----------------------
- Oracle Database has no datatype known as ``DATETIME``, it instead has only
- ``DATE``, which can actually store a date and time value. For this reason, the
- Oracle Database dialects provide a type :class:`_oracle.DATE` which is a
- subclass of :class:`.DateTime`. This type has no special behavior, and is only
- present as a "marker" for this type; additionally, when a database column is
- reflected and the type is reported as ``DATE``, the time-supporting
- :class:`_oracle.DATE` type is used.
- .. _oracle_table_options:
- Oracle Database Table Options
- -----------------------------
- The CREATE TABLE phrase supports the following options with Oracle Database
- dialects in conjunction with the :class:`_schema.Table` construct:
- * ``ON COMMIT``::
- Table(
- "some_table",
- metadata,
- ...,
- prefixes=["GLOBAL TEMPORARY"],
- oracle_on_commit="PRESERVE ROWS",
- )
- *
- ``COMPRESS``::
- Table(
- "mytable", metadata, Column("data", String(32)), oracle_compress=True
- )
- Table("mytable", metadata, Column("data", String(32)), oracle_compress=6)
- The ``oracle_compress`` parameter accepts either an integer compression
- level, or ``True`` to use the default compression level.
- *
- ``TABLESPACE``::
- Table("mytable", metadata, ..., oracle_tablespace="EXAMPLE_TABLESPACE")
- The ``oracle_tablespace`` parameter specifies the tablespace in which the
- table is to be created. This is useful when you want to create a table in a
- tablespace other than the default tablespace of the user.
- .. versionadded:: 2.0.37
- .. _oracle_index_options:
- Oracle Database Specific Index Options
- --------------------------------------
- Bitmap Indexes
- ~~~~~~~~~~~~~~
- You can specify the ``oracle_bitmap`` parameter to create a bitmap index
- instead of a B-tree index::
- Index("my_index", my_table.c.data, oracle_bitmap=True)
- Bitmap indexes cannot be unique and cannot be compressed. SQLAlchemy will not
- check for such limitations, only the database will.
- Index compression
- ~~~~~~~~~~~~~~~~~
- Oracle Database has a more efficient storage mode for indexes containing lots
- of repeated values. Use the ``oracle_compress`` parameter to turn on key
- compression::
- Index("my_index", my_table.c.data, oracle_compress=True)
- Index(
- "my_index",
- my_table.c.data1,
- my_table.c.data2,
- unique=True,
- oracle_compress=1,
- )
- The ``oracle_compress`` parameter accepts either an integer specifying the
- number of prefix columns to compress, or ``True`` to use the default (all
- columns for non-unique indexes, all but the last column for unique indexes).
- .. _oracle_vector_datatype:
- VECTOR Datatype
- ---------------
- Oracle Database 23ai introduced a new VECTOR datatype for artificial intelligence
- and machine learning search operations. The VECTOR datatype is a homogeneous array
- of 8-bit signed integers, 8-bit unsigned integers (binary), 32-bit floating-point
- numbers, or 64-bit floating-point numbers.
- A vector's storage type can be either DENSE or SPARSE. A dense vector contains
- meaningful values in most or all of its dimensions. In contrast, a sparse vector
- has non-zero values in only a few dimensions, with the majority being zero.
- Sparse vectors are represented by the total number of vector dimensions, an array
- of indices, and an array of values where each value’s location in the vector is
- indicated by the corresponding indices array position. All other vector values are
- treated as zero.
- The storage formats that can be used with sparse vectors are float32, float64, and
- int8. Note that the binary storage format cannot be used with sparse vectors.
- Sparse vectors are supported when you are using Oracle Database 23.7 or later.
- .. seealso::
- `Using VECTOR Data
- <https://python-oracledb.readthedocs.io/en/latest/user_guide/vector_data_type.html>`_ - in the documentation
- for the :ref:`oracledb` driver.
- .. versionadded:: 2.0.41 - Added VECTOR datatype
- .. versionadded:: 2.0.43 - Added DENSE/SPARSE support
- CREATE TABLE support for VECTOR
- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- With the :class:`.VECTOR` datatype, you can specify the number of dimensions,
- the storage format, and the storage type for the data. Valid values for the
- storage format are enum members of :class:`.VectorStorageFormat`. Valid values
- for the storage type are enum members of :class:`.VectorStorageType`. If
- storage type is not specified, a DENSE vector is created by default.
- To create a table that includes a :class:`.VECTOR` column::
- from sqlalchemy.dialects.oracle import (
- VECTOR,
- VectorStorageFormat,
- VectorStorageType,
- )
- t = Table(
- "t1",
- metadata,
- Column("id", Integer, primary_key=True),
- Column(
- "embedding",
- VECTOR(
- dim=3,
- storage_format=VectorStorageFormat.FLOAT32,
- storage_type=VectorStorageType.SPARSE,
- ),
- ),
- Column(...),
- ...,
- )
- Vectors can also be defined with an arbitrary number of dimensions and formats.
- This allows you to specify vectors of different dimensions with the various
- storage formats mentioned below.
- **Examples**
- * In this case, the storage format is flexible, allowing any vector type data to be
- inserted, such as INT8 or BINARY etc::
- vector_col: Mapped[array.array] = mapped_column(VECTOR(dim=3))
- * The dimension is flexible in this case, meaning that any dimension vector can
- be used::
- vector_col: Mapped[array.array] = mapped_column(
- VECTOR(storage_format=VectorStorageType.INT8)
- )
- * Both the dimensions and the storage format are flexible. It creates a DENSE vector::
- vector_col: Mapped[array.array] = mapped_column(VECTOR)
- * To create a SPARSE vector with both dimensions and the storage format as flexible,
- use the :attr:`.VectorStorageType.SPARSE` storage type::
- vector_col: Mapped[array.array] = mapped_column(
- VECTOR(storage_type=VectorStorageType.SPARSE)
- )
- Python Datatypes for VECTOR
- ~~~~~~~~~~~~~~~~~~~~~~~~~~~
- VECTOR data can be inserted using Python list or Python ``array.array()`` objects.
- Python arrays of type FLOAT (32-bit), DOUBLE (64-bit), INT (8-bit signed integers),
- or BINARY (8-bit unsigned integers) are used as bind values when inserting
- VECTOR columns::
- from sqlalchemy import insert, select
- with engine.begin() as conn:
- conn.execute(
- insert(t1),
- {"id": 1, "embedding": [1, 2, 3]},
- )
- Data can be inserted into a sparse vector using the :class:`_oracle.SparseVector`
- class, creating an object consisting of the number of dimensions, an array of indices, and a
- corresponding array of values::
- from sqlalchemy import insert, select
- from sqlalchemy.dialects.oracle import SparseVector
- sparse_val = SparseVector(10, [1, 2], array.array("d", [23.45, 221.22]))
- with engine.begin() as conn:
- conn.execute(
- insert(t1),
- {"id": 1, "embedding": sparse_val},
- )
- VECTOR Indexes
- ~~~~~~~~~~~~~~
- The VECTOR feature supports an Oracle-specific parameter ``oracle_vector``
- on the :class:`.Index` construct, which allows the construction of VECTOR
- indexes.
- SPARSE vectors cannot be used in the creation of vector indexes.
- To utilize VECTOR indexing, set the ``oracle_vector`` parameter to True to use
- the default values provided by Oracle. HNSW is the default indexing method::
- from sqlalchemy import Index
- Index(
- "vector_index",
- t1.c.embedding,
- oracle_vector=True,
- )
- The full range of parameters for vector indexes are available by using the
- :class:`.VectorIndexConfig` dataclass in place of a boolean; this dataclass
- allows full configuration of the index::
- Index(
- "hnsw_vector_index",
- t1.c.embedding,
- oracle_vector=VectorIndexConfig(
- index_type=VectorIndexType.HNSW,
- distance=VectorDistanceType.COSINE,
- accuracy=90,
- hnsw_neighbors=5,
- hnsw_efconstruction=20,
- parallel=10,
- ),
- )
- Index(
- "ivf_vector_index",
- t1.c.embedding,
- oracle_vector=VectorIndexConfig(
- index_type=VectorIndexType.IVF,
- distance=VectorDistanceType.DOT,
- accuracy=90,
- ivf_neighbor_partitions=5,
- ),
- )
- For complete explanation of these parameters, see the Oracle documentation linked
- below.
- .. seealso::
- `CREATE VECTOR INDEX <https://www.oracle.com/pls/topic/lookup?ctx=dblatest&id=GUID-B396C369-54BB-4098-A0DD-7C54B3A0D66F>`_ - in the Oracle documentation
- Similarity Searching
- ~~~~~~~~~~~~~~~~~~~~
- When using the :class:`_oracle.VECTOR` datatype with a :class:`.Column` or similar
- ORM mapped construct, additional comparison functions are available, including:
- * ``l2_distance``
- * ``cosine_distance``
- * ``inner_product``
- Example Usage::
- result_vector = connection.scalars(
- select(t1).order_by(t1.embedding.l2_distance([2, 3, 4])).limit(3)
- )
- for user in vector:
- print(user.id, user.embedding)
- FETCH APPROXIMATE support
- ~~~~~~~~~~~~~~~~~~~~~~~~~
- Approximate vector search can only be performed when all syntax and semantic
- rules are satisfied, the corresponding vector index is available, and the
- query optimizer determines to perform it. If any of these conditions are
- unmet, then an approximate search is not performed. In this case the query
- returns exact results.
- To enable approximate searching during similarity searches on VECTORS, the
- ``oracle_fetch_approximate`` parameter may be used with the :meth:`.Select.fetch`
- clause to add ``FETCH APPROX`` to the SELECT statement::
- select(users_table).fetch(5, oracle_fetch_approximate=True)
- """ # noqa
- from __future__ import annotations
- from collections import defaultdict
- from dataclasses import fields
- from functools import lru_cache
- from functools import wraps
- import re
- from . import dictionary
- from .types import _OracleBoolean
- from .types import _OracleDate
- from .types import BFILE
- from .types import BINARY_DOUBLE
- from .types import BINARY_FLOAT
- from .types import DATE
- from .types import FLOAT
- from .types import INTERVAL
- from .types import LONG
- from .types import NCLOB
- from .types import NUMBER
- from .types import NVARCHAR2 # noqa
- from .types import OracleRaw # noqa
- from .types import RAW
- from .types import ROWID # noqa
- from .types import TIMESTAMP
- from .types import VARCHAR2 # noqa
- from .vector import VECTOR
- from .vector import VectorIndexConfig
- from .vector import VectorIndexType
- from ... import Computed
- from ... import exc
- from ... import schema as sa_schema
- from ... import sql
- from ... import util
- from ...engine import default
- from ...engine import ObjectKind
- from ...engine import ObjectScope
- from ...engine import reflection
- from ...engine.reflection import ReflectionDefaults
- from ...sql import and_
- from ...sql import bindparam
- from ...sql import compiler
- from ...sql import expression
- from ...sql import func
- from ...sql import null
- from ...sql import or_
- from ...sql import select
- from ...sql import selectable as sa_selectable
- from ...sql import sqltypes
- from ...sql import util as sql_util
- from ...sql import visitors
- from ...sql.visitors import InternalTraversal
- from ...types import BLOB
- from ...types import CHAR
- from ...types import CLOB
- from ...types import DOUBLE_PRECISION
- from ...types import INTEGER
- from ...types import NCHAR
- from ...types import NVARCHAR
- from ...types import REAL
- from ...types import VARCHAR
- RESERVED_WORDS = set(
- "SHARE RAW DROP BETWEEN FROM DESC OPTION PRIOR LONG THEN "
- "DEFAULT ALTER IS INTO MINUS INTEGER NUMBER GRANT IDENTIFIED "
- "ALL TO ORDER ON FLOAT DATE HAVING CLUSTER NOWAIT RESOURCE "
- "ANY TABLE INDEX FOR UPDATE WHERE CHECK SMALLINT WITH DELETE "
- "BY ASC REVOKE LIKE SIZE RENAME NOCOMPRESS NULL GROUP VALUES "
- "AS IN VIEW EXCLUSIVE COMPRESS SYNONYM SELECT INSERT EXISTS "
- "NOT TRIGGER ELSE CREATE INTERSECT PCTFREE DISTINCT USER "
- "CONNECT SET MODE OF UNIQUE VARCHAR2 VARCHAR LOCK OR CHAR "
- "DECIMAL UNION PUBLIC AND START UID COMMENT CURRENT LEVEL".split()
- )
- NO_ARG_FNS = set(
- "UID CURRENT_DATE SYSDATE USER CURRENT_TIME CURRENT_TIMESTAMP".split()
- )
- colspecs = {
- sqltypes.Boolean: _OracleBoolean,
- sqltypes.Interval: INTERVAL,
- sqltypes.DateTime: DATE,
- sqltypes.Date: _OracleDate,
- }
- ischema_names = {
- "VARCHAR2": VARCHAR,
- "NVARCHAR2": NVARCHAR,
- "CHAR": CHAR,
- "NCHAR": NCHAR,
- "DATE": DATE,
- "NUMBER": NUMBER,
- "BLOB": BLOB,
- "BFILE": BFILE,
- "CLOB": CLOB,
- "NCLOB": NCLOB,
- "TIMESTAMP": TIMESTAMP,
- "TIMESTAMP WITH TIME ZONE": TIMESTAMP,
- "TIMESTAMP WITH LOCAL TIME ZONE": TIMESTAMP,
- "INTERVAL DAY TO SECOND": INTERVAL,
- "RAW": RAW,
- "FLOAT": FLOAT,
- "DOUBLE PRECISION": DOUBLE_PRECISION,
- "REAL": REAL,
- "LONG": LONG,
- "BINARY_DOUBLE": BINARY_DOUBLE,
- "BINARY_FLOAT": BINARY_FLOAT,
- "ROWID": ROWID,
- "VECTOR": VECTOR,
- }
- class OracleTypeCompiler(compiler.GenericTypeCompiler):
- # Note:
- # Oracle DATE == DATETIME
- # Oracle does not allow milliseconds in DATE
- # Oracle does not support TIME columns
- def visit_datetime(self, type_, **kw):
- return self.visit_DATE(type_, **kw)
- def visit_float(self, type_, **kw):
- return self.visit_FLOAT(type_, **kw)
- def visit_double(self, type_, **kw):
- return self.visit_DOUBLE_PRECISION(type_, **kw)
- def visit_unicode(self, type_, **kw):
- if self.dialect._use_nchar_for_unicode:
- return self.visit_NVARCHAR2(type_, **kw)
- else:
- return self.visit_VARCHAR2(type_, **kw)
- def visit_INTERVAL(self, type_, **kw):
- return "INTERVAL DAY%s TO SECOND%s" % (
- type_.day_precision is not None
- and "(%d)" % type_.day_precision
- or "",
- type_.second_precision is not None
- and "(%d)" % type_.second_precision
- or "",
- )
- def visit_LONG(self, type_, **kw):
- return "LONG"
- def visit_TIMESTAMP(self, type_, **kw):
- if getattr(type_, "local_timezone", False):
- return "TIMESTAMP WITH LOCAL TIME ZONE"
- elif type_.timezone:
- return "TIMESTAMP WITH TIME ZONE"
- else:
- return "TIMESTAMP"
- def visit_DOUBLE_PRECISION(self, type_, **kw):
- return self._generate_numeric(type_, "DOUBLE PRECISION", **kw)
- def visit_BINARY_DOUBLE(self, type_, **kw):
- return self._generate_numeric(type_, "BINARY_DOUBLE", **kw)
- def visit_BINARY_FLOAT(self, type_, **kw):
- return self._generate_numeric(type_, "BINARY_FLOAT", **kw)
- def visit_FLOAT(self, type_, **kw):
- kw["_requires_binary_precision"] = True
- return self._generate_numeric(type_, "FLOAT", **kw)
- def visit_NUMBER(self, type_, **kw):
- return self._generate_numeric(type_, "NUMBER", **kw)
- def _generate_numeric(
- self,
- type_,
- name,
- precision=None,
- scale=None,
- _requires_binary_precision=False,
- **kw,
- ):
- if precision is None:
- precision = getattr(type_, "precision", None)
- if _requires_binary_precision:
- binary_precision = getattr(type_, "binary_precision", None)
- if precision and binary_precision is None:
- # https://www.oracletutorial.com/oracle-basics/oracle-float/
- estimated_binary_precision = int(precision / 0.30103)
- raise exc.ArgumentError(
- "Oracle Database FLOAT types use 'binary precision', "
- "which does not convert cleanly from decimal "
- "'precision'. Please specify "
- "this type with a separate Oracle Database variant, such "
- f"as {type_.__class__.__name__}(precision={precision})."
- f"with_variant(oracle.FLOAT"
- f"(binary_precision="
- f"{estimated_binary_precision}), 'oracle'), so that the "
- "Oracle Database specific 'binary_precision' may be "
- "specified accurately."
- )
- else:
- precision = binary_precision
- if scale is None:
- scale = getattr(type_, "scale", None)
- if precision is None:
- return name
- elif scale is None:
- n = "%(name)s(%(precision)s)"
- return n % {"name": name, "precision": precision}
- else:
- n = "%(name)s(%(precision)s, %(scale)s)"
- return n % {"name": name, "precision": precision, "scale": scale}
- def visit_string(self, type_, **kw):
- return self.visit_VARCHAR2(type_, **kw)
- def visit_VARCHAR2(self, type_, **kw):
- return self._visit_varchar(type_, "", "2")
- def visit_NVARCHAR2(self, type_, **kw):
- return self._visit_varchar(type_, "N", "2")
- visit_NVARCHAR = visit_NVARCHAR2
- def visit_VARCHAR(self, type_, **kw):
- return self._visit_varchar(type_, "", "")
- def _visit_varchar(self, type_, n, num):
- if not type_.length:
- return "%(n)sVARCHAR%(two)s" % {"two": num, "n": n}
- elif not n and self.dialect._supports_char_length:
- varchar = "VARCHAR%(two)s(%(length)s CHAR)"
- return varchar % {"length": type_.length, "two": num}
- else:
- varchar = "%(n)sVARCHAR%(two)s(%(length)s)"
- return varchar % {"length": type_.length, "two": num, "n": n}
- def visit_text(self, type_, **kw):
- return self.visit_CLOB(type_, **kw)
- def visit_unicode_text(self, type_, **kw):
- if self.dialect._use_nchar_for_unicode:
- return self.visit_NCLOB(type_, **kw)
- else:
- return self.visit_CLOB(type_, **kw)
- def visit_large_binary(self, type_, **kw):
- return self.visit_BLOB(type_, **kw)
- def visit_big_integer(self, type_, **kw):
- return self.visit_NUMBER(type_, precision=19, **kw)
- def visit_boolean(self, type_, **kw):
- return self.visit_SMALLINT(type_, **kw)
- def visit_RAW(self, type_, **kw):
- if type_.length:
- return "RAW(%(length)s)" % {"length": type_.length}
- else:
- return "RAW"
- def visit_ROWID(self, type_, **kw):
- return "ROWID"
- def visit_VECTOR(self, type_, **kw):
- dim = type_.dim if type_.dim is not None else "*"
- storage_format = (
- type_.storage_format.value
- if type_.storage_format is not None
- else "*"
- )
- storage_type = (
- type_.storage_type.value if type_.storage_type is not None else "*"
- )
- return f"VECTOR({dim},{storage_format},{storage_type})"
- class OracleCompiler(compiler.SQLCompiler):
- """Oracle compiler modifies the lexical structure of Select
- statements to work under non-ANSI configured Oracle databases, if
- the use_ansi flag is False.
- """
- compound_keywords = util.update_copy(
- compiler.SQLCompiler.compound_keywords,
- {expression.CompoundSelect.EXCEPT: "MINUS"},
- )
- def __init__(self, *args, **kwargs):
- self.__wheres = {}
- super().__init__(*args, **kwargs)
- def visit_mod_binary(self, binary, operator, **kw):
- return "mod(%s, %s)" % (
- self.process(binary.left, **kw),
- self.process(binary.right, **kw),
- )
- def visit_now_func(self, fn, **kw):
- return "CURRENT_TIMESTAMP"
- def visit_char_length_func(self, fn, **kw):
- return "LENGTH" + self.function_argspec(fn, **kw)
- def visit_match_op_binary(self, binary, operator, **kw):
- return "CONTAINS (%s, %s)" % (
- self.process(binary.left),
- self.process(binary.right),
- )
- def visit_true(self, expr, **kw):
- return "1"
- def visit_false(self, expr, **kw):
- return "0"
- def get_cte_preamble(self, recursive):
- return "WITH"
- def get_select_hint_text(self, byfroms):
- return " ".join("/*+ %s */" % text for table, text in byfroms.items())
- def function_argspec(self, fn, **kw):
- if len(fn.clauses) > 0 or fn.name.upper() not in NO_ARG_FNS:
- return compiler.SQLCompiler.function_argspec(self, fn, **kw)
- else:
- return ""
- def visit_function(self, func, **kw):
- text = super().visit_function(func, **kw)
- if kw.get("asfrom", False) and func.name.lower() != "table":
- text = "TABLE (%s)" % text
- return text
- def visit_table_valued_column(self, element, **kw):
- text = super().visit_table_valued_column(element, **kw)
- text = text + ".COLUMN_VALUE"
- return text
- def default_from(self):
- """Called when a ``SELECT`` statement has no froms,
- and no ``FROM`` clause is to be appended.
- The Oracle compiler tacks a "FROM DUAL" to the statement.
- """
- return " FROM DUAL"
- def visit_join(self, join, from_linter=None, **kwargs):
- if self.dialect.use_ansi:
- return compiler.SQLCompiler.visit_join(
- self, join, from_linter=from_linter, **kwargs
- )
- else:
- if from_linter:
- from_linter.edges.add((join.left, join.right))
- kwargs["asfrom"] = True
- if isinstance(join.right, expression.FromGrouping):
- right = join.right.element
- else:
- right = join.right
- return (
- self.process(join.left, from_linter=from_linter, **kwargs)
- + ", "
- + self.process(right, from_linter=from_linter, **kwargs)
- )
- def _get_nonansi_join_whereclause(self, froms):
- clauses = []
- def visit_join(join):
- if join.isouter:
- # https://docs.oracle.com/database/121/SQLRF/queries006.htm#SQLRF52354
- # "apply the outer join operator (+) to all columns of B in
- # the join condition in the WHERE clause" - that is,
- # unconditionally regardless of operator or the other side
- def visit_binary(binary):
- if isinstance(
- binary.left, expression.ColumnClause
- ) and join.right.is_derived_from(binary.left.table):
- binary.left = _OuterJoinColumn(binary.left)
- elif isinstance(
- binary.right, expression.ColumnClause
- ) and join.right.is_derived_from(binary.right.table):
- binary.right = _OuterJoinColumn(binary.right)
- clauses.append(
- visitors.cloned_traverse(
- join.onclause, {}, {"binary": visit_binary}
- )
- )
- else:
- clauses.append(join.onclause)
- for j in join.left, join.right:
- if isinstance(j, expression.Join):
- visit_join(j)
- elif isinstance(j, expression.FromGrouping):
- visit_join(j.element)
- for f in froms:
- if isinstance(f, expression.Join):
- visit_join(f)
- if not clauses:
- return None
- else:
- return sql.and_(*clauses)
- def visit_outer_join_column(self, vc, **kw):
- return self.process(vc.column, **kw) + "(+)"
- def visit_sequence(self, seq, **kw):
- return self.preparer.format_sequence(seq) + ".nextval"
- def get_render_as_alias_suffix(self, alias_name_text):
- """Oracle doesn't like ``FROM table AS alias``"""
- return " " + alias_name_text
- def returning_clause(
- self, stmt, returning_cols, *, populate_result_map, **kw
- ):
- columns = []
- binds = []
- for i, column in enumerate(
- expression._select_iterables(returning_cols)
- ):
- if (
- self.isupdate
- and isinstance(column, sa_schema.Column)
- and isinstance(column.server_default, Computed)
- and not self.dialect._supports_update_returning_computed_cols
- ):
- util.warn(
- "Computed columns don't work with Oracle Database UPDATE "
- "statements that use RETURNING; the value of the column "
- "*before* the UPDATE takes place is returned. It is "
- "advised to not use RETURNING with an Oracle Database "
- "computed column. Consider setting implicit_returning "
- "to False on the Table object in order to avoid implicit "
- "RETURNING clauses from being generated for this Table."
- )
- if column.type._has_column_expression:
- col_expr = column.type.column_expression(column)
- else:
- col_expr = column
- outparam = sql.outparam("ret_%d" % i, type_=column.type)
- self.binds[outparam.key] = outparam
- binds.append(
- self.bindparam_string(self._truncate_bindparam(outparam))
- )
- # has_out_parameters would in a normal case be set to True
- # as a result of the compiler visiting an outparam() object.
- # in this case, the above outparam() objects are not being
- # visited. Ensure the statement itself didn't have other
- # outparam() objects independently.
- # technically, this could be supported, but as it would be
- # a very strange use case without a clear rationale, disallow it
- if self.has_out_parameters:
- raise exc.InvalidRequestError(
- "Using explicit outparam() objects with "
- "UpdateBase.returning() in the same Core DML statement "
- "is not supported in the Oracle Database dialects."
- )
- self._oracle_returning = True
- columns.append(self.process(col_expr, within_columns_clause=False))
- if populate_result_map:
- self._add_to_result_map(
- getattr(col_expr, "name", col_expr._anon_name_label),
- getattr(col_expr, "name", col_expr._anon_name_label),
- (
- column,
- getattr(column, "name", None),
- getattr(column, "key", None),
- ),
- column.type,
- )
- return "RETURNING " + ", ".join(columns) + " INTO " + ", ".join(binds)
- def _row_limit_clause(self, select, **kw):
- """Oracle Database 12c supports OFFSET/FETCH operators
- Use it instead subquery with row_number
- """
- if (
- select._fetch_clause is not None
- or not self.dialect._supports_offset_fetch
- ):
- return super()._row_limit_clause(
- select, use_literal_execute_for_simple_int=True, **kw
- )
- else:
- return self.fetch_clause(
- select,
- fetch_clause=self._get_limit_or_fetch(select),
- use_literal_execute_for_simple_int=True,
- **kw,
- )
- def _get_limit_or_fetch(self, select):
- if select._fetch_clause is None:
- return select._limit_clause
- else:
- return select._fetch_clause
- def fetch_clause(
- self,
- select,
- fetch_clause=None,
- require_offset=False,
- use_literal_execute_for_simple_int=False,
- **kw,
- ):
- text = super().fetch_clause(
- select,
- fetch_clause=fetch_clause,
- require_offset=require_offset,
- use_literal_execute_for_simple_int=(
- use_literal_execute_for_simple_int
- ),
- **kw,
- )
- if select.dialect_options["oracle"]["fetch_approximate"]:
- text = re.sub("FETCH FIRST", "FETCH APPROX FIRST", text)
- return text
- def translate_select_structure(self, select_stmt, **kwargs):
- select = select_stmt
- if not getattr(select, "_oracle_visit", None):
- if not self.dialect.use_ansi:
- froms = self._display_froms_for_select(
- select, kwargs.get("asfrom", False)
- )
- whereclause = self._get_nonansi_join_whereclause(froms)
- if whereclause is not None:
- select = select.where(whereclause)
- select._oracle_visit = True
- # if fetch is used this is not needed
- if (
- select._has_row_limiting_clause
- and not self.dialect._supports_offset_fetch
- and select._fetch_clause is None
- ):
- limit_clause = select._limit_clause
- offset_clause = select._offset_clause
- if select._simple_int_clause(limit_clause):
- limit_clause = limit_clause.render_literal_execute()
- if select._simple_int_clause(offset_clause):
- offset_clause = offset_clause.render_literal_execute()
- # currently using form at:
- # https://blogs.oracle.com/oraclemagazine/\
- # on-rownum-and-limiting-results
- orig_select = select
- select = select._generate()
- select._oracle_visit = True
- # add expressions to accommodate FOR UPDATE OF
- for_update = select._for_update_arg
- if for_update is not None and for_update.of:
- for_update = for_update._clone()
- for_update._copy_internals()
- for elem in for_update.of:
- if not select.selected_columns.contains_column(elem):
- select = select.add_columns(elem)
- # Wrap the middle select and add the hint
- inner_subquery = select.alias()
- limitselect = sql.select(
- *[
- c
- for c in inner_subquery.c
- if orig_select.selected_columns.corresponding_column(c)
- is not None
- ]
- )
- if (
- limit_clause is not None
- and self.dialect.optimize_limits
- and select._simple_int_clause(limit_clause)
- ):
- limitselect = limitselect.prefix_with(
- expression.text(
- "/*+ FIRST_ROWS(%s) */"
- % self.process(limit_clause, **kwargs)
- )
- )
- limitselect._oracle_visit = True
- limitselect._is_wrapper = True
- # add expressions to accommodate FOR UPDATE OF
- if for_update is not None and for_update.of:
- adapter = sql_util.ClauseAdapter(inner_subquery)
- for_update.of = [
- adapter.traverse(elem) for elem in for_update.of
- ]
- # If needed, add the limiting clause
- if limit_clause is not None:
- if select._simple_int_clause(limit_clause) and (
- offset_clause is None
- or select._simple_int_clause(offset_clause)
- ):
- max_row = limit_clause
- if offset_clause is not None:
- max_row = max_row + offset_clause
- else:
- max_row = limit_clause
- if offset_clause is not None:
- max_row = max_row + offset_clause
- limitselect = limitselect.where(
- sql.literal_column("ROWNUM") <= max_row
- )
- # If needed, add the ora_rn, and wrap again with offset.
- if offset_clause is None:
- limitselect._for_update_arg = for_update
- select = limitselect
- else:
- limitselect = limitselect.add_columns(
- sql.literal_column("ROWNUM").label("ora_rn")
- )
- limitselect._oracle_visit = True
- limitselect._is_wrapper = True
- if for_update is not None and for_update.of:
- limitselect_cols = limitselect.selected_columns
- for elem in for_update.of:
- if (
- limitselect_cols.corresponding_column(elem)
- is None
- ):
- limitselect = limitselect.add_columns(elem)
- limit_subquery = limitselect.alias()
- origselect_cols = orig_select.selected_columns
- offsetselect = sql.select(
- *[
- c
- for c in limit_subquery.c
- if origselect_cols.corresponding_column(c)
- is not None
- ]
- )
- offsetselect._oracle_visit = True
- offsetselect._is_wrapper = True
- if for_update is not None and for_update.of:
- adapter = sql_util.ClauseAdapter(limit_subquery)
- for_update.of = [
- adapter.traverse(elem) for elem in for_update.of
- ]
- offsetselect = offsetselect.where(
- sql.literal_column("ora_rn") > offset_clause
- )
- offsetselect._for_update_arg = for_update
- select = offsetselect
- return select
- def limit_clause(self, select, **kw):
- return ""
- def visit_empty_set_expr(self, type_, **kw):
- return "SELECT 1 FROM DUAL WHERE 1!=1"
- def for_update_clause(self, select, **kw):
- if self.is_subquery():
- return ""
- tmp = " FOR UPDATE"
- if select._for_update_arg.of:
- tmp += " OF " + ", ".join(
- self.process(elem, **kw) for elem in select._for_update_arg.of
- )
- if select._for_update_arg.nowait:
- tmp += " NOWAIT"
- if select._for_update_arg.skip_locked:
- tmp += " SKIP LOCKED"
- return tmp
- def visit_is_distinct_from_binary(self, binary, operator, **kw):
- return "DECODE(%s, %s, 0, 1) = 1" % (
- self.process(binary.left),
- self.process(binary.right),
- )
- def visit_is_not_distinct_from_binary(self, binary, operator, **kw):
- return "DECODE(%s, %s, 0, 1) = 0" % (
- self.process(binary.left),
- self.process(binary.right),
- )
- def visit_regexp_match_op_binary(self, binary, operator, **kw):
- string = self.process(binary.left, **kw)
- pattern = self.process(binary.right, **kw)
- flags = binary.modifiers["flags"]
- if flags is None:
- return "REGEXP_LIKE(%s, %s)" % (string, pattern)
- else:
- return "REGEXP_LIKE(%s, %s, %s)" % (
- string,
- pattern,
- self.render_literal_value(flags, sqltypes.STRINGTYPE),
- )
- def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
- return "NOT %s" % self.visit_regexp_match_op_binary(
- binary, operator, **kw
- )
- def visit_regexp_replace_op_binary(self, binary, operator, **kw):
- string = self.process(binary.left, **kw)
- pattern_replace = self.process(binary.right, **kw)
- flags = binary.modifiers["flags"]
- if flags is None:
- return "REGEXP_REPLACE(%s, %s)" % (
- string,
- pattern_replace,
- )
- else:
- return "REGEXP_REPLACE(%s, %s, %s)" % (
- string,
- pattern_replace,
- self.render_literal_value(flags, sqltypes.STRINGTYPE),
- )
- def visit_aggregate_strings_func(self, fn, **kw):
- return "LISTAGG%s" % self.function_argspec(fn, **kw)
- def _visit_bitwise(self, binary, fn_name, custom_right=None, **kw):
- left = self.process(binary.left, **kw)
- right = self.process(
- custom_right if custom_right is not None else binary.right, **kw
- )
- return f"{fn_name}({left}, {right})"
- def visit_bitwise_xor_op_binary(self, binary, operator, **kw):
- return self._visit_bitwise(binary, "BITXOR", **kw)
- def visit_bitwise_or_op_binary(self, binary, operator, **kw):
- return self._visit_bitwise(binary, "BITOR", **kw)
- def visit_bitwise_and_op_binary(self, binary, operator, **kw):
- return self._visit_bitwise(binary, "BITAND", **kw)
- def visit_bitwise_rshift_op_binary(self, binary, operator, **kw):
- raise exc.CompileError("Cannot compile bitwise_rshift in oracle")
- def visit_bitwise_lshift_op_binary(self, binary, operator, **kw):
- raise exc.CompileError("Cannot compile bitwise_lshift in oracle")
- def visit_bitwise_not_op_unary_operator(self, element, operator, **kw):
- raise exc.CompileError("Cannot compile bitwise_not in oracle")
- class OracleDDLCompiler(compiler.DDLCompiler):
- def _build_vector_index_config(
- self, vector_index_config: VectorIndexConfig
- ) -> str:
- parts = []
- sql_param_name = {
- "hnsw_neighbors": "neighbors",
- "hnsw_efconstruction": "efconstruction",
- "ivf_neighbor_partitions": "neighbor partitions",
- "ivf_sample_per_partition": "sample_per_partition",
- "ivf_min_vectors_per_partition": "min_vectors_per_partition",
- }
- if vector_index_config.index_type == VectorIndexType.HNSW:
- parts.append("ORGANIZATION INMEMORY NEIGHBOR GRAPH")
- elif vector_index_config.index_type == VectorIndexType.IVF:
- parts.append("ORGANIZATION NEIGHBOR PARTITIONS")
- if vector_index_config.distance is not None:
- parts.append(f"DISTANCE {vector_index_config.distance.value}")
- if vector_index_config.accuracy is not None:
- parts.append(
- f"WITH TARGET ACCURACY {vector_index_config.accuracy}"
- )
- parameters_str = [f"type {vector_index_config.index_type.name}"]
- prefix = vector_index_config.index_type.name.lower() + "_"
- for field in fields(vector_index_config):
- if field.name.startswith(prefix):
- key = sql_param_name.get(field.name)
- value = getattr(vector_index_config, field.name)
- if value is not None:
- parameters_str.append(f"{key} {value}")
- parameters_str = ", ".join(parameters_str)
- parts.append(f"PARAMETERS ({parameters_str})")
- if vector_index_config.parallel is not None:
- parts.append(f"PARALLEL {vector_index_config.parallel}")
- return " ".join(parts)
- def define_constraint_cascades(self, constraint):
- text = ""
- if constraint.ondelete is not None:
- text += " ON DELETE %s" % constraint.ondelete
- # oracle has no ON UPDATE CASCADE -
- # its only available via triggers
- # https://web.archive.org/web/20090317041251/https://asktom.oracle.com/tkyte/update_cascade/index.html
- if constraint.onupdate is not None:
- util.warn(
- "Oracle Database does not contain native UPDATE CASCADE "
- "functionality - onupdates will not be rendered for foreign "
- "keys. Consider using deferrable=True, initially='deferred' "
- "or triggers."
- )
- return text
- def visit_drop_table_comment(self, drop, **kw):
- return "COMMENT ON TABLE %s IS ''" % self.preparer.format_table(
- drop.element
- )
- def visit_create_index(self, create, **kw):
- index = create.element
- self._verify_index_table(index)
- preparer = self.preparer
- text = "CREATE "
- if index.unique:
- text += "UNIQUE "
- if index.dialect_options["oracle"]["bitmap"]:
- text += "BITMAP "
- vector_options = index.dialect_options["oracle"]["vector"]
- if vector_options:
- text += "VECTOR "
- text += "INDEX %s ON %s (%s)" % (
- self._prepared_index_name(index, include_schema=True),
- preparer.format_table(index.table, use_schema=True),
- ", ".join(
- self.sql_compiler.process(
- expr, include_table=False, literal_binds=True
- )
- for expr in index.expressions
- ),
- )
- if index.dialect_options["oracle"]["compress"] is not False:
- if index.dialect_options["oracle"]["compress"] is True:
- text += " COMPRESS"
- else:
- text += " COMPRESS %d" % (
- index.dialect_options["oracle"]["compress"]
- )
- if vector_options:
- if vector_options is True:
- vector_options = VectorIndexConfig()
- text += " " + self._build_vector_index_config(vector_options)
- return text
- def post_create_table(self, table):
- table_opts = []
- opts = table.dialect_options["oracle"]
- if opts["on_commit"]:
- on_commit_options = opts["on_commit"].replace("_", " ").upper()
- table_opts.append("\n ON COMMIT %s" % on_commit_options)
- if opts["compress"]:
- if opts["compress"] is True:
- table_opts.append("\n COMPRESS")
- else:
- table_opts.append("\n COMPRESS FOR %s" % (opts["compress"]))
- if opts["tablespace"]:
- table_opts.append(
- "\n TABLESPACE %s" % self.preparer.quote(opts["tablespace"])
- )
- return "".join(table_opts)
- def get_identity_options(self, identity_options):
- text = super().get_identity_options(identity_options)
- text = text.replace("NO MINVALUE", "NOMINVALUE")
- text = text.replace("NO MAXVALUE", "NOMAXVALUE")
- text = text.replace("NO CYCLE", "NOCYCLE")
- if identity_options.order is not None:
- text += " ORDER" if identity_options.order else " NOORDER"
- return text.strip()
- def visit_computed_column(self, generated, **kw):
- text = "GENERATED ALWAYS AS (%s)" % self.sql_compiler.process(
- generated.sqltext, include_table=False, literal_binds=True
- )
- if generated.persisted is True:
- raise exc.CompileError(
- "Oracle Database computed columns do not support 'stored' "
- "persistence; set the 'persisted' flag to None or False for "
- "Oracle Database support."
- )
- elif generated.persisted is False:
- text += " VIRTUAL"
- return text
- def visit_identity_column(self, identity, **kw):
- if identity.always is None:
- kind = ""
- else:
- kind = "ALWAYS" if identity.always else "BY DEFAULT"
- text = "GENERATED %s" % kind
- if identity.on_null:
- text += " ON NULL"
- text += " AS IDENTITY"
- options = self.get_identity_options(identity)
- if options:
- text += " (%s)" % options
- return text
- class OracleIdentifierPreparer(compiler.IdentifierPreparer):
- reserved_words = {x.lower() for x in RESERVED_WORDS}
- illegal_initial_characters = {str(dig) for dig in range(0, 10)}.union(
- ["_", "$"]
- )
- def _bindparam_requires_quotes(self, value):
- """Return True if the given identifier requires quoting."""
- lc_value = value.lower()
- return (
- lc_value in self.reserved_words
- or value[0] in self.illegal_initial_characters
- or not self.legal_characters.match(str(value))
- )
- def format_savepoint(self, savepoint):
- name = savepoint.ident.lstrip("_")
- return super().format_savepoint(savepoint, name)
- class OracleExecutionContext(default.DefaultExecutionContext):
- def fire_sequence(self, seq, type_):
- return self._execute_scalar(
- "SELECT "
- + self.identifier_preparer.format_sequence(seq)
- + ".nextval FROM DUAL",
- type_,
- )
- def pre_exec(self):
- if self.statement and "_oracle_dblink" in self.execution_options:
- self.statement = self.statement.replace(
- dictionary.DB_LINK_PLACEHOLDER,
- self.execution_options["_oracle_dblink"],
- )
- class OracleDialect(default.DefaultDialect):
- name = "oracle"
- supports_statement_cache = True
- supports_alter = True
- max_identifier_length = 128
- _supports_offset_fetch = True
- insert_returning = True
- update_returning = True
- delete_returning = True
- div_is_floordiv = False
- supports_simple_order_by_label = False
- cte_follows_insert = True
- returns_native_bytes = True
- supports_sequences = True
- sequences_optional = False
- postfetch_lastrowid = False
- default_paramstyle = "named"
- colspecs = colspecs
- ischema_names = ischema_names
- requires_name_normalize = True
- supports_comments = True
- supports_default_values = False
- supports_default_metavalue = True
- supports_empty_insert = False
- supports_identity_columns = True
- statement_compiler = OracleCompiler
- ddl_compiler = OracleDDLCompiler
- type_compiler_cls = OracleTypeCompiler
- preparer = OracleIdentifierPreparer
- execution_ctx_cls = OracleExecutionContext
- reflection_options = ("oracle_resolve_synonyms",)
- _use_nchar_for_unicode = False
- construct_arguments = [
- (
- sa_schema.Table,
- {
- "resolve_synonyms": False,
- "on_commit": None,
- "compress": False,
- "tablespace": None,
- },
- ),
- (
- sa_schema.Index,
- {
- "bitmap": False,
- "compress": False,
- "vector": False,
- },
- ),
- (sa_selectable.Select, {"fetch_approximate": False}),
- (sa_selectable.CompoundSelect, {"fetch_approximate": False}),
- ]
- @util.deprecated_params(
- use_binds_for_limits=(
- "1.4",
- "The ``use_binds_for_limits`` Oracle Database dialect parameter "
- "is deprecated. The dialect now renders LIMIT / OFFSET integers "
- "inline in all cases using a post-compilation hook, so that the "
- "value is still represented by a 'bound parameter' on the Core "
- "Expression side.",
- )
- )
- def __init__(
- self,
- use_ansi=True,
- optimize_limits=False,
- use_binds_for_limits=None,
- use_nchar_for_unicode=False,
- exclude_tablespaces=("SYSTEM", "SYSAUX"),
- enable_offset_fetch=True,
- **kwargs,
- ):
- default.DefaultDialect.__init__(self, **kwargs)
- self._use_nchar_for_unicode = use_nchar_for_unicode
- self.use_ansi = use_ansi
- self.optimize_limits = optimize_limits
- self.exclude_tablespaces = exclude_tablespaces
- self.enable_offset_fetch = self._supports_offset_fetch = (
- enable_offset_fetch
- )
- def initialize(self, connection):
- super().initialize(connection)
- # Oracle 8i has RETURNING:
- # https://docs.oracle.com/cd/A87860_01/doc/index.htm
- # so does Oracle8:
- # https://docs.oracle.com/cd/A64702_01/doc/index.htm
- if self._is_oracle_8:
- self.colspecs = self.colspecs.copy()
- self.colspecs.pop(sqltypes.Interval)
- self.use_ansi = False
- self.supports_identity_columns = self.server_version_info >= (12,)
- self._supports_offset_fetch = (
- self.enable_offset_fetch and self.server_version_info >= (12,)
- )
- def _get_effective_compat_server_version_info(self, connection):
- # dialect does not need compat levels below 12.2, so don't query
- # in those cases
- if self.server_version_info < (12, 2):
- return self.server_version_info
- try:
- compat = connection.exec_driver_sql(
- "SELECT value FROM v$parameter WHERE name = 'compatible'"
- ).scalar()
- except exc.DBAPIError:
- compat = None
- if compat:
- try:
- return tuple(int(x) for x in compat.split("."))
- except:
- return self.server_version_info
- else:
- return self.server_version_info
- @property
- def _is_oracle_8(self):
- return self.server_version_info and self.server_version_info < (9,)
- @property
- def _supports_table_compression(self):
- return self.server_version_info and self.server_version_info >= (10, 1)
- @property
- def _supports_table_compress_for(self):
- return self.server_version_info and self.server_version_info >= (11,)
- @property
- def _supports_char_length(self):
- return not self._is_oracle_8
- @property
- def _supports_update_returning_computed_cols(self):
- # on version 18 this error is no longet present while it happens on 11
- # it may work also on versions before the 18
- return self.server_version_info and self.server_version_info >= (18,)
- @property
- def _supports_except_all(self):
- return self.server_version_info and self.server_version_info >= (21,)
- def do_release_savepoint(self, connection, name):
- # Oracle does not support RELEASE SAVEPOINT
- pass
- def _check_max_identifier_length(self, connection):
- if self._get_effective_compat_server_version_info(connection) < (
- 12,
- 2,
- ):
- return 30
- else:
- # use the default
- return None
- def get_isolation_level_values(self, dbapi_connection):
- return ["READ COMMITTED", "SERIALIZABLE"]
- def get_default_isolation_level(self, dbapi_conn):
- try:
- return self.get_isolation_level(dbapi_conn)
- except NotImplementedError:
- raise
- except:
- return "READ COMMITTED"
- def _execute_reflection(
- self, connection, query, dblink, returns_long, params=None
- ):
- if dblink and not dblink.startswith("@"):
- dblink = f"@{dblink}"
- execution_options = {
- # handle db links
- "_oracle_dblink": dblink or "",
- # override any schema translate map
- "schema_translate_map": None,
- }
- if dblink and returns_long:
- # Oracle seems to error with
- # "ORA-00997: illegal use of LONG datatype" when returning
- # LONG columns via a dblink in a query with bind params
- # This type seems to be very hard to cast into something else
- # so it seems easier to just use bind param in this case
- def visit_bindparam(bindparam):
- bindparam.literal_execute = True
- query = visitors.cloned_traverse(
- query, {}, {"bindparam": visit_bindparam}
- )
- return connection.execute(
- query, params, execution_options=execution_options
- )
- @util.memoized_property
- def _has_table_query(self):
- # materialized views are returned by all_tables
- tables = (
- select(
- dictionary.all_tables.c.table_name,
- dictionary.all_tables.c.owner,
- )
- .union_all(
- select(
- dictionary.all_views.c.view_name.label("table_name"),
- dictionary.all_views.c.owner,
- )
- )
- .subquery("tables_and_views")
- )
- query = select(tables.c.table_name).where(
- tables.c.table_name == bindparam("table_name"),
- tables.c.owner == bindparam("owner"),
- )
- return query
- @reflection.cache
- def has_table(
- self, connection, table_name, schema=None, dblink=None, **kw
- ):
- """Supported kw arguments are: ``dblink`` to reflect via a db link."""
- self._ensure_has_table_connection(connection)
- if not schema:
- schema = self.default_schema_name
- params = {
- "table_name": self.denormalize_name(table_name),
- "owner": self.denormalize_schema_name(schema),
- }
- cursor = self._execute_reflection(
- connection,
- self._has_table_query,
- dblink,
- returns_long=False,
- params=params,
- )
- return bool(cursor.scalar())
- @reflection.cache
- def has_sequence(
- self, connection, sequence_name, schema=None, dblink=None, **kw
- ):
- """Supported kw arguments are: ``dblink`` to reflect via a db link."""
- if not schema:
- schema = self.default_schema_name
- query = select(dictionary.all_sequences.c.sequence_name).where(
- dictionary.all_sequences.c.sequence_name
- == self.denormalize_schema_name(sequence_name),
- dictionary.all_sequences.c.sequence_owner
- == self.denormalize_schema_name(schema),
- )
- cursor = self._execute_reflection(
- connection, query, dblink, returns_long=False
- )
- return bool(cursor.scalar())
- def _get_default_schema_name(self, connection):
- return self.normalize_name(
- connection.exec_driver_sql(
- "select sys_context( 'userenv', 'current_schema' ) from dual"
- ).scalar()
- )
- def denormalize_schema_name(self, name):
- # look for quoted_name
- force = getattr(name, "quote", None)
- if force is None and name == "public":
- # look for case insensitive, no quoting specified, "public"
- return "PUBLIC"
- return super().denormalize_name(name)
- @reflection.flexi_cache(
- ("schema", InternalTraversal.dp_string),
- ("filter_names", InternalTraversal.dp_string_list),
- ("dblink", InternalTraversal.dp_string),
- )
- def _get_synonyms(self, connection, schema, filter_names, dblink, **kw):
- owner = self.denormalize_schema_name(
- schema or self.default_schema_name
- )
- has_filter_names, params = self._prepare_filter_names(filter_names)
- query = select(
- dictionary.all_synonyms.c.synonym_name,
- dictionary.all_synonyms.c.table_name,
- dictionary.all_synonyms.c.table_owner,
- dictionary.all_synonyms.c.db_link,
- ).where(dictionary.all_synonyms.c.owner == owner)
- if has_filter_names:
- query = query.where(
- dictionary.all_synonyms.c.synonym_name.in_(
- params["filter_names"]
- )
- )
- result = self._execute_reflection(
- connection, query, dblink, returns_long=False
- ).mappings()
- return result.all()
- @lru_cache()
- def _all_objects_query(
- self, owner, scope, kind, has_filter_names, has_mat_views
- ):
- query = (
- select(dictionary.all_objects.c.object_name)
- .select_from(dictionary.all_objects)
- .where(dictionary.all_objects.c.owner == owner)
- )
- # NOTE: materialized views are listed in all_objects twice;
- # once as MATERIALIZE VIEW and once as TABLE
- if kind is ObjectKind.ANY:
- # materilaized view are listed also as tables so there is no
- # need to add them to the in_.
- query = query.where(
- dictionary.all_objects.c.object_type.in_(("TABLE", "VIEW"))
- )
- else:
- object_type = []
- if ObjectKind.VIEW in kind:
- object_type.append("VIEW")
- if (
- ObjectKind.MATERIALIZED_VIEW in kind
- and ObjectKind.TABLE not in kind
- ):
- # materilaized view are listed also as tables so there is no
- # need to add them to the in_ if also selecting tables.
- object_type.append("MATERIALIZED VIEW")
- if ObjectKind.TABLE in kind:
- object_type.append("TABLE")
- if has_mat_views and ObjectKind.MATERIALIZED_VIEW not in kind:
- # materialized view are listed also as tables,
- # so they need to be filtered out
- # EXCEPT ALL / MINUS profiles as faster than using
- # NOT EXISTS or NOT IN with a subquery, but it's in
- # general faster to get the mat view names and exclude
- # them only when needed
- query = query.where(
- dictionary.all_objects.c.object_name.not_in(
- bindparam("mat_views")
- )
- )
- query = query.where(
- dictionary.all_objects.c.object_type.in_(object_type)
- )
- # handles scope
- if scope is ObjectScope.DEFAULT:
- query = query.where(dictionary.all_objects.c.temporary == "N")
- elif scope is ObjectScope.TEMPORARY:
- query = query.where(dictionary.all_objects.c.temporary == "Y")
- if has_filter_names:
- query = query.where(
- dictionary.all_objects.c.object_name.in_(
- bindparam("filter_names")
- )
- )
- return query
- @reflection.flexi_cache(
- ("schema", InternalTraversal.dp_string),
- ("scope", InternalTraversal.dp_plain_obj),
- ("kind", InternalTraversal.dp_plain_obj),
- ("filter_names", InternalTraversal.dp_string_list),
- ("dblink", InternalTraversal.dp_string),
- )
- def _get_all_objects(
- self, connection, schema, scope, kind, filter_names, dblink, **kw
- ):
- owner = self.denormalize_schema_name(
- schema or self.default_schema_name
- )
- has_filter_names, params = self._prepare_filter_names(filter_names)
- has_mat_views = False
- if (
- ObjectKind.TABLE in kind
- and ObjectKind.MATERIALIZED_VIEW not in kind
- ):
- # see note in _all_objects_query
- mat_views = self.get_materialized_view_names(
- connection, schema, dblink, _normalize=False, **kw
- )
- if mat_views:
- params["mat_views"] = mat_views
- has_mat_views = True
- query = self._all_objects_query(
- owner, scope, kind, has_filter_names, has_mat_views
- )
- result = self._execute_reflection(
- connection, query, dblink, returns_long=False, params=params
- ).scalars()
- return result.all()
- def _handle_synonyms_decorator(fn):
- @wraps(fn)
- def wrapper(self, *args, **kwargs):
- return self._handle_synonyms(fn, *args, **kwargs)
- return wrapper
- def _handle_synonyms(self, fn, connection, *args, **kwargs):
- if not kwargs.get("oracle_resolve_synonyms", False):
- return fn(self, connection, *args, **kwargs)
- original_kw = kwargs.copy()
- schema = kwargs.pop("schema", None)
- result = self._get_synonyms(
- connection,
- schema=schema,
- filter_names=kwargs.pop("filter_names", None),
- dblink=kwargs.pop("dblink", None),
- info_cache=kwargs.get("info_cache", None),
- )
- dblinks_owners = defaultdict(dict)
- for row in result:
- key = row["db_link"], row["table_owner"]
- tn = self.normalize_name(row["table_name"])
- dblinks_owners[key][tn] = row["synonym_name"]
- if not dblinks_owners:
- # No synonym, do the plain thing
- return fn(self, connection, *args, **original_kw)
- data = {}
- for (dblink, table_owner), mapping in dblinks_owners.items():
- call_kw = {
- **original_kw,
- "schema": table_owner,
- "dblink": self.normalize_name(dblink),
- "filter_names": mapping.keys(),
- }
- call_result = fn(self, connection, *args, **call_kw)
- for (_, tn), value in call_result:
- synonym_name = self.normalize_name(mapping[tn])
- data[(schema, synonym_name)] = value
- return data.items()
- @reflection.cache
- def get_schema_names(self, connection, dblink=None, **kw):
- """Supported kw arguments are: ``dblink`` to reflect via a db link."""
- query = select(dictionary.all_users.c.username).order_by(
- dictionary.all_users.c.username
- )
- result = self._execute_reflection(
- connection, query, dblink, returns_long=False
- ).scalars()
- return [self.normalize_name(row) for row in result]
- @reflection.cache
- def get_table_names(self, connection, schema=None, dblink=None, **kw):
- """Supported kw arguments are: ``dblink`` to reflect via a db link."""
- # note that table_names() isn't loading DBLINKed or synonym'ed tables
- if schema is None:
- schema = self.default_schema_name
- den_schema = self.denormalize_schema_name(schema)
- if kw.get("oracle_resolve_synonyms", False):
- tables = (
- select(
- dictionary.all_tables.c.table_name,
- dictionary.all_tables.c.owner,
- dictionary.all_tables.c.iot_name,
- dictionary.all_tables.c.duration,
- dictionary.all_tables.c.tablespace_name,
- )
- .union_all(
- select(
- dictionary.all_synonyms.c.synonym_name.label(
- "table_name"
- ),
- dictionary.all_synonyms.c.owner,
- dictionary.all_tables.c.iot_name,
- dictionary.all_tables.c.duration,
- dictionary.all_tables.c.tablespace_name,
- )
- .select_from(dictionary.all_tables)
- .join(
- dictionary.all_synonyms,
- and_(
- dictionary.all_tables.c.table_name
- == dictionary.all_synonyms.c.table_name,
- dictionary.all_tables.c.owner
- == func.coalesce(
- dictionary.all_synonyms.c.table_owner,
- dictionary.all_synonyms.c.owner,
- ),
- ),
- )
- )
- .subquery("available_tables")
- )
- else:
- tables = dictionary.all_tables
- query = select(tables.c.table_name)
- if self.exclude_tablespaces:
- query = query.where(
- func.coalesce(
- tables.c.tablespace_name, "no tablespace"
- ).not_in(self.exclude_tablespaces)
- )
- query = query.where(
- tables.c.owner == den_schema,
- tables.c.iot_name.is_(null()),
- tables.c.duration.is_(null()),
- )
- # remove materialized views
- mat_query = select(
- dictionary.all_mviews.c.mview_name.label("table_name")
- ).where(dictionary.all_mviews.c.owner == den_schema)
- query = (
- query.except_all(mat_query)
- if self._supports_except_all
- else query.except_(mat_query)
- )
- result = self._execute_reflection(
- connection, query, dblink, returns_long=False
- ).scalars()
- return [self.normalize_name(row) for row in result]
- @reflection.cache
- def get_temp_table_names(self, connection, dblink=None, **kw):
- """Supported kw arguments are: ``dblink`` to reflect via a db link."""
- schema = self.denormalize_schema_name(self.default_schema_name)
- query = select(dictionary.all_tables.c.table_name)
- if self.exclude_tablespaces:
- query = query.where(
- func.coalesce(
- dictionary.all_tables.c.tablespace_name, "no tablespace"
- ).not_in(self.exclude_tablespaces)
- )
- query = query.where(
- dictionary.all_tables.c.owner == schema,
- dictionary.all_tables.c.iot_name.is_(null()),
- dictionary.all_tables.c.duration.is_not(null()),
- )
- result = self._execute_reflection(
- connection, query, dblink, returns_long=False
- ).scalars()
- return [self.normalize_name(row) for row in result]
- @reflection.cache
- def get_materialized_view_names(
- self, connection, schema=None, dblink=None, _normalize=True, **kw
- ):
- """Supported kw arguments are: ``dblink`` to reflect via a db link."""
- if not schema:
- schema = self.default_schema_name
- query = select(dictionary.all_mviews.c.mview_name).where(
- dictionary.all_mviews.c.owner
- == self.denormalize_schema_name(schema)
- )
- result = self._execute_reflection(
- connection, query, dblink, returns_long=False
- ).scalars()
- if _normalize:
- return [self.normalize_name(row) for row in result]
- else:
- return result.all()
- @reflection.cache
- def get_view_names(self, connection, schema=None, dblink=None, **kw):
- """Supported kw arguments are: ``dblink`` to reflect via a db link."""
- if not schema:
- schema = self.default_schema_name
- query = select(dictionary.all_views.c.view_name).where(
- dictionary.all_views.c.owner
- == self.denormalize_schema_name(schema)
- )
- result = self._execute_reflection(
- connection, query, dblink, returns_long=False
- ).scalars()
- return [self.normalize_name(row) for row in result]
- @reflection.cache
- def get_sequence_names(self, connection, schema=None, dblink=None, **kw):
- """Supported kw arguments are: ``dblink`` to reflect via a db link."""
- if not schema:
- schema = self.default_schema_name
- query = select(dictionary.all_sequences.c.sequence_name).where(
- dictionary.all_sequences.c.sequence_owner
- == self.denormalize_schema_name(schema)
- )
- result = self._execute_reflection(
- connection, query, dblink, returns_long=False
- ).scalars()
- return [self.normalize_name(row) for row in result]
- def _value_or_raise(self, data, table, schema):
- table = self.normalize_name(str(table))
- try:
- return dict(data)[(schema, table)]
- except KeyError:
- raise exc.NoSuchTableError(
- f"{schema}.{table}" if schema else table
- ) from None
- def _prepare_filter_names(self, filter_names):
- if filter_names:
- fn = [self.denormalize_name(name) for name in filter_names]
- return True, {"filter_names": fn}
- else:
- return False, {}
- @reflection.cache
- def get_table_options(self, connection, table_name, schema=None, **kw):
- """Supported kw arguments are: ``dblink`` to reflect via a db link;
- ``oracle_resolve_synonyms`` to resolve names to synonyms
- """
- data = self.get_multi_table_options(
- connection,
- schema=schema,
- filter_names=[table_name],
- scope=ObjectScope.ANY,
- kind=ObjectKind.ANY,
- **kw,
- )
- return self._value_or_raise(data, table_name, schema)
- @lru_cache()
- def _table_options_query(
- self, owner, scope, kind, has_filter_names, has_mat_views
- ):
- query = select(
- dictionary.all_tables.c.table_name,
- (
- dictionary.all_tables.c.compression
- if self._supports_table_compression
- else sql.null().label("compression")
- ),
- (
- dictionary.all_tables.c.compress_for
- if self._supports_table_compress_for
- else sql.null().label("compress_for")
- ),
- dictionary.all_tables.c.tablespace_name,
- ).where(dictionary.all_tables.c.owner == owner)
- if has_filter_names:
- query = query.where(
- dictionary.all_tables.c.table_name.in_(
- bindparam("filter_names")
- )
- )
- if scope is ObjectScope.DEFAULT:
- query = query.where(dictionary.all_tables.c.duration.is_(null()))
- elif scope is ObjectScope.TEMPORARY:
- query = query.where(
- dictionary.all_tables.c.duration.is_not(null())
- )
- if (
- has_mat_views
- and ObjectKind.TABLE in kind
- and ObjectKind.MATERIALIZED_VIEW not in kind
- ):
- # cant use EXCEPT ALL / MINUS here because we don't have an
- # excludable row vs. the query above
- # outerjoin + where null works better on oracle 21 but 11 does
- # not like it at all. this is the next best thing
- query = query.where(
- dictionary.all_tables.c.table_name.not_in(
- bindparam("mat_views")
- )
- )
- elif (
- ObjectKind.TABLE not in kind
- and ObjectKind.MATERIALIZED_VIEW in kind
- ):
- query = query.where(
- dictionary.all_tables.c.table_name.in_(bindparam("mat_views"))
- )
- return query
- @_handle_synonyms_decorator
- def get_multi_table_options(
- self,
- connection,
- *,
- schema,
- filter_names,
- scope,
- kind,
- dblink=None,
- **kw,
- ):
- """Supported kw arguments are: ``dblink`` to reflect via a db link;
- ``oracle_resolve_synonyms`` to resolve names to synonyms
- """
- owner = self.denormalize_schema_name(
- schema or self.default_schema_name
- )
- has_filter_names, params = self._prepare_filter_names(filter_names)
- has_mat_views = False
- if (
- ObjectKind.TABLE in kind
- and ObjectKind.MATERIALIZED_VIEW not in kind
- ):
- # see note in _table_options_query
- mat_views = self.get_materialized_view_names(
- connection, schema, dblink, _normalize=False, **kw
- )
- if mat_views:
- params["mat_views"] = mat_views
- has_mat_views = True
- elif (
- ObjectKind.TABLE not in kind
- and ObjectKind.MATERIALIZED_VIEW in kind
- ):
- mat_views = self.get_materialized_view_names(
- connection, schema, dblink, _normalize=False, **kw
- )
- params["mat_views"] = mat_views
- options = {}
- default = ReflectionDefaults.table_options
- if ObjectKind.TABLE in kind or ObjectKind.MATERIALIZED_VIEW in kind:
- query = self._table_options_query(
- owner, scope, kind, has_filter_names, has_mat_views
- )
- result = self._execute_reflection(
- connection, query, dblink, returns_long=False, params=params
- )
- for table, compression, compress_for, tablespace in result:
- data = default()
- if compression == "ENABLED":
- data["oracle_compress"] = compress_for
- if tablespace:
- data["oracle_tablespace"] = tablespace
- options[(schema, self.normalize_name(table))] = data
- if ObjectKind.VIEW in kind and ObjectScope.DEFAULT in scope:
- # add the views (no temporary views)
- for view in self.get_view_names(connection, schema, dblink, **kw):
- if not filter_names or view in filter_names:
- options[(schema, view)] = default()
- return options.items()
- @reflection.cache
- def get_columns(self, connection, table_name, schema=None, **kw):
- """Supported kw arguments are: ``dblink`` to reflect via a db link;
- ``oracle_resolve_synonyms`` to resolve names to synonyms
- """
- data = self.get_multi_columns(
- connection,
- schema=schema,
- filter_names=[table_name],
- scope=ObjectScope.ANY,
- kind=ObjectKind.ANY,
- **kw,
- )
- return self._value_or_raise(data, table_name, schema)
- def _run_batches(
- self, connection, query, dblink, returns_long, mappings, all_objects
- ):
- each_batch = 500
- batches = list(all_objects)
- while batches:
- batch = batches[0:each_batch]
- batches[0:each_batch] = []
- result = self._execute_reflection(
- connection,
- query,
- dblink,
- returns_long=returns_long,
- params={"all_objects": batch},
- )
- if mappings:
- yield from result.mappings()
- else:
- yield from result
- @lru_cache()
- def _column_query(self, owner):
- all_cols = dictionary.all_tab_cols
- all_comments = dictionary.all_col_comments
- all_ids = dictionary.all_tab_identity_cols
- if self.server_version_info >= (12,):
- add_cols = (
- all_cols.c.default_on_null,
- sql.case(
- (all_ids.c.table_name.is_(None), sql.null()),
- else_=all_ids.c.generation_type
- + ","
- + all_ids.c.identity_options,
- ).label("identity_options"),
- )
- join_identity_cols = True
- else:
- add_cols = (
- sql.null().label("default_on_null"),
- sql.null().label("identity_options"),
- )
- join_identity_cols = False
- # NOTE: on oracle cannot create tables/views without columns and
- # a table cannot have all column hidden:
- # ORA-54039: table must have at least one column that is not invisible
- # all_tab_cols returns data for tables/views/mat-views.
- # all_tab_cols does not return recycled tables
- query = (
- select(
- all_cols.c.table_name,
- all_cols.c.column_name,
- all_cols.c.data_type,
- all_cols.c.char_length,
- all_cols.c.data_precision,
- all_cols.c.data_scale,
- all_cols.c.nullable,
- all_cols.c.data_default,
- all_comments.c.comments,
- all_cols.c.virtual_column,
- *add_cols,
- ).select_from(all_cols)
- # NOTE: all_col_comments has a row for each column even if no
- # comment is present, so a join could be performed, but there
- # seems to be no difference compared to an outer join
- .outerjoin(
- all_comments,
- and_(
- all_cols.c.table_name == all_comments.c.table_name,
- all_cols.c.column_name == all_comments.c.column_name,
- all_cols.c.owner == all_comments.c.owner,
- ),
- )
- )
- if join_identity_cols:
- query = query.outerjoin(
- all_ids,
- and_(
- all_cols.c.table_name == all_ids.c.table_name,
- all_cols.c.column_name == all_ids.c.column_name,
- all_cols.c.owner == all_ids.c.owner,
- ),
- )
- query = query.where(
- all_cols.c.table_name.in_(bindparam("all_objects")),
- all_cols.c.hidden_column == "NO",
- all_cols.c.owner == owner,
- ).order_by(all_cols.c.table_name, all_cols.c.column_id)
- return query
- @_handle_synonyms_decorator
- def get_multi_columns(
- self,
- connection,
- *,
- schema,
- filter_names,
- scope,
- kind,
- dblink=None,
- **kw,
- ):
- """Supported kw arguments are: ``dblink`` to reflect via a db link;
- ``oracle_resolve_synonyms`` to resolve names to synonyms
- """
- owner = self.denormalize_schema_name(
- schema or self.default_schema_name
- )
- query = self._column_query(owner)
- if (
- filter_names
- and kind is ObjectKind.ANY
- and scope is ObjectScope.ANY
- ):
- all_objects = [self.denormalize_name(n) for n in filter_names]
- else:
- all_objects = self._get_all_objects(
- connection, schema, scope, kind, filter_names, dblink, **kw
- )
- columns = defaultdict(list)
- # all_tab_cols.data_default is LONG
- result = self._run_batches(
- connection,
- query,
- dblink,
- returns_long=True,
- mappings=True,
- all_objects=all_objects,
- )
- def maybe_int(value):
- if isinstance(value, float) and value.is_integer():
- return int(value)
- else:
- return value
- remove_size = re.compile(r"\(\d+\)")
- for row_dict in result:
- table_name = self.normalize_name(row_dict["table_name"])
- orig_colname = row_dict["column_name"]
- colname = self.normalize_name(orig_colname)
- coltype = row_dict["data_type"]
- precision = maybe_int(row_dict["data_precision"])
- if coltype == "NUMBER":
- scale = maybe_int(row_dict["data_scale"])
- if precision is None and scale == 0:
- coltype = INTEGER()
- else:
- coltype = NUMBER(precision, scale)
- elif coltype == "FLOAT":
- # https://docs.oracle.com/cd/B14117_01/server.101/b10758/sqlqr06.htm
- if precision == 126:
- # The DOUBLE PRECISION datatype is a floating-point
- # number with binary precision 126.
- coltype = DOUBLE_PRECISION()
- elif precision == 63:
- # The REAL datatype is a floating-point number with a
- # binary precision of 63, or 18 decimal.
- coltype = REAL()
- else:
- # non standard precision
- coltype = FLOAT(binary_precision=precision)
- elif coltype in ("VARCHAR2", "NVARCHAR2", "CHAR", "NCHAR"):
- char_length = maybe_int(row_dict["char_length"])
- coltype = self.ischema_names.get(coltype)(char_length)
- elif "WITH TIME ZONE" in coltype:
- coltype = TIMESTAMP(timezone=True)
- elif "WITH LOCAL TIME ZONE" in coltype:
- coltype = TIMESTAMP(local_timezone=True)
- else:
- coltype = re.sub(remove_size, "", coltype)
- try:
- coltype = self.ischema_names[coltype]
- except KeyError:
- util.warn(
- "Did not recognize type '%s' of column '%s'"
- % (coltype, colname)
- )
- coltype = sqltypes.NULLTYPE
- default = row_dict["data_default"]
- if row_dict["virtual_column"] == "YES":
- computed = dict(sqltext=default)
- default = None
- else:
- computed = None
- identity_options = row_dict["identity_options"]
- if identity_options is not None:
- identity = self._parse_identity_options(
- identity_options, row_dict["default_on_null"]
- )
- default = None
- else:
- identity = None
- cdict = {
- "name": colname,
- "type": coltype,
- "nullable": row_dict["nullable"] == "Y",
- "default": default,
- "comment": row_dict["comments"],
- }
- if orig_colname.lower() == orig_colname:
- cdict["quote"] = True
- if computed is not None:
- cdict["computed"] = computed
- if identity is not None:
- cdict["identity"] = identity
- columns[(schema, table_name)].append(cdict)
- # NOTE: default not needed since all tables have columns
- # default = ReflectionDefaults.columns
- # return (
- # (key, value if value else default())
- # for key, value in columns.items()
- # )
- return columns.items()
- def _parse_identity_options(self, identity_options, default_on_null):
- # identity_options is a string that starts with 'ALWAYS,' or
- # 'BY DEFAULT,' and continues with
- # START WITH: 1, INCREMENT BY: 1, MAX_VALUE: 123, MIN_VALUE: 1,
- # CYCLE_FLAG: N, CACHE_SIZE: 1, ORDER_FLAG: N, SCALE_FLAG: N,
- # EXTEND_FLAG: N, SESSION_FLAG: N, KEEP_VALUE: N
- parts = [p.strip() for p in identity_options.split(",")]
- identity = {
- "always": parts[0] == "ALWAYS",
- "on_null": default_on_null == "YES",
- }
- for part in parts[1:]:
- option, value = part.split(":")
- value = value.strip()
- if "START WITH" in option:
- identity["start"] = int(value)
- elif "INCREMENT BY" in option:
- identity["increment"] = int(value)
- elif "MAX_VALUE" in option:
- identity["maxvalue"] = int(value)
- elif "MIN_VALUE" in option:
- identity["minvalue"] = int(value)
- elif "CYCLE_FLAG" in option:
- identity["cycle"] = value == "Y"
- elif "CACHE_SIZE" in option:
- identity["cache"] = int(value)
- elif "ORDER_FLAG" in option:
- identity["order"] = value == "Y"
- return identity
- @reflection.cache
- def get_table_comment(self, connection, table_name, schema=None, **kw):
- """Supported kw arguments are: ``dblink`` to reflect via a db link;
- ``oracle_resolve_synonyms`` to resolve names to synonyms
- """
- data = self.get_multi_table_comment(
- connection,
- schema=schema,
- filter_names=[table_name],
- scope=ObjectScope.ANY,
- kind=ObjectKind.ANY,
- **kw,
- )
- return self._value_or_raise(data, table_name, schema)
- @lru_cache()
- def _comment_query(self, owner, scope, kind, has_filter_names):
- # NOTE: all_tab_comments / all_mview_comments have a row for all
- # object even if they don't have comments
- queries = []
- if ObjectKind.TABLE in kind or ObjectKind.VIEW in kind:
- # all_tab_comments returns also plain views
- tbl_view = select(
- dictionary.all_tab_comments.c.table_name,
- dictionary.all_tab_comments.c.comments,
- ).where(
- dictionary.all_tab_comments.c.owner == owner,
- dictionary.all_tab_comments.c.table_name.not_like("BIN$%"),
- )
- if ObjectKind.VIEW not in kind:
- tbl_view = tbl_view.where(
- dictionary.all_tab_comments.c.table_type == "TABLE"
- )
- elif ObjectKind.TABLE not in kind:
- tbl_view = tbl_view.where(
- dictionary.all_tab_comments.c.table_type == "VIEW"
- )
- queries.append(tbl_view)
- if ObjectKind.MATERIALIZED_VIEW in kind:
- mat_view = select(
- dictionary.all_mview_comments.c.mview_name.label("table_name"),
- dictionary.all_mview_comments.c.comments,
- ).where(
- dictionary.all_mview_comments.c.owner == owner,
- dictionary.all_mview_comments.c.mview_name.not_like("BIN$%"),
- )
- queries.append(mat_view)
- if len(queries) == 1:
- query = queries[0]
- else:
- union = sql.union_all(*queries).subquery("tables_and_views")
- query = select(union.c.table_name, union.c.comments)
- name_col = query.selected_columns.table_name
- if scope in (ObjectScope.DEFAULT, ObjectScope.TEMPORARY):
- temp = "Y" if scope is ObjectScope.TEMPORARY else "N"
- # need distinct since materialized view are listed also
- # as tables in all_objects
- query = query.distinct().join(
- dictionary.all_objects,
- and_(
- dictionary.all_objects.c.owner == owner,
- dictionary.all_objects.c.object_name == name_col,
- dictionary.all_objects.c.temporary == temp,
- ),
- )
- if has_filter_names:
- query = query.where(name_col.in_(bindparam("filter_names")))
- return query
- @_handle_synonyms_decorator
- def get_multi_table_comment(
- self,
- connection,
- *,
- schema,
- filter_names,
- scope,
- kind,
- dblink=None,
- **kw,
- ):
- """Supported kw arguments are: ``dblink`` to reflect via a db link;
- ``oracle_resolve_synonyms`` to resolve names to synonyms
- """
- owner = self.denormalize_schema_name(
- schema or self.default_schema_name
- )
- has_filter_names, params = self._prepare_filter_names(filter_names)
- query = self._comment_query(owner, scope, kind, has_filter_names)
- result = self._execute_reflection(
- connection, query, dblink, returns_long=False, params=params
- )
- default = ReflectionDefaults.table_comment
- # materialized views by default seem to have a comment like
- # "snapshot table for snapshot owner.mat_view_name"
- ignore_mat_view = "snapshot table for snapshot "
- return (
- (
- (schema, self.normalize_name(table)),
- (
- {"text": comment}
- if comment is not None
- and not comment.startswith(ignore_mat_view)
- else default()
- ),
- )
- for table, comment in result
- )
- @reflection.cache
- def get_indexes(self, connection, table_name, schema=None, **kw):
- """Supported kw arguments are: ``dblink`` to reflect via a db link;
- ``oracle_resolve_synonyms`` to resolve names to synonyms
- """
- data = self.get_multi_indexes(
- connection,
- schema=schema,
- filter_names=[table_name],
- scope=ObjectScope.ANY,
- kind=ObjectKind.ANY,
- **kw,
- )
- return self._value_or_raise(data, table_name, schema)
- @lru_cache()
- def _index_query(self, owner):
- return (
- select(
- dictionary.all_ind_columns.c.table_name,
- dictionary.all_ind_columns.c.index_name,
- dictionary.all_ind_columns.c.column_name,
- dictionary.all_indexes.c.index_type,
- dictionary.all_indexes.c.uniqueness,
- dictionary.all_indexes.c.compression,
- dictionary.all_indexes.c.prefix_length,
- dictionary.all_ind_columns.c.descend,
- dictionary.all_ind_expressions.c.column_expression,
- )
- .select_from(dictionary.all_ind_columns)
- .join(
- dictionary.all_indexes,
- sql.and_(
- dictionary.all_ind_columns.c.index_name
- == dictionary.all_indexes.c.index_name,
- dictionary.all_ind_columns.c.index_owner
- == dictionary.all_indexes.c.owner,
- ),
- )
- .outerjoin(
- # NOTE: this adds about 20% to the query time. Using a
- # case expression with a scalar subquery only when needed
- # with the assumption that most indexes are not expression
- # would be faster but oracle does not like that with
- # LONG datatype. It errors with:
- # ORA-00997: illegal use of LONG datatype
- dictionary.all_ind_expressions,
- sql.and_(
- dictionary.all_ind_expressions.c.index_name
- == dictionary.all_ind_columns.c.index_name,
- dictionary.all_ind_expressions.c.index_owner
- == dictionary.all_ind_columns.c.index_owner,
- dictionary.all_ind_expressions.c.column_position
- == dictionary.all_ind_columns.c.column_position,
- ),
- )
- .where(
- dictionary.all_indexes.c.table_owner == owner,
- dictionary.all_indexes.c.table_name.in_(
- bindparam("all_objects")
- ),
- )
- .order_by(
- dictionary.all_ind_columns.c.index_name,
- dictionary.all_ind_columns.c.column_position,
- )
- )
- @reflection.flexi_cache(
- ("schema", InternalTraversal.dp_string),
- ("dblink", InternalTraversal.dp_string),
- ("all_objects", InternalTraversal.dp_string_list),
- )
- def _get_indexes_rows(self, connection, schema, dblink, all_objects, **kw):
- owner = self.denormalize_schema_name(
- schema or self.default_schema_name
- )
- query = self._index_query(owner)
- pks = {
- row_dict["constraint_name"]
- for row_dict in self._get_all_constraint_rows(
- connection, schema, dblink, all_objects, **kw
- )
- if row_dict["constraint_type"] == "P"
- }
- # all_ind_expressions.column_expression is LONG
- result = self._run_batches(
- connection,
- query,
- dblink,
- returns_long=True,
- mappings=True,
- all_objects=all_objects,
- )
- return [
- row_dict
- for row_dict in result
- if row_dict["index_name"] not in pks
- ]
- @_handle_synonyms_decorator
- def get_multi_indexes(
- self,
- connection,
- *,
- schema,
- filter_names,
- scope,
- kind,
- dblink=None,
- **kw,
- ):
- """Supported kw arguments are: ``dblink`` to reflect via a db link;
- ``oracle_resolve_synonyms`` to resolve names to synonyms
- """
- all_objects = self._get_all_objects(
- connection, schema, scope, kind, filter_names, dblink, **kw
- )
- uniqueness = {"NONUNIQUE": False, "UNIQUE": True}
- enabled = {"DISABLED": False, "ENABLED": True}
- is_bitmap = {"BITMAP", "FUNCTION-BASED BITMAP"}
- indexes = defaultdict(dict)
- for row_dict in self._get_indexes_rows(
- connection, schema, dblink, all_objects, **kw
- ):
- index_name = self.normalize_name(row_dict["index_name"])
- table_name = self.normalize_name(row_dict["table_name"])
- table_indexes = indexes[(schema, table_name)]
- if index_name not in table_indexes:
- table_indexes[index_name] = index_dict = {
- "name": index_name,
- "column_names": [],
- "dialect_options": {},
- "unique": uniqueness.get(row_dict["uniqueness"], False),
- }
- do = index_dict["dialect_options"]
- if row_dict["index_type"] in is_bitmap:
- do["oracle_bitmap"] = True
- if enabled.get(row_dict["compression"], False):
- do["oracle_compress"] = row_dict["prefix_length"]
- else:
- index_dict = table_indexes[index_name]
- expr = row_dict["column_expression"]
- if expr is not None:
- index_dict["column_names"].append(None)
- if "expressions" in index_dict:
- index_dict["expressions"].append(expr)
- else:
- index_dict["expressions"] = index_dict["column_names"][:-1]
- index_dict["expressions"].append(expr)
- if row_dict["descend"].lower() != "asc":
- assert row_dict["descend"].lower() == "desc"
- cs = index_dict.setdefault("column_sorting", {})
- cs[expr] = ("desc",)
- else:
- assert row_dict["descend"].lower() == "asc"
- cn = self.normalize_name(row_dict["column_name"])
- index_dict["column_names"].append(cn)
- if "expressions" in index_dict:
- index_dict["expressions"].append(cn)
- default = ReflectionDefaults.indexes
- return (
- (key, list(indexes[key].values()) if key in indexes else default())
- for key in (
- (schema, self.normalize_name(obj_name))
- for obj_name in all_objects
- )
- )
- @reflection.cache
- def get_pk_constraint(self, connection, table_name, schema=None, **kw):
- """Supported kw arguments are: ``dblink`` to reflect via a db link;
- ``oracle_resolve_synonyms`` to resolve names to synonyms
- """
- data = self.get_multi_pk_constraint(
- connection,
- schema=schema,
- filter_names=[table_name],
- scope=ObjectScope.ANY,
- kind=ObjectKind.ANY,
- **kw,
- )
- return self._value_or_raise(data, table_name, schema)
- @lru_cache()
- def _constraint_query(self, owner):
- local = dictionary.all_cons_columns.alias("local")
- remote = dictionary.all_cons_columns.alias("remote")
- return (
- select(
- dictionary.all_constraints.c.table_name,
- dictionary.all_constraints.c.constraint_type,
- dictionary.all_constraints.c.constraint_name,
- local.c.column_name.label("local_column"),
- remote.c.table_name.label("remote_table"),
- remote.c.column_name.label("remote_column"),
- remote.c.owner.label("remote_owner"),
- dictionary.all_constraints.c.search_condition,
- dictionary.all_constraints.c.delete_rule,
- )
- .select_from(dictionary.all_constraints)
- .join(
- local,
- and_(
- local.c.owner == dictionary.all_constraints.c.owner,
- dictionary.all_constraints.c.constraint_name
- == local.c.constraint_name,
- ),
- )
- .outerjoin(
- remote,
- and_(
- dictionary.all_constraints.c.r_owner == remote.c.owner,
- dictionary.all_constraints.c.r_constraint_name
- == remote.c.constraint_name,
- or_(
- remote.c.position.is_(sql.null()),
- local.c.position == remote.c.position,
- ),
- ),
- )
- .where(
- dictionary.all_constraints.c.owner == owner,
- dictionary.all_constraints.c.table_name.in_(
- bindparam("all_objects")
- ),
- dictionary.all_constraints.c.constraint_type.in_(
- ("R", "P", "U", "C")
- ),
- )
- .order_by(
- dictionary.all_constraints.c.constraint_name, local.c.position
- )
- )
- @reflection.flexi_cache(
- ("schema", InternalTraversal.dp_string),
- ("dblink", InternalTraversal.dp_string),
- ("all_objects", InternalTraversal.dp_string_list),
- )
- def _get_all_constraint_rows(
- self, connection, schema, dblink, all_objects, **kw
- ):
- owner = self.denormalize_schema_name(
- schema or self.default_schema_name
- )
- query = self._constraint_query(owner)
- # since the result is cached a list must be created
- values = list(
- self._run_batches(
- connection,
- query,
- dblink,
- returns_long=False,
- mappings=True,
- all_objects=all_objects,
- )
- )
- return values
- @_handle_synonyms_decorator
- def get_multi_pk_constraint(
- self,
- connection,
- *,
- scope,
- schema,
- filter_names,
- kind,
- dblink=None,
- **kw,
- ):
- """Supported kw arguments are: ``dblink`` to reflect via a db link;
- ``oracle_resolve_synonyms`` to resolve names to synonyms
- """
- all_objects = self._get_all_objects(
- connection, schema, scope, kind, filter_names, dblink, **kw
- )
- primary_keys = defaultdict(dict)
- default = ReflectionDefaults.pk_constraint
- for row_dict in self._get_all_constraint_rows(
- connection, schema, dblink, all_objects, **kw
- ):
- if row_dict["constraint_type"] != "P":
- continue
- table_name = self.normalize_name(row_dict["table_name"])
- constraint_name = self.normalize_name(row_dict["constraint_name"])
- column_name = self.normalize_name(row_dict["local_column"])
- table_pk = primary_keys[(schema, table_name)]
- if not table_pk:
- table_pk["name"] = constraint_name
- table_pk["constrained_columns"] = [column_name]
- else:
- table_pk["constrained_columns"].append(column_name)
- return (
- (key, primary_keys[key] if key in primary_keys else default())
- for key in (
- (schema, self.normalize_name(obj_name))
- for obj_name in all_objects
- )
- )
- @reflection.cache
- def get_foreign_keys(
- self,
- connection,
- table_name,
- schema=None,
- **kw,
- ):
- """Supported kw arguments are: ``dblink`` to reflect via a db link;
- ``oracle_resolve_synonyms`` to resolve names to synonyms
- """
- data = self.get_multi_foreign_keys(
- connection,
- schema=schema,
- filter_names=[table_name],
- scope=ObjectScope.ANY,
- kind=ObjectKind.ANY,
- **kw,
- )
- return self._value_or_raise(data, table_name, schema)
- @_handle_synonyms_decorator
- def get_multi_foreign_keys(
- self,
- connection,
- *,
- scope,
- schema,
- filter_names,
- kind,
- dblink=None,
- **kw,
- ):
- """Supported kw arguments are: ``dblink`` to reflect via a db link;
- ``oracle_resolve_synonyms`` to resolve names to synonyms
- """
- all_objects = self._get_all_objects(
- connection, schema, scope, kind, filter_names, dblink, **kw
- )
- resolve_synonyms = kw.get("oracle_resolve_synonyms", False)
- owner = self.denormalize_schema_name(
- schema or self.default_schema_name
- )
- all_remote_owners = set()
- fkeys = defaultdict(dict)
- for row_dict in self._get_all_constraint_rows(
- connection, schema, dblink, all_objects, **kw
- ):
- if row_dict["constraint_type"] != "R":
- continue
- table_name = self.normalize_name(row_dict["table_name"])
- constraint_name = self.normalize_name(row_dict["constraint_name"])
- table_fkey = fkeys[(schema, table_name)]
- assert constraint_name is not None
- local_column = self.normalize_name(row_dict["local_column"])
- remote_table = self.normalize_name(row_dict["remote_table"])
- remote_column = self.normalize_name(row_dict["remote_column"])
- remote_owner_orig = row_dict["remote_owner"]
- remote_owner = self.normalize_name(remote_owner_orig)
- if remote_owner_orig is not None:
- all_remote_owners.add(remote_owner_orig)
- if remote_table is None:
- # ticket 363
- if dblink and not dblink.startswith("@"):
- dblink = f"@{dblink}"
- util.warn(
- "Got 'None' querying 'table_name' from "
- f"all_cons_columns{dblink or ''} - does the user have "
- "proper rights to the table?"
- )
- continue
- if constraint_name not in table_fkey:
- table_fkey[constraint_name] = fkey = {
- "name": constraint_name,
- "constrained_columns": [],
- "referred_schema": None,
- "referred_table": remote_table,
- "referred_columns": [],
- "options": {},
- }
- if resolve_synonyms:
- # will be removed below
- fkey["_ref_schema"] = remote_owner
- if schema is not None or remote_owner_orig != owner:
- fkey["referred_schema"] = remote_owner
- delete_rule = row_dict["delete_rule"]
- if delete_rule != "NO ACTION":
- fkey["options"]["ondelete"] = delete_rule
- else:
- fkey = table_fkey[constraint_name]
- fkey["constrained_columns"].append(local_column)
- fkey["referred_columns"].append(remote_column)
- if resolve_synonyms and all_remote_owners:
- query = select(
- dictionary.all_synonyms.c.owner,
- dictionary.all_synonyms.c.table_name,
- dictionary.all_synonyms.c.table_owner,
- dictionary.all_synonyms.c.synonym_name,
- ).where(dictionary.all_synonyms.c.owner.in_(all_remote_owners))
- result = self._execute_reflection(
- connection, query, dblink, returns_long=False
- ).mappings()
- remote_owners_lut = {}
- for row in result:
- synonym_owner = self.normalize_name(row["owner"])
- table_name = self.normalize_name(row["table_name"])
- remote_owners_lut[(synonym_owner, table_name)] = (
- row["table_owner"],
- row["synonym_name"],
- )
- empty = (None, None)
- for table_fkeys in fkeys.values():
- for table_fkey in table_fkeys.values():
- key = (
- table_fkey.pop("_ref_schema"),
- table_fkey["referred_table"],
- )
- remote_owner, syn_name = remote_owners_lut.get(key, empty)
- if syn_name:
- sn = self.normalize_name(syn_name)
- table_fkey["referred_table"] = sn
- if schema is not None or remote_owner != owner:
- ro = self.normalize_name(remote_owner)
- table_fkey["referred_schema"] = ro
- else:
- table_fkey["referred_schema"] = None
- default = ReflectionDefaults.foreign_keys
- return (
- (key, list(fkeys[key].values()) if key in fkeys else default())
- for key in (
- (schema, self.normalize_name(obj_name))
- for obj_name in all_objects
- )
- )
- @reflection.cache
- def get_unique_constraints(
- self, connection, table_name, schema=None, **kw
- ):
- """Supported kw arguments are: ``dblink`` to reflect via a db link;
- ``oracle_resolve_synonyms`` to resolve names to synonyms
- """
- data = self.get_multi_unique_constraints(
- connection,
- schema=schema,
- filter_names=[table_name],
- scope=ObjectScope.ANY,
- kind=ObjectKind.ANY,
- **kw,
- )
- return self._value_or_raise(data, table_name, schema)
- @_handle_synonyms_decorator
- def get_multi_unique_constraints(
- self,
- connection,
- *,
- scope,
- schema,
- filter_names,
- kind,
- dblink=None,
- **kw,
- ):
- """Supported kw arguments are: ``dblink`` to reflect via a db link;
- ``oracle_resolve_synonyms`` to resolve names to synonyms
- """
- all_objects = self._get_all_objects(
- connection, schema, scope, kind, filter_names, dblink, **kw
- )
- unique_cons = defaultdict(dict)
- index_names = {
- row_dict["index_name"]
- for row_dict in self._get_indexes_rows(
- connection, schema, dblink, all_objects, **kw
- )
- }
- for row_dict in self._get_all_constraint_rows(
- connection, schema, dblink, all_objects, **kw
- ):
- if row_dict["constraint_type"] != "U":
- continue
- table_name = self.normalize_name(row_dict["table_name"])
- constraint_name_orig = row_dict["constraint_name"]
- constraint_name = self.normalize_name(constraint_name_orig)
- column_name = self.normalize_name(row_dict["local_column"])
- table_uc = unique_cons[(schema, table_name)]
- assert constraint_name is not None
- if constraint_name not in table_uc:
- table_uc[constraint_name] = uc = {
- "name": constraint_name,
- "column_names": [],
- "duplicates_index": (
- constraint_name
- if constraint_name_orig in index_names
- else None
- ),
- }
- else:
- uc = table_uc[constraint_name]
- uc["column_names"].append(column_name)
- default = ReflectionDefaults.unique_constraints
- return (
- (
- key,
- (
- list(unique_cons[key].values())
- if key in unique_cons
- else default()
- ),
- )
- for key in (
- (schema, self.normalize_name(obj_name))
- for obj_name in all_objects
- )
- )
- @reflection.cache
- def get_view_definition(
- self,
- connection,
- view_name,
- schema=None,
- dblink=None,
- **kw,
- ):
- """Supported kw arguments are: ``dblink`` to reflect via a db link;
- ``oracle_resolve_synonyms`` to resolve names to synonyms
- """
- if kw.get("oracle_resolve_synonyms", False):
- synonyms = self._get_synonyms(
- connection, schema, filter_names=[view_name], dblink=dblink
- )
- if synonyms:
- assert len(synonyms) == 1
- row_dict = synonyms[0]
- dblink = self.normalize_name(row_dict["db_link"])
- schema = row_dict["table_owner"]
- view_name = row_dict["table_name"]
- name = self.denormalize_name(view_name)
- owner = self.denormalize_schema_name(
- schema or self.default_schema_name
- )
- query = (
- select(dictionary.all_views.c.text)
- .where(
- dictionary.all_views.c.view_name == name,
- dictionary.all_views.c.owner == owner,
- )
- .union_all(
- select(dictionary.all_mviews.c.query).where(
- dictionary.all_mviews.c.mview_name == name,
- dictionary.all_mviews.c.owner == owner,
- )
- )
- )
- rp = self._execute_reflection(
- connection, query, dblink, returns_long=False
- ).scalar()
- if rp is None:
- raise exc.NoSuchTableError(
- f"{schema}.{view_name}" if schema else view_name
- )
- else:
- return rp
- @reflection.cache
- def get_check_constraints(
- self, connection, table_name, schema=None, include_all=False, **kw
- ):
- """Supported kw arguments are: ``dblink`` to reflect via a db link;
- ``oracle_resolve_synonyms`` to resolve names to synonyms
- """
- data = self.get_multi_check_constraints(
- connection,
- schema=schema,
- filter_names=[table_name],
- scope=ObjectScope.ANY,
- include_all=include_all,
- kind=ObjectKind.ANY,
- **kw,
- )
- return self._value_or_raise(data, table_name, schema)
- @_handle_synonyms_decorator
- def get_multi_check_constraints(
- self,
- connection,
- *,
- schema,
- filter_names,
- dblink=None,
- scope,
- kind,
- include_all=False,
- **kw,
- ):
- """Supported kw arguments are: ``dblink`` to reflect via a db link;
- ``oracle_resolve_synonyms`` to resolve names to synonyms
- """
- all_objects = self._get_all_objects(
- connection, schema, scope, kind, filter_names, dblink, **kw
- )
- not_null = re.compile(r"..+?. IS NOT NULL$")
- check_constraints = defaultdict(list)
- for row_dict in self._get_all_constraint_rows(
- connection, schema, dblink, all_objects, **kw
- ):
- if row_dict["constraint_type"] != "C":
- continue
- table_name = self.normalize_name(row_dict["table_name"])
- constraint_name = self.normalize_name(row_dict["constraint_name"])
- search_condition = row_dict["search_condition"]
- table_checks = check_constraints[(schema, table_name)]
- if constraint_name is not None and (
- include_all or not not_null.match(search_condition)
- ):
- table_checks.append(
- {"name": constraint_name, "sqltext": search_condition}
- )
- default = ReflectionDefaults.check_constraints
- return (
- (
- key,
- (
- check_constraints[key]
- if key in check_constraints
- else default()
- ),
- )
- for key in (
- (schema, self.normalize_name(obj_name))
- for obj_name in all_objects
- )
- )
- def _list_dblinks(self, connection, dblink=None):
- query = select(dictionary.all_db_links.c.db_link)
- links = self._execute_reflection(
- connection, query, dblink, returns_long=False
- ).scalars()
- return [self.normalize_name(link) for link in links]
- class _OuterJoinColumn(sql.ClauseElement):
- __visit_name__ = "outer_join_column"
- def __init__(self, column):
- self.column = column
|