schema.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. # testing/schema.py
  2. # Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. # mypy: ignore-errors
  8. from __future__ import annotations
  9. import sys
  10. from . import config
  11. from . import exclusions
  12. from .. import event
  13. from .. import schema
  14. from .. import types as sqltypes
  15. from ..orm import mapped_column as _orm_mapped_column
  16. from ..util import OrderedDict
  17. __all__ = ["Table", "Column"]
  18. table_options = {}
  19. def Table(*args, **kw) -> schema.Table:
  20. """A schema.Table wrapper/hook for dialect-specific tweaks."""
  21. test_opts = {k: kw.pop(k) for k in list(kw) if k.startswith("test_")}
  22. kw.update(table_options)
  23. if exclusions.against(config._current, "mysql"):
  24. if (
  25. "mysql_engine" not in kw
  26. and "mysql_type" not in kw
  27. and "autoload_with" not in kw
  28. ):
  29. if "test_needs_fk" in test_opts or "test_needs_acid" in test_opts:
  30. kw["mysql_engine"] = "InnoDB"
  31. else:
  32. # there are in fact test fixtures that rely upon MyISAM,
  33. # due to MySQL / MariaDB having poor FK behavior under innodb,
  34. # such as a self-referential table can't be deleted from at
  35. # once without attending to per-row dependencies. We'd need to
  36. # add special steps to some fixtures if we want to not
  37. # explicitly state MyISAM here
  38. kw["mysql_engine"] = "MyISAM"
  39. elif exclusions.against(config._current, "mariadb"):
  40. if (
  41. "mariadb_engine" not in kw
  42. and "mariadb_type" not in kw
  43. and "autoload_with" not in kw
  44. ):
  45. if "test_needs_fk" in test_opts or "test_needs_acid" in test_opts:
  46. kw["mariadb_engine"] = "InnoDB"
  47. else:
  48. kw["mariadb_engine"] = "MyISAM"
  49. return schema.Table(*args, **kw)
  50. def mapped_column(*args, **kw):
  51. """An orm.mapped_column wrapper/hook for dialect-specific tweaks."""
  52. return _schema_column(_orm_mapped_column, args, kw)
  53. def Column(*args, **kw):
  54. """A schema.Column wrapper/hook for dialect-specific tweaks."""
  55. return _schema_column(schema.Column, args, kw)
  56. def _schema_column(factory, args, kw):
  57. test_opts = {k: kw.pop(k) for k in list(kw) if k.startswith("test_")}
  58. if not config.requirements.foreign_key_ddl.enabled_for_config(config):
  59. args = [arg for arg in args if not isinstance(arg, schema.ForeignKey)]
  60. construct = factory(*args, **kw)
  61. if factory is schema.Column:
  62. col = construct
  63. else:
  64. col = construct.column
  65. if test_opts.get("test_needs_autoincrement", False) and kw.get(
  66. "primary_key", False
  67. ):
  68. if col.default is None and col.server_default is None:
  69. col.autoincrement = True
  70. # allow any test suite to pick up on this
  71. col.info["test_needs_autoincrement"] = True
  72. # hardcoded rule for oracle; this should
  73. # be moved out
  74. if exclusions.against(config._current, "oracle"):
  75. def add_seq(c, tbl):
  76. c._init_items(
  77. schema.Sequence(
  78. _truncate_name(
  79. config.db.dialect, tbl.name + "_" + c.name + "_seq"
  80. ),
  81. optional=True,
  82. )
  83. )
  84. event.listen(col, "after_parent_attach", add_seq, propagate=True)
  85. return construct
  86. class eq_type_affinity:
  87. """Helper to compare types inside of datastructures based on affinity.
  88. E.g.::
  89. eq_(
  90. inspect(connection).get_columns("foo"),
  91. [
  92. {
  93. "name": "id",
  94. "type": testing.eq_type_affinity(sqltypes.INTEGER),
  95. "nullable": False,
  96. "default": None,
  97. "autoincrement": False,
  98. },
  99. {
  100. "name": "data",
  101. "type": testing.eq_type_affinity(sqltypes.NullType),
  102. "nullable": True,
  103. "default": None,
  104. "autoincrement": False,
  105. },
  106. ],
  107. )
  108. """
  109. def __init__(self, target):
  110. self.target = sqltypes.to_instance(target)
  111. def __eq__(self, other):
  112. return self.target._type_affinity is other._type_affinity
  113. def __ne__(self, other):
  114. return self.target._type_affinity is not other._type_affinity
  115. class eq_compile_type:
  116. """similar to eq_type_affinity but uses compile"""
  117. def __init__(self, target):
  118. self.target = target
  119. def __eq__(self, other):
  120. return self.target == other.compile()
  121. def __ne__(self, other):
  122. return self.target != other.compile()
  123. class eq_clause_element:
  124. """Helper to compare SQL structures based on compare()"""
  125. def __init__(self, target):
  126. self.target = target
  127. def __eq__(self, other):
  128. return self.target.compare(other)
  129. def __ne__(self, other):
  130. return not self.target.compare(other)
  131. def _truncate_name(dialect, name):
  132. if len(name) > dialect.max_identifier_length:
  133. return (
  134. name[0 : max(dialect.max_identifier_length - 6, 0)]
  135. + "_"
  136. + hex(hash(name) % 64)[2:]
  137. )
  138. else:
  139. return name
  140. def pep435_enum(name):
  141. # Implements PEP 435 in the minimal fashion needed by SQLAlchemy
  142. __members__ = OrderedDict()
  143. def __init__(self, name, value, alias=None):
  144. self.name = name
  145. self.value = value
  146. self.__members__[name] = self
  147. value_to_member[value] = self
  148. setattr(self.__class__, name, self)
  149. if alias:
  150. self.__members__[alias] = self
  151. setattr(self.__class__, alias, self)
  152. value_to_member = {}
  153. @classmethod
  154. def get(cls, value):
  155. return value_to_member[value]
  156. someenum = type(
  157. name,
  158. (object,),
  159. {"__members__": __members__, "__init__": __init__, "get": get},
  160. )
  161. # getframe() trick for pickling I don't understand courtesy
  162. # Python namedtuple()
  163. try:
  164. module = sys._getframe(1).f_globals.get("__name__", "__main__")
  165. except (AttributeError, ValueError):
  166. pass
  167. if module is not None:
  168. someenum.__module__ = module
  169. return someenum