test_reflection.py 111 KB


  1. # testing/suite/test_reflection.py
  2. # Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. # mypy: ignore-errors
  8. import contextlib
  9. import operator
  10. import re
  11. import sqlalchemy as sa
  12. from .. import config
  13. from .. import engines
  14. from .. import eq_
  15. from .. import eq_regex
  16. from .. import expect_raises
  17. from .. import expect_raises_message
  18. from .. import expect_warnings
  19. from .. import fixtures
  20. from .. import is_
  21. from ..provision import get_temp_table_name
  22. from ..provision import temp_table_keyword_args
  23. from ..schema import Column
  24. from ..schema import Table
  25. from ... import Boolean
  26. from ... import DateTime
  27. from ... import event
  28. from ... import ForeignKey
  29. from ... import func
  30. from ... import Identity
  31. from ... import inspect
  32. from ... import Integer
  33. from ... import MetaData
  34. from ... import String
  35. from ... import testing
  36. from ... import types as sql_types
  37. from ...engine import Inspector
  38. from ...engine import ObjectKind
  39. from ...engine import ObjectScope
  40. from ...exc import NoSuchTableError
  41. from ...exc import UnreflectableTableError
  42. from ...schema import DDL
  43. from ...schema import Index
  44. from ...sql.elements import quoted_name
  45. from ...sql.schema import BLANK_SCHEMA
  46. from ...testing import ComparesIndexes
  47. from ...testing import ComparesTables
  48. from ...testing import is_false
  49. from ...testing import is_true
  50. from ...testing import mock
  51. metadata, users = None, None
  52. class OneConnectionTablesTest(fixtures.TablesTest):
  53. @classmethod
  54. def setup_bind(cls):
  55. # TODO: when temp tables are subject to server reset,
  56. # this will also have to disable that server reset from
  57. # happening
  58. if config.requirements.independent_connections.enabled:
  59. from sqlalchemy import pool
  60. return engines.testing_engine(
  61. options=dict(poolclass=pool.StaticPool, scope="class"),
  62. )
  63. else:
  64. return config.db
  65. class HasTableTest(OneConnectionTablesTest):
  66. __backend__ = True
  67. @classmethod
  68. def define_tables(cls, metadata):
  69. Table(
  70. "test_table",
  71. metadata,
  72. Column("id", Integer, primary_key=True),
  73. Column("data", String(50)),
  74. )
  75. if testing.requires.schemas.enabled:
  76. Table(
  77. "test_table_s",
  78. metadata,
  79. Column("id", Integer, primary_key=True),
  80. Column("data", String(50)),
  81. schema=config.test_schema,
  82. )
  83. if testing.requires.view_reflection:
  84. cls.define_views(metadata)
  85. if testing.requires.has_temp_table.enabled:
  86. cls.define_temp_tables(metadata)
  87. @classmethod
  88. def define_views(cls, metadata):
  89. query = "CREATE VIEW vv AS SELECT id, data FROM test_table"
  90. event.listen(metadata, "after_create", DDL(query))
  91. event.listen(metadata, "before_drop", DDL("DROP VIEW vv"))
  92. if testing.requires.schemas.enabled:
  93. query = (
  94. "CREATE VIEW %s.vv AS SELECT id, data FROM %s.test_table_s"
  95. % (
  96. config.test_schema,
  97. config.test_schema,
  98. )
  99. )
  100. event.listen(metadata, "after_create", DDL(query))
  101. event.listen(
  102. metadata,
  103. "before_drop",
  104. DDL("DROP VIEW %s.vv" % (config.test_schema)),
  105. )
  106. @classmethod
  107. def temp_table_name(cls):
  108. return get_temp_table_name(
  109. config, config.db, f"user_tmp_{config.ident}"
  110. )
  111. @classmethod
  112. def define_temp_tables(cls, metadata):
  113. kw = temp_table_keyword_args(config, config.db)
  114. table_name = cls.temp_table_name()
  115. user_tmp = Table(
  116. table_name,
  117. metadata,
  118. Column("id", sa.INT, primary_key=True),
  119. Column("name", sa.VARCHAR(50)),
  120. **kw,
  121. )
  122. if (
  123. testing.requires.view_reflection.enabled
  124. and testing.requires.temporary_views.enabled
  125. ):
  126. event.listen(
  127. user_tmp,
  128. "after_create",
  129. DDL(
  130. "create temporary view user_tmp_v as "
  131. "select * from user_tmp_%s" % config.ident
  132. ),
  133. )
  134. event.listen(user_tmp, "before_drop", DDL("drop view user_tmp_v"))
  135. def test_has_table(self):
  136. with config.db.begin() as conn:
  137. is_true(config.db.dialect.has_table(conn, "test_table"))
  138. is_false(config.db.dialect.has_table(conn, "test_table_s"))
  139. is_false(config.db.dialect.has_table(conn, "nonexistent_table"))
  140. def test_has_table_cache(self, metadata):
  141. insp = inspect(config.db)
  142. is_true(insp.has_table("test_table"))
  143. nt = Table("new_table", metadata, Column("col", Integer))
  144. is_false(insp.has_table("new_table"))
  145. nt.create(config.db)
  146. try:
  147. is_false(insp.has_table("new_table"))
  148. insp.clear_cache()
  149. is_true(insp.has_table("new_table"))
  150. finally:
  151. nt.drop(config.db)
  152. @testing.requires.schemas
  153. def test_has_table_schema(self):
  154. with config.db.begin() as conn:
  155. is_false(
  156. config.db.dialect.has_table(
  157. conn, "test_table", schema=config.test_schema
  158. )
  159. )
  160. is_true(
  161. config.db.dialect.has_table(
  162. conn, "test_table_s", schema=config.test_schema
  163. )
  164. )
  165. is_false(
  166. config.db.dialect.has_table(
  167. conn, "nonexistent_table", schema=config.test_schema
  168. )
  169. )
  170. @testing.requires.schemas
  171. def test_has_table_nonexistent_schema(self):
  172. with config.db.begin() as conn:
  173. is_false(
  174. config.db.dialect.has_table(
  175. conn, "test_table", schema="nonexistent_schema"
  176. )
  177. )
  178. @testing.requires.views
  179. def test_has_table_view(self, connection):
  180. insp = inspect(connection)
  181. is_true(insp.has_table("vv"))
  182. @testing.requires.has_temp_table
  183. def test_has_table_temp_table(self, connection):
  184. insp = inspect(connection)
  185. temp_table_name = self.temp_table_name()
  186. is_true(insp.has_table(temp_table_name))
  187. @testing.requires.has_temp_table
  188. @testing.requires.view_reflection
  189. @testing.requires.temporary_views
  190. def test_has_table_temp_view(self, connection):
  191. insp = inspect(connection)
  192. is_true(insp.has_table("user_tmp_v"))
  193. @testing.requires.views
  194. @testing.requires.schemas
  195. def test_has_table_view_schema(self, connection):
  196. insp = inspect(connection)
  197. is_true(insp.has_table("vv", config.test_schema))
  198. class HasIndexTest(fixtures.TablesTest):
  199. __backend__ = True
  200. __requires__ = ("index_reflection",)
  201. @classmethod
  202. def define_tables(cls, metadata):
  203. tt = Table(
  204. "test_table",
  205. metadata,
  206. Column("id", Integer, primary_key=True),
  207. Column("data", String(50)),
  208. Column("data2", String(50)),
  209. )
  210. Index("my_idx", tt.c.data)
  211. if testing.requires.schemas.enabled:
  212. tt = Table(
  213. "test_table",
  214. metadata,
  215. Column("id", Integer, primary_key=True),
  216. Column("data", String(50)),
  217. schema=config.test_schema,
  218. )
  219. Index("my_idx_s", tt.c.data)
  220. kind = testing.combinations("dialect", "inspector", argnames="kind")
  221. def _has_index(self, kind, conn):
  222. if kind == "dialect":
  223. return lambda *a, **k: config.db.dialect.has_index(conn, *a, **k)
  224. else:
  225. return inspect(conn).has_index
  226. @kind
  227. def test_has_index(self, kind, connection, metadata):
  228. meth = self._has_index(kind, connection)
  229. assert meth("test_table", "my_idx")
  230. assert not meth("test_table", "my_idx_s")
  231. assert not meth("nonexistent_table", "my_idx")
  232. assert not meth("test_table", "nonexistent_idx")
  233. assert not meth("test_table", "my_idx_2")
  234. assert not meth("test_table_2", "my_idx_3")
  235. idx = Index("my_idx_2", self.tables.test_table.c.data2)
  236. tbl = Table(
  237. "test_table_2",
  238. metadata,
  239. Column("foo", Integer),
  240. Index("my_idx_3", "foo"),
  241. )
  242. idx.create(connection)
  243. tbl.create(connection)
  244. try:
  245. if kind == "inspector":
  246. assert not meth("test_table", "my_idx_2")
  247. assert not meth("test_table_2", "my_idx_3")
  248. meth.__self__.clear_cache()
  249. assert meth("test_table", "my_idx_2") is True
  250. assert meth("test_table_2", "my_idx_3") is True
  251. finally:
  252. tbl.drop(connection)
  253. idx.drop(connection)
  254. @testing.requires.schemas
  255. @kind
  256. def test_has_index_schema(self, kind, connection):
  257. meth = self._has_index(kind, connection)
  258. assert meth("test_table", "my_idx_s", schema=config.test_schema)
  259. assert not meth("test_table", "my_idx", schema=config.test_schema)
  260. assert not meth(
  261. "nonexistent_table", "my_idx_s", schema=config.test_schema
  262. )
  263. assert not meth(
  264. "test_table", "nonexistent_idx_s", schema=config.test_schema
  265. )
  266. class BizarroCharacterTest(fixtures.TestBase):
  267. __backend__ = True
  268. def column_names():
  269. return testing.combinations(
  270. ("plainname",),
  271. ("(3)",),
  272. ("col%p",),
  273. ("[brack]",),
  274. argnames="columnname",
  275. )
  276. def table_names():
  277. return testing.combinations(
  278. ("plain",),
  279. ("(2)",),
  280. ("per % cent",),
  281. ("[brackets]",),
  282. argnames="tablename",
  283. )
  284. @testing.variation("use_composite", [True, False])
  285. @column_names()
  286. @table_names()
  287. @testing.requires.foreign_key_constraint_reflection
  288. def test_fk_ref(
  289. self, connection, metadata, use_composite, tablename, columnname
  290. ):
  291. """tests for #10275"""
  292. tt = Table(
  293. tablename,
  294. metadata,
  295. Column(columnname, Integer, key="id", primary_key=True),
  296. test_needs_fk=True,
  297. )
  298. if use_composite:
  299. tt.append_column(Column("id2", Integer, primary_key=True))
  300. if use_composite:
  301. Table(
  302. "other",
  303. metadata,
  304. Column("id", Integer, primary_key=True),
  305. Column("ref", Integer),
  306. Column("ref2", Integer),
  307. sa.ForeignKeyConstraint(["ref", "ref2"], [tt.c.id, tt.c.id2]),
  308. test_needs_fk=True,
  309. )
  310. else:
  311. Table(
  312. "other",
  313. metadata,
  314. Column("id", Integer, primary_key=True),
  315. Column("ref", ForeignKey(tt.c.id)),
  316. test_needs_fk=True,
  317. )
  318. metadata.create_all(connection)
  319. m2 = MetaData()
  320. o2 = Table("other", m2, autoload_with=connection)
  321. t1 = m2.tables[tablename]
  322. assert o2.c.ref.references(t1.c[0])
  323. if use_composite:
  324. assert o2.c.ref2.references(t1.c[1])
  325. @column_names()
  326. @table_names()
  327. @testing.requires.identity_columns
  328. def test_reflect_identity(
  329. self, tablename, columnname, connection, metadata
  330. ):
  331. Table(
  332. tablename,
  333. metadata,
  334. Column(columnname, Integer, Identity(), primary_key=True),
  335. )
  336. metadata.create_all(connection)
  337. insp = inspect(connection)
  338. eq_(insp.get_columns(tablename)[0]["identity"]["start"], 1)
  339. @column_names()
  340. @table_names()
  341. @testing.requires.comment_reflection
  342. def test_reflect_comments(
  343. self, tablename, columnname, connection, metadata
  344. ):
  345. Table(
  346. tablename,
  347. metadata,
  348. Column("id", Integer, primary_key=True),
  349. Column(columnname, Integer, comment="some comment"),
  350. )
  351. metadata.create_all(connection)
  352. insp = inspect(connection)
  353. eq_(insp.get_columns(tablename)[1]["comment"], "some comment")
  354. class TempTableElementsTest(fixtures.TestBase):
  355. __backend__ = True
  356. __requires__ = ("temp_table_reflection",)
  357. @testing.fixture
  358. def tablename(self):
  359. return get_temp_table_name(
  360. config, config.db, f"ident_tmp_{config.ident}"
  361. )
  362. @testing.requires.identity_columns
  363. def test_reflect_identity(self, tablename, connection, metadata):
  364. Table(
  365. tablename,
  366. metadata,
  367. Column("id", Integer, Identity(), primary_key=True),
  368. )
  369. metadata.create_all(connection)
  370. insp = inspect(connection)
  371. eq_(insp.get_columns(tablename)[0]["identity"]["start"], 1)
  372. @testing.requires.temp_table_comment_reflection
  373. def test_reflect_comments(self, tablename, connection, metadata):
  374. Table(
  375. tablename,
  376. metadata,
  377. Column("id", Integer, primary_key=True),
  378. Column("foobar", Integer, comment="some comment"),
  379. )
  380. metadata.create_all(connection)
  381. insp = inspect(connection)
  382. eq_(insp.get_columns(tablename)[1]["comment"], "some comment")
  383. class QuotedNameArgumentTest(fixtures.TablesTest):
  384. run_create_tables = "once"
  385. __backend__ = True
  386. @classmethod
  387. def define_tables(cls, metadata):
  388. Table(
  389. "quote ' one",
  390. metadata,
  391. Column("id", Integer),
  392. Column("name", String(50)),
  393. Column("data", String(50)),
  394. Column("related_id", Integer),
  395. sa.PrimaryKeyConstraint("id", name="pk quote ' one"),
  396. sa.Index("ix quote ' one", "name"),
  397. sa.UniqueConstraint(
  398. "data",
  399. name="uq quote' one",
  400. ),
  401. sa.ForeignKeyConstraint(
  402. ["id"], ["related.id"], name="fk quote ' one"
  403. ),
  404. sa.CheckConstraint("name != 'foo'", name="ck quote ' one"),
  405. comment=r"""quote ' one comment""",
  406. test_needs_fk=True,
  407. )
  408. if testing.requires.symbol_names_w_double_quote.enabled:
  409. Table(
  410. 'quote " two',
  411. metadata,
  412. Column("id", Integer),
  413. Column("name", String(50)),
  414. Column("data", String(50)),
  415. Column("related_id", Integer),
  416. sa.PrimaryKeyConstraint("id", name='pk quote " two'),
  417. sa.Index('ix quote " two', "name"),
  418. sa.UniqueConstraint(
  419. "data",
  420. name='uq quote" two',
  421. ),
  422. sa.ForeignKeyConstraint(
  423. ["id"], ["related.id"], name='fk quote " two'
  424. ),
  425. sa.CheckConstraint("name != 'foo'", name='ck quote " two '),
  426. comment=r"""quote " two comment""",
  427. test_needs_fk=True,
  428. )
  429. Table(
  430. "related",
  431. metadata,
  432. Column("id", Integer, primary_key=True),
  433. Column("related", Integer),
  434. test_needs_fk=True,
  435. )
  436. if testing.requires.view_column_reflection.enabled:
  437. if testing.requires.symbol_names_w_double_quote.enabled:
  438. names = [
  439. "quote ' one",
  440. 'quote " two',
  441. ]
  442. else:
  443. names = [
  444. "quote ' one",
  445. ]
  446. for name in names:
  447. query = "CREATE VIEW %s AS SELECT * FROM %s" % (
  448. config.db.dialect.identifier_preparer.quote(
  449. "view %s" % name
  450. ),
  451. config.db.dialect.identifier_preparer.quote(name),
  452. )
  453. event.listen(metadata, "after_create", DDL(query))
  454. event.listen(
  455. metadata,
  456. "before_drop",
  457. DDL(
  458. "DROP VIEW %s"
  459. % config.db.dialect.identifier_preparer.quote(
  460. "view %s" % name
  461. )
  462. ),
  463. )
  464. def quote_fixtures(fn):
  465. return testing.combinations(
  466. ("quote ' one",),
  467. ('quote " two', testing.requires.symbol_names_w_double_quote),
  468. )(fn)
  469. @quote_fixtures
  470. def test_get_table_options(self, name):
  471. insp = inspect(config.db)
  472. if testing.requires.reflect_table_options.enabled:
  473. res = insp.get_table_options(name)
  474. is_true(isinstance(res, dict))
  475. else:
  476. with expect_raises(NotImplementedError):
  477. insp.get_table_options(name)
  478. @quote_fixtures
  479. @testing.requires.view_column_reflection
  480. def test_get_view_definition(self, name):
  481. insp = inspect(config.db)
  482. assert insp.get_view_definition("view %s" % name)
  483. @quote_fixtures
  484. def test_get_columns(self, name):
  485. insp = inspect(config.db)
  486. assert insp.get_columns(name)
  487. @quote_fixtures
  488. def test_get_pk_constraint(self, name):
  489. insp = inspect(config.db)
  490. assert insp.get_pk_constraint(name)
  491. @quote_fixtures
  492. @testing.requires.foreign_key_constraint_reflection
  493. def test_get_foreign_keys(self, name):
  494. insp = inspect(config.db)
  495. assert insp.get_foreign_keys(name)
  496. @quote_fixtures
  497. @testing.requires.index_reflection
  498. def test_get_indexes(self, name):
  499. insp = inspect(config.db)
  500. assert insp.get_indexes(name)
  501. @quote_fixtures
  502. @testing.requires.unique_constraint_reflection
  503. def test_get_unique_constraints(self, name):
  504. insp = inspect(config.db)
  505. assert insp.get_unique_constraints(name)
  506. @quote_fixtures
  507. @testing.requires.comment_reflection
  508. def test_get_table_comment(self, name):
  509. insp = inspect(config.db)
  510. assert insp.get_table_comment(name)
  511. @quote_fixtures
  512. @testing.requires.check_constraint_reflection
  513. def test_get_check_constraints(self, name):
  514. insp = inspect(config.db)
  515. assert insp.get_check_constraints(name)
  516. def _multi_combination(fn):
  517. schema = testing.combinations(
  518. None,
  519. (
  520. lambda: config.test_schema,
  521. testing.requires.schemas,
  522. ),
  523. argnames="schema",
  524. )
  525. scope = testing.combinations(
  526. ObjectScope.DEFAULT,
  527. ObjectScope.TEMPORARY,
  528. ObjectScope.ANY,
  529. argnames="scope",
  530. )
  531. kind = testing.combinations(
  532. ObjectKind.TABLE,
  533. ObjectKind.VIEW,
  534. ObjectKind.MATERIALIZED_VIEW,
  535. ObjectKind.ANY,
  536. ObjectKind.ANY_VIEW,
  537. ObjectKind.TABLE | ObjectKind.VIEW,
  538. ObjectKind.TABLE | ObjectKind.MATERIALIZED_VIEW,
  539. argnames="kind",
  540. )
  541. filter_names = testing.combinations(True, False, argnames="use_filter")
  542. return schema(scope(kind(filter_names(fn))))
  543. class ComponentReflectionTest(ComparesTables, OneConnectionTablesTest):
  544. run_inserts = run_deletes = None
  545. __backend__ = True
  546. @classmethod
  547. def define_tables(cls, metadata):
  548. cls.define_reflected_tables(metadata, None)
  549. if testing.requires.schemas.enabled:
  550. cls.define_reflected_tables(metadata, testing.config.test_schema)
  551. @classmethod
  552. def define_reflected_tables(cls, metadata, schema):
  553. if schema:
  554. schema_prefix = schema + "."
  555. else:
  556. schema_prefix = ""
  557. if testing.requires.self_referential_foreign_keys.enabled:
  558. parent_id_args = (
  559. ForeignKey(
  560. "%susers.user_id" % schema_prefix, name="user_id_fk"
  561. ),
  562. )
  563. else:
  564. parent_id_args = ()
  565. users = Table(
  566. "users",
  567. metadata,
  568. Column("user_id", sa.INT, primary_key=True),
  569. Column("test1", sa.CHAR(5), nullable=False),
  570. Column("test2", sa.Float(), nullable=False),
  571. Column("parent_user_id", sa.Integer, *parent_id_args),
  572. sa.CheckConstraint(
  573. "test2 > 0",
  574. name="zz_test2_gt_zero",
  575. comment="users check constraint",
  576. ),
  577. sa.CheckConstraint("test2 <= 1000"),
  578. schema=schema,
  579. test_needs_fk=True,
  580. )
  581. Table(
  582. "dingalings",
  583. metadata,
  584. Column("dingaling_id", sa.Integer, primary_key=True),
  585. Column(
  586. "address_id",
  587. sa.Integer,
  588. ForeignKey(
  589. "%semail_addresses.address_id" % schema_prefix,
  590. name="zz_email_add_id_fg",
  591. comment="di fk comment",
  592. ),
  593. ),
  594. Column(
  595. "id_user",
  596. sa.Integer,
  597. ForeignKey("%susers.user_id" % schema_prefix),
  598. ),
  599. Column("data", sa.String(30), unique=True),
  600. sa.CheckConstraint(
  601. "address_id > 0 AND address_id < 1000",
  602. name="address_id_gt_zero",
  603. ),
  604. sa.UniqueConstraint(
  605. "address_id",
  606. "dingaling_id",
  607. name="zz_dingalings_multiple",
  608. comment="di unique comment",
  609. ),
  610. schema=schema,
  611. test_needs_fk=True,
  612. )
  613. Table(
  614. "email_addresses",
  615. metadata,
  616. Column("address_id", sa.Integer),
  617. Column("remote_user_id", sa.Integer, ForeignKey(users.c.user_id)),
  618. Column("email_address", sa.String(20), index=True),
  619. sa.PrimaryKeyConstraint(
  620. "address_id", name="email_ad_pk", comment="ea pk comment"
  621. ),
  622. schema=schema,
  623. test_needs_fk=True,
  624. )
  625. Table(
  626. "comment_test",
  627. metadata,
  628. Column("id", sa.Integer, primary_key=True, comment="id comment"),
  629. Column("data", sa.String(20), comment="data % comment"),
  630. Column(
  631. "d2",
  632. sa.String(20),
  633. comment=r"""Comment types type speedily ' " \ '' Fun!""",
  634. ),
  635. Column("d3", sa.String(42), comment="Comment\nwith\rescapes"),
  636. schema=schema,
  637. comment=r"""the test % ' " \ table comment""",
  638. )
  639. Table(
  640. "no_constraints",
  641. metadata,
  642. Column("data", sa.String(20)),
  643. schema=schema,
  644. comment="no\nconstraints\rhas\fescaped\vcomment",
  645. )
  646. if testing.requires.cross_schema_fk_reflection.enabled:
  647. if schema is None:
  648. Table(
  649. "local_table",
  650. metadata,
  651. Column("id", sa.Integer, primary_key=True),
  652. Column("data", sa.String(20)),
  653. Column(
  654. "remote_id",
  655. ForeignKey(
  656. "%s.remote_table_2.id" % testing.config.test_schema
  657. ),
  658. ),
  659. test_needs_fk=True,
  660. schema=config.db.dialect.default_schema_name,
  661. )
  662. else:
  663. Table(
  664. "remote_table",
  665. metadata,
  666. Column("id", sa.Integer, primary_key=True),
  667. Column(
  668. "local_id",
  669. ForeignKey(
  670. "%s.local_table.id"
  671. % config.db.dialect.default_schema_name
  672. ),
  673. ),
  674. Column("data", sa.String(20)),
  675. schema=schema,
  676. test_needs_fk=True,
  677. )
  678. Table(
  679. "remote_table_2",
  680. metadata,
  681. Column("id", sa.Integer, primary_key=True),
  682. Column("data", sa.String(20)),
  683. schema=schema,
  684. test_needs_fk=True,
  685. )
  686. if testing.requires.index_reflection.enabled:
  687. Index("users_t_idx", users.c.test1, users.c.test2, unique=True)
  688. Index(
  689. "users_all_idx", users.c.user_id, users.c.test2, users.c.test1
  690. )
  691. if not schema:
  692. # test_needs_fk is at the moment to force MySQL InnoDB
  693. noncol_idx_test_nopk = Table(
  694. "noncol_idx_test_nopk",
  695. metadata,
  696. Column("q", sa.String(5)),
  697. test_needs_fk=True,
  698. )
  699. noncol_idx_test_pk = Table(
  700. "noncol_idx_test_pk",
  701. metadata,
  702. Column("id", sa.Integer, primary_key=True),
  703. Column("q", sa.String(5)),
  704. test_needs_fk=True,
  705. )
  706. if (
  707. testing.requires.indexes_with_ascdesc.enabled
  708. and testing.requires.reflect_indexes_with_ascdesc.enabled
  709. ):
  710. Index("noncol_idx_nopk", noncol_idx_test_nopk.c.q.desc())
  711. Index("noncol_idx_pk", noncol_idx_test_pk.c.q.desc())
  712. if testing.requires.view_column_reflection.enabled:
  713. cls.define_views(metadata, schema)
  714. if not schema and testing.requires.temp_table_reflection.enabled:
  715. cls.define_temp_tables(metadata)
  716. @classmethod
  717. def temp_table_name(cls):
  718. return get_temp_table_name(
  719. config, config.db, f"user_tmp_{config.ident}"
  720. )
  721. @classmethod
  722. def define_temp_tables(cls, metadata):
  723. kw = temp_table_keyword_args(config, config.db)
  724. table_name = cls.temp_table_name()
  725. user_tmp = Table(
  726. table_name,
  727. metadata,
  728. Column("id", sa.INT, primary_key=True),
  729. Column("name", sa.VARCHAR(50)),
  730. Column("foo", sa.INT),
  731. # disambiguate temp table unique constraint names. this is
  732. # pretty arbitrary for a generic dialect however we are doing
  733. # it to suit SQL Server which will produce name conflicts for
  734. # unique constraints created against temp tables in different
  735. # databases.
  736. # https://www.arbinada.com/en/node/1645
  737. sa.UniqueConstraint("name", name=f"user_tmp_uq_{config.ident}"),
  738. sa.Index("user_tmp_ix", "foo"),
  739. **kw,
  740. )
  741. if (
  742. testing.requires.view_reflection.enabled
  743. and testing.requires.temporary_views.enabled
  744. ):
  745. event.listen(
  746. user_tmp,
  747. "after_create",
  748. DDL(
  749. "create temporary view user_tmp_v as "
  750. "select * from user_tmp_%s" % config.ident
  751. ),
  752. )
  753. event.listen(user_tmp, "before_drop", DDL("drop view user_tmp_v"))
  754. @classmethod
  755. def define_views(cls, metadata, schema):
  756. if testing.requires.materialized_views.enabled:
  757. materialized = {"dingalings"}
  758. else:
  759. materialized = set()
  760. for table_name in ("users", "email_addresses", "dingalings"):
  761. fullname = table_name
  762. if schema:
  763. fullname = f"{schema}.{table_name}"
  764. view_name = fullname + "_v"
  765. prefix = "MATERIALIZED " if table_name in materialized else ""
  766. query = (
  767. f"CREATE {prefix}VIEW {view_name} AS SELECT * FROM {fullname}"
  768. )
  769. event.listen(metadata, "after_create", DDL(query))
  770. if table_name in materialized:
  771. index_name = "mat_index"
  772. if schema and testing.against("oracle"):
  773. index_name = f"{schema}.{index_name}"
  774. idx = f"CREATE INDEX {index_name} ON {view_name}(data)"
  775. event.listen(metadata, "after_create", DDL(idx))
  776. event.listen(
  777. metadata, "before_drop", DDL(f"DROP {prefix}VIEW {view_name}")
  778. )
  779. def _resolve_kind(self, kind, tables, views, materialized):
  780. res = {}
  781. if ObjectKind.TABLE in kind:
  782. res.update(tables)
  783. if ObjectKind.VIEW in kind:
  784. res.update(views)
  785. if ObjectKind.MATERIALIZED_VIEW in kind:
  786. res.update(materialized)
  787. return res
  788. def _resolve_views(self, views, materialized):
  789. if not testing.requires.view_column_reflection.enabled:
  790. materialized.clear()
  791. views.clear()
  792. elif not testing.requires.materialized_views.enabled:
  793. views.update(materialized)
  794. materialized.clear()
  795. def _resolve_names(self, schema, scope, filter_names, values):
  796. scope_filter = lambda _: True # noqa: E731
  797. if scope is ObjectScope.DEFAULT:
  798. scope_filter = lambda k: "tmp" not in k[1] # noqa: E731
  799. if scope is ObjectScope.TEMPORARY:
  800. scope_filter = lambda k: "tmp" in k[1] # noqa: E731
  801. removed = {
  802. None: {"remote_table", "remote_table_2"},
  803. testing.config.test_schema: {
  804. "local_table",
  805. "noncol_idx_test_nopk",
  806. "noncol_idx_test_pk",
  807. "user_tmp_v",
  808. self.temp_table_name(),
  809. },
  810. }
  811. if not testing.requires.cross_schema_fk_reflection.enabled:
  812. removed[None].add("local_table")
  813. removed[testing.config.test_schema].update(
  814. ["remote_table", "remote_table_2"]
  815. )
  816. if not testing.requires.index_reflection.enabled:
  817. removed[None].update(
  818. ["noncol_idx_test_nopk", "noncol_idx_test_pk"]
  819. )
  820. if (
  821. not testing.requires.temp_table_reflection.enabled
  822. or not testing.requires.temp_table_names.enabled
  823. ):
  824. removed[None].update(["user_tmp_v", self.temp_table_name()])
  825. if not testing.requires.temporary_views.enabled:
  826. removed[None].update(["user_tmp_v"])
  827. res = {
  828. k: v
  829. for k, v in values.items()
  830. if scope_filter(k)
  831. and k[1] not in removed[schema]
  832. and (not filter_names or k[1] in filter_names)
  833. }
  834. return res
  835. def exp_options(
  836. self,
  837. schema=None,
  838. scope=ObjectScope.ANY,
  839. kind=ObjectKind.ANY,
  840. filter_names=None,
  841. ):
  842. materialized = {(schema, "dingalings_v"): mock.ANY}
  843. views = {
  844. (schema, "email_addresses_v"): mock.ANY,
  845. (schema, "users_v"): mock.ANY,
  846. (schema, "user_tmp_v"): mock.ANY,
  847. }
  848. self._resolve_views(views, materialized)
  849. tables = {
  850. (schema, "users"): mock.ANY,
  851. (schema, "dingalings"): mock.ANY,
  852. (schema, "email_addresses"): mock.ANY,
  853. (schema, "comment_test"): mock.ANY,
  854. (schema, "no_constraints"): mock.ANY,
  855. (schema, "local_table"): mock.ANY,
  856. (schema, "remote_table"): mock.ANY,
  857. (schema, "remote_table_2"): mock.ANY,
  858. (schema, "noncol_idx_test_nopk"): mock.ANY,
  859. (schema, "noncol_idx_test_pk"): mock.ANY,
  860. (schema, self.temp_table_name()): mock.ANY,
  861. }
  862. res = self._resolve_kind(kind, tables, views, materialized)
  863. res = self._resolve_names(schema, scope, filter_names, res)
  864. return res
  865. def exp_comments(
  866. self,
  867. schema=None,
  868. scope=ObjectScope.ANY,
  869. kind=ObjectKind.ANY,
  870. filter_names=None,
  871. ):
  872. empty = {"text": None}
  873. materialized = {(schema, "dingalings_v"): empty}
  874. views = {
  875. (schema, "email_addresses_v"): empty,
  876. (schema, "users_v"): empty,
  877. (schema, "user_tmp_v"): empty,
  878. }
  879. self._resolve_views(views, materialized)
  880. tables = {
  881. (schema, "users"): empty,
  882. (schema, "dingalings"): empty,
  883. (schema, "email_addresses"): empty,
  884. (schema, "comment_test"): {
  885. "text": r"""the test % ' " \ table comment"""
  886. },
  887. (schema, "no_constraints"): {
  888. "text": "no\nconstraints\rhas\fescaped\vcomment"
  889. },
  890. (schema, "local_table"): empty,
  891. (schema, "remote_table"): empty,
  892. (schema, "remote_table_2"): empty,
  893. (schema, "noncol_idx_test_nopk"): empty,
  894. (schema, "noncol_idx_test_pk"): empty,
  895. (schema, self.temp_table_name()): empty,
  896. }
  897. res = self._resolve_kind(kind, tables, views, materialized)
  898. res = self._resolve_names(schema, scope, filter_names, res)
  899. return res
  900. def exp_columns(
  901. self,
  902. schema=None,
  903. scope=ObjectScope.ANY,
  904. kind=ObjectKind.ANY,
  905. filter_names=None,
  906. ):
  907. def col(
  908. name, auto=False, default=mock.ANY, comment=None, nullable=True
  909. ):
  910. res = {
  911. "name": name,
  912. "autoincrement": auto,
  913. "type": mock.ANY,
  914. "default": default,
  915. "comment": comment,
  916. "nullable": nullable,
  917. }
  918. if auto == "omit":
  919. res.pop("autoincrement")
  920. return res
  921. def pk(name, **kw):
  922. kw = {"auto": True, "default": mock.ANY, "nullable": False, **kw}
  923. return col(name, **kw)
  924. materialized = {
  925. (schema, "dingalings_v"): [
  926. col("dingaling_id", auto="omit", nullable=mock.ANY),
  927. col("address_id"),
  928. col("id_user"),
  929. col("data"),
  930. ]
  931. }
  932. views = {
  933. (schema, "email_addresses_v"): [
  934. col("address_id", auto="omit", nullable=mock.ANY),
  935. col("remote_user_id"),
  936. col("email_address"),
  937. ],
  938. (schema, "users_v"): [
  939. col("user_id", auto="omit", nullable=mock.ANY),
  940. col("test1", nullable=mock.ANY),
  941. col("test2", nullable=mock.ANY),
  942. col("parent_user_id"),
  943. ],
  944. (schema, "user_tmp_v"): [
  945. col("id", auto="omit", nullable=mock.ANY),
  946. col("name"),
  947. col("foo"),
  948. ],
  949. }
  950. self._resolve_views(views, materialized)
  951. tables = {
  952. (schema, "users"): [
  953. pk("user_id"),
  954. col("test1", nullable=False),
  955. col("test2", nullable=False),
  956. col("parent_user_id"),
  957. ],
  958. (schema, "dingalings"): [
  959. pk("dingaling_id"),
  960. col("address_id"),
  961. col("id_user"),
  962. col("data"),
  963. ],
  964. (schema, "email_addresses"): [
  965. pk("address_id"),
  966. col("remote_user_id"),
  967. col("email_address"),
  968. ],
  969. (schema, "comment_test"): [
  970. pk("id", comment="id comment"),
  971. col("data", comment="data % comment"),
  972. col(
  973. "d2",
  974. comment=r"""Comment types type speedily ' " \ '' Fun!""",
  975. ),
  976. col("d3", comment="Comment\nwith\rescapes"),
  977. ],
  978. (schema, "no_constraints"): [col("data")],
  979. (schema, "local_table"): [pk("id"), col("data"), col("remote_id")],
  980. (schema, "remote_table"): [pk("id"), col("local_id"), col("data")],
  981. (schema, "remote_table_2"): [pk("id"), col("data")],
  982. (schema, "noncol_idx_test_nopk"): [col("q")],
  983. (schema, "noncol_idx_test_pk"): [pk("id"), col("q")],
  984. (schema, self.temp_table_name()): [
  985. pk("id"),
  986. col("name"),
  987. col("foo"),
  988. ],
  989. }
  990. res = self._resolve_kind(kind, tables, views, materialized)
  991. res = self._resolve_names(schema, scope, filter_names, res)
  992. return res
  993. @property
  994. def _required_column_keys(self):
  995. return {"name", "type", "nullable", "default"}
  996. def exp_pks(
  997. self,
  998. schema=None,
  999. scope=ObjectScope.ANY,
  1000. kind=ObjectKind.ANY,
  1001. filter_names=None,
  1002. ):
  1003. def pk(*cols, name=mock.ANY, comment=None):
  1004. return {
  1005. "constrained_columns": list(cols),
  1006. "name": name,
  1007. "comment": comment,
  1008. }
  1009. empty = pk(name=None)
  1010. if testing.requires.materialized_views_reflect_pk.enabled:
  1011. materialized = {(schema, "dingalings_v"): pk("dingaling_id")}
  1012. else:
  1013. materialized = {(schema, "dingalings_v"): empty}
  1014. views = {
  1015. (schema, "email_addresses_v"): empty,
  1016. (schema, "users_v"): empty,
  1017. (schema, "user_tmp_v"): empty,
  1018. }
  1019. self._resolve_views(views, materialized)
  1020. tables = {
  1021. (schema, "users"): pk("user_id"),
  1022. (schema, "dingalings"): pk("dingaling_id"),
  1023. (schema, "email_addresses"): pk(
  1024. "address_id", name="email_ad_pk", comment="ea pk comment"
  1025. ),
  1026. (schema, "comment_test"): pk("id"),
  1027. (schema, "no_constraints"): empty,
  1028. (schema, "local_table"): pk("id"),
  1029. (schema, "remote_table"): pk("id"),
  1030. (schema, "remote_table_2"): pk("id"),
  1031. (schema, "noncol_idx_test_nopk"): empty,
  1032. (schema, "noncol_idx_test_pk"): pk("id"),
  1033. (schema, self.temp_table_name()): pk("id"),
  1034. }
  1035. if not testing.requires.reflects_pk_names.enabled:
  1036. for val in tables.values():
  1037. if val["name"] is not None:
  1038. val["name"] = mock.ANY
  1039. res = self._resolve_kind(kind, tables, views, materialized)
  1040. res = self._resolve_names(schema, scope, filter_names, res)
  1041. return res
  1042. @property
  1043. def _required_pk_keys(self):
  1044. return {"name", "constrained_columns"}
  1045. def exp_fks(
  1046. self,
  1047. schema=None,
  1048. scope=ObjectScope.ANY,
  1049. kind=ObjectKind.ANY,
  1050. filter_names=None,
  1051. ):
  1052. class tt:
  1053. def __eq__(self, other):
  1054. return (
  1055. other is None
  1056. or config.db.dialect.default_schema_name == other
  1057. )
  1058. def fk(
  1059. cols,
  1060. ref_col,
  1061. ref_table,
  1062. ref_schema=schema,
  1063. name=mock.ANY,
  1064. comment=None,
  1065. ):
  1066. return {
  1067. "constrained_columns": cols,
  1068. "referred_columns": ref_col,
  1069. "name": name,
  1070. "options": mock.ANY,
  1071. "referred_schema": (
  1072. ref_schema if ref_schema is not None else tt()
  1073. ),
  1074. "referred_table": ref_table,
  1075. "comment": comment,
  1076. }
  1077. materialized = {(schema, "dingalings_v"): []}
  1078. views = {
  1079. (schema, "email_addresses_v"): [],
  1080. (schema, "users_v"): [],
  1081. (schema, "user_tmp_v"): [],
  1082. }
  1083. self._resolve_views(views, materialized)
  1084. tables = {
  1085. (schema, "users"): [
  1086. fk(["parent_user_id"], ["user_id"], "users", name="user_id_fk")
  1087. ],
  1088. (schema, "dingalings"): [
  1089. fk(["id_user"], ["user_id"], "users"),
  1090. fk(
  1091. ["address_id"],
  1092. ["address_id"],
  1093. "email_addresses",
  1094. name="zz_email_add_id_fg",
  1095. comment="di fk comment",
  1096. ),
  1097. ],
  1098. (schema, "email_addresses"): [
  1099. fk(["remote_user_id"], ["user_id"], "users")
  1100. ],
  1101. (schema, "comment_test"): [],
  1102. (schema, "no_constraints"): [],
  1103. (schema, "local_table"): [
  1104. fk(
  1105. ["remote_id"],
  1106. ["id"],
  1107. "remote_table_2",
  1108. ref_schema=config.test_schema,
  1109. )
  1110. ],
  1111. (schema, "remote_table"): [
  1112. fk(["local_id"], ["id"], "local_table", ref_schema=None)
  1113. ],
  1114. (schema, "remote_table_2"): [],
  1115. (schema, "noncol_idx_test_nopk"): [],
  1116. (schema, "noncol_idx_test_pk"): [],
  1117. (schema, self.temp_table_name()): [],
  1118. }
  1119. if not testing.requires.self_referential_foreign_keys.enabled:
  1120. tables[(schema, "users")].clear()
  1121. if not testing.requires.named_constraints.enabled:
  1122. for vals in tables.values():
  1123. for val in vals:
  1124. if val["name"] is not mock.ANY:
  1125. val["name"] = mock.ANY
  1126. res = self._resolve_kind(kind, tables, views, materialized)
  1127. res = self._resolve_names(schema, scope, filter_names, res)
  1128. return res
  1129. @property
  1130. def _required_fk_keys(self):
  1131. return {
  1132. "name",
  1133. "constrained_columns",
  1134. "referred_schema",
  1135. "referred_table",
  1136. "referred_columns",
  1137. }
  1138. def exp_indexes(
  1139. self,
  1140. schema=None,
  1141. scope=ObjectScope.ANY,
  1142. kind=ObjectKind.ANY,
  1143. filter_names=None,
  1144. ):
  1145. def idx(
  1146. *cols,
  1147. name,
  1148. unique=False,
  1149. column_sorting=None,
  1150. duplicates=False,
  1151. fk=False,
  1152. ):
  1153. fk_req = testing.requires.foreign_keys_reflect_as_index
  1154. dup_req = testing.requires.unique_constraints_reflect_as_index
  1155. sorting_expression = (
  1156. testing.requires.reflect_indexes_with_ascdesc_as_expression
  1157. )
  1158. if (fk and not fk_req.enabled) or (
  1159. duplicates and not dup_req.enabled
  1160. ):
  1161. return ()
  1162. res = {
  1163. "unique": unique,
  1164. "column_names": list(cols),
  1165. "name": name,
  1166. "dialect_options": mock.ANY,
  1167. "include_columns": [],
  1168. }
  1169. if column_sorting:
  1170. res["column_sorting"] = column_sorting
  1171. if sorting_expression.enabled:
  1172. res["expressions"] = orig = res["column_names"]
  1173. res["column_names"] = [
  1174. None if c in column_sorting else c for c in orig
  1175. ]
  1176. if duplicates:
  1177. res["duplicates_constraint"] = name
  1178. return [res]
  1179. materialized = {(schema, "dingalings_v"): []}
  1180. views = {
  1181. (schema, "email_addresses_v"): [],
  1182. (schema, "users_v"): [],
  1183. (schema, "user_tmp_v"): [],
  1184. }
  1185. self._resolve_views(views, materialized)
  1186. if materialized:
  1187. materialized[(schema, "dingalings_v")].extend(
  1188. idx("data", name="mat_index")
  1189. )
  1190. tables = {
  1191. (schema, "users"): [
  1192. *idx("parent_user_id", name="user_id_fk", fk=True),
  1193. *idx("user_id", "test2", "test1", name="users_all_idx"),
  1194. *idx("test1", "test2", name="users_t_idx", unique=True),
  1195. ],
  1196. (schema, "dingalings"): [
  1197. *idx("data", name=mock.ANY, unique=True, duplicates=True),
  1198. *idx("id_user", name=mock.ANY, fk=True),
  1199. *idx(
  1200. "address_id",
  1201. "dingaling_id",
  1202. name="zz_dingalings_multiple",
  1203. unique=True,
  1204. duplicates=True,
  1205. ),
  1206. ],
  1207. (schema, "email_addresses"): [
  1208. *idx("email_address", name=mock.ANY),
  1209. *idx("remote_user_id", name=mock.ANY, fk=True),
  1210. ],
  1211. (schema, "comment_test"): [],
  1212. (schema, "no_constraints"): [],
  1213. (schema, "local_table"): [
  1214. *idx("remote_id", name=mock.ANY, fk=True)
  1215. ],
  1216. (schema, "remote_table"): [
  1217. *idx("local_id", name=mock.ANY, fk=True)
  1218. ],
  1219. (schema, "remote_table_2"): [],
  1220. (schema, "noncol_idx_test_nopk"): [
  1221. *idx(
  1222. "q",
  1223. name="noncol_idx_nopk",
  1224. column_sorting={"q": ("desc",)},
  1225. )
  1226. ],
  1227. (schema, "noncol_idx_test_pk"): [
  1228. *idx(
  1229. "q", name="noncol_idx_pk", column_sorting={"q": ("desc",)}
  1230. )
  1231. ],
  1232. (schema, self.temp_table_name()): [
  1233. *idx("foo", name="user_tmp_ix"),
  1234. *idx(
  1235. "name",
  1236. name=f"user_tmp_uq_{config.ident}",
  1237. duplicates=True,
  1238. unique=True,
  1239. ),
  1240. ],
  1241. }
  1242. if (
  1243. not testing.requires.indexes_with_ascdesc.enabled
  1244. or not testing.requires.reflect_indexes_with_ascdesc.enabled
  1245. ):
  1246. tables[(schema, "noncol_idx_test_nopk")].clear()
  1247. tables[(schema, "noncol_idx_test_pk")].clear()
  1248. res = self._resolve_kind(kind, tables, views, materialized)
  1249. res = self._resolve_names(schema, scope, filter_names, res)
  1250. return res
  1251. @property
  1252. def _required_index_keys(self):
  1253. return {"name", "column_names", "unique"}
  1254. def exp_ucs(
  1255. self,
  1256. schema=None,
  1257. scope=ObjectScope.ANY,
  1258. kind=ObjectKind.ANY,
  1259. filter_names=None,
  1260. all_=False,
  1261. ):
  1262. def uc(
  1263. *cols, name, duplicates_index=None, is_index=False, comment=None
  1264. ):
  1265. req = testing.requires.unique_index_reflect_as_unique_constraints
  1266. if is_index and not req.enabled:
  1267. return ()
  1268. res = {
  1269. "column_names": list(cols),
  1270. "name": name,
  1271. "comment": comment,
  1272. }
  1273. if duplicates_index:
  1274. res["duplicates_index"] = duplicates_index
  1275. return [res]
  1276. materialized = {(schema, "dingalings_v"): []}
  1277. views = {
  1278. (schema, "email_addresses_v"): [],
  1279. (schema, "users_v"): [],
  1280. (schema, "user_tmp_v"): [],
  1281. }
  1282. self._resolve_views(views, materialized)
  1283. tables = {
  1284. (schema, "users"): [
  1285. *uc(
  1286. "test1",
  1287. "test2",
  1288. name="users_t_idx",
  1289. duplicates_index="users_t_idx",
  1290. is_index=True,
  1291. )
  1292. ],
  1293. (schema, "dingalings"): [
  1294. *uc("data", name=mock.ANY, duplicates_index=mock.ANY),
  1295. *uc(
  1296. "address_id",
  1297. "dingaling_id",
  1298. name="zz_dingalings_multiple",
  1299. duplicates_index="zz_dingalings_multiple",
  1300. comment="di unique comment",
  1301. ),
  1302. ],
  1303. (schema, "email_addresses"): [],
  1304. (schema, "comment_test"): [],
  1305. (schema, "no_constraints"): [],
  1306. (schema, "local_table"): [],
  1307. (schema, "remote_table"): [],
  1308. (schema, "remote_table_2"): [],
  1309. (schema, "noncol_idx_test_nopk"): [],
  1310. (schema, "noncol_idx_test_pk"): [],
  1311. (schema, self.temp_table_name()): [
  1312. *uc("name", name=f"user_tmp_uq_{config.ident}")
  1313. ],
  1314. }
  1315. if all_:
  1316. return {**materialized, **views, **tables}
  1317. else:
  1318. res = self._resolve_kind(kind, tables, views, materialized)
  1319. res = self._resolve_names(schema, scope, filter_names, res)
  1320. return res
  1321. @property
  1322. def _required_unique_cst_keys(self):
  1323. return {"name", "column_names"}
  1324. def exp_ccs(
  1325. self,
  1326. schema=None,
  1327. scope=ObjectScope.ANY,
  1328. kind=ObjectKind.ANY,
  1329. filter_names=None,
  1330. ):
  1331. class tt(str):
  1332. def __eq__(self, other):
  1333. res = (
  1334. other.lower()
  1335. .replace("(", "")
  1336. .replace(")", "")
  1337. .replace("`", "")
  1338. )
  1339. return self in res
  1340. def cc(text, name, comment=None):
  1341. return {"sqltext": tt(text), "name": name, "comment": comment}
  1342. # print({1: "test2 > (0)::double precision"} == {1: tt("test2 > 0")})
  1343. # assert 0
  1344. materialized = {(schema, "dingalings_v"): []}
  1345. views = {
  1346. (schema, "email_addresses_v"): [],
  1347. (schema, "users_v"): [],
  1348. (schema, "user_tmp_v"): [],
  1349. }
  1350. self._resolve_views(views, materialized)
  1351. tables = {
  1352. (schema, "users"): [
  1353. cc("test2 <= 1000", mock.ANY),
  1354. cc(
  1355. "test2 > 0",
  1356. "zz_test2_gt_zero",
  1357. comment="users check constraint",
  1358. ),
  1359. ],
  1360. (schema, "dingalings"): [
  1361. cc(
  1362. "address_id > 0 and address_id < 1000",
  1363. name="address_id_gt_zero",
  1364. ),
  1365. ],
  1366. (schema, "email_addresses"): [],
  1367. (schema, "comment_test"): [],
  1368. (schema, "no_constraints"): [],
  1369. (schema, "local_table"): [],
  1370. (schema, "remote_table"): [],
  1371. (schema, "remote_table_2"): [],
  1372. (schema, "noncol_idx_test_nopk"): [],
  1373. (schema, "noncol_idx_test_pk"): [],
  1374. (schema, self.temp_table_name()): [],
  1375. }
  1376. res = self._resolve_kind(kind, tables, views, materialized)
  1377. res = self._resolve_names(schema, scope, filter_names, res)
  1378. return res
  1379. @property
  1380. def _required_cc_keys(self):
  1381. return {"name", "sqltext"}
  1382. @testing.requires.schema_reflection
  1383. def test_get_schema_names(self, connection):
  1384. insp = inspect(connection)
  1385. is_true(testing.config.test_schema in insp.get_schema_names())
  1386. @testing.requires.schema_reflection
  1387. def test_has_schema(self, connection):
  1388. insp = inspect(connection)
  1389. is_true(insp.has_schema(testing.config.test_schema))
  1390. is_false(insp.has_schema("sa_fake_schema_foo"))
  1391. @testing.requires.schema_reflection
  1392. def test_get_schema_names_w_translate_map(self, connection):
  1393. """test #7300"""
  1394. connection = connection.execution_options(
  1395. schema_translate_map={
  1396. "foo": "bar",
  1397. BLANK_SCHEMA: testing.config.test_schema,
  1398. }
  1399. )
  1400. insp = inspect(connection)
  1401. is_true(testing.config.test_schema in insp.get_schema_names())
  1402. @testing.requires.schema_reflection
  1403. def test_has_schema_w_translate_map(self, connection):
  1404. connection = connection.execution_options(
  1405. schema_translate_map={
  1406. "foo": "bar",
  1407. BLANK_SCHEMA: testing.config.test_schema,
  1408. }
  1409. )
  1410. insp = inspect(connection)
  1411. is_true(insp.has_schema(testing.config.test_schema))
  1412. is_false(insp.has_schema("sa_fake_schema_foo"))
  1413. @testing.requires.schema_reflection
  1414. @testing.requires.schema_create_delete
  1415. def test_schema_cache(self, connection):
  1416. insp = inspect(connection)
  1417. is_false("foo_bar" in insp.get_schema_names())
  1418. is_false(insp.has_schema("foo_bar"))
  1419. connection.execute(DDL("CREATE SCHEMA foo_bar"))
  1420. try:
  1421. is_false("foo_bar" in insp.get_schema_names())
  1422. is_false(insp.has_schema("foo_bar"))
  1423. insp.clear_cache()
  1424. is_true("foo_bar" in insp.get_schema_names())
  1425. is_true(insp.has_schema("foo_bar"))
  1426. finally:
  1427. connection.execute(DDL("DROP SCHEMA foo_bar"))
  1428. @testing.requires.schema_reflection
  1429. def test_dialect_initialize(self):
  1430. engine = engines.testing_engine()
  1431. inspect(engine)
  1432. assert hasattr(engine.dialect, "default_schema_name")
  1433. @testing.requires.schema_reflection
  1434. def test_get_default_schema_name(self, connection):
  1435. insp = inspect(connection)
  1436. eq_(insp.default_schema_name, connection.dialect.default_schema_name)
  1437. @testing.combinations(
  1438. None,
  1439. ("foreign_key", testing.requires.foreign_key_constraint_reflection),
  1440. argnames="order_by",
  1441. )
  1442. @testing.combinations(
  1443. (True, testing.requires.schemas), False, argnames="use_schema"
  1444. )
  1445. def test_get_table_names(self, connection, order_by, use_schema):
  1446. if use_schema:
  1447. schema = config.test_schema
  1448. else:
  1449. schema = None
  1450. _ignore_tables = {
  1451. "comment_test",
  1452. "noncol_idx_test_pk",
  1453. "noncol_idx_test_nopk",
  1454. "local_table",
  1455. "remote_table",
  1456. "remote_table_2",
  1457. "no_constraints",
  1458. }
  1459. insp = inspect(connection)
  1460. if order_by:
  1461. tables = [
  1462. rec[0]
  1463. for rec in insp.get_sorted_table_and_fkc_names(schema)
  1464. if rec[0]
  1465. ]
  1466. else:
  1467. tables = insp.get_table_names(schema)
  1468. table_names = [t for t in tables if t not in _ignore_tables]
  1469. if order_by == "foreign_key":
  1470. answer = ["users", "email_addresses", "dingalings"]
  1471. eq_(table_names, answer)
  1472. else:
  1473. answer = ["dingalings", "email_addresses", "users"]
  1474. eq_(sorted(table_names), answer)
  1475. @testing.combinations(
  1476. (True, testing.requires.schemas), False, argnames="use_schema"
  1477. )
  1478. def test_get_view_names(self, connection, use_schema):
  1479. insp = inspect(connection)
  1480. if use_schema:
  1481. schema = config.test_schema
  1482. else:
  1483. schema = None
  1484. table_names = insp.get_view_names(schema)
  1485. if testing.requires.materialized_views.enabled:
  1486. eq_(sorted(table_names), ["email_addresses_v", "users_v"])
  1487. eq_(insp.get_materialized_view_names(schema), ["dingalings_v"])
  1488. else:
  1489. answer = ["dingalings_v", "email_addresses_v", "users_v"]
  1490. eq_(sorted(table_names), answer)
  1491. @testing.requires.temp_table_names
  1492. def test_get_temp_table_names(self, connection):
  1493. insp = inspect(connection)
  1494. temp_table_names = insp.get_temp_table_names()
  1495. eq_(sorted(temp_table_names), [f"user_tmp_{config.ident}"])
  1496. @testing.requires.view_reflection
  1497. @testing.requires.temporary_views
  1498. def test_get_temp_view_names(self, connection):
  1499. insp = inspect(connection)
  1500. temp_table_names = insp.get_temp_view_names()
  1501. eq_(sorted(temp_table_names), ["user_tmp_v"])
  1502. @testing.requires.comment_reflection
  1503. def test_get_comments(self, connection):
  1504. self._test_get_comments(connection)
  1505. @testing.requires.comment_reflection
  1506. @testing.requires.schemas
  1507. def test_get_comments_with_schema(self, connection):
  1508. self._test_get_comments(connection, testing.config.test_schema)
  1509. def _test_get_comments(self, connection, schema=None):
  1510. insp = inspect(connection)
  1511. exp = self.exp_comments(schema=schema)
  1512. eq_(
  1513. insp.get_table_comment("comment_test", schema=schema),
  1514. exp[(schema, "comment_test")],
  1515. )
  1516. eq_(
  1517. insp.get_table_comment("users", schema=schema),
  1518. exp[(schema, "users")],
  1519. )
  1520. eq_(
  1521. insp.get_table_comment("comment_test", schema=schema),
  1522. exp[(schema, "comment_test")],
  1523. )
  1524. no_cst = self.tables.no_constraints.name
  1525. eq_(
  1526. insp.get_table_comment(no_cst, schema=schema),
  1527. exp[(schema, no_cst)],
  1528. )
  1529. @testing.combinations(
  1530. (False, False),
  1531. (False, True, testing.requires.schemas),
  1532. (True, False, testing.requires.view_reflection),
  1533. (
  1534. True,
  1535. True,
  1536. testing.requires.schemas + testing.requires.view_reflection,
  1537. ),
  1538. argnames="use_views,use_schema",
  1539. )
  1540. def test_get_columns(self, connection, use_views, use_schema):
  1541. if use_schema:
  1542. schema = config.test_schema
  1543. else:
  1544. schema = None
  1545. users, addresses = (self.tables.users, self.tables.email_addresses)
  1546. if use_views:
  1547. table_names = ["users_v", "email_addresses_v", "dingalings_v"]
  1548. else:
  1549. table_names = ["users", "email_addresses"]
  1550. insp = inspect(connection)
  1551. for table_name, table in zip(table_names, (users, addresses)):
  1552. schema_name = schema
  1553. cols = insp.get_columns(table_name, schema=schema_name)
  1554. is_true(len(cols) > 0, len(cols))
  1555. # should be in order
  1556. for i, col in enumerate(table.columns):
  1557. eq_(col.name, cols[i]["name"])
  1558. ctype = cols[i]["type"].__class__
  1559. ctype_def = col.type
  1560. if isinstance(ctype_def, sa.types.TypeEngine):
  1561. ctype_def = ctype_def.__class__
  1562. # Oracle returns Date for DateTime.
  1563. if testing.against("oracle") and ctype_def in (
  1564. sql_types.Date,
  1565. sql_types.DateTime,
  1566. ):
  1567. ctype_def = sql_types.Date
  1568. # assert that the desired type and return type share
  1569. # a base within one of the generic types.
  1570. is_true(
  1571. len(
  1572. set(ctype.__mro__)
  1573. .intersection(ctype_def.__mro__)
  1574. .intersection(
  1575. [
  1576. sql_types.Integer,
  1577. sql_types.Numeric,
  1578. sql_types.DateTime,
  1579. sql_types.Date,
  1580. sql_types.Time,
  1581. sql_types.String,
  1582. sql_types._Binary,
  1583. ]
  1584. )
  1585. )
  1586. > 0,
  1587. "%s(%s), %s(%s)"
  1588. % (col.name, col.type, cols[i]["name"], ctype),
  1589. )
  1590. if not col.primary_key:
  1591. assert cols[i]["default"] is None
  1592. # The case of a table with no column
  1593. # is tested below in TableNoColumnsTest
  1594. @testing.requires.temp_table_reflection
  1595. def test_reflect_table_temp_table(self, connection):
  1596. table_name = self.temp_table_name()
  1597. user_tmp = self.tables[table_name]
  1598. reflected_user_tmp = Table(
  1599. table_name, MetaData(), autoload_with=connection
  1600. )
  1601. self.assert_tables_equal(
  1602. user_tmp, reflected_user_tmp, strict_constraints=False
  1603. )
  1604. @testing.requires.temp_table_reflection
  1605. def test_get_temp_table_columns(self, connection):
  1606. table_name = self.temp_table_name()
  1607. user_tmp = self.tables[table_name]
  1608. insp = inspect(connection)
  1609. cols = insp.get_columns(table_name)
  1610. is_true(len(cols) > 0, len(cols))
  1611. for i, col in enumerate(user_tmp.columns):
  1612. eq_(col.name, cols[i]["name"])
  1613. @testing.requires.temp_table_reflection
  1614. @testing.requires.view_column_reflection
  1615. @testing.requires.temporary_views
  1616. def test_get_temp_view_columns(self, connection):
  1617. insp = inspect(connection)
  1618. cols = insp.get_columns("user_tmp_v")
  1619. eq_([col["name"] for col in cols], ["id", "name", "foo"])
  1620. @testing.combinations(
  1621. (False,), (True, testing.requires.schemas), argnames="use_schema"
  1622. )
  1623. @testing.requires.primary_key_constraint_reflection
  1624. def test_get_pk_constraint(self, connection, use_schema):
  1625. if use_schema:
  1626. schema = testing.config.test_schema
  1627. else:
  1628. schema = None
  1629. users, addresses = self.tables.users, self.tables.email_addresses
  1630. insp = inspect(connection)
  1631. exp = self.exp_pks(schema=schema)
  1632. users_cons = insp.get_pk_constraint(users.name, schema=schema)
  1633. self._check_list(
  1634. [users_cons], [exp[(schema, users.name)]], self._required_pk_keys
  1635. )
  1636. addr_cons = insp.get_pk_constraint(addresses.name, schema=schema)
  1637. exp_cols = exp[(schema, addresses.name)]["constrained_columns"]
  1638. eq_(addr_cons["constrained_columns"], exp_cols)
  1639. with testing.requires.reflects_pk_names.fail_if():
  1640. eq_(addr_cons["name"], "email_ad_pk")
  1641. no_cst = self.tables.no_constraints.name
  1642. self._check_list(
  1643. [insp.get_pk_constraint(no_cst, schema=schema)],
  1644. [exp[(schema, no_cst)]],
  1645. self._required_pk_keys,
  1646. )
  1647. @testing.combinations(
  1648. (False,), (True, testing.requires.schemas), argnames="use_schema"
  1649. )
  1650. @testing.requires.foreign_key_constraint_reflection
  1651. def test_get_foreign_keys(self, connection, use_schema):
  1652. if use_schema:
  1653. schema = config.test_schema
  1654. else:
  1655. schema = None
  1656. users, addresses = (self.tables.users, self.tables.email_addresses)
  1657. insp = inspect(connection)
  1658. expected_schema = schema
  1659. # users
  1660. if testing.requires.self_referential_foreign_keys.enabled:
  1661. users_fkeys = insp.get_foreign_keys(users.name, schema=schema)
  1662. fkey1 = users_fkeys[0]
  1663. with testing.requires.named_constraints.fail_if():
  1664. eq_(fkey1["name"], "user_id_fk")
  1665. eq_(fkey1["referred_schema"], expected_schema)
  1666. eq_(fkey1["referred_table"], users.name)
  1667. eq_(fkey1["referred_columns"], ["user_id"])
  1668. eq_(fkey1["constrained_columns"], ["parent_user_id"])
  1669. # addresses
  1670. addr_fkeys = insp.get_foreign_keys(addresses.name, schema=schema)
  1671. fkey1 = addr_fkeys[0]
  1672. with testing.requires.implicitly_named_constraints.fail_if():
  1673. is_true(fkey1["name"] is not None)
  1674. eq_(fkey1["referred_schema"], expected_schema)
  1675. eq_(fkey1["referred_table"], users.name)
  1676. eq_(fkey1["referred_columns"], ["user_id"])
  1677. eq_(fkey1["constrained_columns"], ["remote_user_id"])
  1678. no_cst = self.tables.no_constraints.name
  1679. eq_(insp.get_foreign_keys(no_cst, schema=schema), [])
  1680. @testing.requires.cross_schema_fk_reflection
  1681. @testing.requires.schemas
  1682. def test_get_inter_schema_foreign_keys(self, connection):
  1683. local_table, remote_table, remote_table_2 = self.tables(
  1684. "%s.local_table" % connection.dialect.default_schema_name,
  1685. "%s.remote_table" % testing.config.test_schema,
  1686. "%s.remote_table_2" % testing.config.test_schema,
  1687. )
  1688. insp = inspect(connection)
  1689. local_fkeys = insp.get_foreign_keys(local_table.name)
  1690. eq_(len(local_fkeys), 1)
  1691. fkey1 = local_fkeys[0]
  1692. eq_(fkey1["referred_schema"], testing.config.test_schema)
  1693. eq_(fkey1["referred_table"], remote_table_2.name)
  1694. eq_(fkey1["referred_columns"], ["id"])
  1695. eq_(fkey1["constrained_columns"], ["remote_id"])
  1696. remote_fkeys = insp.get_foreign_keys(
  1697. remote_table.name, schema=testing.config.test_schema
  1698. )
  1699. eq_(len(remote_fkeys), 1)
  1700. fkey2 = remote_fkeys[0]
  1701. is_true(
  1702. fkey2["referred_schema"]
  1703. in (
  1704. None,
  1705. connection.dialect.default_schema_name,
  1706. )
  1707. )
  1708. eq_(fkey2["referred_table"], local_table.name)
  1709. eq_(fkey2["referred_columns"], ["id"])
  1710. eq_(fkey2["constrained_columns"], ["local_id"])
  1711. @testing.combinations(
  1712. (False,), (True, testing.requires.schemas), argnames="use_schema"
  1713. )
  1714. @testing.requires.index_reflection
  1715. def test_get_indexes(self, connection, use_schema):
  1716. if use_schema:
  1717. schema = config.test_schema
  1718. else:
  1719. schema = None
  1720. # The database may decide to create indexes for foreign keys, etc.
  1721. # so there may be more indexes than expected.
  1722. insp = inspect(connection)
  1723. indexes = insp.get_indexes("users", schema=schema)
  1724. exp = self.exp_indexes(schema=schema)
  1725. self._check_list(
  1726. indexes, exp[(schema, "users")], self._required_index_keys
  1727. )
  1728. no_cst = self.tables.no_constraints.name
  1729. self._check_list(
  1730. insp.get_indexes(no_cst, schema=schema),
  1731. exp[(schema, no_cst)],
  1732. self._required_index_keys,
  1733. )
  1734. @testing.combinations(
  1735. ("noncol_idx_test_nopk", "noncol_idx_nopk"),
  1736. ("noncol_idx_test_pk", "noncol_idx_pk"),
  1737. argnames="tname,ixname",
  1738. )
  1739. @testing.requires.index_reflection
  1740. @testing.requires.indexes_with_ascdesc
  1741. @testing.requires.reflect_indexes_with_ascdesc
  1742. def test_get_noncol_index(self, connection, tname, ixname):
  1743. insp = inspect(connection)
  1744. indexes = insp.get_indexes(tname)
  1745. # reflecting an index that has "x DESC" in it as the column.
  1746. # the DB may or may not give us "x", but make sure we get the index
  1747. # back, it has a name, it's connected to the table.
  1748. expected_indexes = self.exp_indexes()[(None, tname)]
  1749. self._check_list(indexes, expected_indexes, self._required_index_keys)
  1750. t = Table(tname, MetaData(), autoload_with=connection)
  1751. eq_(len(t.indexes), 1)
  1752. is_(list(t.indexes)[0].table, t)
  1753. eq_(list(t.indexes)[0].name, ixname)
  1754. @testing.requires.temp_table_reflection
  1755. @testing.requires.unique_constraint_reflection
  1756. def test_get_temp_table_unique_constraints(self, connection):
  1757. insp = inspect(connection)
  1758. name = self.temp_table_name()
  1759. reflected = insp.get_unique_constraints(name)
  1760. exp = self.exp_ucs(all_=True)[(None, name)]
  1761. self._check_list(reflected, exp, self._required_index_keys)
  1762. @testing.requires.temp_table_reflect_indexes
  1763. def test_get_temp_table_indexes(self, connection):
  1764. insp = inspect(connection)
  1765. table_name = self.temp_table_name()
  1766. indexes = insp.get_indexes(table_name)
  1767. for ind in indexes:
  1768. ind.pop("dialect_options", None)
  1769. expected = [
  1770. {"unique": False, "column_names": ["foo"], "name": "user_tmp_ix"}
  1771. ]
  1772. if testing.requires.index_reflects_included_columns.enabled:
  1773. expected[0]["include_columns"] = []
  1774. eq_(
  1775. [idx for idx in indexes if idx["name"] == "user_tmp_ix"],
  1776. expected,
  1777. )
  1778. @testing.combinations(
  1779. (True, testing.requires.schemas), (False,), argnames="use_schema"
  1780. )
  1781. @testing.requires.unique_constraint_reflection
  1782. def test_get_unique_constraints(self, metadata, connection, use_schema):
  1783. # SQLite dialect needs to parse the names of the constraints
  1784. # separately from what it gets from PRAGMA index_list(), and
  1785. # then matches them up. so same set of column_names in two
  1786. # constraints will confuse it. Perhaps we should no longer
  1787. # bother with index_list() here since we have the whole
  1788. # CREATE TABLE?
  1789. if use_schema:
  1790. schema = config.test_schema
  1791. else:
  1792. schema = None
  1793. uniques = sorted(
  1794. [
  1795. {"name": "unique_a", "column_names": ["a"]},
  1796. {"name": "unique_a_b_c", "column_names": ["a", "b", "c"]},
  1797. {"name": "unique_c_a_b", "column_names": ["c", "a", "b"]},
  1798. {"name": "unique_asc_key", "column_names": ["asc", "key"]},
  1799. {"name": "i.have.dots", "column_names": ["b"]},
  1800. {"name": "i have spaces", "column_names": ["c"]},
  1801. ],
  1802. key=operator.itemgetter("name"),
  1803. )
  1804. table = Table(
  1805. "testtbl",
  1806. metadata,
  1807. Column("a", sa.String(20)),
  1808. Column("b", sa.String(30)),
  1809. Column("c", sa.Integer),
  1810. # reserved identifiers
  1811. Column("asc", sa.String(30)),
  1812. Column("key", sa.String(30)),
  1813. schema=schema,
  1814. )
  1815. for uc in uniques:
  1816. table.append_constraint(
  1817. sa.UniqueConstraint(*uc["column_names"], name=uc["name"])
  1818. )
  1819. table.create(connection)
  1820. insp = inspect(connection)
  1821. reflected = sorted(
  1822. insp.get_unique_constraints("testtbl", schema=schema),
  1823. key=operator.itemgetter("name"),
  1824. )
  1825. names_that_duplicate_index = set()
  1826. eq_(len(uniques), len(reflected))
  1827. for orig, refl in zip(uniques, reflected):
  1828. # Different dialects handle duplicate index and constraints
  1829. # differently, so ignore this flag
  1830. dupe = refl.pop("duplicates_index", None)
  1831. if dupe:
  1832. names_that_duplicate_index.add(dupe)
  1833. eq_(refl.pop("comment", None), None)
  1834. # ignore dialect_options
  1835. refl.pop("dialect_options", None)
  1836. eq_(orig, refl)
  1837. reflected_metadata = MetaData()
  1838. reflected = Table(
  1839. "testtbl",
  1840. reflected_metadata,
  1841. autoload_with=connection,
  1842. schema=schema,
  1843. )
  1844. # test "deduplicates for index" logic. MySQL and Oracle
  1845. # "unique constraints" are actually unique indexes (with possible
  1846. # exception of a unique that is a dupe of another one in the case
  1847. # of Oracle). make sure # they aren't duplicated.
  1848. idx_names = {idx.name for idx in reflected.indexes}
  1849. uq_names = {
  1850. uq.name
  1851. for uq in reflected.constraints
  1852. if isinstance(uq, sa.UniqueConstraint)
  1853. }.difference(["unique_c_a_b"])
  1854. assert not idx_names.intersection(uq_names)
  1855. if names_that_duplicate_index:
  1856. eq_(names_that_duplicate_index, idx_names)
  1857. eq_(uq_names, set())
  1858. no_cst = self.tables.no_constraints.name
  1859. eq_(insp.get_unique_constraints(no_cst, schema=schema), [])
  1860. @testing.requires.view_reflection
  1861. @testing.combinations(
  1862. (False,), (True, testing.requires.schemas), argnames="use_schema"
  1863. )
  1864. def test_get_view_definition(self, connection, use_schema):
  1865. if use_schema:
  1866. schema = config.test_schema
  1867. else:
  1868. schema = None
  1869. insp = inspect(connection)
  1870. for view in ["users_v", "email_addresses_v", "dingalings_v"]:
  1871. v = insp.get_view_definition(view, schema=schema)
  1872. is_true(bool(v))
  1873. @testing.requires.view_reflection
  1874. def test_get_view_definition_does_not_exist(self, connection):
  1875. insp = inspect(connection)
  1876. with expect_raises(NoSuchTableError):
  1877. insp.get_view_definition("view_does_not_exist")
  1878. with expect_raises(NoSuchTableError):
  1879. insp.get_view_definition("users") # a table
  1880. @testing.requires.table_reflection
  1881. def test_autoincrement_col(self, connection):
  1882. """test that 'autoincrement' is reflected according to sqla's policy.
  1883. Don't mark this test as unsupported for any backend !
  1884. (technically it fails with MySQL InnoDB since "id" comes before "id2")
  1885. A backend is better off not returning "autoincrement" at all,
  1886. instead of potentially returning "False" for an auto-incrementing
  1887. primary key column.
  1888. """
  1889. insp = inspect(connection)
  1890. for tname, cname in [
  1891. ("users", "user_id"),
  1892. ("email_addresses", "address_id"),
  1893. ("dingalings", "dingaling_id"),
  1894. ]:
  1895. cols = insp.get_columns(tname)
  1896. id_ = {c["name"]: c for c in cols}[cname]
  1897. assert id_.get("autoincrement", True)
  1898. @testing.combinations(
  1899. (True, testing.requires.schemas), (False,), argnames="use_schema"
  1900. )
  1901. def test_get_table_options(self, use_schema):
  1902. insp = inspect(config.db)
  1903. schema = config.test_schema if use_schema else None
  1904. if testing.requires.reflect_table_options.enabled:
  1905. res = insp.get_table_options("users", schema=schema)
  1906. is_true(isinstance(res, dict))
  1907. # NOTE: can't really create a table with no option
  1908. res = insp.get_table_options("no_constraints", schema=schema)
  1909. is_true(isinstance(res, dict))
  1910. else:
  1911. with expect_raises(NotImplementedError):
  1912. insp.get_table_options("users", schema=schema)
  1913. @testing.combinations((True, testing.requires.schemas), False)
  1914. def test_multi_get_table_options(self, use_schema):
  1915. insp = inspect(config.db)
  1916. if testing.requires.reflect_table_options.enabled:
  1917. schema = config.test_schema if use_schema else None
  1918. res = insp.get_multi_table_options(schema=schema)
  1919. exp = {
  1920. (schema, table): insp.get_table_options(table, schema=schema)
  1921. for table in insp.get_table_names(schema=schema)
  1922. }
  1923. eq_(res, exp)
  1924. else:
  1925. with expect_raises(NotImplementedError):
  1926. insp.get_multi_table_options()
  1927. @testing.fixture
  1928. def get_multi_exp(self, connection):
  1929. def provide_fixture(
  1930. schema, scope, kind, use_filter, single_reflect_fn, exp_method
  1931. ):
  1932. insp = inspect(connection)
  1933. # call the reflection function at least once to avoid
  1934. # "Unexpected success" errors if the result is actually empty
  1935. # and NotImplementedError is not raised
  1936. single_reflect_fn(insp, "email_addresses")
  1937. kw = {"scope": scope, "kind": kind}
  1938. if schema:
  1939. schema = schema()
  1940. filter_names = []
  1941. if ObjectKind.TABLE in kind:
  1942. filter_names.extend(
  1943. ["comment_test", "users", "does-not-exist"]
  1944. )
  1945. if ObjectKind.VIEW in kind:
  1946. filter_names.extend(["email_addresses_v", "does-not-exist"])
  1947. if ObjectKind.MATERIALIZED_VIEW in kind:
  1948. filter_names.extend(["dingalings_v", "does-not-exist"])
  1949. if schema:
  1950. kw["schema"] = schema
  1951. if use_filter:
  1952. kw["filter_names"] = filter_names
  1953. exp = exp_method(
  1954. schema=schema,
  1955. scope=scope,
  1956. kind=kind,
  1957. filter_names=kw.get("filter_names"),
  1958. )
  1959. kws = [kw]
  1960. if scope == ObjectScope.DEFAULT:
  1961. nkw = kw.copy()
  1962. nkw.pop("scope")
  1963. kws.append(nkw)
  1964. if kind == ObjectKind.TABLE:
  1965. nkw = kw.copy()
  1966. nkw.pop("kind")
  1967. kws.append(nkw)
  1968. return inspect(connection), kws, exp
  1969. return provide_fixture
  1970. @testing.requires.reflect_table_options
  1971. @_multi_combination
  1972. def test_multi_get_table_options_tables(
  1973. self, get_multi_exp, schema, scope, kind, use_filter
  1974. ):
  1975. insp, kws, exp = get_multi_exp(
  1976. schema,
  1977. scope,
  1978. kind,
  1979. use_filter,
  1980. Inspector.get_table_options,
  1981. self.exp_options,
  1982. )
  1983. for kw in kws:
  1984. insp.clear_cache()
  1985. result = insp.get_multi_table_options(**kw)
  1986. eq_(result, exp)
  1987. @testing.requires.comment_reflection
  1988. @_multi_combination
  1989. def test_get_multi_table_comment(
  1990. self, get_multi_exp, schema, scope, kind, use_filter
  1991. ):
  1992. insp, kws, exp = get_multi_exp(
  1993. schema,
  1994. scope,
  1995. kind,
  1996. use_filter,
  1997. Inspector.get_table_comment,
  1998. self.exp_comments,
  1999. )
  2000. for kw in kws:
  2001. insp.clear_cache()
  2002. eq_(insp.get_multi_table_comment(**kw), exp)
  2003. def _check_expressions(self, result, exp, err_msg):
  2004. def _clean(text: str):
  2005. return re.sub(r"['\" ]", "", text).lower()
  2006. if isinstance(exp, dict):
  2007. eq_({_clean(e): v for e, v in result.items()}, exp, err_msg)
  2008. else:
  2009. eq_([_clean(e) for e in result], exp, err_msg)
  2010. def _check_list(self, result, exp, req_keys=None, msg=None):
  2011. if req_keys is None:
  2012. eq_(result, exp, msg)
  2013. else:
  2014. eq_(len(result), len(exp), msg)
  2015. for r, e in zip(result, exp):
  2016. for k in set(r) | set(e):
  2017. if k in req_keys or (k in r and k in e):
  2018. err_msg = f"{msg} - {k} - {r}"
  2019. if k in ("expressions", "column_sorting"):
  2020. self._check_expressions(r[k], e[k], err_msg)
  2021. else:
  2022. eq_(r[k], e[k], err_msg)
  2023. def _check_table_dict(self, result, exp, req_keys=None, make_lists=False):
  2024. eq_(set(result.keys()), set(exp.keys()))
  2025. for k in result:
  2026. r, e = result[k], exp[k]
  2027. if make_lists:
  2028. r, e = [r], [e]
  2029. self._check_list(r, e, req_keys, k)
  2030. @_multi_combination
  2031. def test_get_multi_columns(
  2032. self, get_multi_exp, schema, scope, kind, use_filter
  2033. ):
  2034. insp, kws, exp = get_multi_exp(
  2035. schema,
  2036. scope,
  2037. kind,
  2038. use_filter,
  2039. Inspector.get_columns,
  2040. self.exp_columns,
  2041. )
  2042. for kw in kws:
  2043. insp.clear_cache()
  2044. result = insp.get_multi_columns(**kw)
  2045. self._check_table_dict(result, exp, self._required_column_keys)
  2046. @testing.requires.primary_key_constraint_reflection
  2047. @_multi_combination
  2048. def test_get_multi_pk_constraint(
  2049. self, get_multi_exp, schema, scope, kind, use_filter
  2050. ):
  2051. insp, kws, exp = get_multi_exp(
  2052. schema,
  2053. scope,
  2054. kind,
  2055. use_filter,
  2056. Inspector.get_pk_constraint,
  2057. self.exp_pks,
  2058. )
  2059. for kw in kws:
  2060. insp.clear_cache()
  2061. result = insp.get_multi_pk_constraint(**kw)
  2062. self._check_table_dict(
  2063. result, exp, self._required_pk_keys, make_lists=True
  2064. )
  2065. def _adjust_sort(self, result, expected, key):
  2066. if not testing.requires.implicitly_named_constraints.enabled:
  2067. for obj in [result, expected]:
  2068. for val in obj.values():
  2069. if len(val) > 1 and any(
  2070. v.get("name") in (None, mock.ANY) for v in val
  2071. ):
  2072. val.sort(key=key)
  2073. @testing.requires.foreign_key_constraint_reflection
  2074. @_multi_combination
  2075. def test_get_multi_foreign_keys(
  2076. self, get_multi_exp, schema, scope, kind, use_filter
  2077. ):
  2078. insp, kws, exp = get_multi_exp(
  2079. schema,
  2080. scope,
  2081. kind,
  2082. use_filter,
  2083. Inspector.get_foreign_keys,
  2084. self.exp_fks,
  2085. )
  2086. for kw in kws:
  2087. insp.clear_cache()
  2088. result = insp.get_multi_foreign_keys(**kw)
  2089. self._adjust_sort(
  2090. result, exp, lambda d: tuple(d["constrained_columns"])
  2091. )
  2092. self._check_table_dict(result, exp, self._required_fk_keys)
  2093. @testing.requires.index_reflection
  2094. @_multi_combination
  2095. def test_get_multi_indexes(
  2096. self, get_multi_exp, schema, scope, kind, use_filter
  2097. ):
  2098. insp, kws, exp = get_multi_exp(
  2099. schema,
  2100. scope,
  2101. kind,
  2102. use_filter,
  2103. Inspector.get_indexes,
  2104. self.exp_indexes,
  2105. )
  2106. for kw in kws:
  2107. insp.clear_cache()
  2108. result = insp.get_multi_indexes(**kw)
  2109. self._check_table_dict(result, exp, self._required_index_keys)
  2110. @testing.requires.unique_constraint_reflection
  2111. @_multi_combination
  2112. def test_get_multi_unique_constraints(
  2113. self, get_multi_exp, schema, scope, kind, use_filter
  2114. ):
  2115. insp, kws, exp = get_multi_exp(
  2116. schema,
  2117. scope,
  2118. kind,
  2119. use_filter,
  2120. Inspector.get_unique_constraints,
  2121. self.exp_ucs,
  2122. )
  2123. for kw in kws:
  2124. insp.clear_cache()
  2125. result = insp.get_multi_unique_constraints(**kw)
  2126. self._adjust_sort(result, exp, lambda d: tuple(d["column_names"]))
  2127. self._check_table_dict(result, exp, self._required_unique_cst_keys)
  2128. @testing.requires.check_constraint_reflection
  2129. @_multi_combination
  2130. def test_get_multi_check_constraints(
  2131. self, get_multi_exp, schema, scope, kind, use_filter
  2132. ):
  2133. insp, kws, exp = get_multi_exp(
  2134. schema,
  2135. scope,
  2136. kind,
  2137. use_filter,
  2138. Inspector.get_check_constraints,
  2139. self.exp_ccs,
  2140. )
  2141. for kw in kws:
  2142. insp.clear_cache()
  2143. result = insp.get_multi_check_constraints(**kw)
  2144. self._adjust_sort(result, exp, lambda d: tuple(d["sqltext"]))
  2145. self._check_table_dict(result, exp, self._required_cc_keys)
  2146. @testing.combinations(
  2147. ("get_table_options", testing.requires.reflect_table_options),
  2148. "get_columns",
  2149. (
  2150. "get_pk_constraint",
  2151. testing.requires.primary_key_constraint_reflection,
  2152. ),
  2153. (
  2154. "get_foreign_keys",
  2155. testing.requires.foreign_key_constraint_reflection,
  2156. ),
  2157. ("get_indexes", testing.requires.index_reflection),
  2158. (
  2159. "get_unique_constraints",
  2160. testing.requires.unique_constraint_reflection,
  2161. ),
  2162. (
  2163. "get_check_constraints",
  2164. testing.requires.check_constraint_reflection,
  2165. ),
  2166. ("get_table_comment", testing.requires.comment_reflection),
  2167. argnames="method",
  2168. )
  2169. def test_not_existing_table(self, method, connection):
  2170. insp = inspect(connection)
  2171. meth = getattr(insp, method)
  2172. with expect_raises(NoSuchTableError):
  2173. meth("table_does_not_exists")
  2174. def test_unreflectable(self, connection):
  2175. mc = Inspector.get_multi_columns
  2176. def patched(*a, **k):
  2177. ur = k.setdefault("unreflectable", {})
  2178. ur[(None, "some_table")] = UnreflectableTableError("err")
  2179. return mc(*a, **k)
  2180. with mock.patch.object(Inspector, "get_multi_columns", patched):
  2181. with expect_raises_message(UnreflectableTableError, "err"):
  2182. inspect(connection).reflect_table(
  2183. Table("some_table", MetaData()), None
  2184. )
  2185. @testing.combinations(True, False, argnames="use_schema")
  2186. @testing.combinations(
  2187. (True, testing.requires.views), False, argnames="views"
  2188. )
  2189. def test_metadata(self, connection, use_schema, views):
  2190. m = MetaData()
  2191. schema = config.test_schema if use_schema else None
  2192. m.reflect(connection, schema=schema, views=views, resolve_fks=False)
  2193. insp = inspect(connection)
  2194. tables = insp.get_table_names(schema)
  2195. if views:
  2196. tables += insp.get_view_names(schema)
  2197. try:
  2198. tables += insp.get_materialized_view_names(schema)
  2199. except NotImplementedError:
  2200. pass
  2201. if schema:
  2202. tables = [f"{schema}.{t}" for t in tables]
  2203. eq_(sorted(m.tables), sorted(tables))
  2204. @testing.requires.comment_reflection
  2205. def test_comments_unicode(self, connection, metadata):
  2206. Table(
  2207. "unicode_comments",
  2208. metadata,
  2209. Column("unicode", Integer, comment="é試蛇ẟΩ"),
  2210. Column("emoji", Integer, comment="☁️✨"),
  2211. comment="試蛇ẟΩ✨",
  2212. )
  2213. metadata.create_all(connection)
  2214. insp = inspect(connection)
  2215. tc = insp.get_table_comment("unicode_comments")
  2216. eq_(tc, {"text": "試蛇ẟΩ✨"})
  2217. cols = insp.get_columns("unicode_comments")
  2218. value = {c["name"]: c["comment"] for c in cols}
  2219. exp = {"unicode": "é試蛇ẟΩ", "emoji": "☁️✨"}
  2220. eq_(value, exp)
  2221. @testing.requires.comment_reflection_full_unicode
  2222. def test_comments_unicode_full(self, connection, metadata):
  2223. Table(
  2224. "unicode_comments",
  2225. metadata,
  2226. Column("emoji", Integer, comment="🐍🧙🝝🧙‍♂️🧙‍♀️"),
  2227. comment="🎩🁰🝑🤷‍♀️🤷‍♂️",
  2228. )
  2229. metadata.create_all(connection)
  2230. insp = inspect(connection)
  2231. tc = insp.get_table_comment("unicode_comments")
  2232. eq_(tc, {"text": "🎩🁰🝑🤷‍♀️🤷‍♂️"})
  2233. c = insp.get_columns("unicode_comments")[0]
  2234. eq_({c["name"]: c["comment"]}, {"emoji": "🐍🧙🝝🧙‍♂️🧙‍♀️"})
  2235. class TableNoColumnsTest(fixtures.TestBase):
  2236. __requires__ = ("reflect_tables_no_columns",)
  2237. __backend__ = True
  2238. @testing.fixture
  2239. def table_no_columns(self, connection, metadata):
  2240. Table("empty", metadata)
  2241. metadata.create_all(connection)
  2242. @testing.fixture
  2243. def view_no_columns(self, connection, metadata):
  2244. Table("empty", metadata)
  2245. event.listen(
  2246. metadata,
  2247. "after_create",
  2248. DDL("CREATE VIEW empty_v AS SELECT * FROM empty"),
  2249. )
  2250. # for transactional DDL the transaction is rolled back before this
  2251. # drop statement is invoked
  2252. event.listen(
  2253. metadata, "before_drop", DDL("DROP VIEW IF EXISTS empty_v")
  2254. )
  2255. metadata.create_all(connection)
  2256. def test_reflect_table_no_columns(self, connection, table_no_columns):
  2257. t2 = Table("empty", MetaData(), autoload_with=connection)
  2258. eq_(list(t2.c), [])
  2259. def test_get_columns_table_no_columns(self, connection, table_no_columns):
  2260. insp = inspect(connection)
  2261. eq_(insp.get_columns("empty"), [])
  2262. multi = insp.get_multi_columns()
  2263. eq_(multi, {(None, "empty"): []})
  2264. def test_reflect_incl_table_no_columns(self, connection, table_no_columns):
  2265. m = MetaData()
  2266. m.reflect(connection)
  2267. assert set(m.tables).intersection(["empty"])
  2268. @testing.requires.views
  2269. def test_reflect_view_no_columns(self, connection, view_no_columns):
  2270. t2 = Table("empty_v", MetaData(), autoload_with=connection)
  2271. eq_(list(t2.c), [])
  2272. @testing.requires.views
  2273. def test_get_columns_view_no_columns(self, connection, view_no_columns):
  2274. insp = inspect(connection)
  2275. eq_(insp.get_columns("empty_v"), [])
  2276. multi = insp.get_multi_columns(kind=ObjectKind.VIEW)
  2277. eq_(multi, {(None, "empty_v"): []})
  2278. class ComponentReflectionTestExtra(ComparesIndexes, fixtures.TestBase):
  2279. __backend__ = True
  2280. @testing.fixture(params=[True, False])
  2281. def use_schema_fixture(self, request):
  2282. if request.param:
  2283. return config.test_schema
  2284. else:
  2285. return None
  2286. @testing.fixture()
  2287. def inspect_for_table(self, metadata, connection, use_schema_fixture):
  2288. @contextlib.contextmanager
  2289. def go(tablename):
  2290. yield use_schema_fixture, inspect(connection)
  2291. metadata.create_all(connection)
  2292. return go
  2293. def ck_eq(self, reflected, expected):
  2294. # trying to minimize effect of quoting, parenthesis, etc.
  2295. # may need to add more to this as new dialects get CHECK
  2296. # constraint reflection support
  2297. def normalize(sqltext):
  2298. return " ".join(
  2299. re.findall(r"and|\d|=|a|b|c|or|<|>", sqltext.lower(), re.I)
  2300. )
  2301. reflected = sorted(
  2302. [
  2303. {"name": item["name"], "sqltext": normalize(item["sqltext"])}
  2304. for item in reflected
  2305. ],
  2306. key=lambda item: (item["sqltext"]),
  2307. )
  2308. expected = sorted(
  2309. expected,
  2310. key=lambda item: (item["sqltext"]),
  2311. )
  2312. eq_(reflected, expected)
  2313. @testing.requires.check_constraint_reflection
  2314. def test_check_constraint_no_constraint(self, metadata, inspect_for_table):
  2315. with inspect_for_table("no_constraints") as (schema, inspector):
  2316. Table(
  2317. "no_constraints",
  2318. metadata,
  2319. Column("data", sa.String(20)),
  2320. schema=schema,
  2321. )
  2322. self.ck_eq(
  2323. inspector.get_check_constraints("no_constraints", schema=schema),
  2324. [],
  2325. )
  2326. @testing.requires.inline_check_constraint_reflection
  2327. @testing.combinations(
  2328. "my_inline", "MyInline", None, argnames="constraint_name"
  2329. )
  2330. def test_check_constraint_inline(
  2331. self, metadata, inspect_for_table, constraint_name
  2332. ):
  2333. with inspect_for_table("sa_cc") as (schema, inspector):
  2334. Table(
  2335. "sa_cc",
  2336. metadata,
  2337. Column("id", Integer(), primary_key=True),
  2338. Column(
  2339. "a",
  2340. Integer(),
  2341. sa.CheckConstraint(
  2342. "a > 1 AND a < 5", name=constraint_name
  2343. ),
  2344. ),
  2345. Column("data", String(50)),
  2346. schema=schema,
  2347. )
  2348. reflected = inspector.get_check_constraints("sa_cc", schema=schema)
  2349. self.ck_eq(
  2350. reflected,
  2351. [
  2352. {
  2353. "name": constraint_name or mock.ANY,
  2354. "sqltext": "a > 1 and a < 5",
  2355. },
  2356. ],
  2357. )
  2358. @testing.requires.check_constraint_reflection
  2359. @testing.combinations(
  2360. "my_ck_const", "MyCkConst", None, argnames="constraint_name"
  2361. )
  2362. def test_check_constraint_standalone(
  2363. self, metadata, inspect_for_table, constraint_name
  2364. ):
  2365. with inspect_for_table("sa_cc") as (schema, inspector):
  2366. Table(
  2367. "sa_cc",
  2368. metadata,
  2369. Column("a", Integer()),
  2370. sa.CheckConstraint(
  2371. "a = 1 OR (a > 2 AND a < 5)", name=constraint_name
  2372. ),
  2373. schema=schema,
  2374. )
  2375. reflected = inspector.get_check_constraints("sa_cc", schema=schema)
  2376. self.ck_eq(
  2377. reflected,
  2378. [
  2379. {
  2380. "name": constraint_name or mock.ANY,
  2381. "sqltext": "a = 1 or a > 2 and a < 5",
  2382. },
  2383. ],
  2384. )
  2385. @testing.requires.inline_check_constraint_reflection
  2386. def test_check_constraint_mixed(self, metadata, inspect_for_table):
  2387. with inspect_for_table("sa_cc") as (schema, inspector):
  2388. Table(
  2389. "sa_cc",
  2390. metadata,
  2391. Column("id", Integer(), primary_key=True),
  2392. Column("a", Integer(), sa.CheckConstraint("a > 1 AND a < 5")),
  2393. Column(
  2394. "b",
  2395. Integer(),
  2396. sa.CheckConstraint("b > 1 AND b < 5", name="my_inline"),
  2397. ),
  2398. Column("c", Integer()),
  2399. Column("data", String(50)),
  2400. sa.UniqueConstraint("data", name="some_uq"),
  2401. sa.CheckConstraint("c > 1 AND c < 5", name="cc1"),
  2402. sa.UniqueConstraint("c", name="some_c_uq"),
  2403. schema=schema,
  2404. )
  2405. reflected = inspector.get_check_constraints("sa_cc", schema=schema)
  2406. self.ck_eq(
  2407. reflected,
  2408. [
  2409. {"name": "cc1", "sqltext": "c > 1 and c < 5"},
  2410. {"name": "my_inline", "sqltext": "b > 1 and b < 5"},
  2411. {"name": mock.ANY, "sqltext": "a > 1 and a < 5"},
  2412. ],
  2413. )
  2414. @testing.requires.indexes_with_expressions
  2415. def test_reflect_expression_based_indexes(self, metadata, connection):
  2416. t = Table(
  2417. "t",
  2418. metadata,
  2419. Column("x", String(30)),
  2420. Column("y", String(30)),
  2421. Column("z", String(30)),
  2422. )
  2423. Index("t_idx", func.lower(t.c.x), t.c.z, func.lower(t.c.y))
  2424. long_str = "long string " * 100
  2425. Index("t_idx_long", func.coalesce(t.c.x, long_str))
  2426. Index("t_idx_2", t.c.x)
  2427. metadata.create_all(connection)
  2428. insp = inspect(connection)
  2429. expected = [
  2430. {
  2431. "name": "t_idx_2",
  2432. "column_names": ["x"],
  2433. "unique": False,
  2434. "dialect_options": {},
  2435. }
  2436. ]
  2437. def completeIndex(entry):
  2438. if testing.requires.index_reflects_included_columns.enabled:
  2439. entry["include_columns"] = []
  2440. entry["dialect_options"] = {
  2441. f"{connection.engine.name}_include": []
  2442. }
  2443. else:
  2444. entry.setdefault("dialect_options", {})
  2445. completeIndex(expected[0])
  2446. class lower_index_str(str):
  2447. def __eq__(self, other):
  2448. ol = other.lower()
  2449. # test that lower and x or y are in the string
  2450. return "lower" in ol and ("x" in ol or "y" in ol)
  2451. class coalesce_index_str(str):
  2452. def __eq__(self, other):
  2453. # test that coalesce and the string is in other
  2454. return "coalesce" in other.lower() and long_str in other
  2455. if testing.requires.reflect_indexes_with_expressions.enabled:
  2456. expr_index = {
  2457. "name": "t_idx",
  2458. "column_names": [None, "z", None],
  2459. "expressions": [
  2460. lower_index_str("lower(x)"),
  2461. "z",
  2462. lower_index_str("lower(y)"),
  2463. ],
  2464. "unique": False,
  2465. }
  2466. completeIndex(expr_index)
  2467. expected.insert(0, expr_index)
  2468. expr_index_long = {
  2469. "name": "t_idx_long",
  2470. "column_names": [None],
  2471. "expressions": [
  2472. coalesce_index_str(f"coalesce(x, '{long_str}')")
  2473. ],
  2474. "unique": False,
  2475. }
  2476. completeIndex(expr_index_long)
  2477. expected.append(expr_index_long)
  2478. eq_(insp.get_indexes("t"), expected)
  2479. m2 = MetaData()
  2480. t2 = Table("t", m2, autoload_with=connection)
  2481. else:
  2482. with expect_warnings(
  2483. "Skipped unsupported reflection of expression-based "
  2484. "index t_idx"
  2485. ):
  2486. eq_(insp.get_indexes("t"), expected)
  2487. m2 = MetaData()
  2488. t2 = Table("t", m2, autoload_with=connection)
  2489. self.compare_table_index_with_expected(
  2490. t2, expected, connection.engine.name
  2491. )
  2492. @testing.requires.index_reflects_included_columns
  2493. def test_reflect_covering_index(self, metadata, connection):
  2494. t = Table(
  2495. "t",
  2496. metadata,
  2497. Column("x", String(30)),
  2498. Column("y", String(30)),
  2499. )
  2500. idx = Index("t_idx", t.c.x)
  2501. idx.dialect_options[connection.engine.name]["include"] = ["y"]
  2502. metadata.create_all(connection)
  2503. insp = inspect(connection)
  2504. get_indexes = insp.get_indexes("t")
  2505. eq_(
  2506. get_indexes,
  2507. [
  2508. {
  2509. "name": "t_idx",
  2510. "column_names": ["x"],
  2511. "include_columns": ["y"],
  2512. "unique": False,
  2513. "dialect_options": mock.ANY,
  2514. }
  2515. ],
  2516. )
  2517. eq_(
  2518. get_indexes[0]["dialect_options"][
  2519. "%s_include" % connection.engine.name
  2520. ],
  2521. ["y"],
  2522. )
  2523. t2 = Table("t", MetaData(), autoload_with=connection)
  2524. eq_(
  2525. list(t2.indexes)[0].dialect_options[connection.engine.name][
  2526. "include"
  2527. ],
  2528. ["y"],
  2529. )
  2530. def _type_round_trip(self, connection, metadata, *types):
  2531. t = Table(
  2532. "t",
  2533. metadata,
  2534. *[Column("t%d" % i, type_) for i, type_ in enumerate(types)],
  2535. )
  2536. t.create(connection)
  2537. return [c["type"] for c in inspect(connection).get_columns("t")]
  2538. @testing.requires.table_reflection
  2539. def test_numeric_reflection(self, connection, metadata):
  2540. for typ in self._type_round_trip(
  2541. connection, metadata, sql_types.Numeric(18, 5)
  2542. ):
  2543. assert isinstance(typ, sql_types.Numeric)
  2544. eq_(typ.precision, 18)
  2545. eq_(typ.scale, 5)
  2546. @testing.requires.table_reflection
  2547. @testing.combinations(
  2548. sql_types.String,
  2549. sql_types.VARCHAR,
  2550. sql_types.CHAR,
  2551. (sql_types.NVARCHAR, testing.requires.nvarchar_types),
  2552. (sql_types.NCHAR, testing.requires.nvarchar_types),
  2553. argnames="type_",
  2554. )
  2555. def test_string_length_reflection(self, connection, metadata, type_):
  2556. typ = self._type_round_trip(connection, metadata, type_(52))[0]
  2557. if issubclass(type_, sql_types.VARCHAR):
  2558. assert isinstance(typ, sql_types.VARCHAR)
  2559. elif issubclass(type_, sql_types.CHAR):
  2560. assert isinstance(typ, sql_types.CHAR)
  2561. else:
  2562. assert isinstance(typ, sql_types.String)
  2563. eq_(typ.length, 52)
  2564. assert isinstance(typ.length, int)
  2565. @testing.requires.table_reflection
  2566. def test_nullable_reflection(self, connection, metadata):
  2567. t = Table(
  2568. "t",
  2569. metadata,
  2570. Column("a", Integer, nullable=True),
  2571. Column("b", Integer, nullable=False),
  2572. )
  2573. t.create(connection)
  2574. eq_(
  2575. {
  2576. col["name"]: col["nullable"]
  2577. for col in inspect(connection).get_columns("t")
  2578. },
  2579. {"a": True, "b": False},
  2580. )
  2581. @testing.combinations(
  2582. (
  2583. None,
  2584. "CASCADE",
  2585. None,
  2586. testing.requires.foreign_key_constraint_option_reflection_ondelete,
  2587. ),
  2588. (
  2589. None,
  2590. None,
  2591. "SET NULL",
  2592. testing.requires.foreign_key_constraint_option_reflection_onupdate,
  2593. ),
  2594. (
  2595. {},
  2596. None,
  2597. "NO ACTION",
  2598. testing.requires.foreign_key_constraint_option_reflection_onupdate,
  2599. ),
  2600. (
  2601. {},
  2602. "NO ACTION",
  2603. None,
  2604. testing.requires.fk_constraint_option_reflection_ondelete_noaction,
  2605. ),
  2606. (
  2607. None,
  2608. None,
  2609. "RESTRICT",
  2610. testing.requires.fk_constraint_option_reflection_onupdate_restrict,
  2611. ),
  2612. (
  2613. None,
  2614. "RESTRICT",
  2615. None,
  2616. testing.requires.fk_constraint_option_reflection_ondelete_restrict,
  2617. ),
  2618. argnames="expected,ondelete,onupdate",
  2619. )
  2620. def test_get_foreign_key_options(
  2621. self, connection, metadata, expected, ondelete, onupdate
  2622. ):
  2623. options = {}
  2624. if ondelete:
  2625. options["ondelete"] = ondelete
  2626. if onupdate:
  2627. options["onupdate"] = onupdate
  2628. if expected is None:
  2629. expected = options
  2630. Table(
  2631. "x",
  2632. metadata,
  2633. Column("id", Integer, primary_key=True),
  2634. test_needs_fk=True,
  2635. )
  2636. Table(
  2637. "table",
  2638. metadata,
  2639. Column("id", Integer, primary_key=True),
  2640. Column("x_id", Integer, ForeignKey("x.id", name="xid")),
  2641. Column("test", String(10)),
  2642. test_needs_fk=True,
  2643. )
  2644. Table(
  2645. "user",
  2646. metadata,
  2647. Column("id", Integer, primary_key=True),
  2648. Column("name", String(50), nullable=False),
  2649. Column("tid", Integer),
  2650. sa.ForeignKeyConstraint(
  2651. ["tid"], ["table.id"], name="myfk", **options
  2652. ),
  2653. test_needs_fk=True,
  2654. )
  2655. metadata.create_all(connection)
  2656. insp = inspect(connection)
  2657. # test 'options' is always present for a backend
  2658. # that can reflect these, since alembic looks for this
  2659. opts = insp.get_foreign_keys("table")[0]["options"]
  2660. eq_({k: opts[k] for k in opts if opts[k]}, {})
  2661. opts = insp.get_foreign_keys("user")[0]["options"]
  2662. eq_(opts, expected)
  2663. # eq_(dict((k, opts[k]) for k in opts if opts[k]), expected)
  2664. @testing.combinations(
  2665. (Integer, sa.text("10"), r"'?10'?"),
  2666. (Integer, "10", r"'?10'?"),
  2667. (Boolean, sa.true(), r"1|true"),
  2668. (
  2669. Integer,
  2670. sa.text("3 + 5"),
  2671. r"3\+5",
  2672. testing.requires.expression_server_defaults,
  2673. ),
  2674. (
  2675. Integer,
  2676. sa.text("(3 * 5)"),
  2677. r"3\*5",
  2678. testing.requires.expression_server_defaults,
  2679. ),
  2680. (DateTime, func.now(), r"current_timestamp|now|getdate"),
  2681. (
  2682. Integer,
  2683. sa.literal_column("3") + sa.literal_column("5"),
  2684. r"3\+5",
  2685. testing.requires.expression_server_defaults,
  2686. ),
  2687. argnames="datatype, default, expected_reg",
  2688. )
  2689. @testing.requires.server_defaults
  2690. def test_server_defaults(
  2691. self, metadata, connection, datatype, default, expected_reg
  2692. ):
  2693. t = Table(
  2694. "t",
  2695. metadata,
  2696. Column("id", Integer, primary_key=True),
  2697. Column("thecol", datatype, server_default=default),
  2698. )
  2699. t.create(connection)
  2700. reflected = inspect(connection).get_columns("t")[1]["default"]
  2701. reflected_sanitized = re.sub(r"[\(\) \']", "", reflected)
  2702. eq_regex(reflected_sanitized, expected_reg, flags=re.IGNORECASE)
  2703. class NormalizedNameTest(fixtures.TablesTest):
  2704. __requires__ = ("denormalized_names",)
  2705. __backend__ = True
  2706. @classmethod
  2707. def define_tables(cls, metadata):
  2708. Table(
  2709. quoted_name("t1", quote=True),
  2710. metadata,
  2711. Column("id", Integer, primary_key=True),
  2712. )
  2713. Table(
  2714. quoted_name("t2", quote=True),
  2715. metadata,
  2716. Column("id", Integer, primary_key=True),
  2717. Column("t1id", ForeignKey("t1.id")),
  2718. )
  2719. def test_reflect_lowercase_forced_tables(self):
  2720. m2 = MetaData()
  2721. t2_ref = Table(
  2722. quoted_name("t2", quote=True), m2, autoload_with=config.db
  2723. )
  2724. t1_ref = m2.tables["t1"]
  2725. assert t2_ref.c.t1id.references(t1_ref.c.id)
  2726. m3 = MetaData()
  2727. m3.reflect(
  2728. config.db, only=lambda name, m: name.lower() in ("t1", "t2")
  2729. )
  2730. assert m3.tables["t2"].c.t1id.references(m3.tables["t1"].c.id)
  2731. def test_get_table_names(self):
  2732. tablenames = [
  2733. t
  2734. for t in inspect(config.db).get_table_names()
  2735. if t.lower() in ("t1", "t2")
  2736. ]
  2737. eq_(tablenames[0].upper(), tablenames[0].lower())
  2738. eq_(tablenames[1].upper(), tablenames[1].lower())
  2739. class ComputedReflectionTest(fixtures.ComputedReflectionFixtureTest):
  2740. def test_computed_col_default_not_set(self):
  2741. insp = inspect(config.db)
  2742. cols = insp.get_columns("computed_default_table")
  2743. col_data = {c["name"]: c for c in cols}
  2744. is_true("42" in col_data["with_default"]["default"])
  2745. is_(col_data["normal"]["default"], None)
  2746. is_(col_data["computed_col"]["default"], None)
  2747. def test_get_column_returns_computed(self):
  2748. insp = inspect(config.db)
  2749. cols = insp.get_columns("computed_default_table")
  2750. data = {c["name"]: c for c in cols}
  2751. for key in ("id", "normal", "with_default"):
  2752. is_true("computed" not in data[key])
  2753. compData = data["computed_col"]
  2754. is_true("computed" in compData)
  2755. is_true("sqltext" in compData["computed"])
  2756. eq_(self.normalize(compData["computed"]["sqltext"]), "normal+42")
  2757. eq_(
  2758. "persisted" in compData["computed"],
  2759. testing.requires.computed_columns_reflect_persisted.enabled,
  2760. )
  2761. if testing.requires.computed_columns_reflect_persisted.enabled:
  2762. eq_(
  2763. compData["computed"]["persisted"],
  2764. testing.requires.computed_columns_default_persisted.enabled,
  2765. )
  2766. def check_column(self, data, column, sqltext, persisted):
  2767. is_true("computed" in data[column])
  2768. compData = data[column]["computed"]
  2769. eq_(self.normalize(compData["sqltext"]), sqltext)
  2770. if testing.requires.computed_columns_reflect_persisted.enabled:
  2771. is_true("persisted" in compData)
  2772. is_(compData["persisted"], persisted)
  2773. def test_get_column_returns_persisted(self):
  2774. insp = inspect(config.db)
  2775. cols = insp.get_columns("computed_column_table")
  2776. data = {c["name"]: c for c in cols}
  2777. self.check_column(
  2778. data,
  2779. "computed_no_flag",
  2780. "normal+42",
  2781. testing.requires.computed_columns_default_persisted.enabled,
  2782. )
  2783. if testing.requires.computed_columns_virtual.enabled:
  2784. self.check_column(
  2785. data,
  2786. "computed_virtual",
  2787. "normal+2",
  2788. False,
  2789. )
  2790. if testing.requires.computed_columns_stored.enabled:
  2791. self.check_column(
  2792. data,
  2793. "computed_stored",
  2794. "normal-42",
  2795. True,
  2796. )
  2797. @testing.requires.schemas
  2798. def test_get_column_returns_persisted_with_schema(self):
  2799. insp = inspect(config.db)
  2800. cols = insp.get_columns(
  2801. "computed_column_table", schema=config.test_schema
  2802. )
  2803. data = {c["name"]: c for c in cols}
  2804. self.check_column(
  2805. data,
  2806. "computed_no_flag",
  2807. "normal/42",
  2808. testing.requires.computed_columns_default_persisted.enabled,
  2809. )
  2810. if testing.requires.computed_columns_virtual.enabled:
  2811. self.check_column(
  2812. data,
  2813. "computed_virtual",
  2814. "normal/2",
  2815. False,
  2816. )
  2817. if testing.requires.computed_columns_stored.enabled:
  2818. self.check_column(
  2819. data,
  2820. "computed_stored",
  2821. "normal*42",
  2822. True,
  2823. )
  2824. class IdentityReflectionTest(fixtures.TablesTest):
  2825. run_inserts = run_deletes = None
  2826. __backend__ = True
  2827. __requires__ = ("identity_columns", "table_reflection")
  2828. @classmethod
  2829. def define_tables(cls, metadata):
  2830. Table(
  2831. "t1",
  2832. metadata,
  2833. Column("normal", Integer),
  2834. Column("id1", Integer, Identity()),
  2835. )
  2836. Table(
  2837. "t2",
  2838. metadata,
  2839. Column(
  2840. "id2",
  2841. Integer,
  2842. Identity(
  2843. always=True,
  2844. start=2,
  2845. increment=3,
  2846. minvalue=-2,
  2847. maxvalue=42,
  2848. cycle=True,
  2849. cache=4,
  2850. ),
  2851. ),
  2852. )
  2853. if testing.requires.schemas.enabled:
  2854. Table(
  2855. "t1",
  2856. metadata,
  2857. Column("normal", Integer),
  2858. Column("id1", Integer, Identity(always=True, start=20)),
  2859. schema=config.test_schema,
  2860. )
  2861. def check(self, value, exp, approx):
  2862. if testing.requires.identity_columns_standard.enabled:
  2863. common_keys = (
  2864. "always",
  2865. "start",
  2866. "increment",
  2867. "minvalue",
  2868. "maxvalue",
  2869. "cycle",
  2870. "cache",
  2871. )
  2872. for k in list(value):
  2873. if k not in common_keys:
  2874. value.pop(k)
  2875. if approx:
  2876. eq_(len(value), len(exp))
  2877. for k in value:
  2878. if k == "minvalue":
  2879. is_true(value[k] <= exp[k])
  2880. elif k in {"maxvalue", "cache"}:
  2881. is_true(value[k] >= exp[k])
  2882. else:
  2883. eq_(value[k], exp[k], k)
  2884. else:
  2885. eq_(value, exp)
  2886. else:
  2887. eq_(value["start"], exp["start"])
  2888. eq_(value["increment"], exp["increment"])
  2889. def test_reflect_identity(self):
  2890. insp = inspect(config.db)
  2891. cols = insp.get_columns("t1") + insp.get_columns("t2")
  2892. for col in cols:
  2893. if col["name"] == "normal":
  2894. is_false("identity" in col)
  2895. elif col["name"] == "id1":
  2896. if "autoincrement" in col:
  2897. is_true(col["autoincrement"])
  2898. eq_(col["default"], None)
  2899. is_true("identity" in col)
  2900. self.check(
  2901. col["identity"],
  2902. dict(
  2903. always=False,
  2904. start=1,
  2905. increment=1,
  2906. minvalue=1,
  2907. maxvalue=2147483647,
  2908. cycle=False,
  2909. cache=1,
  2910. ),
  2911. approx=True,
  2912. )
  2913. elif col["name"] == "id2":
  2914. if "autoincrement" in col:
  2915. is_true(col["autoincrement"])
  2916. eq_(col["default"], None)
  2917. is_true("identity" in col)
  2918. self.check(
  2919. col["identity"],
  2920. dict(
  2921. always=True,
  2922. start=2,
  2923. increment=3,
  2924. minvalue=-2,
  2925. maxvalue=42,
  2926. cycle=True,
  2927. cache=4,
  2928. ),
  2929. approx=False,
  2930. )
  2931. @testing.requires.schemas
  2932. def test_reflect_identity_schema(self):
  2933. insp = inspect(config.db)
  2934. cols = insp.get_columns("t1", schema=config.test_schema)
  2935. for col in cols:
  2936. if col["name"] == "normal":
  2937. is_false("identity" in col)
  2938. elif col["name"] == "id1":
  2939. if "autoincrement" in col:
  2940. is_true(col["autoincrement"])
  2941. eq_(col["default"], None)
  2942. is_true("identity" in col)
  2943. self.check(
  2944. col["identity"],
  2945. dict(
  2946. always=True,
  2947. start=20,
  2948. increment=1,
  2949. minvalue=1,
  2950. maxvalue=2147483647,
  2951. cycle=False,
  2952. cache=1,
  2953. ),
  2954. approx=True,
  2955. )
  2956. class CompositeKeyReflectionTest(fixtures.TablesTest):
  2957. __backend__ = True
  2958. @classmethod
  2959. def define_tables(cls, metadata):
  2960. tb1 = Table(
  2961. "tb1",
  2962. metadata,
  2963. Column("id", Integer),
  2964. Column("attr", Integer),
  2965. Column("name", sql_types.VARCHAR(20)),
  2966. sa.PrimaryKeyConstraint("name", "id", "attr", name="pk_tb1"),
  2967. schema=None,
  2968. test_needs_fk=True,
  2969. )
  2970. Table(
  2971. "tb2",
  2972. metadata,
  2973. Column("id", Integer, primary_key=True),
  2974. Column("pid", Integer),
  2975. Column("pattr", Integer),
  2976. Column("pname", sql_types.VARCHAR(20)),
  2977. sa.ForeignKeyConstraint(
  2978. ["pname", "pid", "pattr"],
  2979. [tb1.c.name, tb1.c.id, tb1.c.attr],
  2980. name="fk_tb1_name_id_attr",
  2981. ),
  2982. schema=None,
  2983. test_needs_fk=True,
  2984. )
  2985. @testing.requires.primary_key_constraint_reflection
  2986. def test_pk_column_order(self, connection):
  2987. # test for issue #5661
  2988. insp = inspect(connection)
  2989. primary_key = insp.get_pk_constraint(self.tables.tb1.name)
  2990. eq_(primary_key.get("constrained_columns"), ["name", "id", "attr"])
  2991. @testing.requires.foreign_key_constraint_reflection
  2992. def test_fk_column_order(self, connection):
  2993. # test for issue #5661
  2994. insp = inspect(connection)
  2995. foreign_keys = insp.get_foreign_keys(self.tables.tb2.name)
  2996. eq_(len(foreign_keys), 1)
  2997. fkey1 = foreign_keys[0]
  2998. eq_(fkey1.get("referred_columns"), ["name", "id", "attr"])
  2999. eq_(fkey1.get("constrained_columns"), ["pname", "pid", "pattr"])
  3000. __all__ = (
  3001. "ComponentReflectionTest",
  3002. "ComponentReflectionTestExtra",
  3003. "TableNoColumnsTest",
  3004. "QuotedNameArgumentTest",
  3005. "BizarroCharacterTest",
  3006. "HasTableTest",
  3007. "HasIndexTest",
  3008. "NormalizedNameTest",
  3009. "ComputedReflectionTest",
  3010. "IdentityReflectionTest",
  3011. "CompositeKeyReflectionTest",
  3012. "TempTableElementsTest",
  3013. )