| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122 |
- from __future__ import annotations
- from collections.abc import Iterable
- from contextlib import contextmanager
- import logging
- import sys
- import textwrap
- from typing import Iterator
- from typing import Optional
- from typing import TextIO
- from typing import Union
- import warnings
- from sqlalchemy.engine import url
- log = logging.getLogger(__name__)
- # disable "no handler found" errors
- logging.getLogger("alembic").addHandler(logging.NullHandler())
- try:
- import fcntl
- import termios
- import struct
- ioctl = fcntl.ioctl(0, termios.TIOCGWINSZ, struct.pack("HHHH", 0, 0, 0, 0))
- _h, TERMWIDTH, _hp, _wp = struct.unpack("HHHH", ioctl)
- if TERMWIDTH <= 0: # can occur if running in emacs pseudo-tty
- TERMWIDTH = None
- except (ImportError, OSError):
- TERMWIDTH = None
- def write_outstream(
- stream: TextIO, *text: Union[str, bytes], quiet: bool = False
- ) -> None:
- if quiet:
- return
- encoding = getattr(stream, "encoding", "ascii") or "ascii"
- for t in text:
- if not isinstance(t, bytes):
- t = t.encode(encoding, "replace")
- t = t.decode(encoding)
- try:
- stream.write(t)
- except OSError:
- # suppress "broken pipe" errors.
- # no known way to handle this on Python 3 however
- # as the exception is "ignored" (noisily) in TextIOWrapper.
- break
- @contextmanager
- def status(
- status_msg: str, newline: bool = False, quiet: bool = False
- ) -> Iterator[None]:
- msg(status_msg + " ...", newline, flush=True, quiet=quiet)
- try:
- yield
- except:
- if not quiet:
- write_outstream(sys.stdout, " FAILED\n")
- raise
- else:
- if not quiet:
- write_outstream(sys.stdout, " done\n")
- def err(message: str, quiet: bool = False) -> None:
- log.error(message)
- msg(f"FAILED: {message}", quiet=quiet)
- sys.exit(-1)
- def obfuscate_url_pw(input_url: str) -> str:
- return url.make_url(input_url).render_as_string(hide_password=True)
- def warn(msg: str, stacklevel: int = 2) -> None:
- warnings.warn(msg, UserWarning, stacklevel=stacklevel)
- def warn_deprecated(msg: str, stacklevel: int = 2) -> None:
- warnings.warn(msg, DeprecationWarning, stacklevel=stacklevel)
- def msg(
- msg: str, newline: bool = True, flush: bool = False, quiet: bool = False
- ) -> None:
- if quiet:
- return
- if TERMWIDTH is None:
- write_outstream(sys.stdout, msg)
- if newline:
- write_outstream(sys.stdout, "\n")
- else:
- # left indent output lines
- indent = " "
- lines = textwrap.wrap(
- msg,
- TERMWIDTH,
- initial_indent=indent,
- subsequent_indent=indent,
- )
- if len(lines) > 1:
- for line in lines[0:-1]:
- write_outstream(sys.stdout, line, "\n")
- write_outstream(sys.stdout, lines[-1], ("\n" if newline else ""))
- if flush:
- sys.stdout.flush()
- def format_as_comma(value: Optional[Union[str, Iterable[str]]]) -> str:
- if value is None:
- return ""
- elif isinstance(value, str):
- return value
- elif isinstance(value, Iterable):
- return ", ".join(value)
- else:
- raise ValueError("Don't know how to comma-format %r" % value)
|