env.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. import asyncio
  2. import logging
  3. from logging.config import fileConfig
  4. from sqlalchemy import MetaData
  5. from flask import current_app
  6. from alembic import context
  7. USE_TWOPHASE = False
  8. # this is the Alembic Config object, which provides
  9. # access to the values within the .ini file in use.
  10. config = context.config
  11. # Interpret the config file for Python logging.
  12. # This line sets up loggers basically.
  13. fileConfig(config.config_file_name)
  14. logger = logging.getLogger('alembic.env')
  15. def get_engine(bind_key=None):
  16. try:
  17. # this works with Flask-SQLAlchemy<3 and Alchemical
  18. return current_app.extensions['migrate'].db.get_engine(bind=bind_key)
  19. except (TypeError, AttributeError):
  20. # this works with Flask-SQLAlchemy>=3
  21. return current_app.extensions['migrate'].db.engines.get(bind_key)
  22. def get_engine_url(bind_key=None):
  23. try:
  24. return get_engine(bind_key).url.render_as_string(
  25. hide_password=False).replace('%', '%%')
  26. except AttributeError:
  27. return str(get_engine(bind_key).url).replace('%', '%%')
  28. # add your model's MetaData object here
  29. # for 'autogenerate' support
  30. # from myapp import mymodel
  31. # target_metadata = mymodel.Base.metadata
  32. config.set_main_option('sqlalchemy.url', get_engine_url())
  33. bind_names = []
  34. if current_app.config.get('SQLALCHEMY_BINDS') is not None:
  35. bind_names = list(current_app.config['SQLALCHEMY_BINDS'].keys())
  36. else:
  37. get_bind_names = getattr(current_app.extensions['migrate'].db,
  38. 'bind_names', None)
  39. if get_bind_names:
  40. bind_names = get_bind_names()
  41. for bind in bind_names:
  42. context.config.set_section_option(
  43. bind, "sqlalchemy.url", get_engine_url(bind_key=bind))
  44. target_db = current_app.extensions['migrate'].db
  45. # other values from the config, defined by the needs of env.py,
  46. # can be acquired:
  47. # my_important_option = config.get_main_option("my_important_option")
  48. # ... etc.
  49. def get_metadata(bind):
  50. """Return the metadata for a bind."""
  51. if bind == '':
  52. bind = None
  53. if hasattr(target_db, 'metadatas'):
  54. return target_db.metadatas[bind]
  55. # legacy, less flexible implementation
  56. m = MetaData()
  57. for t in target_db.metadata.tables.values():
  58. if t.info.get('bind_key') == bind:
  59. t.tometadata(m)
  60. return m
  61. def run_migrations_offline():
  62. """Run migrations in 'offline' mode.
  63. This configures the context with just a URL
  64. and not an Engine, though an Engine is acceptable
  65. here as well. By skipping the Engine creation
  66. we don't even need a DBAPI to be available.
  67. Calls to context.execute() here emit the given string to the
  68. script output.
  69. """
  70. # for the --sql use case, run migrations for each URL into
  71. # individual files.
  72. engines = {
  73. '': {
  74. 'url': context.config.get_main_option('sqlalchemy.url')
  75. }
  76. }
  77. for name in bind_names:
  78. engines[name] = rec = {}
  79. rec['url'] = context.config.get_section_option(name, "sqlalchemy.url")
  80. for name, rec in engines.items():
  81. logger.info("Migrating database %s" % (name or '<default>'))
  82. file_ = "%s.sql" % name
  83. logger.info("Writing output to %s" % file_)
  84. with open(file_, 'w') as buffer:
  85. context.configure(
  86. url=rec['url'],
  87. output_buffer=buffer,
  88. target_metadata=get_metadata(name),
  89. literal_binds=True,
  90. )
  91. with context.begin_transaction():
  92. context.run_migrations(engine_name=name)
  93. def do_run_migrations(_, engines):
  94. # this callback is used to prevent an auto-migration from being generated
  95. # when there are no changes to the schema
  96. # reference: http://alembic.zzzcomputing.com/en/latest/cookbook.html
  97. def process_revision_directives(context, revision, directives):
  98. if getattr(config.cmd_opts, 'autogenerate', False):
  99. script = directives[0]
  100. if len(script.upgrade_ops_list) >= len(bind_names) + 1:
  101. empty = True
  102. for upgrade_ops in script.upgrade_ops_list:
  103. if not upgrade_ops.is_empty():
  104. empty = False
  105. if empty:
  106. directives[:] = []
  107. logger.info('No changes in schema detected.')
  108. conf_args = current_app.extensions['migrate'].configure_args
  109. if conf_args.get("process_revision_directives") is None:
  110. conf_args["process_revision_directives"] = process_revision_directives
  111. for name, rec in engines.items():
  112. rec['sync_connection'] = conn = rec['connection']._sync_connection()
  113. if USE_TWOPHASE:
  114. rec['transaction'] = conn.begin_twophase()
  115. else:
  116. rec['transaction'] = conn.begin()
  117. try:
  118. for name, rec in engines.items():
  119. logger.info("Migrating database %s" % (name or '<default>'))
  120. context.configure(
  121. connection=rec['sync_connection'],
  122. upgrade_token="%s_upgrades" % name,
  123. downgrade_token="%s_downgrades" % name,
  124. target_metadata=get_metadata(name),
  125. **conf_args
  126. )
  127. context.run_migrations(engine_name=name)
  128. if USE_TWOPHASE:
  129. for rec in engines.values():
  130. rec['transaction'].prepare()
  131. for rec in engines.values():
  132. rec['transaction'].commit()
  133. except: # noqa: E722
  134. for rec in engines.values():
  135. rec['transaction'].rollback()
  136. raise
  137. finally:
  138. for rec in engines.values():
  139. rec['sync_connection'].close()
  140. async def run_migrations_online():
  141. """Run migrations in 'online' mode.
  142. In this scenario we need to create an Engine
  143. and associate a connection with the context.
  144. """
  145. # for the direct-to-DB use case, start a transaction on all
  146. # engines, then run all migrations, then commit all transactions.
  147. engines = {
  148. '': {'engine': get_engine()}
  149. }
  150. for name in bind_names:
  151. engines[name] = rec = {}
  152. rec['engine'] = get_engine(bind_key=name)
  153. for name, rec in engines.items():
  154. engine = rec['engine']
  155. rec['connection'] = await engine.connect().start()
  156. await engines['']['connection'].run_sync(do_run_migrations, engines)
  157. for rec in engines.values():
  158. await rec['connection'].close()
  159. if context.is_offline_mode():
  160. run_migrations_offline()
  161. else:
  162. asyncio.get_event_loop().run_until_complete(run_migrations_online())