provision.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. # dialects/mysql/provision.py
  2. # Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. # mypy: ignore-errors
  8. from ... import exc
  9. from ...testing.provision import configure_follower
  10. from ...testing.provision import create_db
  11. from ...testing.provision import drop_db
  12. from ...testing.provision import generate_driver_url
  13. from ...testing.provision import temp_table_keyword_args
  14. from ...testing.provision import upsert
  15. @generate_driver_url.for_db("mysql", "mariadb")
  16. def generate_driver_url(url, driver, query_str):
  17. backend = url.get_backend_name()
  18. # NOTE: at the moment, tests are running mariadbconnector
  19. # against both mariadb and mysql backends. if we want this to be
  20. # limited, do the decision making here to reject a "mysql+mariadbconnector"
  21. # URL. Optionally also re-enable the module level
  22. # MySQLDialect_mariadbconnector.is_mysql flag as well, which must include
  23. # a unit and/or functional test.
  24. # all the Jenkins tests have been running mysqlclient Python library
  25. # built against mariadb client drivers for years against all MySQL /
  26. # MariaDB versions going back to MySQL 5.6, currently they can talk
  27. # to MySQL databases without problems.
  28. if backend == "mysql":
  29. dialect_cls = url.get_dialect()
  30. if dialect_cls._is_mariadb_from_url(url):
  31. backend = "mariadb"
  32. new_url = url.set(
  33. drivername="%s+%s" % (backend, driver)
  34. ).update_query_string(query_str)
  35. if driver == "mariadbconnector":
  36. new_url = new_url.difference_update_query(["charset"])
  37. elif driver == "mysqlconnector":
  38. new_url = new_url.update_query_pairs(
  39. [("collation", "utf8mb4_general_ci")]
  40. )
  41. try:
  42. new_url.get_dialect()
  43. except exc.NoSuchModuleError:
  44. return None
  45. else:
  46. return new_url
  47. @create_db.for_db("mysql", "mariadb")
  48. def _mysql_create_db(cfg, eng, ident):
  49. with eng.begin() as conn:
  50. try:
  51. _mysql_drop_db(cfg, conn, ident)
  52. except Exception:
  53. pass
  54. with eng.begin() as conn:
  55. conn.exec_driver_sql(
  56. "CREATE DATABASE %s CHARACTER SET utf8mb4" % ident
  57. )
  58. conn.exec_driver_sql(
  59. "CREATE DATABASE %s_test_schema CHARACTER SET utf8mb4" % ident
  60. )
  61. conn.exec_driver_sql(
  62. "CREATE DATABASE %s_test_schema_2 CHARACTER SET utf8mb4" % ident
  63. )
  64. @configure_follower.for_db("mysql", "mariadb")
  65. def _mysql_configure_follower(config, ident):
  66. config.test_schema = "%s_test_schema" % ident
  67. config.test_schema_2 = "%s_test_schema_2" % ident
  68. @drop_db.for_db("mysql", "mariadb")
  69. def _mysql_drop_db(cfg, eng, ident):
  70. with eng.begin() as conn:
  71. conn.exec_driver_sql("DROP DATABASE %s_test_schema" % ident)
  72. conn.exec_driver_sql("DROP DATABASE %s_test_schema_2" % ident)
  73. conn.exec_driver_sql("DROP DATABASE %s" % ident)
  74. @temp_table_keyword_args.for_db("mysql", "mariadb")
  75. def _mysql_temp_table_keyword_args(cfg, eng):
  76. return {"prefixes": ["TEMPORARY"]}
  77. @upsert.for_db("mariadb")
  78. def _upsert(
  79. cfg, table, returning, *, set_lambda=None, sort_by_parameter_order=False
  80. ):
  81. from sqlalchemy.dialects.mysql import insert
  82. stmt = insert(table)
  83. if set_lambda:
  84. stmt = stmt.on_duplicate_key_update(**set_lambda(stmt.inserted))
  85. else:
  86. pk1 = table.primary_key.c[0]
  87. stmt = stmt.on_duplicate_key_update({pk1.key: pk1})
  88. stmt = stmt.returning(
  89. *returning, sort_by_parameter_order=sort_by_parameter_order
  90. )
  91. return stmt