| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902 |
- # mypy: allow-untyped-defs, allow-incomplete-defs, allow-untyped-calls
- # mypy: no-warn-return-any, allow-any-generics
- from __future__ import annotations
- import logging
- import re
- from typing import Any
- from typing import Callable
- from typing import Dict
- from typing import Iterable
- from typing import List
- from typing import Mapping
- from typing import NamedTuple
- from typing import Optional
- from typing import Sequence
- from typing import Set
- from typing import Tuple
- from typing import Type
- from typing import TYPE_CHECKING
- from typing import Union
- from sqlalchemy import cast
- from sqlalchemy import Column
- from sqlalchemy import MetaData
- from sqlalchemy import PrimaryKeyConstraint
- from sqlalchemy import schema
- from sqlalchemy import String
- from sqlalchemy import Table
- from sqlalchemy import text
- from . import _autogen
- from . import base
- from ._autogen import _constraint_sig as _constraint_sig
- from ._autogen import ComparisonResult as ComparisonResult
- from .. import util
- from ..util import sqla_compat
- if TYPE_CHECKING:
- from typing import Literal
- from typing import TextIO
- from sqlalchemy.engine import Connection
- from sqlalchemy.engine import Dialect
- from sqlalchemy.engine.cursor import CursorResult
- from sqlalchemy.engine.reflection import Inspector
- from sqlalchemy.sql import ClauseElement
- from sqlalchemy.sql import Executable
- from sqlalchemy.sql.elements import quoted_name
- from sqlalchemy.sql.schema import Constraint
- from sqlalchemy.sql.schema import ForeignKeyConstraint
- from sqlalchemy.sql.schema import Index
- from sqlalchemy.sql.schema import UniqueConstraint
- from sqlalchemy.sql.selectable import TableClause
- from sqlalchemy.sql.type_api import TypeEngine
- from .base import _ServerDefault
- from ..autogenerate.api import AutogenContext
- from ..operations.batch import ApplyBatchImpl
- from ..operations.batch import BatchOperationsImpl
- log = logging.getLogger(__name__)
- class ImplMeta(type):
- def __init__(
- cls,
- classname: str,
- bases: Tuple[Type[DefaultImpl]],
- dict_: Dict[str, Any],
- ):
- newtype = type.__init__(cls, classname, bases, dict_)
- if "__dialect__" in dict_:
- _impls[dict_["__dialect__"]] = cls # type: ignore[assignment]
- return newtype
- _impls: Dict[str, Type[DefaultImpl]] = {}
- class DefaultImpl(metaclass=ImplMeta):
- """Provide the entrypoint for major migration operations,
- including database-specific behavioral variances.
- While individual SQL/DDL constructs already provide
- for database-specific implementations, variances here
- allow for entirely different sequences of operations
- to take place for a particular migration, such as
- SQL Server's special 'IDENTITY INSERT' step for
- bulk inserts.
- """
- __dialect__ = "default"
- transactional_ddl = False
- command_terminator = ";"
- type_synonyms: Tuple[Set[str], ...] = ({"NUMERIC", "DECIMAL"},)
- type_arg_extract: Sequence[str] = ()
- # These attributes are deprecated in SQLAlchemy via #10247. They need to
- # be ignored to support older version that did not use dialect kwargs.
- # They only apply to Oracle and are replaced by oracle_order,
- # oracle_on_null
- identity_attrs_ignore: Tuple[str, ...] = ("order", "on_null")
- def __init__(
- self,
- dialect: Dialect,
- connection: Optional[Connection],
- as_sql: bool,
- transactional_ddl: Optional[bool],
- output_buffer: Optional[TextIO],
- context_opts: Dict[str, Any],
- ) -> None:
- self.dialect = dialect
- self.connection = connection
- self.as_sql = as_sql
- self.literal_binds = context_opts.get("literal_binds", False)
- self.output_buffer = output_buffer
- self.memo: dict = {}
- self.context_opts = context_opts
- if transactional_ddl is not None:
- self.transactional_ddl = transactional_ddl
- if self.literal_binds:
- if not self.as_sql:
- raise util.CommandError(
- "Can't use literal_binds setting without as_sql mode"
- )
- @classmethod
- def get_by_dialect(cls, dialect: Dialect) -> Type[DefaultImpl]:
- return _impls[dialect.name]
- def static_output(self, text: str) -> None:
- assert self.output_buffer is not None
- self.output_buffer.write(text + "\n\n")
- self.output_buffer.flush()
- def version_table_impl(
- self,
- *,
- version_table: str,
- version_table_schema: Optional[str],
- version_table_pk: bool,
- **kw: Any,
- ) -> Table:
- """Generate a :class:`.Table` object which will be used as the
- structure for the Alembic version table.
- Third party dialects may override this hook to provide an alternate
- structure for this :class:`.Table`; requirements are only that it
- be named based on the ``version_table`` parameter and contains
- at least a single string-holding column named ``version_num``.
- .. versionadded:: 1.14
- """
- vt = Table(
- version_table,
- MetaData(),
- Column("version_num", String(32), nullable=False),
- schema=version_table_schema,
- )
- if version_table_pk:
- vt.append_constraint(
- PrimaryKeyConstraint(
- "version_num", name=f"{version_table}_pkc"
- )
- )
- return vt
- def requires_recreate_in_batch(
- self, batch_op: BatchOperationsImpl
- ) -> bool:
- """Return True if the given :class:`.BatchOperationsImpl`
- would need the table to be recreated and copied in order to
- proceed.
- Normally, only returns True on SQLite when operations other
- than add_column are present.
- """
- return False
- def prep_table_for_batch(
- self, batch_impl: ApplyBatchImpl, table: Table
- ) -> None:
- """perform any operations needed on a table before a new
- one is created to replace it in batch mode.
- the PG dialect uses this to drop constraints on the table
- before the new one uses those same names.
- """
- @property
- def bind(self) -> Optional[Connection]:
- return self.connection
- def _exec(
- self,
- construct: Union[Executable, str],
- execution_options: Optional[Mapping[str, Any]] = None,
- multiparams: Optional[Sequence[Mapping[str, Any]]] = None,
- params: Mapping[str, Any] = util.immutabledict(),
- ) -> Optional[CursorResult]:
- if isinstance(construct, str):
- construct = text(construct)
- if self.as_sql:
- if multiparams is not None or params:
- raise TypeError("SQL parameters not allowed with as_sql")
- compile_kw: dict[str, Any]
- if self.literal_binds and not isinstance(
- construct, schema.DDLElement
- ):
- compile_kw = dict(compile_kwargs={"literal_binds": True})
- else:
- compile_kw = {}
- if TYPE_CHECKING:
- assert isinstance(construct, ClauseElement)
- compiled = construct.compile(dialect=self.dialect, **compile_kw)
- self.static_output(
- str(compiled).replace("\t", " ").strip()
- + self.command_terminator
- )
- return None
- else:
- conn = self.connection
- assert conn is not None
- if execution_options:
- conn = conn.execution_options(**execution_options)
- if params and multiparams is not None:
- raise TypeError(
- "Can't send params and multiparams at the same time"
- )
- if multiparams:
- return conn.execute(construct, multiparams)
- else:
- return conn.execute(construct, params)
- def execute(
- self,
- sql: Union[Executable, str],
- execution_options: Optional[dict[str, Any]] = None,
- ) -> None:
- self._exec(sql, execution_options)
- def alter_column(
- self,
- table_name: str,
- column_name: str,
- *,
- nullable: Optional[bool] = None,
- server_default: Optional[
- Union[_ServerDefault, Literal[False]]
- ] = False,
- name: Optional[str] = None,
- type_: Optional[TypeEngine] = None,
- schema: Optional[str] = None,
- autoincrement: Optional[bool] = None,
- comment: Optional[Union[str, Literal[False]]] = False,
- existing_comment: Optional[str] = None,
- existing_type: Optional[TypeEngine] = None,
- existing_server_default: Optional[_ServerDefault] = None,
- existing_nullable: Optional[bool] = None,
- existing_autoincrement: Optional[bool] = None,
- **kw: Any,
- ) -> None:
- if autoincrement is not None or existing_autoincrement is not None:
- util.warn(
- "autoincrement and existing_autoincrement "
- "only make sense for MySQL",
- stacklevel=3,
- )
- if nullable is not None:
- self._exec(
- base.ColumnNullable(
- table_name,
- column_name,
- nullable,
- schema=schema,
- existing_type=existing_type,
- existing_server_default=existing_server_default,
- existing_nullable=existing_nullable,
- existing_comment=existing_comment,
- )
- )
- if server_default is not False:
- kw = {}
- cls_: Type[
- Union[
- base.ComputedColumnDefault,
- base.IdentityColumnDefault,
- base.ColumnDefault,
- ]
- ]
- if sqla_compat._server_default_is_computed(
- server_default, existing_server_default
- ):
- cls_ = base.ComputedColumnDefault
- elif sqla_compat._server_default_is_identity(
- server_default, existing_server_default
- ):
- cls_ = base.IdentityColumnDefault
- kw["impl"] = self
- else:
- cls_ = base.ColumnDefault
- self._exec(
- cls_(
- table_name,
- column_name,
- server_default, # type:ignore[arg-type]
- schema=schema,
- existing_type=existing_type,
- existing_server_default=existing_server_default,
- existing_nullable=existing_nullable,
- existing_comment=existing_comment,
- **kw,
- )
- )
- if type_ is not None:
- self._exec(
- base.ColumnType(
- table_name,
- column_name,
- type_,
- schema=schema,
- existing_type=existing_type,
- existing_server_default=existing_server_default,
- existing_nullable=existing_nullable,
- existing_comment=existing_comment,
- )
- )
- if comment is not False:
- self._exec(
- base.ColumnComment(
- table_name,
- column_name,
- comment,
- schema=schema,
- existing_type=existing_type,
- existing_server_default=existing_server_default,
- existing_nullable=existing_nullable,
- existing_comment=existing_comment,
- )
- )
- # do the new name last ;)
- if name is not None:
- self._exec(
- base.ColumnName(
- table_name,
- column_name,
- name,
- schema=schema,
- existing_type=existing_type,
- existing_server_default=existing_server_default,
- existing_nullable=existing_nullable,
- )
- )
- def add_column(
- self,
- table_name: str,
- column: Column[Any],
- *,
- schema: Optional[Union[str, quoted_name]] = None,
- if_not_exists: Optional[bool] = None,
- ) -> None:
- self._exec(
- base.AddColumn(
- table_name,
- column,
- schema=schema,
- if_not_exists=if_not_exists,
- )
- )
- def drop_column(
- self,
- table_name: str,
- column: Column[Any],
- *,
- schema: Optional[str] = None,
- if_exists: Optional[bool] = None,
- **kw,
- ) -> None:
- self._exec(
- base.DropColumn(
- table_name, column, schema=schema, if_exists=if_exists
- )
- )
- def add_constraint(self, const: Any) -> None:
- if const._create_rule is None or const._create_rule(self):
- self._exec(schema.AddConstraint(const))
- def drop_constraint(self, const: Constraint, **kw: Any) -> None:
- self._exec(schema.DropConstraint(const, **kw))
- def rename_table(
- self,
- old_table_name: str,
- new_table_name: Union[str, quoted_name],
- schema: Optional[Union[str, quoted_name]] = None,
- ) -> None:
- self._exec(
- base.RenameTable(old_table_name, new_table_name, schema=schema)
- )
- def create_table(self, table: Table, **kw: Any) -> None:
- table.dispatch.before_create(
- table, self.connection, checkfirst=False, _ddl_runner=self
- )
- self._exec(schema.CreateTable(table, **kw))
- table.dispatch.after_create(
- table, self.connection, checkfirst=False, _ddl_runner=self
- )
- for index in table.indexes:
- self._exec(schema.CreateIndex(index))
- with_comment = (
- self.dialect.supports_comments and not self.dialect.inline_comments
- )
- comment = table.comment
- if comment and with_comment:
- self.create_table_comment(table)
- for column in table.columns:
- comment = column.comment
- if comment and with_comment:
- self.create_column_comment(column)
- def drop_table(self, table: Table, **kw: Any) -> None:
- table.dispatch.before_drop(
- table, self.connection, checkfirst=False, _ddl_runner=self
- )
- self._exec(schema.DropTable(table, **kw))
- table.dispatch.after_drop(
- table, self.connection, checkfirst=False, _ddl_runner=self
- )
- def create_index(self, index: Index, **kw: Any) -> None:
- self._exec(schema.CreateIndex(index, **kw))
- def create_table_comment(self, table: Table) -> None:
- self._exec(schema.SetTableComment(table))
- def drop_table_comment(self, table: Table) -> None:
- self._exec(schema.DropTableComment(table))
- def create_column_comment(self, column: Column[Any]) -> None:
- self._exec(schema.SetColumnComment(column))
- def drop_index(self, index: Index, **kw: Any) -> None:
- self._exec(schema.DropIndex(index, **kw))
- def bulk_insert(
- self,
- table: Union[TableClause, Table],
- rows: List[dict],
- multiinsert: bool = True,
- ) -> None:
- if not isinstance(rows, list):
- raise TypeError("List expected")
- elif rows and not isinstance(rows[0], dict):
- raise TypeError("List of dictionaries expected")
- if self.as_sql:
- for row in rows:
- self._exec(
- table.insert()
- .inline()
- .values(
- **{
- k: (
- sqla_compat._literal_bindparam(
- k, v, type_=table.c[k].type
- )
- if not isinstance(
- v, sqla_compat._literal_bindparam
- )
- else v
- )
- for k, v in row.items()
- }
- )
- )
- else:
- if rows:
- if multiinsert:
- self._exec(table.insert().inline(), multiparams=rows)
- else:
- for row in rows:
- self._exec(table.insert().inline().values(**row))
- def _tokenize_column_type(self, column: Column) -> Params:
- definition: str
- definition = self.dialect.type_compiler.process(column.type).lower()
- # tokenize the SQLAlchemy-generated version of a type, so that
- # the two can be compared.
- #
- # examples:
- # NUMERIC(10, 5)
- # TIMESTAMP WITH TIMEZONE
- # INTEGER UNSIGNED
- # INTEGER (10) UNSIGNED
- # INTEGER(10) UNSIGNED
- # varchar character set utf8
- #
- tokens: List[str] = re.findall(r"[\w\-_]+|\(.+?\)", definition)
- term_tokens: List[str] = []
- paren_term = None
- for token in tokens:
- if re.match(r"^\(.*\)$", token):
- paren_term = token
- else:
- term_tokens.append(token)
- params = Params(term_tokens[0], term_tokens[1:], [], {})
- if paren_term:
- term: str
- for term in re.findall("[^(),]+", paren_term):
- if "=" in term:
- key, val = term.split("=")
- params.kwargs[key.strip()] = val.strip()
- else:
- params.args.append(term.strip())
- return params
- def _column_types_match(
- self, inspector_params: Params, metadata_params: Params
- ) -> bool:
- if inspector_params.token0 == metadata_params.token0:
- return True
- synonyms = [{t.lower() for t in batch} for batch in self.type_synonyms]
- inspector_all_terms = " ".join(
- [inspector_params.token0] + inspector_params.tokens
- )
- metadata_all_terms = " ".join(
- [metadata_params.token0] + metadata_params.tokens
- )
- for batch in synonyms:
- if {inspector_all_terms, metadata_all_terms}.issubset(batch) or {
- inspector_params.token0,
- metadata_params.token0,
- }.issubset(batch):
- return True
- return False
- def _column_args_match(
- self, inspected_params: Params, meta_params: Params
- ) -> bool:
- """We want to compare column parameters. However, we only want
- to compare parameters that are set. If they both have `collation`,
- we want to make sure they are the same. However, if only one
- specifies it, dont flag it for being less specific
- """
- if (
- len(meta_params.tokens) == len(inspected_params.tokens)
- and meta_params.tokens != inspected_params.tokens
- ):
- return False
- if (
- len(meta_params.args) == len(inspected_params.args)
- and meta_params.args != inspected_params.args
- ):
- return False
- insp = " ".join(inspected_params.tokens).lower()
- meta = " ".join(meta_params.tokens).lower()
- for reg in self.type_arg_extract:
- mi = re.search(reg, insp)
- mm = re.search(reg, meta)
- if mi and mm and mi.group(1) != mm.group(1):
- return False
- return True
- def compare_type(
- self, inspector_column: Column[Any], metadata_column: Column
- ) -> bool:
- """Returns True if there ARE differences between the types of the two
- columns. Takes impl.type_synonyms into account between retrospected
- and metadata types
- """
- inspector_params = self._tokenize_column_type(inspector_column)
- metadata_params = self._tokenize_column_type(metadata_column)
- if not self._column_types_match(inspector_params, metadata_params):
- return True
- if not self._column_args_match(inspector_params, metadata_params):
- return True
- return False
- def compare_server_default(
- self,
- inspector_column,
- metadata_column,
- rendered_metadata_default,
- rendered_inspector_default,
- ):
- return rendered_inspector_default != rendered_metadata_default
- def correct_for_autogen_constraints(
- self,
- conn_uniques: Set[UniqueConstraint],
- conn_indexes: Set[Index],
- metadata_unique_constraints: Set[UniqueConstraint],
- metadata_indexes: Set[Index],
- ) -> None:
- pass
- def cast_for_batch_migrate(self, existing, existing_transfer, new_type):
- if existing.type._type_affinity is not new_type._type_affinity:
- existing_transfer["expr"] = cast(
- existing_transfer["expr"], new_type
- )
- def render_ddl_sql_expr(
- self, expr: ClauseElement, is_server_default: bool = False, **kw: Any
- ) -> str:
- """Render a SQL expression that is typically a server default,
- index expression, etc.
- """
- compile_kw = {"literal_binds": True, "include_table": False}
- return str(
- expr.compile(dialect=self.dialect, compile_kwargs=compile_kw)
- )
- def _compat_autogen_column_reflect(self, inspector: Inspector) -> Callable:
- return self.autogen_column_reflect
- def correct_for_autogen_foreignkeys(
- self,
- conn_fks: Set[ForeignKeyConstraint],
- metadata_fks: Set[ForeignKeyConstraint],
- ) -> None:
- pass
- def autogen_column_reflect(self, inspector, table, column_info):
- """A hook that is attached to the 'column_reflect' event for when
- a Table is reflected from the database during the autogenerate
- process.
- Dialects can elect to modify the information gathered here.
- """
- def start_migrations(self) -> None:
- """A hook called when :meth:`.EnvironmentContext.run_migrations`
- is called.
- Implementations can set up per-migration-run state here.
- """
- def emit_begin(self) -> None:
- """Emit the string ``BEGIN``, or the backend-specific
- equivalent, on the current connection context.
- This is used in offline mode and typically
- via :meth:`.EnvironmentContext.begin_transaction`.
- """
- self.static_output("BEGIN" + self.command_terminator)
- def emit_commit(self) -> None:
- """Emit the string ``COMMIT``, or the backend-specific
- equivalent, on the current connection context.
- This is used in offline mode and typically
- via :meth:`.EnvironmentContext.begin_transaction`.
- """
- self.static_output("COMMIT" + self.command_terminator)
- def render_type(
- self, type_obj: TypeEngine, autogen_context: AutogenContext
- ) -> Union[str, Literal[False]]:
- return False
- def _compare_identity_default(self, metadata_identity, inspector_identity):
- # ignored contains the attributes that were not considered
- # because assumed to their default values in the db.
- diff, ignored = _compare_identity_options(
- metadata_identity,
- inspector_identity,
- schema.Identity(),
- skip={"always"},
- )
- meta_always = getattr(metadata_identity, "always", None)
- inspector_always = getattr(inspector_identity, "always", None)
- # None and False are the same in this comparison
- if bool(meta_always) != bool(inspector_always):
- diff.add("always")
- diff.difference_update(self.identity_attrs_ignore)
- # returns 3 values:
- return (
- # different identity attributes
- diff,
- # ignored identity attributes
- ignored,
- # if the two identity should be considered different
- bool(diff) or bool(metadata_identity) != bool(inspector_identity),
- )
- def _compare_index_unique(
- self, metadata_index: Index, reflected_index: Index
- ) -> Optional[str]:
- conn_unique = bool(reflected_index.unique)
- meta_unique = bool(metadata_index.unique)
- if conn_unique != meta_unique:
- return f"unique={conn_unique} to unique={meta_unique}"
- else:
- return None
- def _create_metadata_constraint_sig(
- self, constraint: _autogen._C, **opts: Any
- ) -> _constraint_sig[_autogen._C]:
- return _constraint_sig.from_constraint(True, self, constraint, **opts)
- def _create_reflected_constraint_sig(
- self, constraint: _autogen._C, **opts: Any
- ) -> _constraint_sig[_autogen._C]:
- return _constraint_sig.from_constraint(False, self, constraint, **opts)
- def compare_indexes(
- self,
- metadata_index: Index,
- reflected_index: Index,
- ) -> ComparisonResult:
- """Compare two indexes by comparing the signature generated by
- ``create_index_sig``.
- This method returns a ``ComparisonResult``.
- """
- msg: List[str] = []
- unique_msg = self._compare_index_unique(
- metadata_index, reflected_index
- )
- if unique_msg:
- msg.append(unique_msg)
- m_sig = self._create_metadata_constraint_sig(metadata_index)
- r_sig = self._create_reflected_constraint_sig(reflected_index)
- assert _autogen.is_index_sig(m_sig)
- assert _autogen.is_index_sig(r_sig)
- # The assumption is that the index have no expression
- for sig in m_sig, r_sig:
- if sig.has_expressions:
- log.warning(
- "Generating approximate signature for index %s. "
- "The dialect "
- "implementation should either skip expression indexes "
- "or provide a custom implementation.",
- sig.const,
- )
- if m_sig.column_names != r_sig.column_names:
- msg.append(
- f"expression {r_sig.column_names} to {m_sig.column_names}"
- )
- if msg:
- return ComparisonResult.Different(msg)
- else:
- return ComparisonResult.Equal()
- def compare_unique_constraint(
- self,
- metadata_constraint: UniqueConstraint,
- reflected_constraint: UniqueConstraint,
- ) -> ComparisonResult:
- """Compare two unique constraints by comparing the two signatures.
- The arguments are two tuples that contain the unique constraint and
- the signatures generated by ``create_unique_constraint_sig``.
- This method returns a ``ComparisonResult``.
- """
- metadata_tup = self._create_metadata_constraint_sig(
- metadata_constraint
- )
- reflected_tup = self._create_reflected_constraint_sig(
- reflected_constraint
- )
- meta_sig = metadata_tup.unnamed
- conn_sig = reflected_tup.unnamed
- if conn_sig != meta_sig:
- return ComparisonResult.Different(
- f"expression {conn_sig} to {meta_sig}"
- )
- else:
- return ComparisonResult.Equal()
- def _skip_functional_indexes(self, metadata_indexes, conn_indexes):
- conn_indexes_by_name = {c.name: c for c in conn_indexes}
- for idx in list(metadata_indexes):
- if idx.name in conn_indexes_by_name:
- continue
- iex = sqla_compat.is_expression_index(idx)
- if iex:
- util.warn(
- "autogenerate skipping metadata-specified "
- "expression-based index "
- f"{idx.name!r}; dialect {self.__dialect__!r} under "
- f"SQLAlchemy {sqla_compat.sqlalchemy_version} can't "
- "reflect these indexes so they can't be compared"
- )
- metadata_indexes.discard(idx)
- def adjust_reflected_dialect_options(
- self, reflected_object: Dict[str, Any], kind: str
- ) -> Dict[str, Any]:
- return reflected_object.get("dialect_options", {})
- class Params(NamedTuple):
- token0: str
- tokens: List[str]
- args: List[str]
- kwargs: Dict[str, str]
- def _compare_identity_options(
- metadata_io: Union[schema.Identity, schema.Sequence, None],
- inspector_io: Union[schema.Identity, schema.Sequence, None],
- default_io: Union[schema.Identity, schema.Sequence],
- skip: Set[str],
- ):
- # this can be used for identity or sequence compare.
- # default_io is an instance of IdentityOption with all attributes to the
- # default value.
- meta_d = sqla_compat._get_identity_options_dict(metadata_io)
- insp_d = sqla_compat._get_identity_options_dict(inspector_io)
- diff = set()
- ignored_attr = set()
- def check_dicts(
- meta_dict: Mapping[str, Any],
- insp_dict: Mapping[str, Any],
- default_dict: Mapping[str, Any],
- attrs: Iterable[str],
- ):
- for attr in set(attrs).difference(skip):
- meta_value = meta_dict.get(attr)
- insp_value = insp_dict.get(attr)
- if insp_value != meta_value:
- default_value = default_dict.get(attr)
- if meta_value == default_value:
- ignored_attr.add(attr)
- else:
- diff.add(attr)
- check_dicts(
- meta_d,
- insp_d,
- sqla_compat._get_identity_options_dict(default_io),
- set(meta_d).union(insp_d),
- )
- if sqla_compat.identity_has_dialect_kwargs:
- assert hasattr(default_io, "dialect_kwargs")
- # use only the dialect kwargs in inspector_io since metadata_io
- # can have options for many backends
- check_dicts(
- getattr(metadata_io, "dialect_kwargs", {}),
- getattr(inspector_io, "dialect_kwargs", {}),
- default_io.dialect_kwargs,
- getattr(inspector_io, "dialect_kwargs", {}),
- )
- return diff, ignored_attr
|