provision.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. # dialects/mssql/provision.py
  2. # Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. # mypy: ignore-errors
  8. from sqlalchemy import inspect
  9. from sqlalchemy import Integer
  10. from ... import create_engine
  11. from ... import exc
  12. from ...schema import Column
  13. from ...schema import DropConstraint
  14. from ...schema import ForeignKeyConstraint
  15. from ...schema import MetaData
  16. from ...schema import Table
  17. from ...testing.provision import create_db
  18. from ...testing.provision import drop_all_schema_objects_pre_tables
  19. from ...testing.provision import drop_db
  20. from ...testing.provision import generate_driver_url
  21. from ...testing.provision import get_temp_table_name
  22. from ...testing.provision import log
  23. from ...testing.provision import normalize_sequence
  24. from ...testing.provision import post_configure_engine
  25. from ...testing.provision import run_reap_dbs
  26. from ...testing.provision import temp_table_keyword_args
  27. @post_configure_engine.for_db("mssql")
  28. def post_configure_engine(url, engine, follower_ident):
  29. if engine.driver == "pyodbc":
  30. engine.dialect.dbapi.pooling = False
  31. @generate_driver_url.for_db("mssql")
  32. def generate_driver_url(url, driver, query_str):
  33. backend = url.get_backend_name()
  34. new_url = url.set(drivername="%s+%s" % (backend, driver))
  35. if driver not in ("pyodbc", "aioodbc"):
  36. new_url = new_url.set(query="")
  37. if driver == "aioodbc":
  38. new_url = new_url.update_query_dict({"MARS_Connection": "Yes"})
  39. if query_str:
  40. new_url = new_url.update_query_string(query_str)
  41. try:
  42. new_url.get_dialect()
  43. except exc.NoSuchModuleError:
  44. return None
  45. else:
  46. return new_url
  47. @create_db.for_db("mssql")
  48. def _mssql_create_db(cfg, eng, ident):
  49. with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
  50. conn.exec_driver_sql("create database %s" % ident)
  51. conn.exec_driver_sql(
  52. "ALTER DATABASE %s SET ALLOW_SNAPSHOT_ISOLATION ON" % ident
  53. )
  54. conn.exec_driver_sql(
  55. "ALTER DATABASE %s SET READ_COMMITTED_SNAPSHOT ON" % ident
  56. )
  57. conn.exec_driver_sql("use %s" % ident)
  58. conn.exec_driver_sql("create schema test_schema")
  59. conn.exec_driver_sql("create schema test_schema_2")
  60. @drop_db.for_db("mssql")
  61. def _mssql_drop_db(cfg, eng, ident):
  62. with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
  63. _mssql_drop_ignore(conn, ident)
  64. def _mssql_drop_ignore(conn, ident):
  65. try:
  66. # typically when this happens, we can't KILL the session anyway,
  67. # so let the cleanup process drop the DBs
  68. # for row in conn.exec_driver_sql(
  69. # "select session_id from sys.dm_exec_sessions "
  70. # "where database_id=db_id('%s')" % ident):
  71. # log.info("killing SQL server session %s", row['session_id'])
  72. # conn.exec_driver_sql("kill %s" % row['session_id'])
  73. conn.exec_driver_sql("drop database %s" % ident)
  74. log.info("Reaped db: %s", ident)
  75. return True
  76. except exc.DatabaseError as err:
  77. log.warning("couldn't drop db: %s", err)
  78. return False
  79. @run_reap_dbs.for_db("mssql")
  80. def _reap_mssql_dbs(url, idents):
  81. log.info("db reaper connecting to %r", url)
  82. eng = create_engine(url)
  83. with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
  84. log.info("identifiers in file: %s", ", ".join(idents))
  85. to_reap = conn.exec_driver_sql(
  86. "select d.name from sys.databases as d where name "
  87. "like 'TEST_%' and not exists (select session_id "
  88. "from sys.dm_exec_sessions "
  89. "where database_id=d.database_id)"
  90. )
  91. all_names = {dbname.lower() for (dbname,) in to_reap}
  92. to_drop = set()
  93. for name in all_names:
  94. if name in idents:
  95. to_drop.add(name)
  96. dropped = total = 0
  97. for total, dbname in enumerate(to_drop, 1):
  98. if _mssql_drop_ignore(conn, dbname):
  99. dropped += 1
  100. log.info(
  101. "Dropped %d out of %d stale databases detected", dropped, total
  102. )
  103. @temp_table_keyword_args.for_db("mssql")
  104. def _mssql_temp_table_keyword_args(cfg, eng):
  105. return {}
  106. @get_temp_table_name.for_db("mssql")
  107. def _mssql_get_temp_table_name(cfg, eng, base_name):
  108. return "##" + base_name
  109. @drop_all_schema_objects_pre_tables.for_db("mssql")
  110. def drop_all_schema_objects_pre_tables(cfg, eng):
  111. with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
  112. inspector = inspect(conn)
  113. for schema in (None, "dbo", cfg.test_schema, cfg.test_schema_2):
  114. for tname in inspector.get_table_names(schema=schema):
  115. tb = Table(
  116. tname,
  117. MetaData(),
  118. Column("x", Integer),
  119. Column("y", Integer),
  120. schema=schema,
  121. )
  122. for fk in inspect(conn).get_foreign_keys(tname, schema=schema):
  123. conn.execute(
  124. DropConstraint(
  125. ForeignKeyConstraint(
  126. [tb.c.x], [tb.c.y], name=fk["name"]
  127. )
  128. )
  129. )
  130. @normalize_sequence.for_db("mssql")
  131. def normalize_sequence(cfg, sequence):
  132. if sequence.start is None:
  133. sequence.start = 1
  134. return sequence