base.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. # testing/fixtures/base.py
  2. # Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. # mypy: ignore-errors
  8. from __future__ import annotations
  9. import sqlalchemy as sa
  10. from .. import assertions
  11. from .. import config
  12. from ..assertions import eq_
  13. from ..util import drop_all_tables_from_metadata
  14. from ..util import picklers
  15. from ... import Column
  16. from ... import func
  17. from ... import Integer
  18. from ... import select
  19. from ... import Table
  20. from ...orm import DeclarativeBase
  21. from ...orm import MappedAsDataclass
  22. from ...orm import registry
  23. @config.mark_base_test_class()
  24. class TestBase:
  25. # A sequence of requirement names matching testing.requires decorators
  26. __requires__ = ()
  27. # A sequence of dialect names to exclude from the test class.
  28. __unsupported_on__ = ()
  29. # If present, test class is only runnable for the *single* specified
  30. # dialect. If you need multiple, use __unsupported_on__ and invert.
  31. __only_on__ = None
  32. # A sequence of no-arg callables. If any are True, the entire testcase is
  33. # skipped.
  34. __skip_if__ = None
  35. # if True, the testing reaper will not attempt to touch connection
  36. # state after a test is completed and before the outer teardown
  37. # starts
  38. __leave_connections_for_teardown__ = False
  39. def assert_(self, val, msg=None):
  40. assert val, msg
  41. @config.fixture()
  42. def nocache(self):
  43. _cache = config.db._compiled_cache
  44. config.db._compiled_cache = None
  45. yield
  46. config.db._compiled_cache = _cache
  47. @config.fixture()
  48. def connection_no_trans(self):
  49. eng = getattr(self, "bind", None) or config.db
  50. with eng.connect() as conn:
  51. yield conn
  52. @config.fixture()
  53. def connection(self):
  54. global _connection_fixture_connection
  55. eng = getattr(self, "bind", None) or config.db
  56. conn = eng.connect()
  57. trans = conn.begin()
  58. _connection_fixture_connection = conn
  59. yield conn
  60. _connection_fixture_connection = None
  61. if trans.is_active:
  62. trans.rollback()
  63. # trans would not be active here if the test is using
  64. # the legacy @provide_metadata decorator still, as it will
  65. # run a close all connections.
  66. conn.close()
  67. @config.fixture()
  68. def close_result_when_finished(self):
  69. to_close = []
  70. to_consume = []
  71. def go(result, consume=False):
  72. to_close.append(result)
  73. if consume:
  74. to_consume.append(result)
  75. yield go
  76. for r in to_consume:
  77. try:
  78. r.all()
  79. except:
  80. pass
  81. for r in to_close:
  82. try:
  83. r.close()
  84. except:
  85. pass
  86. @config.fixture()
  87. def registry(self, metadata):
  88. reg = registry(
  89. metadata=metadata,
  90. type_annotation_map={
  91. str: sa.String().with_variant(
  92. sa.String(50), "mysql", "mariadb", "oracle"
  93. )
  94. },
  95. )
  96. yield reg
  97. reg.dispose()
  98. @config.fixture
  99. def decl_base(self, metadata):
  100. _md = metadata
  101. class Base(DeclarativeBase):
  102. metadata = _md
  103. type_annotation_map = {
  104. str: sa.String().with_variant(
  105. sa.String(50), "mysql", "mariadb", "oracle"
  106. )
  107. }
  108. yield Base
  109. Base.registry.dispose()
  110. @config.fixture
  111. def dc_decl_base(self, metadata):
  112. _md = metadata
  113. class Base(MappedAsDataclass, DeclarativeBase):
  114. metadata = _md
  115. type_annotation_map = {
  116. str: sa.String().with_variant(
  117. sa.String(50), "mysql", "mariadb"
  118. )
  119. }
  120. yield Base
  121. Base.registry.dispose()
  122. @config.fixture()
  123. def future_connection(self, future_engine, connection):
  124. # integrate the future_engine and connection fixtures so
  125. # that users of the "connection" fixture will get at the
  126. # "future" connection
  127. yield connection
  128. @config.fixture()
  129. def future_engine(self):
  130. yield
  131. @config.fixture()
  132. def testing_engine(self):
  133. from .. import engines
  134. def gen_testing_engine(
  135. url=None,
  136. options=None,
  137. future=None,
  138. asyncio=False,
  139. transfer_staticpool=False,
  140. share_pool=False,
  141. ):
  142. if options is None:
  143. options = {}
  144. options["scope"] = "fixture"
  145. return engines.testing_engine(
  146. url=url,
  147. options=options,
  148. asyncio=asyncio,
  149. transfer_staticpool=transfer_staticpool,
  150. share_pool=share_pool,
  151. )
  152. yield gen_testing_engine
  153. engines.testing_reaper._drop_testing_engines("fixture")
  154. @config.fixture()
  155. def async_testing_engine(self, testing_engine):
  156. def go(**kw):
  157. kw["asyncio"] = True
  158. return testing_engine(**kw)
  159. return go
  160. @config.fixture(params=picklers())
  161. def picklers(self, request):
  162. yield request.param
  163. @config.fixture()
  164. def metadata(self, request):
  165. """Provide bound MetaData for a single test, dropping afterwards."""
  166. from ...sql import schema
  167. metadata = schema.MetaData()
  168. request.instance.metadata = metadata
  169. yield metadata
  170. del request.instance.metadata
  171. if (
  172. _connection_fixture_connection
  173. and _connection_fixture_connection.in_transaction()
  174. ):
  175. trans = _connection_fixture_connection.get_transaction()
  176. trans.rollback()
  177. with _connection_fixture_connection.begin():
  178. drop_all_tables_from_metadata(
  179. metadata, _connection_fixture_connection
  180. )
  181. else:
  182. drop_all_tables_from_metadata(metadata, config.db)
  183. @config.fixture(
  184. params=[
  185. (rollback, second_operation, begin_nested)
  186. for rollback in (True, False)
  187. for second_operation in ("none", "execute", "begin")
  188. for begin_nested in (
  189. True,
  190. False,
  191. )
  192. ]
  193. )
  194. def trans_ctx_manager_fixture(self, request, metadata):
  195. rollback, second_operation, begin_nested = request.param
  196. t = Table("test", metadata, Column("data", Integer))
  197. eng = getattr(self, "bind", None) or config.db
  198. t.create(eng)
  199. def run_test(subject, trans_on_subject, execute_on_subject):
  200. with subject.begin() as trans:
  201. if begin_nested:
  202. if not config.requirements.savepoints.enabled:
  203. config.skip_test("savepoints not enabled")
  204. if execute_on_subject:
  205. nested_trans = subject.begin_nested()
  206. else:
  207. nested_trans = trans.begin_nested()
  208. with nested_trans:
  209. if execute_on_subject:
  210. subject.execute(t.insert(), {"data": 10})
  211. else:
  212. trans.execute(t.insert(), {"data": 10})
  213. # for nested trans, we always commit/rollback on the
  214. # "nested trans" object itself.
  215. # only Session(future=False) will affect savepoint
  216. # transaction for session.commit/rollback
  217. if rollback:
  218. nested_trans.rollback()
  219. else:
  220. nested_trans.commit()
  221. if second_operation != "none":
  222. with assertions.expect_raises_message(
  223. sa.exc.InvalidRequestError,
  224. "Can't operate on closed transaction "
  225. "inside context "
  226. "manager. Please complete the context "
  227. "manager "
  228. "before emitting further commands.",
  229. ):
  230. if second_operation == "execute":
  231. if execute_on_subject:
  232. subject.execute(
  233. t.insert(), {"data": 12}
  234. )
  235. else:
  236. trans.execute(t.insert(), {"data": 12})
  237. elif second_operation == "begin":
  238. if execute_on_subject:
  239. subject.begin_nested()
  240. else:
  241. trans.begin_nested()
  242. # outside the nested trans block, but still inside the
  243. # transaction block, we can run SQL, and it will be
  244. # committed
  245. if execute_on_subject:
  246. subject.execute(t.insert(), {"data": 14})
  247. else:
  248. trans.execute(t.insert(), {"data": 14})
  249. else:
  250. if execute_on_subject:
  251. subject.execute(t.insert(), {"data": 10})
  252. else:
  253. trans.execute(t.insert(), {"data": 10})
  254. if trans_on_subject:
  255. if rollback:
  256. subject.rollback()
  257. else:
  258. subject.commit()
  259. else:
  260. if rollback:
  261. trans.rollback()
  262. else:
  263. trans.commit()
  264. if second_operation != "none":
  265. with assertions.expect_raises_message(
  266. sa.exc.InvalidRequestError,
  267. "Can't operate on closed transaction inside "
  268. "context "
  269. "manager. Please complete the context manager "
  270. "before emitting further commands.",
  271. ):
  272. if second_operation == "execute":
  273. if execute_on_subject:
  274. subject.execute(t.insert(), {"data": 12})
  275. else:
  276. trans.execute(t.insert(), {"data": 12})
  277. elif second_operation == "begin":
  278. if hasattr(trans, "begin"):
  279. trans.begin()
  280. else:
  281. subject.begin()
  282. elif second_operation == "begin_nested":
  283. if execute_on_subject:
  284. subject.begin_nested()
  285. else:
  286. trans.begin_nested()
  287. expected_committed = 0
  288. if begin_nested:
  289. # begin_nested variant, we inserted a row after the nested
  290. # block
  291. expected_committed += 1
  292. if not rollback:
  293. # not rollback variant, our row inserted in the target
  294. # block itself would be committed
  295. expected_committed += 1
  296. if execute_on_subject:
  297. eq_(
  298. subject.scalar(select(func.count()).select_from(t)),
  299. expected_committed,
  300. )
  301. else:
  302. with subject.connect() as conn:
  303. eq_(
  304. conn.scalar(select(func.count()).select_from(t)),
  305. expected_committed,
  306. )
  307. return run_test
  308. _connection_fixture_connection = None
  309. class FutureEngineMixin:
  310. """alembic's suite still using this"""