assertsql.py 16 KB


  1. # testing/assertsql.py
  2. # Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. # mypy: ignore-errors
  8. from __future__ import annotations
  9. import collections
  10. import contextlib
  11. import itertools
  12. import re
  13. from .. import event
  14. from ..engine import url
  15. from ..engine.default import DefaultDialect
  16. from ..schema import BaseDDLElement
  17. class AssertRule:
  18. is_consumed = False
  19. errormessage = None
  20. consume_statement = True
  21. def process_statement(self, execute_observed):
  22. pass
  23. def no_more_statements(self):
  24. assert False, (
  25. "All statements are complete, but pending "
  26. "assertion rules remain"
  27. )
  28. class SQLMatchRule(AssertRule):
  29. pass
  30. class CursorSQL(SQLMatchRule):
  31. def __init__(self, statement, params=None, consume_statement=True):
  32. self.statement = statement
  33. self.params = params
  34. self.consume_statement = consume_statement
  35. def process_statement(self, execute_observed):
  36. stmt = execute_observed.statements[0]
  37. if self.statement != stmt.statement or (
  38. self.params is not None and self.params != stmt.parameters
  39. ):
  40. self.consume_statement = True
  41. self.errormessage = (
  42. "Testing for exact SQL %s parameters %s received %s %s"
  43. % (
  44. self.statement,
  45. self.params,
  46. stmt.statement,
  47. stmt.parameters,
  48. )
  49. )
  50. else:
  51. execute_observed.statements.pop(0)
  52. self.is_consumed = True
  53. if not execute_observed.statements:
  54. self.consume_statement = True
  55. class CompiledSQL(SQLMatchRule):
  56. def __init__(
  57. self, statement, params=None, dialect="default", enable_returning=True
  58. ):
  59. self.statement = statement
  60. self.params = params
  61. self.dialect = dialect
  62. self.enable_returning = enable_returning
  63. def _compare_sql(self, execute_observed, received_statement):
  64. stmt = re.sub(r"[\n\t]", "", self.statement)
  65. return received_statement == stmt
  66. def _compile_dialect(self, execute_observed):
  67. if self.dialect == "default":
  68. dialect = DefaultDialect()
  69. # this is currently what tests are expecting
  70. # dialect.supports_default_values = True
  71. dialect.supports_default_metavalue = True
  72. if self.enable_returning:
  73. dialect.insert_returning = dialect.update_returning = (
  74. dialect.delete_returning
  75. ) = True
  76. dialect.use_insertmanyvalues = True
  77. dialect.supports_multivalues_insert = True
  78. dialect.update_returning_multifrom = True
  79. dialect.delete_returning_multifrom = True
  80. # dialect.favor_returning_over_lastrowid = True
  81. # dialect.insert_null_pk_still_autoincrements = True
  82. # this is calculated but we need it to be True for this
  83. # to look like all the current RETURNING dialects
  84. assert dialect.insert_executemany_returning
  85. return dialect
  86. else:
  87. return url.URL.create(self.dialect).get_dialect()()
  88. def _received_statement(self, execute_observed):
  89. """reconstruct the statement and params in terms
  90. of a target dialect, which for CompiledSQL is just DefaultDialect."""
  91. context = execute_observed.context
  92. compare_dialect = self._compile_dialect(execute_observed)
  93. # received_statement runs a full compile(). we should not need to
  94. # consider extracted_parameters; if we do this indicates some state
  95. # is being sent from a previous cached query, which some misbehaviors
  96. # in the ORM can cause, see #6881
  97. cache_key = None # execute_observed.context.compiled.cache_key
  98. extracted_parameters = (
  99. None # execute_observed.context.extracted_parameters
  100. )
  101. if "schema_translate_map" in context.execution_options:
  102. map_ = context.execution_options["schema_translate_map"]
  103. else:
  104. map_ = None
  105. if isinstance(execute_observed.clauseelement, BaseDDLElement):
  106. compiled = execute_observed.clauseelement.compile(
  107. dialect=compare_dialect,
  108. schema_translate_map=map_,
  109. )
  110. else:
  111. compiled = execute_observed.clauseelement.compile(
  112. cache_key=cache_key,
  113. dialect=compare_dialect,
  114. column_keys=context.compiled.column_keys,
  115. for_executemany=context.compiled.for_executemany,
  116. schema_translate_map=map_,
  117. )
  118. _received_statement = re.sub(r"[\n\t]", "", str(compiled))
  119. parameters = execute_observed.parameters
  120. if not parameters:
  121. _received_parameters = [
  122. compiled.construct_params(
  123. extracted_parameters=extracted_parameters
  124. )
  125. ]
  126. else:
  127. _received_parameters = [
  128. compiled.construct_params(
  129. m, extracted_parameters=extracted_parameters
  130. )
  131. for m in parameters
  132. ]
  133. return _received_statement, _received_parameters
  134. def process_statement(self, execute_observed):
  135. context = execute_observed.context
  136. _received_statement, _received_parameters = self._received_statement(
  137. execute_observed
  138. )
  139. params = self._all_params(context)
  140. equivalent = self._compare_sql(execute_observed, _received_statement)
  141. if equivalent:
  142. if params is not None:
  143. all_params = list(params)
  144. all_received = list(_received_parameters)
  145. while all_params and all_received:
  146. param = dict(all_params.pop(0))
  147. for idx, received in enumerate(list(all_received)):
  148. # do a positive compare only
  149. for param_key in param:
  150. # a key in param did not match current
  151. # 'received'
  152. if (
  153. param_key not in received
  154. or received[param_key] != param[param_key]
  155. ):
  156. break
  157. else:
  158. # all keys in param matched 'received';
  159. # onto next param
  160. del all_received[idx]
  161. break
  162. else:
  163. # param did not match any entry
  164. # in all_received
  165. equivalent = False
  166. break
  167. if all_params or all_received:
  168. equivalent = False
  169. if equivalent:
  170. self.is_consumed = True
  171. self.errormessage = None
  172. else:
  173. self.errormessage = self._failure_message(
  174. execute_observed, params
  175. ) % {
  176. "received_statement": _received_statement,
  177. "received_parameters": _received_parameters,
  178. }
  179. def _all_params(self, context):
  180. if self.params:
  181. if callable(self.params):
  182. params = self.params(context)
  183. else:
  184. params = self.params
  185. if not isinstance(params, list):
  186. params = [params]
  187. return params
  188. else:
  189. return None
  190. def _failure_message(self, execute_observed, expected_params):
  191. return (
  192. "Testing for compiled statement\n%r partial params %s, "
  193. "received\n%%(received_statement)r with params "
  194. "%%(received_parameters)r"
  195. % (
  196. self.statement.replace("%", "%%"),
  197. repr(expected_params).replace("%", "%%"),
  198. )
  199. )
  200. class RegexSQL(CompiledSQL):
  201. def __init__(
  202. self, regex, params=None, dialect="default", enable_returning=False
  203. ):
  204. SQLMatchRule.__init__(self)
  205. self.regex = re.compile(regex)
  206. self.orig_regex = regex
  207. self.params = params
  208. self.dialect = dialect
  209. self.enable_returning = enable_returning
  210. def _failure_message(self, execute_observed, expected_params):
  211. return (
  212. "Testing for compiled statement ~%r partial params %s, "
  213. "received %%(received_statement)r with params "
  214. "%%(received_parameters)r"
  215. % (
  216. self.orig_regex.replace("%", "%%"),
  217. repr(expected_params).replace("%", "%%"),
  218. )
  219. )
  220. def _compare_sql(self, execute_observed, received_statement):
  221. return bool(self.regex.match(received_statement))
  222. class DialectSQL(CompiledSQL):
  223. def _compile_dialect(self, execute_observed):
  224. return execute_observed.context.dialect
  225. def _compare_no_space(self, real_stmt, received_stmt):
  226. stmt = re.sub(r"[\n\t]", "", real_stmt)
  227. return received_stmt == stmt
  228. def _received_statement(self, execute_observed):
  229. received_stmt, received_params = super()._received_statement(
  230. execute_observed
  231. )
  232. # TODO: why do we need this part?
  233. for real_stmt in execute_observed.statements:
  234. if self._compare_no_space(real_stmt.statement, received_stmt):
  235. break
  236. else:
  237. raise AssertionError(
  238. "Can't locate compiled statement %r in list of "
  239. "statements actually invoked" % received_stmt
  240. )
  241. return received_stmt, execute_observed.context.compiled_parameters
  242. def _dialect_adjusted_statement(self, dialect):
  243. paramstyle = dialect.paramstyle
  244. stmt = re.sub(r"[\n\t]", "", self.statement)
  245. # temporarily escape out PG double colons
  246. stmt = stmt.replace("::", "!!")
  247. if paramstyle == "pyformat":
  248. stmt = re.sub(r":([\w_]+)", r"%(\1)s", stmt)
  249. else:
  250. # positional params
  251. repl = None
  252. if paramstyle == "qmark":
  253. repl = "?"
  254. elif paramstyle == "format":
  255. repl = r"%s"
  256. elif paramstyle.startswith("numeric"):
  257. counter = itertools.count(1)
  258. num_identifier = "$" if paramstyle == "numeric_dollar" else ":"
  259. def repl(m):
  260. return f"{num_identifier}{next(counter)}"
  261. stmt = re.sub(r":([\w_]+)", repl, stmt)
  262. # put them back
  263. stmt = stmt.replace("!!", "::")
  264. return stmt
  265. def _compare_sql(self, execute_observed, received_statement):
  266. stmt = self._dialect_adjusted_statement(
  267. execute_observed.context.dialect
  268. )
  269. return received_statement == stmt
  270. def _failure_message(self, execute_observed, expected_params):
  271. return (
  272. "Testing for compiled statement\n%r partial params %s, "
  273. "received\n%%(received_statement)r with params "
  274. "%%(received_parameters)r"
  275. % (
  276. self._dialect_adjusted_statement(
  277. execute_observed.context.dialect
  278. ).replace("%", "%%"),
  279. repr(expected_params).replace("%", "%%"),
  280. )
  281. )
  282. class CountStatements(AssertRule):
  283. def __init__(self, count):
  284. self.count = count
  285. self._statement_count = 0
  286. def process_statement(self, execute_observed):
  287. self._statement_count += 1
  288. def no_more_statements(self):
  289. if self.count != self._statement_count:
  290. assert False, "desired statement count %d does not match %d" % (
  291. self.count,
  292. self._statement_count,
  293. )
  294. class AllOf(AssertRule):
  295. def __init__(self, *rules):
  296. self.rules = set(rules)
  297. def process_statement(self, execute_observed):
  298. for rule in list(self.rules):
  299. rule.errormessage = None
  300. rule.process_statement(execute_observed)
  301. if rule.is_consumed:
  302. self.rules.discard(rule)
  303. if not self.rules:
  304. self.is_consumed = True
  305. break
  306. elif not rule.errormessage:
  307. # rule is not done yet
  308. self.errormessage = None
  309. break
  310. else:
  311. self.errormessage = list(self.rules)[0].errormessage
  312. class EachOf(AssertRule):
  313. def __init__(self, *rules):
  314. self.rules = list(rules)
  315. def process_statement(self, execute_observed):
  316. if not self.rules:
  317. self.is_consumed = True
  318. self.consume_statement = False
  319. while self.rules:
  320. rule = self.rules[0]
  321. rule.process_statement(execute_observed)
  322. if rule.is_consumed:
  323. self.rules.pop(0)
  324. elif rule.errormessage:
  325. self.errormessage = rule.errormessage
  326. if rule.consume_statement:
  327. break
  328. if not self.rules:
  329. self.is_consumed = True
  330. def no_more_statements(self):
  331. if self.rules and not self.rules[0].is_consumed:
  332. self.rules[0].no_more_statements()
  333. elif self.rules:
  334. super().no_more_statements()
  335. class Conditional(EachOf):
  336. def __init__(self, condition, rules, else_rules):
  337. if condition:
  338. super().__init__(*rules)
  339. else:
  340. super().__init__(*else_rules)
  341. class Or(AllOf):
  342. def process_statement(self, execute_observed):
  343. for rule in self.rules:
  344. rule.process_statement(execute_observed)
  345. if rule.is_consumed:
  346. self.is_consumed = True
  347. break
  348. else:
  349. self.errormessage = list(self.rules)[0].errormessage
  350. class SQLExecuteObserved:
  351. def __init__(self, context, clauseelement, multiparams, params):
  352. self.context = context
  353. self.clauseelement = clauseelement
  354. if multiparams:
  355. self.parameters = multiparams
  356. elif params:
  357. self.parameters = [params]
  358. else:
  359. self.parameters = []
  360. self.statements = []
  361. def __repr__(self):
  362. return str(self.statements)
  363. class SQLCursorExecuteObserved(
  364. collections.namedtuple(
  365. "SQLCursorExecuteObserved",
  366. ["statement", "parameters", "context", "executemany"],
  367. )
  368. ):
  369. pass
  370. class SQLAsserter:
  371. def __init__(self):
  372. self.accumulated = []
  373. def _close(self):
  374. self._final = self.accumulated
  375. del self.accumulated
  376. def assert_(self, *rules):
  377. rule = EachOf(*rules)
  378. observed = list(self._final)
  379. while observed:
  380. statement = observed.pop(0)
  381. rule.process_statement(statement)
  382. if rule.is_consumed:
  383. break
  384. elif rule.errormessage:
  385. assert False, rule.errormessage
  386. if observed:
  387. assert False, "Additional SQL statements remain:\n%s" % observed
  388. elif not rule.is_consumed:
  389. rule.no_more_statements()
  390. @contextlib.contextmanager
  391. def assert_engine(engine):
  392. asserter = SQLAsserter()
  393. orig = []
  394. @event.listens_for(engine, "before_execute")
  395. def connection_execute(
  396. conn, clauseelement, multiparams, params, execution_options
  397. ):
  398. # grab the original statement + params before any cursor
  399. # execution
  400. orig[:] = clauseelement, multiparams, params
  401. @event.listens_for(engine, "after_cursor_execute")
  402. def cursor_execute(
  403. conn, cursor, statement, parameters, context, executemany
  404. ):
  405. if not context:
  406. return
  407. # then grab real cursor statements and associate them all
  408. # around a single context
  409. if (
  410. asserter.accumulated
  411. and asserter.accumulated[-1].context is context
  412. ):
  413. obs = asserter.accumulated[-1]
  414. else:
  415. obs = SQLExecuteObserved(context, orig[0], orig[1], orig[2])
  416. asserter.accumulated.append(obs)
  417. obs.statements.append(
  418. SQLCursorExecuteObserved(
  419. statement, parameters, context, executemany
  420. )
  421. )
  422. try:
  423. yield asserter
  424. finally:
  425. event.remove(engine, "after_cursor_execute", cursor_execute)
  426. event.remove(engine, "before_execute", connection_execute)
  427. asserter._close()