test_environment.py 12 KB


  1. import io
  2. from ...migration import MigrationContext
  3. from ...testing import assert_raises
  4. from ...testing import config
  5. from ...testing import eq_
  6. from ...testing import is_
  7. from ...testing import is_false
  8. from ...testing import is_not_
  9. from ...testing import is_true
  10. from ...testing import ne_
  11. from ...testing.fixtures import TestBase
  12. class MigrationTransactionTest(TestBase):
  13. __backend__ = True
  14. conn = None
  15. def _fixture(self, opts):
  16. self.conn = conn = config.db.connect()
  17. if opts.get("as_sql", False):
  18. self.context = MigrationContext.configure(
  19. dialect=conn.dialect, opts=opts
  20. )
  21. self.context.output_buffer = self.context.impl.output_buffer = (
  22. io.StringIO()
  23. )
  24. else:
  25. self.context = MigrationContext.configure(
  26. connection=conn, opts=opts
  27. )
  28. return self.context
  29. def teardown_method(self):
  30. if self.conn:
  31. self.conn.close()
  32. def test_proxy_transaction_rollback(self):
  33. context = self._fixture(
  34. {"transaction_per_migration": True, "transactional_ddl": True}
  35. )
  36. is_false(self.conn.in_transaction())
  37. proxy = context.begin_transaction(_per_migration=True)
  38. is_true(self.conn.in_transaction())
  39. proxy.rollback()
  40. is_false(self.conn.in_transaction())
  41. def test_proxy_transaction_commit(self):
  42. context = self._fixture(
  43. {"transaction_per_migration": True, "transactional_ddl": True}
  44. )
  45. proxy = context.begin_transaction(_per_migration=True)
  46. is_true(self.conn.in_transaction())
  47. proxy.commit()
  48. is_false(self.conn.in_transaction())
  49. def test_proxy_transaction_contextmanager_commit(self):
  50. context = self._fixture(
  51. {"transaction_per_migration": True, "transactional_ddl": True}
  52. )
  53. proxy = context.begin_transaction(_per_migration=True)
  54. is_true(self.conn.in_transaction())
  55. with proxy:
  56. pass
  57. is_false(self.conn.in_transaction())
  58. def test_proxy_transaction_contextmanager_rollback(self):
  59. context = self._fixture(
  60. {"transaction_per_migration": True, "transactional_ddl": True}
  61. )
  62. proxy = context.begin_transaction(_per_migration=True)
  63. is_true(self.conn.in_transaction())
  64. def go():
  65. with proxy:
  66. raise Exception("hi")
  67. assert_raises(Exception, go)
  68. is_false(self.conn.in_transaction())
  69. def test_proxy_transaction_contextmanager_explicit_rollback(self):
  70. context = self._fixture(
  71. {"transaction_per_migration": True, "transactional_ddl": True}
  72. )
  73. proxy = context.begin_transaction(_per_migration=True)
  74. is_true(self.conn.in_transaction())
  75. with proxy:
  76. is_true(self.conn.in_transaction())
  77. proxy.rollback()
  78. is_false(self.conn.in_transaction())
  79. is_false(self.conn.in_transaction())
  80. def test_proxy_transaction_contextmanager_explicit_commit(self):
  81. context = self._fixture(
  82. {"transaction_per_migration": True, "transactional_ddl": True}
  83. )
  84. proxy = context.begin_transaction(_per_migration=True)
  85. is_true(self.conn.in_transaction())
  86. with proxy:
  87. is_true(self.conn.in_transaction())
  88. proxy.commit()
  89. is_false(self.conn.in_transaction())
  90. is_false(self.conn.in_transaction())
  91. def test_transaction_per_migration_transactional_ddl(self):
  92. context = self._fixture(
  93. {"transaction_per_migration": True, "transactional_ddl": True}
  94. )
  95. is_false(self.conn.in_transaction())
  96. with context.begin_transaction():
  97. is_false(self.conn.in_transaction())
  98. with context.begin_transaction(_per_migration=True):
  99. is_true(self.conn.in_transaction())
  100. is_false(self.conn.in_transaction())
  101. is_false(self.conn.in_transaction())
  102. def test_transaction_per_migration_non_transactional_ddl(self):
  103. context = self._fixture(
  104. {"transaction_per_migration": True, "transactional_ddl": False}
  105. )
  106. is_false(self.conn.in_transaction())
  107. with context.begin_transaction():
  108. is_false(self.conn.in_transaction())
  109. with context.begin_transaction(_per_migration=True):
  110. is_true(self.conn.in_transaction())
  111. is_false(self.conn.in_transaction())
  112. is_false(self.conn.in_transaction())
  113. def test_transaction_per_all_transactional_ddl(self):
  114. context = self._fixture({"transactional_ddl": True})
  115. is_false(self.conn.in_transaction())
  116. with context.begin_transaction():
  117. is_true(self.conn.in_transaction())
  118. with context.begin_transaction(_per_migration=True):
  119. is_true(self.conn.in_transaction())
  120. is_true(self.conn.in_transaction())
  121. is_false(self.conn.in_transaction())
  122. def test_transaction_per_all_non_transactional_ddl(self):
  123. context = self._fixture({"transactional_ddl": False})
  124. is_false(self.conn.in_transaction())
  125. with context.begin_transaction():
  126. is_false(self.conn.in_transaction())
  127. with context.begin_transaction(_per_migration=True):
  128. is_true(self.conn.in_transaction())
  129. is_false(self.conn.in_transaction())
  130. is_false(self.conn.in_transaction())
  131. def test_transaction_per_all_sqlmode(self):
  132. context = self._fixture({"as_sql": True})
  133. context.execute("step 1")
  134. with context.begin_transaction():
  135. context.execute("step 2")
  136. with context.begin_transaction(_per_migration=True):
  137. context.execute("step 3")
  138. context.execute("step 4")
  139. context.execute("step 5")
  140. if context.impl.transactional_ddl:
  141. self._assert_impl_steps(
  142. "step 1",
  143. "BEGIN",
  144. "step 2",
  145. "step 3",
  146. "step 4",
  147. "COMMIT",
  148. "step 5",
  149. )
  150. else:
  151. self._assert_impl_steps(
  152. "step 1", "step 2", "step 3", "step 4", "step 5"
  153. )
  154. def test_transaction_per_migration_sqlmode(self):
  155. context = self._fixture(
  156. {"as_sql": True, "transaction_per_migration": True}
  157. )
  158. context.execute("step 1")
  159. with context.begin_transaction():
  160. context.execute("step 2")
  161. with context.begin_transaction(_per_migration=True):
  162. context.execute("step 3")
  163. context.execute("step 4")
  164. context.execute("step 5")
  165. if context.impl.transactional_ddl:
  166. self._assert_impl_steps(
  167. "step 1",
  168. "step 2",
  169. "BEGIN",
  170. "step 3",
  171. "COMMIT",
  172. "step 4",
  173. "step 5",
  174. )
  175. else:
  176. self._assert_impl_steps(
  177. "step 1", "step 2", "step 3", "step 4", "step 5"
  178. )
  179. @config.requirements.autocommit_isolation
  180. def test_autocommit_block(self):
  181. context = self._fixture({"transaction_per_migration": True})
  182. is_false(self.conn.in_transaction())
  183. with context.begin_transaction():
  184. is_false(self.conn.in_transaction())
  185. with context.begin_transaction(_per_migration=True):
  186. is_true(self.conn.in_transaction())
  187. with context.autocommit_block():
  188. # in 1.x, self.conn is separate due to the
  189. # execution_options call. however for future they are the
  190. # same connection and there is a "transaction" block
  191. # despite autocommit
  192. if self.is_sqlalchemy_future:
  193. is_(context.connection, self.conn)
  194. else:
  195. is_not_(context.connection, self.conn)
  196. is_false(self.conn.in_transaction())
  197. eq_(
  198. context.connection._execution_options[
  199. "isolation_level"
  200. ],
  201. "AUTOCOMMIT",
  202. )
  203. ne_(
  204. context.connection._execution_options.get(
  205. "isolation_level", None
  206. ),
  207. "AUTOCOMMIT",
  208. )
  209. is_true(self.conn.in_transaction())
  210. is_false(self.conn.in_transaction())
  211. is_false(self.conn.in_transaction())
  212. @config.requirements.autocommit_isolation
  213. def test_autocommit_block_no_transaction(self):
  214. context = self._fixture({"transaction_per_migration": True})
  215. is_false(self.conn.in_transaction())
  216. with context.autocommit_block():
  217. is_true(context.connection.in_transaction())
  218. # in 1.x, self.conn is separate due to the execution_options
  219. # call. however for future they are the same connection and there
  220. # is a "transaction" block despite autocommit
  221. if self.is_sqlalchemy_future:
  222. is_(context.connection, self.conn)
  223. else:
  224. is_not_(context.connection, self.conn)
  225. is_false(self.conn.in_transaction())
  226. eq_(
  227. context.connection._execution_options["isolation_level"],
  228. "AUTOCOMMIT",
  229. )
  230. ne_(
  231. context.connection._execution_options.get("isolation_level", None),
  232. "AUTOCOMMIT",
  233. )
  234. is_false(self.conn.in_transaction())
  235. def test_autocommit_block_transactional_ddl_sqlmode(self):
  236. context = self._fixture(
  237. {
  238. "transaction_per_migration": True,
  239. "transactional_ddl": True,
  240. "as_sql": True,
  241. }
  242. )
  243. with context.begin_transaction():
  244. context.execute("step 1")
  245. with context.begin_transaction(_per_migration=True):
  246. context.execute("step 2")
  247. with context.autocommit_block():
  248. context.execute("step 3")
  249. context.execute("step 4")
  250. context.execute("step 5")
  251. self._assert_impl_steps(
  252. "step 1",
  253. "BEGIN",
  254. "step 2",
  255. "COMMIT",
  256. "step 3",
  257. "BEGIN",
  258. "step 4",
  259. "COMMIT",
  260. "step 5",
  261. )
  262. def test_autocommit_block_nontransactional_ddl_sqlmode(self):
  263. context = self._fixture(
  264. {
  265. "transaction_per_migration": True,
  266. "transactional_ddl": False,
  267. "as_sql": True,
  268. }
  269. )
  270. with context.begin_transaction():
  271. context.execute("step 1")
  272. with context.begin_transaction(_per_migration=True):
  273. context.execute("step 2")
  274. with context.autocommit_block():
  275. context.execute("step 3")
  276. context.execute("step 4")
  277. context.execute("step 5")
  278. self._assert_impl_steps(
  279. "step 1", "step 2", "step 3", "step 4", "step 5"
  280. )
  281. def _assert_impl_steps(self, *steps):
  282. to_check = self.context.output_buffer.getvalue()
  283. self.context.impl.output_buffer = buf = io.StringIO()
  284. for step in steps:
  285. if step == "BEGIN":
  286. self.context.impl.emit_begin()
  287. elif step == "COMMIT":
  288. self.context.impl.emit_commit()
  289. else:
  290. self.context.impl._exec(step)
  291. eq_(to_check, buf.getvalue())