mssql.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421
  1. # mypy: allow-untyped-defs, allow-incomplete-defs, allow-untyped-calls
  2. # mypy: no-warn-return-any, allow-any-generics
  3. from __future__ import annotations
  4. import re
  5. from typing import Any
  6. from typing import Dict
  7. from typing import List
  8. from typing import Optional
  9. from typing import TYPE_CHECKING
  10. from typing import Union
  11. from sqlalchemy import types as sqltypes
  12. from sqlalchemy.schema import Column
  13. from sqlalchemy.schema import CreateIndex
  14. from sqlalchemy.sql.base import Executable
  15. from sqlalchemy.sql.elements import ClauseElement
  16. from .base import AddColumn
  17. from .base import alter_column
  18. from .base import alter_table
  19. from .base import ColumnDefault
  20. from .base import ColumnName
  21. from .base import ColumnNullable
  22. from .base import ColumnType
  23. from .base import format_column_name
  24. from .base import format_server_default
  25. from .base import format_table_name
  26. from .base import format_type
  27. from .base import RenameTable
  28. from .impl import DefaultImpl
  29. from .. import util
  30. from ..util import sqla_compat
  31. from ..util.sqla_compat import compiles
  32. if TYPE_CHECKING:
  33. from typing import Literal
  34. from sqlalchemy.dialects.mssql.base import MSDDLCompiler
  35. from sqlalchemy.dialects.mssql.base import MSSQLCompiler
  36. from sqlalchemy.engine.cursor import CursorResult
  37. from sqlalchemy.sql.schema import Index
  38. from sqlalchemy.sql.schema import Table
  39. from sqlalchemy.sql.selectable import TableClause
  40. from sqlalchemy.sql.type_api import TypeEngine
  41. from .base import _ServerDefault
  42. class MSSQLImpl(DefaultImpl):
  43. __dialect__ = "mssql"
  44. transactional_ddl = True
  45. batch_separator = "GO"
  46. type_synonyms = DefaultImpl.type_synonyms + ({"VARCHAR", "NVARCHAR"},)
  47. identity_attrs_ignore = DefaultImpl.identity_attrs_ignore + (
  48. "minvalue",
  49. "maxvalue",
  50. "nominvalue",
  51. "nomaxvalue",
  52. "cycle",
  53. "cache",
  54. )
  55. def __init__(self, *arg, **kw) -> None:
  56. super().__init__(*arg, **kw)
  57. self.batch_separator = self.context_opts.get(
  58. "mssql_batch_separator", self.batch_separator
  59. )
  60. def _exec(self, construct: Any, *args, **kw) -> Optional[CursorResult]:
  61. result = super()._exec(construct, *args, **kw)
  62. if self.as_sql and self.batch_separator:
  63. self.static_output(self.batch_separator)
  64. return result
  65. def emit_begin(self) -> None:
  66. self.static_output("BEGIN TRANSACTION" + self.command_terminator)
  67. def emit_commit(self) -> None:
  68. super().emit_commit()
  69. if self.as_sql and self.batch_separator:
  70. self.static_output(self.batch_separator)
  71. def alter_column(
  72. self,
  73. table_name: str,
  74. column_name: str,
  75. *,
  76. nullable: Optional[bool] = None,
  77. server_default: Optional[
  78. Union[_ServerDefault, Literal[False]]
  79. ] = False,
  80. name: Optional[str] = None,
  81. type_: Optional[TypeEngine] = None,
  82. schema: Optional[str] = None,
  83. existing_type: Optional[TypeEngine] = None,
  84. existing_server_default: Optional[_ServerDefault] = None,
  85. existing_nullable: Optional[bool] = None,
  86. **kw: Any,
  87. ) -> None:
  88. if nullable is not None:
  89. if type_ is not None:
  90. # the NULL/NOT NULL alter will handle
  91. # the type alteration
  92. existing_type = type_
  93. type_ = None
  94. elif existing_type is None:
  95. raise util.CommandError(
  96. "MS-SQL ALTER COLUMN operations "
  97. "with NULL or NOT NULL require the "
  98. "existing_type or a new type_ be passed."
  99. )
  100. elif existing_nullable is not None and type_ is not None:
  101. nullable = existing_nullable
  102. # the NULL/NOT NULL alter will handle
  103. # the type alteration
  104. existing_type = type_
  105. type_ = None
  106. elif type_ is not None:
  107. util.warn(
  108. "MS-SQL ALTER COLUMN operations that specify type_= "
  109. "should also specify a nullable= or "
  110. "existing_nullable= argument to avoid implicit conversion "
  111. "of NOT NULL columns to NULL."
  112. )
  113. used_default = False
  114. if sqla_compat._server_default_is_identity(
  115. server_default, existing_server_default
  116. ) or sqla_compat._server_default_is_computed(
  117. server_default, existing_server_default
  118. ):
  119. used_default = True
  120. kw["server_default"] = server_default
  121. kw["existing_server_default"] = existing_server_default
  122. super().alter_column(
  123. table_name,
  124. column_name,
  125. nullable=nullable,
  126. type_=type_,
  127. schema=schema,
  128. existing_type=existing_type,
  129. existing_nullable=existing_nullable,
  130. **kw,
  131. )
  132. if server_default is not False and used_default is False:
  133. if existing_server_default is not False or server_default is None:
  134. self._exec(
  135. _ExecDropConstraint(
  136. table_name,
  137. column_name,
  138. "sys.default_constraints",
  139. schema,
  140. )
  141. )
  142. if server_default is not None:
  143. super().alter_column(
  144. table_name,
  145. column_name,
  146. schema=schema,
  147. server_default=server_default,
  148. )
  149. if name is not None:
  150. super().alter_column(
  151. table_name, column_name, schema=schema, name=name
  152. )
  153. def create_index(self, index: Index, **kw: Any) -> None:
  154. # this likely defaults to None if not present, so get()
  155. # should normally not return the default value. being
  156. # defensive in any case
  157. mssql_include = index.kwargs.get("mssql_include", None) or ()
  158. assert index.table is not None
  159. for col in mssql_include:
  160. if col not in index.table.c:
  161. index.table.append_column(Column(col, sqltypes.NullType))
  162. self._exec(CreateIndex(index, **kw))
  163. def bulk_insert( # type:ignore[override]
  164. self, table: Union[TableClause, Table], rows: List[dict], **kw: Any
  165. ) -> None:
  166. if self.as_sql:
  167. self._exec(
  168. "SET IDENTITY_INSERT %s ON"
  169. % self.dialect.identifier_preparer.format_table(table)
  170. )
  171. super().bulk_insert(table, rows, **kw)
  172. self._exec(
  173. "SET IDENTITY_INSERT %s OFF"
  174. % self.dialect.identifier_preparer.format_table(table)
  175. )
  176. else:
  177. super().bulk_insert(table, rows, **kw)
  178. def drop_column(
  179. self,
  180. table_name: str,
  181. column: Column[Any],
  182. *,
  183. schema: Optional[str] = None,
  184. **kw,
  185. ) -> None:
  186. drop_default = kw.pop("mssql_drop_default", False)
  187. if drop_default:
  188. self._exec(
  189. _ExecDropConstraint(
  190. table_name, column, "sys.default_constraints", schema
  191. )
  192. )
  193. drop_check = kw.pop("mssql_drop_check", False)
  194. if drop_check:
  195. self._exec(
  196. _ExecDropConstraint(
  197. table_name, column, "sys.check_constraints", schema
  198. )
  199. )
  200. drop_fks = kw.pop("mssql_drop_foreign_key", False)
  201. if drop_fks:
  202. self._exec(_ExecDropFKConstraint(table_name, column, schema))
  203. super().drop_column(table_name, column, schema=schema, **kw)
  204. def compare_server_default(
  205. self,
  206. inspector_column,
  207. metadata_column,
  208. rendered_metadata_default,
  209. rendered_inspector_default,
  210. ):
  211. if rendered_metadata_default is not None:
  212. rendered_metadata_default = re.sub(
  213. r"[\(\) \"\']", "", rendered_metadata_default
  214. )
  215. if rendered_inspector_default is not None:
  216. # SQL Server collapses whitespace and adds arbitrary parenthesis
  217. # within expressions. our only option is collapse all of it
  218. rendered_inspector_default = re.sub(
  219. r"[\(\) \"\']", "", rendered_inspector_default
  220. )
  221. return rendered_inspector_default != rendered_metadata_default
  222. def _compare_identity_default(self, metadata_identity, inspector_identity):
  223. diff, ignored, is_alter = super()._compare_identity_default(
  224. metadata_identity, inspector_identity
  225. )
  226. if (
  227. metadata_identity is None
  228. and inspector_identity is not None
  229. and not diff
  230. and inspector_identity.column is not None
  231. and inspector_identity.column.primary_key
  232. ):
  233. # mssql reflect primary keys with autoincrement as identity
  234. # columns. if no different attributes are present ignore them
  235. is_alter = False
  236. return diff, ignored, is_alter
  237. def adjust_reflected_dialect_options(
  238. self, reflected_object: Dict[str, Any], kind: str
  239. ) -> Dict[str, Any]:
  240. options: Dict[str, Any]
  241. options = reflected_object.get("dialect_options", {}).copy()
  242. if not options.get("mssql_include"):
  243. options.pop("mssql_include", None)
  244. if not options.get("mssql_clustered"):
  245. options.pop("mssql_clustered", None)
  246. return options
  247. class _ExecDropConstraint(Executable, ClauseElement):
  248. inherit_cache = False
  249. def __init__(
  250. self,
  251. tname: str,
  252. colname: Union[Column[Any], str],
  253. type_: str,
  254. schema: Optional[str],
  255. ) -> None:
  256. self.tname = tname
  257. self.colname = colname
  258. self.type_ = type_
  259. self.schema = schema
  260. class _ExecDropFKConstraint(Executable, ClauseElement):
  261. inherit_cache = False
  262. def __init__(
  263. self, tname: str, colname: Column[Any], schema: Optional[str]
  264. ) -> None:
  265. self.tname = tname
  266. self.colname = colname
  267. self.schema = schema
  268. @compiles(_ExecDropConstraint, "mssql")
  269. def _exec_drop_col_constraint(
  270. element: _ExecDropConstraint, compiler: MSSQLCompiler, **kw
  271. ) -> str:
  272. schema, tname, colname, type_ = (
  273. element.schema,
  274. element.tname,
  275. element.colname,
  276. element.type_,
  277. )
  278. # from http://www.mssqltips.com/sqlservertip/1425/\
  279. # working-with-default-constraints-in-sql-server/
  280. return """declare @const_name varchar(256)
  281. select @const_name = QUOTENAME([name]) from %(type)s
  282. where parent_object_id = object_id('%(schema_dot)s%(tname)s')
  283. and col_name(parent_object_id, parent_column_id) = '%(colname)s'
  284. exec('alter table %(tname_quoted)s drop constraint ' + @const_name)""" % {
  285. "type": type_,
  286. "tname": tname,
  287. "colname": colname,
  288. "tname_quoted": format_table_name(compiler, tname, schema),
  289. "schema_dot": schema + "." if schema else "",
  290. }
  291. @compiles(_ExecDropFKConstraint, "mssql")
  292. def _exec_drop_col_fk_constraint(
  293. element: _ExecDropFKConstraint, compiler: MSSQLCompiler, **kw
  294. ) -> str:
  295. schema, tname, colname = element.schema, element.tname, element.colname
  296. return """declare @const_name varchar(256)
  297. select @const_name = QUOTENAME([name]) from
  298. sys.foreign_keys fk join sys.foreign_key_columns fkc
  299. on fk.object_id=fkc.constraint_object_id
  300. where fkc.parent_object_id = object_id('%(schema_dot)s%(tname)s')
  301. and col_name(fkc.parent_object_id, fkc.parent_column_id) = '%(colname)s'
  302. exec('alter table %(tname_quoted)s drop constraint ' + @const_name)""" % {
  303. "tname": tname,
  304. "colname": colname,
  305. "tname_quoted": format_table_name(compiler, tname, schema),
  306. "schema_dot": schema + "." if schema else "",
  307. }
  308. @compiles(AddColumn, "mssql")
  309. def visit_add_column(element: AddColumn, compiler: MSDDLCompiler, **kw) -> str:
  310. return "%s %s" % (
  311. alter_table(compiler, element.table_name, element.schema),
  312. mssql_add_column(compiler, element.column, **kw),
  313. )
  314. def mssql_add_column(
  315. compiler: MSDDLCompiler, column: Column[Any], **kw
  316. ) -> str:
  317. return "ADD %s" % compiler.get_column_specification(column, **kw)
  318. @compiles(ColumnNullable, "mssql")
  319. def visit_column_nullable(
  320. element: ColumnNullable, compiler: MSDDLCompiler, **kw
  321. ) -> str:
  322. return "%s %s %s %s" % (
  323. alter_table(compiler, element.table_name, element.schema),
  324. alter_column(compiler, element.column_name),
  325. format_type(compiler, element.existing_type), # type: ignore[arg-type]
  326. "NULL" if element.nullable else "NOT NULL",
  327. )
  328. @compiles(ColumnDefault, "mssql")
  329. def visit_column_default(
  330. element: ColumnDefault, compiler: MSDDLCompiler, **kw
  331. ) -> str:
  332. # TODO: there can also be a named constraint
  333. # with ADD CONSTRAINT here
  334. return "%s ADD DEFAULT %s FOR %s" % (
  335. alter_table(compiler, element.table_name, element.schema),
  336. format_server_default(compiler, element.default),
  337. format_column_name(compiler, element.column_name),
  338. )
  339. @compiles(ColumnName, "mssql")
  340. def visit_rename_column(
  341. element: ColumnName, compiler: MSDDLCompiler, **kw
  342. ) -> str:
  343. return "EXEC sp_rename '%s.%s', %s, 'COLUMN'" % (
  344. format_table_name(compiler, element.table_name, element.schema),
  345. format_column_name(compiler, element.column_name),
  346. format_column_name(compiler, element.newname),
  347. )
  348. @compiles(ColumnType, "mssql")
  349. def visit_column_type(
  350. element: ColumnType, compiler: MSDDLCompiler, **kw
  351. ) -> str:
  352. return "%s %s %s" % (
  353. alter_table(compiler, element.table_name, element.schema),
  354. alter_column(compiler, element.column_name),
  355. format_type(compiler, element.type_),
  356. )
  357. @compiles(RenameTable, "mssql")
  358. def visit_rename_table(
  359. element: RenameTable, compiler: MSDDLCompiler, **kw
  360. ) -> str:
  361. return "EXEC sp_rename '%s', %s" % (
  362. format_table_name(compiler, element.table_name, element.schema),
  363. format_table_name(compiler, element.new_table_name, None),
  364. )