| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400 |
- import io
- import logging
- import os
- import pathlib
- import shutil
- import sys
- import tempfile
- from collections import OrderedDict
- from contextlib import contextmanager
- from typing import IO, Dict, Iterable, Iterator, Mapping, Optional, Tuple, Union
- from .parser import Binding, parse_stream
- from .variables import parse_variables
- # A type alias for a string path to be used for the paths in this file.
- # These paths may flow to `open()` and `shutil.move()`; `shutil.move()`
- # only accepts string paths, not byte paths or file descriptors. See
- # https://github.com/python/typeshed/pull/6832.
- StrPath = Union[str, "os.PathLike[str]"]
- logger = logging.getLogger(__name__)
- def with_warn_for_invalid_lines(mappings: Iterator[Binding]) -> Iterator[Binding]:
- for mapping in mappings:
- if mapping.error:
- logger.warning(
- "python-dotenv could not parse statement starting at line %s",
- mapping.original.line,
- )
- yield mapping
- class DotEnv:
- def __init__(
- self,
- dotenv_path: Optional[StrPath],
- stream: Optional[IO[str]] = None,
- verbose: bool = False,
- encoding: Optional[str] = None,
- interpolate: bool = True,
- override: bool = True,
- ) -> None:
- self.dotenv_path: Optional[StrPath] = dotenv_path
- self.stream: Optional[IO[str]] = stream
- self._dict: Optional[Dict[str, Optional[str]]] = None
- self.verbose: bool = verbose
- self.encoding: Optional[str] = encoding
- self.interpolate: bool = interpolate
- self.override: bool = override
- @contextmanager
- def _get_stream(self) -> Iterator[IO[str]]:
- if self.dotenv_path and os.path.isfile(self.dotenv_path):
- with open(self.dotenv_path, encoding=self.encoding) as stream:
- yield stream
- elif self.stream is not None:
- yield self.stream
- else:
- if self.verbose:
- logger.info(
- "python-dotenv could not find configuration file %s.",
- self.dotenv_path or ".env",
- )
- yield io.StringIO("")
- def dict(self) -> Dict[str, Optional[str]]:
- """Return dotenv as dict"""
- if self._dict:
- return self._dict
- raw_values = self.parse()
- if self.interpolate:
- self._dict = OrderedDict(
- resolve_variables(raw_values, override=self.override)
- )
- else:
- self._dict = OrderedDict(raw_values)
- return self._dict
- def parse(self) -> Iterator[Tuple[str, Optional[str]]]:
- with self._get_stream() as stream:
- for mapping in with_warn_for_invalid_lines(parse_stream(stream)):
- if mapping.key is not None:
- yield mapping.key, mapping.value
- def set_as_environment_variables(self) -> bool:
- """
- Load the current dotenv as system environment variable.
- """
- if not self.dict():
- return False
- for k, v in self.dict().items():
- if k in os.environ and not self.override:
- continue
- if v is not None:
- os.environ[k] = v
- return True
- def get(self, key: str) -> Optional[str]:
- """ """
- data = self.dict()
- if key in data:
- return data[key]
- if self.verbose:
- logger.warning("Key %s not found in %s.", key, self.dotenv_path)
- return None
- def get_key(
- dotenv_path: StrPath,
- key_to_get: str,
- encoding: Optional[str] = "utf-8",
- ) -> Optional[str]:
- """
- Get the value of a given key from the given .env.
- Returns `None` if the key isn't found or doesn't have a value.
- """
- return DotEnv(dotenv_path, verbose=True, encoding=encoding).get(key_to_get)
- @contextmanager
- def rewrite(
- path: StrPath,
- encoding: Optional[str],
- ) -> Iterator[Tuple[IO[str], IO[str]]]:
- pathlib.Path(path).touch()
- with tempfile.NamedTemporaryFile(mode="w", encoding=encoding, delete=False) as dest:
- error = None
- try:
- with open(path, encoding=encoding) as source:
- yield (source, dest)
- except BaseException as err:
- error = err
- if error is None:
- shutil.move(dest.name, path)
- else:
- os.unlink(dest.name)
- raise error from None
- def set_key(
- dotenv_path: StrPath,
- key_to_set: str,
- value_to_set: str,
- quote_mode: str = "always",
- export: bool = False,
- encoding: Optional[str] = "utf-8",
- ) -> Tuple[Optional[bool], str, str]:
- """
- Adds or Updates a key/value to the given .env
- If the .env path given doesn't exist, fails instead of risking creating
- an orphan .env somewhere in the filesystem
- """
- if quote_mode not in ("always", "auto", "never"):
- raise ValueError(f"Unknown quote_mode: {quote_mode}")
- quote = quote_mode == "always" or (
- quote_mode == "auto" and not value_to_set.isalnum()
- )
- if quote:
- value_out = "'{}'".format(value_to_set.replace("'", "\\'"))
- else:
- value_out = value_to_set
- if export:
- line_out = f"export {key_to_set}={value_out}\n"
- else:
- line_out = f"{key_to_set}={value_out}\n"
- with rewrite(dotenv_path, encoding=encoding) as (source, dest):
- replaced = False
- missing_newline = False
- for mapping in with_warn_for_invalid_lines(parse_stream(source)):
- if mapping.key == key_to_set:
- dest.write(line_out)
- replaced = True
- else:
- dest.write(mapping.original.string)
- missing_newline = not mapping.original.string.endswith("\n")
- if not replaced:
- if missing_newline:
- dest.write("\n")
- dest.write(line_out)
- return True, key_to_set, value_to_set
- def unset_key(
- dotenv_path: StrPath,
- key_to_unset: str,
- quote_mode: str = "always",
- encoding: Optional[str] = "utf-8",
- ) -> Tuple[Optional[bool], str]:
- """
- Removes a given key from the given `.env` file.
- If the .env path given doesn't exist, fails.
- If the given key doesn't exist in the .env, fails.
- """
- if not os.path.exists(dotenv_path):
- logger.warning("Can't delete from %s - it doesn't exist.", dotenv_path)
- return None, key_to_unset
- removed = False
- with rewrite(dotenv_path, encoding=encoding) as (source, dest):
- for mapping in with_warn_for_invalid_lines(parse_stream(source)):
- if mapping.key == key_to_unset:
- removed = True
- else:
- dest.write(mapping.original.string)
- if not removed:
- logger.warning(
- "Key %s not removed from %s - key doesn't exist.", key_to_unset, dotenv_path
- )
- return None, key_to_unset
- return removed, key_to_unset
- def resolve_variables(
- values: Iterable[Tuple[str, Optional[str]]],
- override: bool,
- ) -> Mapping[str, Optional[str]]:
- new_values: Dict[str, Optional[str]] = {}
- for name, value in values:
- if value is None:
- result = None
- else:
- atoms = parse_variables(value)
- env: Dict[str, Optional[str]] = {}
- if override:
- env.update(os.environ) # type: ignore
- env.update(new_values)
- else:
- env.update(new_values)
- env.update(os.environ) # type: ignore
- result = "".join(atom.resolve(env) for atom in atoms)
- new_values[name] = result
- return new_values
- def _walk_to_root(path: str) -> Iterator[str]:
- """
- Yield directories starting from the given directory up to the root
- """
- if not os.path.exists(path):
- raise IOError("Starting path not found")
- if os.path.isfile(path):
- path = os.path.dirname(path)
- last_dir = None
- current_dir = os.path.abspath(path)
- while last_dir != current_dir:
- yield current_dir
- parent_dir = os.path.abspath(os.path.join(current_dir, os.path.pardir))
- last_dir, current_dir = current_dir, parent_dir
- def find_dotenv(
- filename: str = ".env",
- raise_error_if_not_found: bool = False,
- usecwd: bool = False,
- ) -> str:
- """
- Search in increasingly higher folders for the given file
- Returns path to the file if found, or an empty string otherwise
- """
- def _is_interactive():
- """Decide whether this is running in a REPL or IPython notebook"""
- if hasattr(sys, "ps1") or hasattr(sys, "ps2"):
- return True
- try:
- main = __import__("__main__", None, None, fromlist=["__file__"])
- except ModuleNotFoundError:
- return False
- return not hasattr(main, "__file__")
- def _is_debugger():
- return sys.gettrace() is not None
- if usecwd or _is_interactive() or _is_debugger() or getattr(sys, "frozen", False):
- # Should work without __file__, e.g. in REPL or IPython notebook.
- path = os.getcwd()
- else:
- # will work for .py files
- frame = sys._getframe()
- current_file = __file__
- while frame.f_code.co_filename == current_file or not os.path.exists(
- frame.f_code.co_filename
- ):
- assert frame.f_back is not None
- frame = frame.f_back
- frame_filename = frame.f_code.co_filename
- path = os.path.dirname(os.path.abspath(frame_filename))
- for dirname in _walk_to_root(path):
- check_path = os.path.join(dirname, filename)
- if os.path.isfile(check_path):
- return check_path
- if raise_error_if_not_found:
- raise IOError("File not found")
- return ""
- def load_dotenv(
- dotenv_path: Optional[StrPath] = None,
- stream: Optional[IO[str]] = None,
- verbose: bool = False,
- override: bool = False,
- interpolate: bool = True,
- encoding: Optional[str] = "utf-8",
- ) -> bool:
- """Parse a .env file and then load all the variables found as environment variables.
- Parameters:
- dotenv_path: Absolute or relative path to .env file.
- stream: Text stream (such as `io.StringIO`) with .env content, used if
- `dotenv_path` is `None`.
- verbose: Whether to output a warning the .env file is missing.
- override: Whether to override the system environment variables with the variables
- from the `.env` file.
- encoding: Encoding to be used to read the file.
- Returns:
- Bool: True if at least one environment variable is set else False
- If both `dotenv_path` and `stream` are `None`, `find_dotenv()` is used to find the
- .env file with it's default parameters. If you need to change the default parameters
- of `find_dotenv()`, you can explicitly call `find_dotenv()` and pass the result
- to this function as `dotenv_path`.
- """
- if dotenv_path is None and stream is None:
- dotenv_path = find_dotenv()
- dotenv = DotEnv(
- dotenv_path=dotenv_path,
- stream=stream,
- verbose=verbose,
- interpolate=interpolate,
- override=override,
- encoding=encoding,
- )
- return dotenv.set_as_environment_variables()
- def dotenv_values(
- dotenv_path: Optional[StrPath] = None,
- stream: Optional[IO[str]] = None,
- verbose: bool = False,
- interpolate: bool = True,
- encoding: Optional[str] = "utf-8",
- ) -> Dict[str, Optional[str]]:
- """
- Parse a .env file and return its content as a dict.
- The returned dict will have `None` values for keys without values in the .env file.
- For example, `foo=bar` results in `{"foo": "bar"}` whereas `foo` alone results in
- `{"foo": None}`
- Parameters:
- dotenv_path: Absolute or relative path to the .env file.
- stream: `StringIO` object with .env content, used if `dotenv_path` is `None`.
- verbose: Whether to output a warning if the .env file is missing.
- encoding: Encoding to be used to read the file.
- If both `dotenv_path` and `stream` are `None`, `find_dotenv()` is used to find the
- .env file.
- """
- if dotenv_path is None and stream is None:
- dotenv_path = find_dotenv()
- return DotEnv(
- dotenv_path=dotenv_path,
- stream=stream,
- verbose=verbose,
- interpolate=interpolate,
- override=True,
- encoding=encoding,
- ).dict()
|