| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202 |
- import asyncio
- import logging
- from logging.config import fileConfig
- from sqlalchemy import MetaData
- from flask import current_app
- from alembic import context
- USE_TWOPHASE = False
- # this is the Alembic Config object, which provides
- # access to the values within the .ini file in use.
- config = context.config
- # Interpret the config file for Python logging.
- # This line sets up loggers basically.
- fileConfig(config.config_file_name)
- logger = logging.getLogger('alembic.env')
- def get_engine(bind_key=None):
- try:
- # this works with Flask-SQLAlchemy<3 and Alchemical
- return current_app.extensions['migrate'].db.get_engine(bind=bind_key)
- except (TypeError, AttributeError):
- # this works with Flask-SQLAlchemy>=3
- return current_app.extensions['migrate'].db.engines.get(bind_key)
- def get_engine_url(bind_key=None):
- try:
- return get_engine(bind_key).url.render_as_string(
- hide_password=False).replace('%', '%%')
- except AttributeError:
- return str(get_engine(bind_key).url).replace('%', '%%')
- # add your model's MetaData object here
- # for 'autogenerate' support
- # from myapp import mymodel
- # target_metadata = mymodel.Base.metadata
- config.set_main_option('sqlalchemy.url', get_engine_url())
- bind_names = []
- if current_app.config.get('SQLALCHEMY_BINDS') is not None:
- bind_names = list(current_app.config['SQLALCHEMY_BINDS'].keys())
- else:
- get_bind_names = getattr(current_app.extensions['migrate'].db,
- 'bind_names', None)
- if get_bind_names:
- bind_names = get_bind_names()
- for bind in bind_names:
- context.config.set_section_option(
- bind, "sqlalchemy.url", get_engine_url(bind_key=bind))
- target_db = current_app.extensions['migrate'].db
- # other values from the config, defined by the needs of env.py,
- # can be acquired:
- # my_important_option = config.get_main_option("my_important_option")
- # ... etc.
- def get_metadata(bind):
- """Return the metadata for a bind."""
- if bind == '':
- bind = None
- if hasattr(target_db, 'metadatas'):
- return target_db.metadatas[bind]
- # legacy, less flexible implementation
- m = MetaData()
- for t in target_db.metadata.tables.values():
- if t.info.get('bind_key') == bind:
- t.tometadata(m)
- return m
- def run_migrations_offline():
- """Run migrations in 'offline' mode.
- This configures the context with just a URL
- and not an Engine, though an Engine is acceptable
- here as well. By skipping the Engine creation
- we don't even need a DBAPI to be available.
- Calls to context.execute() here emit the given string to the
- script output.
- """
- # for the --sql use case, run migrations for each URL into
- # individual files.
- engines = {
- '': {
- 'url': context.config.get_main_option('sqlalchemy.url')
- }
- }
- for name in bind_names:
- engines[name] = rec = {}
- rec['url'] = context.config.get_section_option(name, "sqlalchemy.url")
- for name, rec in engines.items():
- logger.info("Migrating database %s" % (name or '<default>'))
- file_ = "%s.sql" % name
- logger.info("Writing output to %s" % file_)
- with open(file_, 'w') as buffer:
- context.configure(
- url=rec['url'],
- output_buffer=buffer,
- target_metadata=get_metadata(name),
- literal_binds=True,
- )
- with context.begin_transaction():
- context.run_migrations(engine_name=name)
- def do_run_migrations(_, engines):
- # this callback is used to prevent an auto-migration from being generated
- # when there are no changes to the schema
- # reference: http://alembic.zzzcomputing.com/en/latest/cookbook.html
- def process_revision_directives(context, revision, directives):
- if getattr(config.cmd_opts, 'autogenerate', False):
- script = directives[0]
- if len(script.upgrade_ops_list) >= len(bind_names) + 1:
- empty = True
- for upgrade_ops in script.upgrade_ops_list:
- if not upgrade_ops.is_empty():
- empty = False
- if empty:
- directives[:] = []
- logger.info('No changes in schema detected.')
- conf_args = current_app.extensions['migrate'].configure_args
- if conf_args.get("process_revision_directives") is None:
- conf_args["process_revision_directives"] = process_revision_directives
- for name, rec in engines.items():
- rec['sync_connection'] = conn = rec['connection']._sync_connection()
- if USE_TWOPHASE:
- rec['transaction'] = conn.begin_twophase()
- else:
- rec['transaction'] = conn.begin()
- try:
- for name, rec in engines.items():
- logger.info("Migrating database %s" % (name or '<default>'))
- context.configure(
- connection=rec['sync_connection'],
- upgrade_token="%s_upgrades" % name,
- downgrade_token="%s_downgrades" % name,
- target_metadata=get_metadata(name),
- **conf_args
- )
- context.run_migrations(engine_name=name)
- if USE_TWOPHASE:
- for rec in engines.values():
- rec['transaction'].prepare()
- for rec in engines.values():
- rec['transaction'].commit()
- except: # noqa: E722
- for rec in engines.values():
- rec['transaction'].rollback()
- raise
- finally:
- for rec in engines.values():
- rec['sync_connection'].close()
- async def run_migrations_online():
- """Run migrations in 'online' mode.
- In this scenario we need to create an Engine
- and associate a connection with the context.
- """
- # for the direct-to-DB use case, start a transaction on all
- # engines, then run all migrations, then commit all transactions.
- engines = {
- '': {'engine': get_engine()}
- }
- for name in bind_names:
- engines[name] = rec = {}
- rec['engine'] = get_engine(bind_key=name)
- for name, rec in engines.items():
- engine = rec['engine']
- rec['connection'] = await engine.connect().start()
- await engines['']['connection'].run_sync(do_run_migrations, engines)
- for rec in engines.values():
- await rec['connection'].close()
- if context.is_offline_mode():
- run_migrations_offline()
- else:
- asyncio.get_event_loop().run_until_complete(run_migrations_online())
|