env.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557
  1. import importlib.machinery
  2. import os
  3. from pathlib import Path
  4. import shutil
  5. import textwrap
  6. from sqlalchemy.testing import config
  7. from sqlalchemy.testing import provision
  8. from . import util as testing_util
  9. from .. import command
  10. from .. import script
  11. from .. import util
  12. from ..script import Script
  13. from ..script import ScriptDirectory
  14. def _get_staging_directory():
  15. if provision.FOLLOWER_IDENT:
  16. return f"scratch_{provision.FOLLOWER_IDENT}"
  17. else:
  18. return "scratch"
  19. def staging_env(create=True, template="generic", sourceless=False):
  20. cfg = _testing_config()
  21. if create:
  22. path = _join_path(_get_staging_directory(), "scripts")
  23. assert not os.path.exists(path), (
  24. "staging directory %s already exists; poor cleanup?" % path
  25. )
  26. command.init(cfg, path, template=template)
  27. if sourceless:
  28. try:
  29. # do an import so that a .pyc/.pyo is generated.
  30. util.load_python_file(path, "env.py")
  31. except AttributeError:
  32. # we don't have the migration context set up yet
  33. # so running the .env py throws this exception.
  34. # theoretically we could be using py_compiler here to
  35. # generate .pyc/.pyo without importing but not really
  36. # worth it.
  37. pass
  38. assert sourceless in (
  39. "pep3147_envonly",
  40. "simple",
  41. "pep3147_everything",
  42. ), sourceless
  43. make_sourceless(
  44. _join_path(path, "env.py"),
  45. "pep3147" if "pep3147" in sourceless else "simple",
  46. )
  47. sc = script.ScriptDirectory.from_config(cfg)
  48. return sc
  49. def clear_staging_env():
  50. from sqlalchemy.testing import engines
  51. engines.testing_reaper.close_all()
  52. shutil.rmtree(_get_staging_directory(), True)
  53. def script_file_fixture(txt):
  54. dir_ = _join_path(_get_staging_directory(), "scripts")
  55. path = _join_path(dir_, "script.py.mako")
  56. with open(path, "w") as f:
  57. f.write(txt)
  58. def env_file_fixture(txt):
  59. dir_ = _join_path(_get_staging_directory(), "scripts")
  60. txt = (
  61. """
  62. from alembic import context
  63. config = context.config
  64. """
  65. + txt
  66. )
  67. path = _join_path(dir_, "env.py")
  68. pyc_path = util.pyc_file_from_path(path)
  69. if pyc_path:
  70. os.unlink(pyc_path)
  71. with open(path, "w") as f:
  72. f.write(txt)
  73. def _sqlite_file_db(tempname="foo.db", future=False, scope=None, **options):
  74. dir_ = _join_path(_get_staging_directory(), "scripts")
  75. url = "sqlite:///%s/%s" % (dir_, tempname)
  76. if scope:
  77. options["scope"] = scope
  78. return testing_util.testing_engine(url=url, future=future, options=options)
  79. def _sqlite_testing_config(sourceless=False, future=False):
  80. dir_ = _join_path(_get_staging_directory(), "scripts")
  81. url = f"sqlite:///{dir_}/foo.db"
  82. sqlalchemy_future = future or ("future" in config.db.__class__.__module__)
  83. return _write_config_file(
  84. f"""
  85. [alembic]
  86. script_location = {dir_}
  87. sqlalchemy.url = {url}
  88. sourceless = {"true" if sourceless else "false"}
  89. {"sqlalchemy.future = true" if sqlalchemy_future else ""}
  90. [loggers]
  91. keys = root,sqlalchemy
  92. [handlers]
  93. keys = console
  94. [logger_root]
  95. level = WARNING
  96. handlers = console
  97. qualname =
  98. [logger_sqlalchemy]
  99. level = DEBUG
  100. handlers =
  101. qualname = sqlalchemy.engine
  102. [handler_console]
  103. class = StreamHandler
  104. args = (sys.stderr,)
  105. level = NOTSET
  106. formatter = generic
  107. [formatters]
  108. keys = generic
  109. [formatter_generic]
  110. format = %%(levelname)-5.5s [%%(name)s] %%(message)s
  111. datefmt = %%H:%%M:%%S
  112. """
  113. )
  114. def _multi_dir_testing_config(sourceless=False, extra_version_location=""):
  115. dir_ = _join_path(_get_staging_directory(), "scripts")
  116. sqlalchemy_future = "future" in config.db.__class__.__module__
  117. url = "sqlite:///%s/foo.db" % dir_
  118. return _write_config_file(
  119. f"""
  120. [alembic]
  121. script_location = {dir_}
  122. sqlalchemy.url = {url}
  123. sqlalchemy.future = {"true" if sqlalchemy_future else "false"}
  124. sourceless = {"true" if sourceless else "false"}
  125. path_separator = space
  126. version_locations = %(here)s/model1/ %(here)s/model2/ %(here)s/model3/ \
  127. {extra_version_location}
  128. [loggers]
  129. keys = root
  130. [handlers]
  131. keys = console
  132. [logger_root]
  133. level = WARNING
  134. handlers = console
  135. qualname =
  136. [handler_console]
  137. class = StreamHandler
  138. args = (sys.stderr,)
  139. level = NOTSET
  140. formatter = generic
  141. [formatters]
  142. keys = generic
  143. [formatter_generic]
  144. format = %%(levelname)-5.5s [%%(name)s] %%(message)s
  145. datefmt = %%H:%%M:%%S
  146. """
  147. )
  148. def _no_sql_pyproject_config(dialect="postgresql", directives=""):
  149. """use a postgresql url with no host so that
  150. connections guaranteed to fail"""
  151. dir_ = _join_path(_get_staging_directory(), "scripts")
  152. return _write_toml_config(
  153. f"""
  154. [tool.alembic]
  155. script_location ="{dir_}"
  156. {textwrap.dedent(directives)}
  157. """,
  158. f"""
  159. [alembic]
  160. sqlalchemy.url = {dialect}://
  161. [loggers]
  162. keys = root
  163. [handlers]
  164. keys = console
  165. [logger_root]
  166. level = WARNING
  167. handlers = console
  168. qualname =
  169. [handler_console]
  170. class = StreamHandler
  171. args = (sys.stderr,)
  172. level = NOTSET
  173. formatter = generic
  174. [formatters]
  175. keys = generic
  176. [formatter_generic]
  177. format = %%(levelname)-5.5s [%%(name)s] %%(message)s
  178. datefmt = %%H:%%M:%%S
  179. """,
  180. )
  181. def _no_sql_testing_config(dialect="postgresql", directives=""):
  182. """use a postgresql url with no host so that
  183. connections guaranteed to fail"""
  184. dir_ = _join_path(_get_staging_directory(), "scripts")
  185. return _write_config_file(
  186. f"""
  187. [alembic]
  188. script_location ={dir_}
  189. sqlalchemy.url = {dialect}://
  190. {directives}
  191. [loggers]
  192. keys = root
  193. [handlers]
  194. keys = console
  195. [logger_root]
  196. level = WARNING
  197. handlers = console
  198. qualname =
  199. [handler_console]
  200. class = StreamHandler
  201. args = (sys.stderr,)
  202. level = NOTSET
  203. formatter = generic
  204. [formatters]
  205. keys = generic
  206. [formatter_generic]
  207. format = %%(levelname)-5.5s [%%(name)s] %%(message)s
  208. datefmt = %%H:%%M:%%S
  209. """
  210. )
  211. def _write_toml_config(tomltext, initext):
  212. cfg = _write_config_file(initext)
  213. with open(cfg.toml_file_name, "w") as f:
  214. f.write(tomltext)
  215. return cfg
  216. def _write_config_file(text):
  217. cfg = _testing_config()
  218. with open(cfg.config_file_name, "w") as f:
  219. f.write(text)
  220. return cfg
  221. def _testing_config():
  222. from alembic.config import Config
  223. if not os.access(_get_staging_directory(), os.F_OK):
  224. os.mkdir(_get_staging_directory())
  225. return Config(
  226. _join_path(_get_staging_directory(), "test_alembic.ini"),
  227. _join_path(_get_staging_directory(), "pyproject.toml"),
  228. )
  229. def write_script(
  230. scriptdir, rev_id, content, encoding="ascii", sourceless=False
  231. ):
  232. old = scriptdir.revision_map.get_revision(rev_id)
  233. path = old.path
  234. content = textwrap.dedent(content)
  235. if encoding:
  236. content = content.encode(encoding)
  237. with open(path, "wb") as fp:
  238. fp.write(content)
  239. pyc_path = util.pyc_file_from_path(path)
  240. if pyc_path:
  241. os.unlink(pyc_path)
  242. script = Script._from_path(scriptdir, path)
  243. old = scriptdir.revision_map.get_revision(script.revision)
  244. if old.down_revision != script.down_revision:
  245. raise Exception("Can't change down_revision on a refresh operation.")
  246. scriptdir.revision_map.add_revision(script, _replace=True)
  247. if sourceless:
  248. make_sourceless(
  249. path, "pep3147" if sourceless == "pep3147_everything" else "simple"
  250. )
  251. def make_sourceless(path, style):
  252. import py_compile
  253. py_compile.compile(path)
  254. if style == "simple":
  255. pyc_path = util.pyc_file_from_path(path)
  256. suffix = importlib.machinery.BYTECODE_SUFFIXES[0]
  257. filepath, ext = os.path.splitext(path)
  258. simple_pyc_path = filepath + suffix
  259. shutil.move(pyc_path, simple_pyc_path)
  260. pyc_path = simple_pyc_path
  261. else:
  262. assert style in ("pep3147", "simple")
  263. pyc_path = util.pyc_file_from_path(path)
  264. assert os.access(pyc_path, os.F_OK)
  265. os.unlink(path)
  266. def three_rev_fixture(cfg):
  267. a = util.rev_id()
  268. b = util.rev_id()
  269. c = util.rev_id()
  270. script = ScriptDirectory.from_config(cfg)
  271. script.generate_revision(a, "revision a", refresh=True, head="base")
  272. write_script(
  273. script,
  274. a,
  275. f"""\
  276. "Rev A"
  277. revision = '{a}'
  278. down_revision = None
  279. from alembic import op
  280. def upgrade():
  281. op.execute("CREATE STEP 1")
  282. def downgrade():
  283. op.execute("DROP STEP 1")
  284. """,
  285. )
  286. script.generate_revision(b, "revision b", refresh=True, head=a)
  287. write_script(
  288. script,
  289. b,
  290. f"""# coding: utf-8
  291. "Rev B, méil, %3"
  292. revision = '{b}'
  293. down_revision = '{a}'
  294. from alembic import op
  295. def upgrade():
  296. op.execute("CREATE STEP 2")
  297. def downgrade():
  298. op.execute("DROP STEP 2")
  299. """,
  300. encoding="utf-8",
  301. )
  302. script.generate_revision(c, "revision c", refresh=True, head=b)
  303. write_script(
  304. script,
  305. c,
  306. f"""\
  307. "Rev C"
  308. revision = '{c}'
  309. down_revision = '{b}'
  310. from alembic import op
  311. def upgrade():
  312. op.execute("CREATE STEP 3")
  313. def downgrade():
  314. op.execute("DROP STEP 3")
  315. """,
  316. )
  317. return a, b, c
  318. def multi_heads_fixture(cfg, a, b, c):
  319. """Create a multiple head fixture from the three-revs fixture"""
  320. # a->b->c
  321. # -> d -> e
  322. # -> f
  323. d = util.rev_id()
  324. e = util.rev_id()
  325. f = util.rev_id()
  326. script = ScriptDirectory.from_config(cfg)
  327. script.generate_revision(
  328. d, "revision d from b", head=b, splice=True, refresh=True
  329. )
  330. write_script(
  331. script,
  332. d,
  333. f"""\
  334. "Rev D"
  335. revision = '{d}'
  336. down_revision = '{b}'
  337. from alembic import op
  338. def upgrade():
  339. op.execute("CREATE STEP 4")
  340. def downgrade():
  341. op.execute("DROP STEP 4")
  342. """,
  343. )
  344. script.generate_revision(
  345. e, "revision e from d", head=d, splice=True, refresh=True
  346. )
  347. write_script(
  348. script,
  349. e,
  350. f"""\
  351. "Rev E"
  352. revision = '{e}'
  353. down_revision = '{d}'
  354. from alembic import op
  355. def upgrade():
  356. op.execute("CREATE STEP 5")
  357. def downgrade():
  358. op.execute("DROP STEP 5")
  359. """,
  360. )
  361. script.generate_revision(
  362. f, "revision f from b", head=b, splice=True, refresh=True
  363. )
  364. write_script(
  365. script,
  366. f,
  367. f"""\
  368. "Rev F"
  369. revision = '{f}'
  370. down_revision = '{b}'
  371. from alembic import op
  372. def upgrade():
  373. op.execute("CREATE STEP 6")
  374. def downgrade():
  375. op.execute("DROP STEP 6")
  376. """,
  377. )
  378. return d, e, f
  379. def _multidb_testing_config(engines):
  380. """alembic.ini fixture to work exactly with the 'multidb' template"""
  381. dir_ = _join_path(_get_staging_directory(), "scripts")
  382. sqlalchemy_future = "future" in config.db.__class__.__module__
  383. databases = ", ".join(engines.keys())
  384. engines = "\n\n".join(
  385. f"[{key}]\nsqlalchemy.url = {value.url}"
  386. for key, value in engines.items()
  387. )
  388. return _write_config_file(
  389. f"""
  390. [alembic]
  391. script_location = {dir_}
  392. sourceless = false
  393. sqlalchemy.future = {"true" if sqlalchemy_future else "false"}
  394. databases = {databases}
  395. {engines}
  396. [loggers]
  397. keys = root
  398. [handlers]
  399. keys = console
  400. [logger_root]
  401. level = WARNING
  402. handlers = console
  403. qualname =
  404. [handler_console]
  405. class = StreamHandler
  406. args = (sys.stderr,)
  407. level = NOTSET
  408. formatter = generic
  409. [formatters]
  410. keys = generic
  411. [formatter_generic]
  412. format = %%(levelname)-5.5s [%%(name)s] %%(message)s
  413. datefmt = %%H:%%M:%%S
  414. """
  415. )
  416. def _join_path(base: str, *more: str):
  417. return str(Path(base).joinpath(*more).as_posix())