sqlite.py 7.8 KB


  1. # mypy: allow-untyped-defs, allow-incomplete-defs, allow-untyped-calls
  2. # mypy: no-warn-return-any, allow-any-generics
  3. from __future__ import annotations
  4. import re
  5. from typing import Any
  6. from typing import Dict
  7. from typing import Optional
  8. from typing import TYPE_CHECKING
  9. from typing import Union
  10. from sqlalchemy import cast
  11. from sqlalchemy import Computed
  12. from sqlalchemy import JSON
  13. from sqlalchemy import schema
  14. from sqlalchemy import sql
  15. from .base import alter_table
  16. from .base import ColumnName
  17. from .base import format_column_name
  18. from .base import format_table_name
  19. from .base import RenameTable
  20. from .impl import DefaultImpl
  21. from .. import util
  22. from ..util.sqla_compat import compiles
  23. if TYPE_CHECKING:
  24. from sqlalchemy.engine.reflection import Inspector
  25. from sqlalchemy.sql.compiler import DDLCompiler
  26. from sqlalchemy.sql.elements import Cast
  27. from sqlalchemy.sql.elements import ClauseElement
  28. from sqlalchemy.sql.schema import Column
  29. from sqlalchemy.sql.schema import Constraint
  30. from sqlalchemy.sql.schema import Table
  31. from sqlalchemy.sql.type_api import TypeEngine
  32. from ..operations.batch import BatchOperationsImpl
  33. class SQLiteImpl(DefaultImpl):
  34. __dialect__ = "sqlite"
  35. transactional_ddl = False
  36. """SQLite supports transactional DDL, but pysqlite does not:
  37. see: http://bugs.python.org/issue10740
  38. """
  39. def requires_recreate_in_batch(
  40. self, batch_op: BatchOperationsImpl
  41. ) -> bool:
  42. """Return True if the given :class:`.BatchOperationsImpl`
  43. would need the table to be recreated and copied in order to
  44. proceed.
  45. Normally, only returns True on SQLite when operations other
  46. than add_column are present.
  47. """
  48. for op in batch_op.batch:
  49. if op[0] == "add_column":
  50. col = op[1][1]
  51. if isinstance(
  52. col.server_default, schema.DefaultClause
  53. ) and isinstance(col.server_default.arg, sql.ClauseElement):
  54. return True
  55. elif (
  56. isinstance(col.server_default, Computed)
  57. and col.server_default.persisted
  58. ):
  59. return True
  60. elif op[0] not in ("create_index", "drop_index"):
  61. return True
  62. else:
  63. return False
  64. def add_constraint(self, const: Constraint):
  65. # attempt to distinguish between an
  66. # auto-gen constraint and an explicit one
  67. if const._create_rule is None:
  68. raise NotImplementedError(
  69. "No support for ALTER of constraints in SQLite dialect. "
  70. "Please refer to the batch mode feature which allows for "
  71. "SQLite migrations using a copy-and-move strategy."
  72. )
  73. elif const._create_rule(self):
  74. util.warn(
  75. "Skipping unsupported ALTER for "
  76. "creation of implicit constraint. "
  77. "Please refer to the batch mode feature which allows for "
  78. "SQLite migrations using a copy-and-move strategy."
  79. )
  80. def drop_constraint(self, const: Constraint, **kw: Any):
  81. if const._create_rule is None:
  82. raise NotImplementedError(
  83. "No support for ALTER of constraints in SQLite dialect. "
  84. "Please refer to the batch mode feature which allows for "
  85. "SQLite migrations using a copy-and-move strategy."
  86. )
  87. def compare_server_default(
  88. self,
  89. inspector_column: Column[Any],
  90. metadata_column: Column[Any],
  91. rendered_metadata_default: Optional[str],
  92. rendered_inspector_default: Optional[str],
  93. ) -> bool:
  94. if rendered_metadata_default is not None:
  95. rendered_metadata_default = re.sub(
  96. r"^\((.+)\)$", r"\1", rendered_metadata_default
  97. )
  98. rendered_metadata_default = re.sub(
  99. r"^\"?'(.+)'\"?$", r"\1", rendered_metadata_default
  100. )
  101. if rendered_inspector_default is not None:
  102. rendered_inspector_default = re.sub(
  103. r"^\((.+)\)$", r"\1", rendered_inspector_default
  104. )
  105. rendered_inspector_default = re.sub(
  106. r"^\"?'(.+)'\"?$", r"\1", rendered_inspector_default
  107. )
  108. return rendered_inspector_default != rendered_metadata_default
  109. def _guess_if_default_is_unparenthesized_sql_expr(
  110. self, expr: Optional[str]
  111. ) -> bool:
  112. """Determine if a server default is a SQL expression or a constant.
  113. There are too many assertions that expect server defaults to round-trip
  114. identically without parenthesis added so we will add parens only in
  115. very specific cases.
  116. """
  117. if not expr:
  118. return False
  119. elif re.match(r"^[0-9\.]$", expr):
  120. return False
  121. elif re.match(r"^'.+'$", expr):
  122. return False
  123. elif re.match(r"^\(.+\)$", expr):
  124. return False
  125. else:
  126. return True
  127. def autogen_column_reflect(
  128. self,
  129. inspector: Inspector,
  130. table: Table,
  131. column_info: Dict[str, Any],
  132. ) -> None:
  133. # SQLite expression defaults require parenthesis when sent
  134. # as DDL
  135. if self._guess_if_default_is_unparenthesized_sql_expr(
  136. column_info.get("default", None)
  137. ):
  138. column_info["default"] = "(%s)" % (column_info["default"],)
  139. def render_ddl_sql_expr(
  140. self, expr: ClauseElement, is_server_default: bool = False, **kw
  141. ) -> str:
  142. # SQLite expression defaults require parenthesis when sent
  143. # as DDL
  144. str_expr = super().render_ddl_sql_expr(
  145. expr, is_server_default=is_server_default, **kw
  146. )
  147. if (
  148. is_server_default
  149. and self._guess_if_default_is_unparenthesized_sql_expr(str_expr)
  150. ):
  151. str_expr = "(%s)" % (str_expr,)
  152. return str_expr
  153. def cast_for_batch_migrate(
  154. self,
  155. existing: Column[Any],
  156. existing_transfer: Dict[str, Union[TypeEngine, Cast]],
  157. new_type: TypeEngine,
  158. ) -> None:
  159. if (
  160. existing.type._type_affinity is not new_type._type_affinity
  161. and not isinstance(new_type, JSON)
  162. ):
  163. existing_transfer["expr"] = cast(
  164. existing_transfer["expr"], new_type
  165. )
  166. def correct_for_autogen_constraints(
  167. self,
  168. conn_unique_constraints,
  169. conn_indexes,
  170. metadata_unique_constraints,
  171. metadata_indexes,
  172. ):
  173. self._skip_functional_indexes(metadata_indexes, conn_indexes)
  174. @compiles(RenameTable, "sqlite")
  175. def visit_rename_table(
  176. element: RenameTable, compiler: DDLCompiler, **kw
  177. ) -> str:
  178. return "%s RENAME TO %s" % (
  179. alter_table(compiler, element.table_name, element.schema),
  180. format_table_name(compiler, element.new_table_name, None),
  181. )
  182. @compiles(ColumnName, "sqlite")
  183. def visit_column_name(element: ColumnName, compiler: DDLCompiler, **kw) -> str:
  184. return "%s RENAME COLUMN %s TO %s" % (
  185. alter_table(compiler, element.table_name, element.schema),
  186. format_column_name(compiler, element.column_name),
  187. format_column_name(compiler, element.newname),
  188. )
  189. # @compiles(AddColumn, 'sqlite')
  190. # def visit_add_column(element, compiler, **kw):
  191. # return "%s %s" % (
  192. # alter_table(compiler, element.table_name, element.schema),
  193. # add_column(compiler, element.column, **kw)
  194. # )
  195. # def add_column(compiler, column, **kw):
  196. # text = "ADD COLUMN %s" % compiler.get_column_specification(column, **kw)
  197. # need to modify SQLAlchemy so that the CHECK associated with a Boolean
  198. # or Enum gets placed as part of the column constraints, not the Table
  199. # see ticket 98
  200. # for const in column.constraints:
  201. # text += compiler.process(AddConstraint(const))
  202. # return text