messaging.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. from __future__ import annotations
  2. from collections.abc import Iterable
  3. from contextlib import contextmanager
  4. import logging
  5. import sys
  6. import textwrap
  7. from typing import Iterator
  8. from typing import Optional
  9. from typing import TextIO
  10. from typing import Union
  11. import warnings
  12. from sqlalchemy.engine import url
  13. log = logging.getLogger(__name__)
  14. # disable "no handler found" errors
  15. logging.getLogger("alembic").addHandler(logging.NullHandler())
  16. try:
  17. import fcntl
  18. import termios
  19. import struct
  20. ioctl = fcntl.ioctl(0, termios.TIOCGWINSZ, struct.pack("HHHH", 0, 0, 0, 0))
  21. _h, TERMWIDTH, _hp, _wp = struct.unpack("HHHH", ioctl)
  22. if TERMWIDTH <= 0: # can occur if running in emacs pseudo-tty
  23. TERMWIDTH = None
  24. except (ImportError, OSError):
  25. TERMWIDTH = None
  26. def write_outstream(
  27. stream: TextIO, *text: Union[str, bytes], quiet: bool = False
  28. ) -> None:
  29. if quiet:
  30. return
  31. encoding = getattr(stream, "encoding", "ascii") or "ascii"
  32. for t in text:
  33. if not isinstance(t, bytes):
  34. t = t.encode(encoding, "replace")
  35. t = t.decode(encoding)
  36. try:
  37. stream.write(t)
  38. except OSError:
  39. # suppress "broken pipe" errors.
  40. # no known way to handle this on Python 3 however
  41. # as the exception is "ignored" (noisily) in TextIOWrapper.
  42. break
  43. @contextmanager
  44. def status(
  45. status_msg: str, newline: bool = False, quiet: bool = False
  46. ) -> Iterator[None]:
  47. msg(status_msg + " ...", newline, flush=True, quiet=quiet)
  48. try:
  49. yield
  50. except:
  51. if not quiet:
  52. write_outstream(sys.stdout, " FAILED\n")
  53. raise
  54. else:
  55. if not quiet:
  56. write_outstream(sys.stdout, " done\n")
  57. def err(message: str, quiet: bool = False) -> None:
  58. log.error(message)
  59. msg(f"FAILED: {message}", quiet=quiet)
  60. sys.exit(-1)
  61. def obfuscate_url_pw(input_url: str) -> str:
  62. return url.make_url(input_url).render_as_string(hide_password=True)
  63. def warn(msg: str, stacklevel: int = 2) -> None:
  64. warnings.warn(msg, UserWarning, stacklevel=stacklevel)
  65. def warn_deprecated(msg: str, stacklevel: int = 2) -> None:
  66. warnings.warn(msg, DeprecationWarning, stacklevel=stacklevel)
  67. def msg(
  68. msg: str, newline: bool = True, flush: bool = False, quiet: bool = False
  69. ) -> None:
  70. if quiet:
  71. return
  72. if TERMWIDTH is None:
  73. write_outstream(sys.stdout, msg)
  74. if newline:
  75. write_outstream(sys.stdout, "\n")
  76. else:
  77. # left indent output lines
  78. indent = " "
  79. lines = textwrap.wrap(
  80. msg,
  81. TERMWIDTH,
  82. initial_indent=indent,
  83. subsequent_indent=indent,
  84. )
  85. if len(lines) > 1:
  86. for line in lines[0:-1]:
  87. write_outstream(sys.stdout, line, "\n")
  88. write_outstream(sys.stdout, lines[-1], ("\n" if newline else ""))
  89. if flush:
  90. sys.stdout.flush()
  91. def format_as_comma(value: Optional[Union[str, Iterable[str]]]) -> str:
  92. if value is None:
  93. return ""
  94. elif isinstance(value, str):
  95. return value
  96. elif isinstance(value, Iterable):
  97. return ", ".join(value)
  98. else:
  99. raise ValueError("Don't know how to comma-format %r" % value)