test_autogen_identity.py 5.7 KB


  1. import sqlalchemy as sa
  2. from sqlalchemy import Column
  3. from sqlalchemy import Integer
  4. from sqlalchemy import MetaData
  5. from sqlalchemy import Table
  6. from alembic.util import sqla_compat
  7. from ._autogen_fixtures import AutogenFixtureTest
  8. from ... import testing
  9. from ...testing import config
  10. from ...testing import eq_
  11. from ...testing import is_true
  12. from ...testing import TestBase
  13. class AutogenerateIdentityTest(AutogenFixtureTest, TestBase):
  14. __requires__ = ("identity_columns",)
  15. __backend__ = True
  16. def test_add_identity_column(self):
  17. m1 = MetaData()
  18. m2 = MetaData()
  19. Table("user", m1, Column("other", sa.Text))
  20. Table(
  21. "user",
  22. m2,
  23. Column("other", sa.Text),
  24. Column(
  25. "id",
  26. Integer,
  27. sa.Identity(start=5, increment=7),
  28. primary_key=True,
  29. ),
  30. )
  31. diffs = self._fixture(m1, m2)
  32. eq_(diffs[0][0], "add_column")
  33. eq_(diffs[0][2], "user")
  34. eq_(diffs[0][3].name, "id")
  35. i = diffs[0][3].identity
  36. is_true(isinstance(i, sa.Identity))
  37. eq_(i.start, 5)
  38. eq_(i.increment, 7)
  39. def test_remove_identity_column(self):
  40. m1 = MetaData()
  41. m2 = MetaData()
  42. Table(
  43. "user",
  44. m1,
  45. Column(
  46. "id",
  47. Integer,
  48. sa.Identity(start=2, increment=3),
  49. primary_key=True,
  50. ),
  51. )
  52. Table("user", m2)
  53. diffs = self._fixture(m1, m2)
  54. eq_(diffs[0][0], "remove_column")
  55. eq_(diffs[0][2], "user")
  56. c = diffs[0][3]
  57. eq_(c.name, "id")
  58. is_true(isinstance(c.identity, sa.Identity))
  59. eq_(c.identity.start, 2)
  60. eq_(c.identity.increment, 3)
  61. def test_no_change_identity_column(self):
  62. m1 = MetaData()
  63. m2 = MetaData()
  64. for m in (m1, m2):
  65. id_ = sa.Identity(start=2)
  66. Table("user", m, Column("id", Integer, id_))
  67. diffs = self._fixture(m1, m2)
  68. eq_(diffs, [])
  69. def test_dialect_kwargs_changes(self):
  70. m1 = MetaData()
  71. m2 = MetaData()
  72. if sqla_compat.identity_has_dialect_kwargs:
  73. args = {"oracle_on_null": True, "oracle_order": True}
  74. else:
  75. args = {"on_null": True, "order": True}
  76. Table("user", m1, Column("id", Integer, sa.Identity(start=2)))
  77. id_ = sa.Identity(start=2, **args)
  78. Table("user", m2, Column("id", Integer, id_))
  79. diffs = self._fixture(m1, m2)
  80. if config.db.name == "oracle":
  81. is_true(len(diffs), 1)
  82. eq_(diffs[0][0][0], "modify_default")
  83. else:
  84. eq_(diffs, [])
  85. @testing.combinations(
  86. (None, dict(start=2)),
  87. (dict(start=2), None),
  88. (dict(start=2), dict(start=2, increment=7)),
  89. (dict(always=False), dict(always=True)),
  90. (
  91. dict(start=1, minvalue=0, maxvalue=100, cycle=True),
  92. dict(start=1, minvalue=0, maxvalue=100, cycle=False),
  93. ),
  94. (
  95. dict(start=10, increment=3, maxvalue=9999),
  96. dict(start=10, increment=1, maxvalue=3333),
  97. ),
  98. )
  99. @config.requirements.identity_columns_alter
  100. def test_change_identity(self, before, after):
  101. arg_before = (sa.Identity(**before),) if before else ()
  102. arg_after = (sa.Identity(**after),) if after else ()
  103. m1 = MetaData()
  104. m2 = MetaData()
  105. Table(
  106. "user",
  107. m1,
  108. Column("id", Integer, *arg_before),
  109. Column("other", sa.Text),
  110. )
  111. Table(
  112. "user",
  113. m2,
  114. Column("id", Integer, *arg_after),
  115. Column("other", sa.Text),
  116. )
  117. diffs = self._fixture(m1, m2)
  118. eq_(len(diffs[0]), 1)
  119. diffs = diffs[0][0]
  120. eq_(diffs[0], "modify_default")
  121. eq_(diffs[2], "user")
  122. eq_(diffs[3], "id")
  123. old = diffs[5]
  124. new = diffs[6]
  125. def check(kw, idt):
  126. if kw:
  127. is_true(isinstance(idt, sa.Identity))
  128. for k, v in kw.items():
  129. eq_(getattr(idt, k), v)
  130. else:
  131. is_true(idt in (None, False))
  132. check(before, old)
  133. check(after, new)
  134. def test_add_identity_to_column(self):
  135. m1 = MetaData()
  136. m2 = MetaData()
  137. Table(
  138. "user",
  139. m1,
  140. Column("id", Integer),
  141. Column("other", sa.Text),
  142. )
  143. Table(
  144. "user",
  145. m2,
  146. Column("id", Integer, sa.Identity(start=2, maxvalue=1000)),
  147. Column("other", sa.Text),
  148. )
  149. diffs = self._fixture(m1, m2)
  150. eq_(len(diffs[0]), 1)
  151. diffs = diffs[0][0]
  152. eq_(diffs[0], "modify_default")
  153. eq_(diffs[2], "user")
  154. eq_(diffs[3], "id")
  155. eq_(diffs[5], None)
  156. added = diffs[6]
  157. is_true(isinstance(added, sa.Identity))
  158. eq_(added.start, 2)
  159. eq_(added.maxvalue, 1000)
  160. def test_remove_identity_from_column(self):
  161. m1 = MetaData()
  162. m2 = MetaData()
  163. Table(
  164. "user",
  165. m1,
  166. Column("id", Integer, sa.Identity(start=2, maxvalue=1000)),
  167. Column("other", sa.Text),
  168. )
  169. Table(
  170. "user",
  171. m2,
  172. Column("id", Integer),
  173. Column("other", sa.Text),
  174. )
  175. diffs = self._fixture(m1, m2)
  176. eq_(len(diffs[0]), 1)
  177. diffs = diffs[0][0]
  178. eq_(diffs[0], "modify_default")
  179. eq_(diffs[2], "user")
  180. eq_(diffs[3], "id")
  181. eq_(diffs[6], None)
  182. removed = diffs[5]
  183. is_true(isinstance(removed, sa.Identity))