render.py 36 KB


  1. # mypy: allow-untyped-defs, allow-incomplete-defs, allow-untyped-calls
  2. # mypy: no-warn-return-any, allow-any-generics
  3. from __future__ import annotations
  4. from io import StringIO
  5. import re
  6. from typing import Any
  7. from typing import cast
  8. from typing import Dict
  9. from typing import List
  10. from typing import Optional
  11. from typing import Tuple
  12. from typing import TYPE_CHECKING
  13. from typing import Union
  14. from mako.pygen import PythonPrinter
  15. from sqlalchemy import schema as sa_schema
  16. from sqlalchemy import sql
  17. from sqlalchemy import types as sqltypes
  18. from sqlalchemy.sql.base import _DialectArgView
  19. from sqlalchemy.sql.elements import conv
  20. from sqlalchemy.sql.elements import Label
  21. from sqlalchemy.sql.elements import quoted_name
  22. from .. import util
  23. from ..operations import ops
  24. from ..util import sqla_compat
  25. if TYPE_CHECKING:
  26. from typing import Literal
  27. from sqlalchemy import Computed
  28. from sqlalchemy import Identity
  29. from sqlalchemy.sql.elements import ColumnElement
  30. from sqlalchemy.sql.elements import TextClause
  31. from sqlalchemy.sql.schema import CheckConstraint
  32. from sqlalchemy.sql.schema import Column
  33. from sqlalchemy.sql.schema import Constraint
  34. from sqlalchemy.sql.schema import FetchedValue
  35. from sqlalchemy.sql.schema import ForeignKey
  36. from sqlalchemy.sql.schema import ForeignKeyConstraint
  37. from sqlalchemy.sql.schema import Index
  38. from sqlalchemy.sql.schema import MetaData
  39. from sqlalchemy.sql.schema import PrimaryKeyConstraint
  40. from sqlalchemy.sql.schema import UniqueConstraint
  41. from sqlalchemy.sql.sqltypes import ARRAY
  42. from sqlalchemy.sql.type_api import TypeEngine
  43. from alembic.autogenerate.api import AutogenContext
  44. from alembic.config import Config
  45. from alembic.operations.ops import MigrationScript
  46. from alembic.operations.ops import ModifyTableOps
  47. MAX_PYTHON_ARGS = 255
  48. def _render_gen_name(
  49. autogen_context: AutogenContext,
  50. name: sqla_compat._ConstraintName,
  51. ) -> Optional[Union[quoted_name, str, _f_name]]:
  52. if isinstance(name, conv):
  53. return _f_name(_alembic_autogenerate_prefix(autogen_context), name)
  54. else:
  55. return sqla_compat.constraint_name_or_none(name)
  56. def _indent(text: str) -> str:
  57. text = re.compile(r"^", re.M).sub(" ", text).strip()
  58. text = re.compile(r" +$", re.M).sub("", text)
  59. return text
  60. def _render_python_into_templatevars(
  61. autogen_context: AutogenContext,
  62. migration_script: MigrationScript,
  63. template_args: Dict[str, Union[str, Config]],
  64. ) -> None:
  65. imports = autogen_context.imports
  66. for upgrade_ops, downgrade_ops in zip(
  67. migration_script.upgrade_ops_list, migration_script.downgrade_ops_list
  68. ):
  69. template_args[upgrade_ops.upgrade_token] = _indent(
  70. _render_cmd_body(upgrade_ops, autogen_context)
  71. )
  72. template_args[downgrade_ops.downgrade_token] = _indent(
  73. _render_cmd_body(downgrade_ops, autogen_context)
  74. )
  75. template_args["imports"] = "\n".join(sorted(imports))
  76. default_renderers = renderers = util.Dispatcher()
  77. def _render_cmd_body(
  78. op_container: ops.OpContainer,
  79. autogen_context: AutogenContext,
  80. ) -> str:
  81. buf = StringIO()
  82. printer = PythonPrinter(buf)
  83. printer.writeline(
  84. "# ### commands auto generated by Alembic - please adjust! ###"
  85. )
  86. has_lines = False
  87. for op in op_container.ops:
  88. lines = render_op(autogen_context, op)
  89. has_lines = has_lines or bool(lines)
  90. for line in lines:
  91. printer.writeline(line)
  92. if not has_lines:
  93. printer.writeline("pass")
  94. printer.writeline("# ### end Alembic commands ###")
  95. return buf.getvalue()
  96. def render_op(
  97. autogen_context: AutogenContext, op: ops.MigrateOperation
  98. ) -> List[str]:
  99. renderer = renderers.dispatch(op)
  100. lines = util.to_list(renderer(autogen_context, op))
  101. return lines
  102. def render_op_text(
  103. autogen_context: AutogenContext, op: ops.MigrateOperation
  104. ) -> str:
  105. return "\n".join(render_op(autogen_context, op))
  106. @renderers.dispatch_for(ops.ModifyTableOps)
  107. def _render_modify_table(
  108. autogen_context: AutogenContext, op: ModifyTableOps
  109. ) -> List[str]:
  110. opts = autogen_context.opts
  111. render_as_batch = opts.get("render_as_batch", False)
  112. if op.ops:
  113. lines = []
  114. if render_as_batch:
  115. with autogen_context._within_batch():
  116. lines.append(
  117. "with op.batch_alter_table(%r, schema=%r) as batch_op:"
  118. % (op.table_name, op.schema)
  119. )
  120. for t_op in op.ops:
  121. t_lines = render_op(autogen_context, t_op)
  122. lines.extend(t_lines)
  123. lines.append("")
  124. else:
  125. for t_op in op.ops:
  126. t_lines = render_op(autogen_context, t_op)
  127. lines.extend(t_lines)
  128. return lines
  129. else:
  130. return []
  131. @renderers.dispatch_for(ops.CreateTableCommentOp)
  132. def _render_create_table_comment(
  133. autogen_context: AutogenContext, op: ops.CreateTableCommentOp
  134. ) -> str:
  135. if autogen_context._has_batch:
  136. templ = (
  137. "{prefix}create_table_comment(\n"
  138. "{indent}{comment},\n"
  139. "{indent}existing_comment={existing}\n"
  140. ")"
  141. )
  142. else:
  143. templ = (
  144. "{prefix}create_table_comment(\n"
  145. "{indent}'{tname}',\n"
  146. "{indent}{comment},\n"
  147. "{indent}existing_comment={existing},\n"
  148. "{indent}schema={schema}\n"
  149. ")"
  150. )
  151. return templ.format(
  152. prefix=_alembic_autogenerate_prefix(autogen_context),
  153. tname=op.table_name,
  154. comment="%r" % op.comment if op.comment is not None else None,
  155. existing=(
  156. "%r" % op.existing_comment
  157. if op.existing_comment is not None
  158. else None
  159. ),
  160. schema="'%s'" % op.schema if op.schema is not None else None,
  161. indent=" ",
  162. )
  163. @renderers.dispatch_for(ops.DropTableCommentOp)
  164. def _render_drop_table_comment(
  165. autogen_context: AutogenContext, op: ops.DropTableCommentOp
  166. ) -> str:
  167. if autogen_context._has_batch:
  168. templ = (
  169. "{prefix}drop_table_comment(\n"
  170. "{indent}existing_comment={existing}\n"
  171. ")"
  172. )
  173. else:
  174. templ = (
  175. "{prefix}drop_table_comment(\n"
  176. "{indent}'{tname}',\n"
  177. "{indent}existing_comment={existing},\n"
  178. "{indent}schema={schema}\n"
  179. ")"
  180. )
  181. return templ.format(
  182. prefix=_alembic_autogenerate_prefix(autogen_context),
  183. tname=op.table_name,
  184. existing=(
  185. "%r" % op.existing_comment
  186. if op.existing_comment is not None
  187. else None
  188. ),
  189. schema="'%s'" % op.schema if op.schema is not None else None,
  190. indent=" ",
  191. )
  192. @renderers.dispatch_for(ops.CreateTableOp)
  193. def _add_table(autogen_context: AutogenContext, op: ops.CreateTableOp) -> str:
  194. table = op.to_table()
  195. args = [
  196. col
  197. for col in [
  198. _render_column(col, autogen_context) for col in table.columns
  199. ]
  200. if col
  201. ] + sorted(
  202. [
  203. rcons
  204. for rcons in [
  205. _render_constraint(
  206. cons, autogen_context, op._namespace_metadata
  207. )
  208. for cons in table.constraints
  209. ]
  210. if rcons is not None
  211. ]
  212. )
  213. if len(args) > MAX_PYTHON_ARGS:
  214. args_str = "*[" + ",\n".join(args) + "]"
  215. else:
  216. args_str = ",\n".join(args)
  217. text = "%(prefix)screate_table(%(tablename)r,\n%(args)s" % {
  218. "tablename": _ident(op.table_name),
  219. "prefix": _alembic_autogenerate_prefix(autogen_context),
  220. "args": args_str,
  221. }
  222. if op.schema:
  223. text += ",\nschema=%r" % _ident(op.schema)
  224. comment = table.comment
  225. if comment:
  226. text += ",\ncomment=%r" % _ident(comment)
  227. info = table.info
  228. if info:
  229. text += f",\ninfo={info!r}"
  230. for k in sorted(op.kw):
  231. text += ",\n%s=%r" % (k.replace(" ", "_"), op.kw[k])
  232. if table._prefixes:
  233. prefixes = ", ".join("'%s'" % p for p in table._prefixes)
  234. text += ",\nprefixes=[%s]" % prefixes
  235. if op.if_not_exists is not None:
  236. text += ",\nif_not_exists=%r" % bool(op.if_not_exists)
  237. text += "\n)"
  238. return text
  239. @renderers.dispatch_for(ops.DropTableOp)
  240. def _drop_table(autogen_context: AutogenContext, op: ops.DropTableOp) -> str:
  241. text = "%(prefix)sdrop_table(%(tname)r" % {
  242. "prefix": _alembic_autogenerate_prefix(autogen_context),
  243. "tname": _ident(op.table_name),
  244. }
  245. if op.schema:
  246. text += ", schema=%r" % _ident(op.schema)
  247. if op.if_exists is not None:
  248. text += ", if_exists=%r" % bool(op.if_exists)
  249. text += ")"
  250. return text
  251. def _render_dialect_kwargs_items(
  252. autogen_context: AutogenContext, dialect_kwargs: _DialectArgView
  253. ) -> list[str]:
  254. return [
  255. f"{key}={_render_potential_expr(val, autogen_context)}"
  256. for key, val in dialect_kwargs.items()
  257. ]
  258. @renderers.dispatch_for(ops.CreateIndexOp)
  259. def _add_index(autogen_context: AutogenContext, op: ops.CreateIndexOp) -> str:
  260. index = op.to_index()
  261. has_batch = autogen_context._has_batch
  262. if has_batch:
  263. tmpl = (
  264. "%(prefix)screate_index(%(name)r, [%(columns)s], "
  265. "unique=%(unique)r%(kwargs)s)"
  266. )
  267. else:
  268. tmpl = (
  269. "%(prefix)screate_index(%(name)r, %(table)r, [%(columns)s], "
  270. "unique=%(unique)r%(schema)s%(kwargs)s)"
  271. )
  272. assert index.table is not None
  273. opts = _render_dialect_kwargs_items(autogen_context, index.dialect_kwargs)
  274. if op.if_not_exists is not None:
  275. opts.append("if_not_exists=%r" % bool(op.if_not_exists))
  276. text = tmpl % {
  277. "prefix": _alembic_autogenerate_prefix(autogen_context),
  278. "name": _render_gen_name(autogen_context, index.name),
  279. "table": _ident(index.table.name),
  280. "columns": ", ".join(
  281. _get_index_rendered_expressions(index, autogen_context)
  282. ),
  283. "unique": index.unique or False,
  284. "schema": (
  285. (", schema=%r" % _ident(index.table.schema))
  286. if index.table.schema
  287. else ""
  288. ),
  289. "kwargs": ", " + ", ".join(opts) if opts else "",
  290. }
  291. return text
  292. @renderers.dispatch_for(ops.DropIndexOp)
  293. def _drop_index(autogen_context: AutogenContext, op: ops.DropIndexOp) -> str:
  294. index = op.to_index()
  295. has_batch = autogen_context._has_batch
  296. if has_batch:
  297. tmpl = "%(prefix)sdrop_index(%(name)r%(kwargs)s)"
  298. else:
  299. tmpl = (
  300. "%(prefix)sdrop_index(%(name)r, "
  301. "table_name=%(table_name)r%(schema)s%(kwargs)s)"
  302. )
  303. opts = _render_dialect_kwargs_items(autogen_context, index.dialect_kwargs)
  304. if op.if_exists is not None:
  305. opts.append("if_exists=%r" % bool(op.if_exists))
  306. text = tmpl % {
  307. "prefix": _alembic_autogenerate_prefix(autogen_context),
  308. "name": _render_gen_name(autogen_context, op.index_name),
  309. "table_name": _ident(op.table_name),
  310. "schema": ((", schema=%r" % _ident(op.schema)) if op.schema else ""),
  311. "kwargs": ", " + ", ".join(opts) if opts else "",
  312. }
  313. return text
  314. @renderers.dispatch_for(ops.CreateUniqueConstraintOp)
  315. def _add_unique_constraint(
  316. autogen_context: AutogenContext, op: ops.CreateUniqueConstraintOp
  317. ) -> List[str]:
  318. return [_uq_constraint(op.to_constraint(), autogen_context, True)]
  319. @renderers.dispatch_for(ops.CreateForeignKeyOp)
  320. def _add_fk_constraint(
  321. autogen_context: AutogenContext, op: ops.CreateForeignKeyOp
  322. ) -> str:
  323. constraint = op.to_constraint()
  324. args = [repr(_render_gen_name(autogen_context, op.constraint_name))]
  325. if not autogen_context._has_batch:
  326. args.append(repr(_ident(op.source_table)))
  327. args.extend(
  328. [
  329. repr(_ident(op.referent_table)),
  330. repr([_ident(col) for col in op.local_cols]),
  331. repr([_ident(col) for col in op.remote_cols]),
  332. ]
  333. )
  334. kwargs = [
  335. "referent_schema",
  336. "onupdate",
  337. "ondelete",
  338. "initially",
  339. "deferrable",
  340. "use_alter",
  341. "match",
  342. ]
  343. if not autogen_context._has_batch:
  344. kwargs.insert(0, "source_schema")
  345. for k in kwargs:
  346. if k in op.kw:
  347. value = op.kw[k]
  348. if value is not None:
  349. args.append("%s=%r" % (k, value))
  350. dialect_kwargs = _render_dialect_kwargs_items(
  351. autogen_context, constraint.dialect_kwargs
  352. )
  353. return "%(prefix)screate_foreign_key(%(args)s%(dialect_kwargs)s)" % {
  354. "prefix": _alembic_autogenerate_prefix(autogen_context),
  355. "args": ", ".join(args),
  356. "dialect_kwargs": (
  357. ", " + ", ".join(dialect_kwargs) if dialect_kwargs else ""
  358. ),
  359. }
  360. @renderers.dispatch_for(ops.CreatePrimaryKeyOp)
  361. def _add_pk_constraint(constraint, autogen_context):
  362. raise NotImplementedError()
  363. @renderers.dispatch_for(ops.CreateCheckConstraintOp)
  364. def _add_check_constraint(constraint, autogen_context):
  365. raise NotImplementedError()
  366. @renderers.dispatch_for(ops.DropConstraintOp)
  367. def _drop_constraint(
  368. autogen_context: AutogenContext, op: ops.DropConstraintOp
  369. ) -> str:
  370. prefix = _alembic_autogenerate_prefix(autogen_context)
  371. name = _render_gen_name(autogen_context, op.constraint_name)
  372. schema = _ident(op.schema) if op.schema else None
  373. type_ = _ident(op.constraint_type) if op.constraint_type else None
  374. if_exists = op.if_exists
  375. params_strs = []
  376. params_strs.append(repr(name))
  377. if not autogen_context._has_batch:
  378. params_strs.append(repr(_ident(op.table_name)))
  379. if schema is not None:
  380. params_strs.append(f"schema={schema!r}")
  381. if type_ is not None:
  382. params_strs.append(f"type_={type_!r}")
  383. if if_exists is not None:
  384. params_strs.append(f"if_exists={if_exists}")
  385. return f"{prefix}drop_constraint({', '.join(params_strs)})"
  386. @renderers.dispatch_for(ops.AddColumnOp)
  387. def _add_column(autogen_context: AutogenContext, op: ops.AddColumnOp) -> str:
  388. schema, tname, column, if_not_exists = (
  389. op.schema,
  390. op.table_name,
  391. op.column,
  392. op.if_not_exists,
  393. )
  394. if autogen_context._has_batch:
  395. template = "%(prefix)sadd_column(%(column)s)"
  396. else:
  397. template = "%(prefix)sadd_column(%(tname)r, %(column)s"
  398. if schema:
  399. template += ", schema=%(schema)r"
  400. if if_not_exists is not None:
  401. template += ", if_not_exists=%(if_not_exists)r"
  402. template += ")"
  403. text = template % {
  404. "prefix": _alembic_autogenerate_prefix(autogen_context),
  405. "tname": tname,
  406. "column": _render_column(column, autogen_context),
  407. "schema": schema,
  408. "if_not_exists": if_not_exists,
  409. }
  410. return text
  411. @renderers.dispatch_for(ops.DropColumnOp)
  412. def _drop_column(autogen_context: AutogenContext, op: ops.DropColumnOp) -> str:
  413. schema, tname, column_name, if_exists = (
  414. op.schema,
  415. op.table_name,
  416. op.column_name,
  417. op.if_exists,
  418. )
  419. if autogen_context._has_batch:
  420. template = "%(prefix)sdrop_column(%(cname)r)"
  421. else:
  422. template = "%(prefix)sdrop_column(%(tname)r, %(cname)r"
  423. if schema:
  424. template += ", schema=%(schema)r"
  425. if if_exists is not None:
  426. template += ", if_exists=%(if_exists)r"
  427. template += ")"
  428. text = template % {
  429. "prefix": _alembic_autogenerate_prefix(autogen_context),
  430. "tname": _ident(tname),
  431. "cname": _ident(column_name),
  432. "schema": _ident(schema),
  433. "if_exists": if_exists,
  434. }
  435. return text
  436. @renderers.dispatch_for(ops.AlterColumnOp)
  437. def _alter_column(
  438. autogen_context: AutogenContext, op: ops.AlterColumnOp
  439. ) -> str:
  440. tname = op.table_name
  441. cname = op.column_name
  442. server_default = op.modify_server_default
  443. type_ = op.modify_type
  444. nullable = op.modify_nullable
  445. comment = op.modify_comment
  446. newname = op.modify_name
  447. autoincrement = op.kw.get("autoincrement", None)
  448. existing_type = op.existing_type
  449. existing_nullable = op.existing_nullable
  450. existing_comment = op.existing_comment
  451. existing_server_default = op.existing_server_default
  452. schema = op.schema
  453. indent = " " * 11
  454. if autogen_context._has_batch:
  455. template = "%(prefix)salter_column(%(cname)r"
  456. else:
  457. template = "%(prefix)salter_column(%(tname)r, %(cname)r"
  458. text = template % {
  459. "prefix": _alembic_autogenerate_prefix(autogen_context),
  460. "tname": tname,
  461. "cname": cname,
  462. }
  463. if existing_type is not None:
  464. text += ",\n%sexisting_type=%s" % (
  465. indent,
  466. _repr_type(existing_type, autogen_context),
  467. )
  468. if server_default is not False:
  469. rendered = _render_server_default(server_default, autogen_context)
  470. text += ",\n%sserver_default=%s" % (indent, rendered)
  471. if newname is not None:
  472. text += ",\n%snew_column_name=%r" % (indent, newname)
  473. if type_ is not None:
  474. text += ",\n%stype_=%s" % (indent, _repr_type(type_, autogen_context))
  475. if nullable is not None:
  476. text += ",\n%snullable=%r" % (indent, nullable)
  477. if comment is not False:
  478. text += ",\n%scomment=%r" % (indent, comment)
  479. if existing_comment is not None:
  480. text += ",\n%sexisting_comment=%r" % (indent, existing_comment)
  481. if nullable is None and existing_nullable is not None:
  482. text += ",\n%sexisting_nullable=%r" % (indent, existing_nullable)
  483. if autoincrement is not None:
  484. text += ",\n%sautoincrement=%r" % (indent, autoincrement)
  485. if server_default is False and existing_server_default:
  486. rendered = _render_server_default(
  487. existing_server_default, autogen_context
  488. )
  489. text += ",\n%sexisting_server_default=%s" % (indent, rendered)
  490. if schema and not autogen_context._has_batch:
  491. text += ",\n%sschema=%r" % (indent, schema)
  492. text += ")"
  493. return text
  494. class _f_name:
  495. def __init__(self, prefix: str, name: conv) -> None:
  496. self.prefix = prefix
  497. self.name = name
  498. def __repr__(self) -> str:
  499. return "%sf(%r)" % (self.prefix, _ident(self.name))
  500. def _ident(name: Optional[Union[quoted_name, str]]) -> Optional[str]:
  501. """produce a __repr__() object for a string identifier that may
  502. use quoted_name() in SQLAlchemy 0.9 and greater.
  503. The issue worked around here is that quoted_name() doesn't have
  504. very good repr() behavior by itself when unicode is involved.
  505. """
  506. if name is None:
  507. return name
  508. elif isinstance(name, quoted_name):
  509. return str(name)
  510. elif isinstance(name, str):
  511. return name
  512. def _render_potential_expr(
  513. value: Any,
  514. autogen_context: AutogenContext,
  515. *,
  516. wrap_in_element: bool = True,
  517. is_server_default: bool = False,
  518. is_index: bool = False,
  519. ) -> str:
  520. if isinstance(value, sql.ClauseElement):
  521. sql_text = autogen_context.migration_context.impl.render_ddl_sql_expr(
  522. value, is_server_default=is_server_default, is_index=is_index
  523. )
  524. if wrap_in_element:
  525. prefix = _sqlalchemy_autogenerate_prefix(autogen_context)
  526. element = "literal_column" if is_index else "text"
  527. value_str = f"{prefix}{element}({sql_text!r})"
  528. if (
  529. is_index
  530. and isinstance(value, Label)
  531. and type(value.name) is str
  532. ):
  533. return value_str + f".label({value.name!r})"
  534. else:
  535. return value_str
  536. else:
  537. return repr(sql_text)
  538. else:
  539. return repr(value)
  540. def _get_index_rendered_expressions(
  541. idx: Index, autogen_context: AutogenContext
  542. ) -> List[str]:
  543. return [
  544. (
  545. repr(_ident(getattr(exp, "name", None)))
  546. if isinstance(exp, sa_schema.Column)
  547. else _render_potential_expr(exp, autogen_context, is_index=True)
  548. )
  549. for exp in idx.expressions
  550. ]
  551. def _uq_constraint(
  552. constraint: UniqueConstraint,
  553. autogen_context: AutogenContext,
  554. alter: bool,
  555. ) -> str:
  556. opts: List[Tuple[str, Any]] = []
  557. has_batch = autogen_context._has_batch
  558. if constraint.deferrable:
  559. opts.append(("deferrable", constraint.deferrable))
  560. if constraint.initially:
  561. opts.append(("initially", constraint.initially))
  562. if not has_batch and alter and constraint.table.schema:
  563. opts.append(("schema", _ident(constraint.table.schema)))
  564. if not alter and constraint.name:
  565. opts.append(
  566. ("name", _render_gen_name(autogen_context, constraint.name))
  567. )
  568. dialect_options = _render_dialect_kwargs_items(
  569. autogen_context, constraint.dialect_kwargs
  570. )
  571. if alter:
  572. args = [repr(_render_gen_name(autogen_context, constraint.name))]
  573. if not has_batch:
  574. args += [repr(_ident(constraint.table.name))]
  575. args.append(repr([_ident(col.name) for col in constraint.columns]))
  576. args.extend(["%s=%r" % (k, v) for k, v in opts])
  577. args.extend(dialect_options)
  578. return "%(prefix)screate_unique_constraint(%(args)s)" % {
  579. "prefix": _alembic_autogenerate_prefix(autogen_context),
  580. "args": ", ".join(args),
  581. }
  582. else:
  583. args = [repr(_ident(col.name)) for col in constraint.columns]
  584. args.extend(["%s=%r" % (k, v) for k, v in opts])
  585. args.extend(dialect_options)
  586. return "%(prefix)sUniqueConstraint(%(args)s)" % {
  587. "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
  588. "args": ", ".join(args),
  589. }
  590. def _user_autogenerate_prefix(autogen_context, target):
  591. prefix = autogen_context.opts["user_module_prefix"]
  592. if prefix is None:
  593. return "%s." % target.__module__
  594. else:
  595. return prefix
  596. def _sqlalchemy_autogenerate_prefix(autogen_context: AutogenContext) -> str:
  597. return autogen_context.opts["sqlalchemy_module_prefix"] or ""
  598. def _alembic_autogenerate_prefix(autogen_context: AutogenContext) -> str:
  599. if autogen_context._has_batch:
  600. return "batch_op."
  601. else:
  602. return autogen_context.opts["alembic_module_prefix"] or ""
  603. def _user_defined_render(
  604. type_: str, object_: Any, autogen_context: AutogenContext
  605. ) -> Union[str, Literal[False]]:
  606. if "render_item" in autogen_context.opts:
  607. render = autogen_context.opts["render_item"]
  608. if render:
  609. rendered = render(type_, object_, autogen_context)
  610. if rendered is not False:
  611. return rendered
  612. return False
  613. def _render_column(
  614. column: Column[Any], autogen_context: AutogenContext
  615. ) -> str:
  616. rendered = _user_defined_render("column", column, autogen_context)
  617. if rendered is not False:
  618. return rendered
  619. args: List[str] = []
  620. opts: List[Tuple[str, Any]] = []
  621. if column.server_default:
  622. rendered = _render_server_default( # type:ignore[assignment]
  623. column.server_default, autogen_context
  624. )
  625. if rendered:
  626. if _should_render_server_default_positionally(
  627. column.server_default
  628. ):
  629. args.append(rendered)
  630. else:
  631. opts.append(("server_default", rendered))
  632. if (
  633. column.autoincrement is not None
  634. and column.autoincrement != sqla_compat.AUTOINCREMENT_DEFAULT
  635. ):
  636. opts.append(("autoincrement", column.autoincrement))
  637. if column.nullable is not None:
  638. opts.append(("nullable", column.nullable))
  639. if column.system:
  640. opts.append(("system", column.system))
  641. comment = column.comment
  642. if comment:
  643. opts.append(("comment", "%r" % comment))
  644. # TODO: for non-ascii colname, assign a "key"
  645. return "%(prefix)sColumn(%(name)r, %(type)s, %(args)s%(kwargs)s)" % {
  646. "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
  647. "name": _ident(column.name),
  648. "type": _repr_type(column.type, autogen_context),
  649. "args": ", ".join([str(arg) for arg in args]) + ", " if args else "",
  650. "kwargs": (
  651. ", ".join(
  652. ["%s=%s" % (kwname, val) for kwname, val in opts]
  653. + [
  654. "%s=%s"
  655. % (key, _render_potential_expr(val, autogen_context))
  656. for key, val in column.kwargs.items()
  657. ]
  658. )
  659. ),
  660. }
  661. def _should_render_server_default_positionally(server_default: Any) -> bool:
  662. return sqla_compat._server_default_is_computed(
  663. server_default
  664. ) or sqla_compat._server_default_is_identity(server_default)
  665. def _render_server_default(
  666. default: Optional[
  667. Union[FetchedValue, str, TextClause, ColumnElement[Any]]
  668. ],
  669. autogen_context: AutogenContext,
  670. repr_: bool = True,
  671. ) -> Optional[str]:
  672. rendered = _user_defined_render("server_default", default, autogen_context)
  673. if rendered is not False:
  674. return rendered
  675. if sqla_compat._server_default_is_computed(default):
  676. return _render_computed(cast("Computed", default), autogen_context)
  677. elif sqla_compat._server_default_is_identity(default):
  678. return _render_identity(cast("Identity", default), autogen_context)
  679. elif isinstance(default, sa_schema.DefaultClause):
  680. if isinstance(default.arg, str):
  681. default = default.arg
  682. else:
  683. return _render_potential_expr(
  684. default.arg, autogen_context, is_server_default=True
  685. )
  686. elif isinstance(default, sa_schema.FetchedValue):
  687. return _render_fetched_value(autogen_context)
  688. if isinstance(default, str) and repr_:
  689. default = repr(re.sub(r"^'|'$", "", default))
  690. return cast(str, default)
  691. def _render_computed(
  692. computed: Computed, autogen_context: AutogenContext
  693. ) -> str:
  694. text = _render_potential_expr(
  695. computed.sqltext, autogen_context, wrap_in_element=False
  696. )
  697. kwargs = {}
  698. if computed.persisted is not None:
  699. kwargs["persisted"] = computed.persisted
  700. return "%(prefix)sComputed(%(text)s, %(kwargs)s)" % {
  701. "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
  702. "text": text,
  703. "kwargs": (", ".join("%s=%s" % pair for pair in kwargs.items())),
  704. }
  705. def _render_identity(
  706. identity: Identity, autogen_context: AutogenContext
  707. ) -> str:
  708. kwargs = sqla_compat._get_identity_options_dict(
  709. identity, dialect_kwargs=True
  710. )
  711. return "%(prefix)sIdentity(%(kwargs)s)" % {
  712. "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
  713. "kwargs": (", ".join("%s=%s" % pair for pair in kwargs.items())),
  714. }
  715. def _render_fetched_value(autogen_context: AutogenContext) -> str:
  716. return "%(prefix)sFetchedValue()" % {
  717. "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
  718. }
  719. def _repr_type(
  720. type_: TypeEngine,
  721. autogen_context: AutogenContext,
  722. _skip_variants: bool = False,
  723. ) -> str:
  724. rendered = _user_defined_render("type", type_, autogen_context)
  725. if rendered is not False:
  726. return rendered
  727. if hasattr(autogen_context.migration_context, "impl"):
  728. impl_rt = autogen_context.migration_context.impl.render_type(
  729. type_, autogen_context
  730. )
  731. else:
  732. impl_rt = None
  733. mod = type(type_).__module__
  734. imports = autogen_context.imports
  735. if not _skip_variants and sqla_compat._type_has_variants(type_):
  736. return _render_Variant_type(type_, autogen_context)
  737. elif mod.startswith("sqlalchemy.dialects"):
  738. match = re.match(r"sqlalchemy\.dialects\.(\w+)", mod)
  739. assert match is not None
  740. dname = match.group(1)
  741. if imports is not None:
  742. imports.add("from sqlalchemy.dialects import %s" % dname)
  743. if impl_rt:
  744. return impl_rt
  745. else:
  746. return "%s.%r" % (dname, type_)
  747. elif impl_rt:
  748. return impl_rt
  749. elif mod.startswith("sqlalchemy."):
  750. if "_render_%s_type" % type_.__visit_name__ in globals():
  751. fn = globals()["_render_%s_type" % type_.__visit_name__]
  752. return fn(type_, autogen_context)
  753. else:
  754. prefix = _sqlalchemy_autogenerate_prefix(autogen_context)
  755. return "%s%r" % (prefix, type_)
  756. else:
  757. prefix = _user_autogenerate_prefix(autogen_context, type_)
  758. return "%s%r" % (prefix, type_)
  759. def _render_ARRAY_type(type_: ARRAY, autogen_context: AutogenContext) -> str:
  760. return cast(
  761. str,
  762. _render_type_w_subtype(
  763. type_, autogen_context, "item_type", r"(.+?\()"
  764. ),
  765. )
  766. def _render_Variant_type(
  767. type_: TypeEngine, autogen_context: AutogenContext
  768. ) -> str:
  769. base_type, variant_mapping = sqla_compat._get_variant_mapping(type_)
  770. base = _repr_type(base_type, autogen_context, _skip_variants=True)
  771. assert base is not None and base is not False # type: ignore[comparison-overlap] # noqa:E501
  772. for dialect in sorted(variant_mapping):
  773. typ = variant_mapping[dialect]
  774. base += ".with_variant(%s, %r)" % (
  775. _repr_type(typ, autogen_context, _skip_variants=True),
  776. dialect,
  777. )
  778. return base
  779. def _render_type_w_subtype(
  780. type_: TypeEngine,
  781. autogen_context: AutogenContext,
  782. attrname: str,
  783. regexp: str,
  784. prefix: Optional[str] = None,
  785. ) -> Union[Optional[str], Literal[False]]:
  786. outer_repr = repr(type_)
  787. inner_type = getattr(type_, attrname, None)
  788. if inner_type is None:
  789. return False
  790. inner_repr = repr(inner_type)
  791. inner_repr = re.sub(r"([\(\)])", r"\\\1", inner_repr)
  792. sub_type = _repr_type(getattr(type_, attrname), autogen_context)
  793. outer_type = re.sub(regexp + inner_repr, r"\1%s" % sub_type, outer_repr)
  794. if prefix:
  795. return "%s%s" % (prefix, outer_type)
  796. mod = type(type_).__module__
  797. if mod.startswith("sqlalchemy.dialects"):
  798. match = re.match(r"sqlalchemy\.dialects\.(\w+)", mod)
  799. assert match is not None
  800. dname = match.group(1)
  801. return "%s.%s" % (dname, outer_type)
  802. elif mod.startswith("sqlalchemy"):
  803. prefix = _sqlalchemy_autogenerate_prefix(autogen_context)
  804. return "%s%s" % (prefix, outer_type)
  805. else:
  806. return None
  807. _constraint_renderers = util.Dispatcher()
  808. def _render_constraint(
  809. constraint: Constraint,
  810. autogen_context: AutogenContext,
  811. namespace_metadata: Optional[MetaData],
  812. ) -> Optional[str]:
  813. try:
  814. renderer = _constraint_renderers.dispatch(constraint)
  815. except ValueError:
  816. util.warn("No renderer is established for object %r" % constraint)
  817. return "[Unknown Python object %r]" % constraint
  818. else:
  819. return renderer(constraint, autogen_context, namespace_metadata)
  820. @_constraint_renderers.dispatch_for(sa_schema.PrimaryKeyConstraint)
  821. def _render_primary_key(
  822. constraint: PrimaryKeyConstraint,
  823. autogen_context: AutogenContext,
  824. namespace_metadata: Optional[MetaData],
  825. ) -> Optional[str]:
  826. rendered = _user_defined_render("primary_key", constraint, autogen_context)
  827. if rendered is not False:
  828. return rendered
  829. if not constraint.columns:
  830. return None
  831. opts = []
  832. if constraint.name:
  833. opts.append(
  834. ("name", repr(_render_gen_name(autogen_context, constraint.name)))
  835. )
  836. return "%(prefix)sPrimaryKeyConstraint(%(args)s)" % {
  837. "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
  838. "args": ", ".join(
  839. [repr(c.name) for c in constraint.columns]
  840. + ["%s=%s" % (kwname, val) for kwname, val in opts]
  841. ),
  842. }
  843. def _fk_colspec(
  844. fk: ForeignKey,
  845. metadata_schema: Optional[str],
  846. namespace_metadata: Optional[MetaData],
  847. ) -> str:
  848. """Implement a 'safe' version of ForeignKey._get_colspec() that
  849. won't fail if the remote table can't be resolved.
  850. """
  851. colspec = fk._get_colspec()
  852. tokens = colspec.split(".")
  853. tname, colname = tokens[-2:]
  854. if metadata_schema is not None and len(tokens) == 2:
  855. table_fullname = "%s.%s" % (metadata_schema, tname)
  856. else:
  857. table_fullname = ".".join(tokens[0:-1])
  858. if (
  859. not fk.link_to_name
  860. and fk.parent is not None
  861. and fk.parent.table is not None
  862. ):
  863. # try to resolve the remote table in order to adjust for column.key.
  864. # the FK constraint needs to be rendered in terms of the column
  865. # name.
  866. if (
  867. namespace_metadata is not None
  868. and table_fullname in namespace_metadata.tables
  869. ):
  870. col = namespace_metadata.tables[table_fullname].c.get(colname)
  871. if col is not None:
  872. colname = _ident(col.name) # type: ignore[assignment]
  873. colspec = "%s.%s" % (table_fullname, colname)
  874. return colspec
  875. def _populate_render_fk_opts(
  876. constraint: ForeignKeyConstraint, opts: List[Tuple[str, str]]
  877. ) -> None:
  878. if constraint.onupdate:
  879. opts.append(("onupdate", repr(constraint.onupdate)))
  880. if constraint.ondelete:
  881. opts.append(("ondelete", repr(constraint.ondelete)))
  882. if constraint.initially:
  883. opts.append(("initially", repr(constraint.initially)))
  884. if constraint.deferrable:
  885. opts.append(("deferrable", repr(constraint.deferrable)))
  886. if constraint.use_alter:
  887. opts.append(("use_alter", repr(constraint.use_alter)))
  888. if constraint.match:
  889. opts.append(("match", repr(constraint.match)))
  890. @_constraint_renderers.dispatch_for(sa_schema.ForeignKeyConstraint)
  891. def _render_foreign_key(
  892. constraint: ForeignKeyConstraint,
  893. autogen_context: AutogenContext,
  894. namespace_metadata: Optional[MetaData],
  895. ) -> Optional[str]:
  896. rendered = _user_defined_render("foreign_key", constraint, autogen_context)
  897. if rendered is not False:
  898. return rendered
  899. opts = []
  900. if constraint.name:
  901. opts.append(
  902. ("name", repr(_render_gen_name(autogen_context, constraint.name)))
  903. )
  904. _populate_render_fk_opts(constraint, opts)
  905. apply_metadata_schema = (
  906. namespace_metadata.schema if namespace_metadata is not None else None
  907. )
  908. return (
  909. "%(prefix)sForeignKeyConstraint([%(cols)s], "
  910. "[%(refcols)s], %(args)s)"
  911. % {
  912. "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
  913. "cols": ", ".join(
  914. repr(_ident(f.parent.name)) for f in constraint.elements
  915. ),
  916. "refcols": ", ".join(
  917. repr(_fk_colspec(f, apply_metadata_schema, namespace_metadata))
  918. for f in constraint.elements
  919. ),
  920. "args": ", ".join(
  921. ["%s=%s" % (kwname, val) for kwname, val in opts]
  922. ),
  923. }
  924. )
  925. @_constraint_renderers.dispatch_for(sa_schema.UniqueConstraint)
  926. def _render_unique_constraint(
  927. constraint: UniqueConstraint,
  928. autogen_context: AutogenContext,
  929. namespace_metadata: Optional[MetaData],
  930. ) -> str:
  931. rendered = _user_defined_render("unique", constraint, autogen_context)
  932. if rendered is not False:
  933. return rendered
  934. return _uq_constraint(constraint, autogen_context, False)
  935. @_constraint_renderers.dispatch_for(sa_schema.CheckConstraint)
  936. def _render_check_constraint(
  937. constraint: CheckConstraint,
  938. autogen_context: AutogenContext,
  939. namespace_metadata: Optional[MetaData],
  940. ) -> Optional[str]:
  941. rendered = _user_defined_render("check", constraint, autogen_context)
  942. if rendered is not False:
  943. return rendered
  944. # detect the constraint being part of
  945. # a parent type which is probably in the Table already.
  946. # ideally SQLAlchemy would give us more of a first class
  947. # way to detect this.
  948. if (
  949. constraint._create_rule
  950. and hasattr(constraint._create_rule, "target")
  951. and isinstance(
  952. constraint._create_rule.target,
  953. sqltypes.TypeEngine,
  954. )
  955. ):
  956. return None
  957. opts = []
  958. if constraint.name:
  959. opts.append(
  960. ("name", repr(_render_gen_name(autogen_context, constraint.name)))
  961. )
  962. return "%(prefix)sCheckConstraint(%(sqltext)s%(opts)s)" % {
  963. "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
  964. "opts": (
  965. ", " + (", ".join("%s=%s" % (k, v) for k, v in opts))
  966. if opts
  967. else ""
  968. ),
  969. "sqltext": _render_potential_expr(
  970. constraint.sqltext, autogen_context, wrap_in_element=False
  971. ),
  972. }
  973. @renderers.dispatch_for(ops.ExecuteSQLOp)
  974. def _execute_sql(autogen_context: AutogenContext, op: ops.ExecuteSQLOp) -> str:
  975. if not isinstance(op.sqltext, str):
  976. raise NotImplementedError(
  977. "Autogenerate rendering of SQL Expression language constructs "
  978. "not supported here; please use a plain SQL string"
  979. )
  980. return "{prefix}execute({sqltext!r})".format(
  981. prefix=_alembic_autogenerate_prefix(autogen_context),
  982. sqltext=op.sqltext,
  983. )
  984. renderers = default_renderers.branch()