test_autogen_identity.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. import sqlalchemy as sa
  2. from sqlalchemy import Column
  3. from sqlalchemy import Integer
  4. from sqlalchemy import MetaData
  5. from sqlalchemy import Table
  6. from alembic.util import sqla_compat
  7. from ._autogen_fixtures import AutogenFixtureTest
  8. from ... import testing
  9. from ...testing import config
  10. from ...testing import eq_
  11. from ...testing import is_true
  12. from ...testing import TestBase
  13. class AutogenerateIdentityTest(AutogenFixtureTest, TestBase):
  14. __requires__ = ("identity_columns",)
  15. __backend__ = True
  16. def test_add_identity_column(self):
  17. m1 = MetaData()
  18. m2 = MetaData()
  19. Table("user", m1, Column("other", sa.Text))
  20. Table(
  21. "user",
  22. m2,
  23. Column("other", sa.Text),
  24. Column(
  25. "id",
  26. Integer,
  27. sa.Identity(start=5, increment=7),
  28. primary_key=True,
  29. ),
  30. )
  31. diffs = self._fixture(m1, m2)
  32. eq_(diffs[0][0], "add_column")
  33. eq_(diffs[0][2], "user")
  34. eq_(diffs[0][3].name, "id")
  35. i = diffs[0][3].identity
  36. is_true(isinstance(i, sa.Identity))
  37. eq_(i.start, 5)
  38. eq_(i.increment, 7)
  39. def test_remove_identity_column(self):
  40. m1 = MetaData()
  41. m2 = MetaData()
  42. Table(
  43. "user",
  44. m1,
  45. Column(
  46. "id",
  47. Integer,
  48. sa.Identity(start=2, increment=3),
  49. primary_key=True,
  50. ),
  51. )
  52. Table("user", m2)
  53. diffs = self._fixture(m1, m2)
  54. eq_(diffs[0][0], "remove_column")
  55. eq_(diffs[0][2], "user")
  56. c = diffs[0][3]
  57. eq_(c.name, "id")
  58. is_true(isinstance(c.identity, sa.Identity))
  59. eq_(c.identity.start, 2)
  60. eq_(c.identity.increment, 3)
  61. def test_no_change_identity_column(self):
  62. m1 = MetaData()
  63. m2 = MetaData()
  64. for m in (m1, m2):
  65. id_ = sa.Identity(start=2)
  66. Table("user", m, Column("id", Integer, id_))
  67. diffs = self._fixture(m1, m2)
  68. eq_(diffs, [])
  69. def test_dialect_kwargs_changes(self):
  70. m1 = MetaData()
  71. m2 = MetaData()
  72. if sqla_compat.identity_has_dialect_kwargs:
  73. args = {"oracle_on_null": True, "oracle_order": True}
  74. else:
  75. args = {"on_null": True, "order": True}
  76. Table("user", m1, Column("id", Integer, sa.Identity(start=2)))
  77. id_ = sa.Identity(start=2, **args)
  78. Table("user", m2, Column("id", Integer, id_))
  79. diffs = self._fixture(m1, m2)
  80. if config.db.name == "oracle":
  81. is_true(len(diffs), 1)
  82. eq_(diffs[0][0][0], "modify_default")
  83. else:
  84. eq_(diffs, [])
  85. @testing.combinations(
  86. (None, dict(start=2)),
  87. (dict(start=2), None),
  88. (dict(start=2), dict(start=2, increment=7)),
  89. (dict(always=False), dict(always=True)),
  90. (
  91. dict(start=1, minvalue=0, maxvalue=100, cycle=True),
  92. dict(start=1, minvalue=0, maxvalue=100, cycle=False),
  93. ),
  94. (
  95. dict(start=10, increment=3, maxvalue=9999),
  96. dict(start=10, increment=1, maxvalue=3333),
  97. ),
  98. )
  99. @config.requirements.identity_columns_alter
  100. def test_change_identity(self, before, after):
  101. arg_before = (sa.Identity(**before),) if before else ()
  102. arg_after = (sa.Identity(**after),) if after else ()
  103. m1 = MetaData()
  104. m2 = MetaData()
  105. Table(
  106. "user",
  107. m1,
  108. Column("id", Integer, *arg_before),
  109. Column("other", sa.Text),
  110. )
  111. Table(
  112. "user",
  113. m2,
  114. Column("id", Integer, *arg_after),
  115. Column("other", sa.Text),
  116. )
  117. diffs = self._fixture(m1, m2)
  118. eq_(len(diffs[0]), 1)
  119. diffs = diffs[0][0]
  120. eq_(diffs[0], "modify_default")
  121. eq_(diffs[2], "user")
  122. eq_(diffs[3], "id")
  123. old = diffs[5]
  124. new = diffs[6]
  125. def check(kw, idt):
  126. if kw:
  127. is_true(isinstance(idt, sa.Identity))
  128. for k, v in kw.items():
  129. eq_(getattr(idt, k), v)
  130. else:
  131. is_true(idt in (None, False))
  132. check(before, old)
  133. check(after, new)
  134. def test_add_identity_to_column(self):
  135. m1 = MetaData()
  136. m2 = MetaData()
  137. Table(
  138. "user",
  139. m1,
  140. Column("id", Integer),
  141. Column("other", sa.Text),
  142. )
  143. Table(
  144. "user",
  145. m2,
  146. Column("id", Integer, sa.Identity(start=2, maxvalue=1000)),
  147. Column("other", sa.Text),
  148. )
  149. diffs = self._fixture(m1, m2)
  150. eq_(len(diffs[0]), 1)
  151. diffs = diffs[0][0]
  152. eq_(diffs[0], "modify_default")
  153. eq_(diffs[2], "user")
  154. eq_(diffs[3], "id")
  155. eq_(diffs[5], None)
  156. added = diffs[6]
  157. is_true(isinstance(added, sa.Identity))
  158. eq_(added.start, 2)
  159. eq_(added.maxvalue, 1000)
  160. def test_remove_identity_from_column(self):
  161. m1 = MetaData()
  162. m2 = MetaData()
  163. Table(
  164. "user",
  165. m1,
  166. Column("id", Integer, sa.Identity(start=2, maxvalue=1000)),
  167. Column("other", sa.Text),
  168. )
  169. Table(
  170. "user",
  171. m2,
  172. Column("id", Integer),
  173. Column("other", sa.Text),
  174. )
  175. diffs = self._fixture(m1, m2)
  176. eq_(len(diffs[0]), 1)
  177. diffs = diffs[0][0]
  178. eq_(diffs[0], "modify_default")
  179. eq_(diffs[2], "user")
  180. eq_(diffs[3], "id")
  181. eq_(diffs[6], None)
  182. removed = diffs[5]
  183. is_true(isinstance(removed, sa.Identity))