toimpl.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. # mypy: allow-untyped-defs, allow-incomplete-defs, allow-untyped-calls
  2. # mypy: no-warn-return-any, allow-any-generics
  3. from typing import TYPE_CHECKING
  4. from sqlalchemy import schema as sa_schema
  5. from . import ops
  6. from .base import Operations
  7. from ..util.sqla_compat import _copy
  8. from ..util.sqla_compat import sqla_2
  9. if TYPE_CHECKING:
  10. from sqlalchemy.sql.schema import Table
  11. @Operations.implementation_for(ops.AlterColumnOp)
  12. def alter_column(
  13. operations: "Operations", operation: "ops.AlterColumnOp"
  14. ) -> None:
  15. compiler = operations.impl.dialect.statement_compiler(
  16. operations.impl.dialect, None
  17. )
  18. existing_type = operation.existing_type
  19. existing_nullable = operation.existing_nullable
  20. existing_server_default = operation.existing_server_default
  21. type_ = operation.modify_type
  22. column_name = operation.column_name
  23. table_name = operation.table_name
  24. schema = operation.schema
  25. server_default = operation.modify_server_default
  26. new_column_name = operation.modify_name
  27. nullable = operation.modify_nullable
  28. comment = operation.modify_comment
  29. existing_comment = operation.existing_comment
  30. def _count_constraint(constraint):
  31. return not isinstance(constraint, sa_schema.PrimaryKeyConstraint) and (
  32. not constraint._create_rule or constraint._create_rule(compiler)
  33. )
  34. if existing_type and type_:
  35. t = operations.schema_obj.table(
  36. table_name,
  37. sa_schema.Column(column_name, existing_type),
  38. schema=schema,
  39. )
  40. for constraint in t.constraints:
  41. if _count_constraint(constraint):
  42. operations.impl.drop_constraint(constraint)
  43. operations.impl.alter_column(
  44. table_name,
  45. column_name,
  46. nullable=nullable,
  47. server_default=server_default,
  48. name=new_column_name,
  49. type_=type_,
  50. schema=schema,
  51. existing_type=existing_type,
  52. existing_server_default=existing_server_default,
  53. existing_nullable=existing_nullable,
  54. comment=comment,
  55. existing_comment=existing_comment,
  56. **operation.kw,
  57. )
  58. if type_:
  59. t = operations.schema_obj.table(
  60. table_name,
  61. operations.schema_obj.column(column_name, type_),
  62. schema=schema,
  63. )
  64. for constraint in t.constraints:
  65. if _count_constraint(constraint):
  66. operations.impl.add_constraint(constraint)
  67. @Operations.implementation_for(ops.DropTableOp)
  68. def drop_table(operations: "Operations", operation: "ops.DropTableOp") -> None:
  69. kw = {}
  70. if operation.if_exists is not None:
  71. kw["if_exists"] = operation.if_exists
  72. operations.impl.drop_table(
  73. operation.to_table(operations.migration_context), **kw
  74. )
  75. @Operations.implementation_for(ops.DropColumnOp)
  76. def drop_column(
  77. operations: "Operations", operation: "ops.DropColumnOp"
  78. ) -> None:
  79. column = operation.to_column(operations.migration_context)
  80. operations.impl.drop_column(
  81. operation.table_name,
  82. column,
  83. schema=operation.schema,
  84. if_exists=operation.if_exists,
  85. **operation.kw,
  86. )
  87. @Operations.implementation_for(ops.CreateIndexOp)
  88. def create_index(
  89. operations: "Operations", operation: "ops.CreateIndexOp"
  90. ) -> None:
  91. idx = operation.to_index(operations.migration_context)
  92. kw = {}
  93. if operation.if_not_exists is not None:
  94. kw["if_not_exists"] = operation.if_not_exists
  95. operations.impl.create_index(idx, **kw)
  96. @Operations.implementation_for(ops.DropIndexOp)
  97. def drop_index(operations: "Operations", operation: "ops.DropIndexOp") -> None:
  98. kw = {}
  99. if operation.if_exists is not None:
  100. kw["if_exists"] = operation.if_exists
  101. operations.impl.drop_index(
  102. operation.to_index(operations.migration_context),
  103. **kw,
  104. )
  105. @Operations.implementation_for(ops.CreateTableOp)
  106. def create_table(
  107. operations: "Operations", operation: "ops.CreateTableOp"
  108. ) -> "Table":
  109. kw = {}
  110. if operation.if_not_exists is not None:
  111. kw["if_not_exists"] = operation.if_not_exists
  112. table = operation.to_table(operations.migration_context)
  113. operations.impl.create_table(table, **kw)
  114. return table
  115. @Operations.implementation_for(ops.RenameTableOp)
  116. def rename_table(
  117. operations: "Operations", operation: "ops.RenameTableOp"
  118. ) -> None:
  119. operations.impl.rename_table(
  120. operation.table_name, operation.new_table_name, schema=operation.schema
  121. )
  122. @Operations.implementation_for(ops.CreateTableCommentOp)
  123. def create_table_comment(
  124. operations: "Operations", operation: "ops.CreateTableCommentOp"
  125. ) -> None:
  126. table = operation.to_table(operations.migration_context)
  127. operations.impl.create_table_comment(table)
  128. @Operations.implementation_for(ops.DropTableCommentOp)
  129. def drop_table_comment(
  130. operations: "Operations", operation: "ops.DropTableCommentOp"
  131. ) -> None:
  132. table = operation.to_table(operations.migration_context)
  133. operations.impl.drop_table_comment(table)
  134. @Operations.implementation_for(ops.AddColumnOp)
  135. def add_column(operations: "Operations", operation: "ops.AddColumnOp") -> None:
  136. table_name = operation.table_name
  137. column = operation.column
  138. schema = operation.schema
  139. kw = operation.kw
  140. if column.table is not None:
  141. column = _copy(column)
  142. t = operations.schema_obj.table(table_name, column, schema=schema)
  143. operations.impl.add_column(
  144. table_name,
  145. column,
  146. schema=schema,
  147. if_not_exists=operation.if_not_exists,
  148. **kw,
  149. )
  150. for constraint in t.constraints:
  151. if not isinstance(constraint, sa_schema.PrimaryKeyConstraint):
  152. operations.impl.add_constraint(constraint)
  153. for index in t.indexes:
  154. operations.impl.create_index(index)
  155. with_comment = (
  156. operations.impl.dialect.supports_comments
  157. and not operations.impl.dialect.inline_comments
  158. )
  159. comment = column.comment
  160. if comment and with_comment:
  161. operations.impl.create_column_comment(column)
  162. @Operations.implementation_for(ops.AddConstraintOp)
  163. def create_constraint(
  164. operations: "Operations", operation: "ops.AddConstraintOp"
  165. ) -> None:
  166. operations.impl.add_constraint(
  167. operation.to_constraint(operations.migration_context)
  168. )
  169. @Operations.implementation_for(ops.DropConstraintOp)
  170. def drop_constraint(
  171. operations: "Operations", operation: "ops.DropConstraintOp"
  172. ) -> None:
  173. kw = {}
  174. if operation.if_exists is not None:
  175. if not sqla_2:
  176. raise NotImplementedError("SQLAlchemy 2.0 required")
  177. kw["if_exists"] = operation.if_exists
  178. operations.impl.drop_constraint(
  179. operations.schema_obj.generic_constraint(
  180. operation.constraint_name,
  181. operation.table_name,
  182. operation.constraint_type,
  183. schema=operation.schema,
  184. ),
  185. **kw,
  186. )
  187. @Operations.implementation_for(ops.BulkInsertOp)
  188. def bulk_insert(
  189. operations: "Operations", operation: "ops.BulkInsertOp"
  190. ) -> None:
  191. operations.impl.bulk_insert( # type: ignore[union-attr]
  192. operation.table, operation.rows, multiinsert=operation.multiinsert
  193. )
  194. @Operations.implementation_for(ops.ExecuteSQLOp)
  195. def execute_sql(
  196. operations: "Operations", operation: "ops.ExecuteSQLOp"
  197. ) -> None:
  198. operations.migration_context.impl.execute(
  199. operation.sqltext, execution_options=operation.execution_options
  200. )