testing.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. from __future__ import annotations
  2. import importlib.metadata
  3. import typing as t
  4. from contextlib import contextmanager
  5. from contextlib import ExitStack
  6. from copy import copy
  7. from types import TracebackType
  8. from urllib.parse import urlsplit
  9. import werkzeug.test
  10. from click.testing import CliRunner
  11. from click.testing import Result
  12. from werkzeug.test import Client
  13. from werkzeug.wrappers import Request as BaseRequest
  14. from .cli import ScriptInfo
  15. from .sessions import SessionMixin
  16. if t.TYPE_CHECKING: # pragma: no cover
  17. from _typeshed.wsgi import WSGIEnvironment
  18. from werkzeug.test import TestResponse
  19. from .app import Flask
  20. class EnvironBuilder(werkzeug.test.EnvironBuilder):
  21. """An :class:`~werkzeug.test.EnvironBuilder`, that takes defaults from the
  22. application.
  23. :param app: The Flask application to configure the environment from.
  24. :param path: URL path being requested.
  25. :param base_url: Base URL where the app is being served, which
  26. ``path`` is relative to. If not given, built from
  27. :data:`PREFERRED_URL_SCHEME`, ``subdomain``,
  28. :data:`SERVER_NAME`, and :data:`APPLICATION_ROOT`.
  29. :param subdomain: Subdomain name to append to :data:`SERVER_NAME`.
  30. :param url_scheme: Scheme to use instead of
  31. :data:`PREFERRED_URL_SCHEME`.
  32. :param json: If given, this is serialized as JSON and passed as
  33. ``data``. Also defaults ``content_type`` to
  34. ``application/json``.
  35. :param args: other positional arguments passed to
  36. :class:`~werkzeug.test.EnvironBuilder`.
  37. :param kwargs: other keyword arguments passed to
  38. :class:`~werkzeug.test.EnvironBuilder`.
  39. """
  40. def __init__(
  41. self,
  42. app: Flask,
  43. path: str = "/",
  44. base_url: str | None = None,
  45. subdomain: str | None = None,
  46. url_scheme: str | None = None,
  47. *args: t.Any,
  48. **kwargs: t.Any,
  49. ) -> None:
  50. assert not (base_url or subdomain or url_scheme) or (
  51. base_url is not None
  52. ) != bool(subdomain or url_scheme), (
  53. 'Cannot pass "subdomain" or "url_scheme" with "base_url".'
  54. )
  55. if base_url is None:
  56. http_host = app.config.get("SERVER_NAME") or "localhost"
  57. app_root = app.config["APPLICATION_ROOT"]
  58. if subdomain:
  59. http_host = f"{subdomain}.{http_host}"
  60. if url_scheme is None:
  61. url_scheme = app.config["PREFERRED_URL_SCHEME"]
  62. url = urlsplit(path)
  63. base_url = (
  64. f"{url.scheme or url_scheme}://{url.netloc or http_host}"
  65. f"/{app_root.lstrip('/')}"
  66. )
  67. path = url.path
  68. if url.query:
  69. path = f"{path}?{url.query}"
  70. self.app = app
  71. super().__init__(path, base_url, *args, **kwargs)
  72. def json_dumps(self, obj: t.Any, **kwargs: t.Any) -> str:
  73. """Serialize ``obj`` to a JSON-formatted string.
  74. The serialization will be configured according to the config associated
  75. with this EnvironBuilder's ``app``.
  76. """
  77. return self.app.json.dumps(obj, **kwargs)
  78. _werkzeug_version = ""
  79. def _get_werkzeug_version() -> str:
  80. global _werkzeug_version
  81. if not _werkzeug_version:
  82. _werkzeug_version = importlib.metadata.version("werkzeug")
  83. return _werkzeug_version
  84. class FlaskClient(Client):
  85. """Works like a regular Werkzeug test client but has knowledge about
  86. Flask's contexts to defer the cleanup of the request context until
  87. the end of a ``with`` block. For general information about how to
  88. use this class refer to :class:`werkzeug.test.Client`.
  89. .. versionchanged:: 0.12
  90. `app.test_client()` includes preset default environment, which can be
  91. set after instantiation of the `app.test_client()` object in
  92. `client.environ_base`.
  93. Basic usage is outlined in the :doc:`/testing` chapter.
  94. """
  95. application: Flask
  96. def __init__(self, *args: t.Any, **kwargs: t.Any) -> None:
  97. super().__init__(*args, **kwargs)
  98. self.preserve_context = False
  99. self._new_contexts: list[t.ContextManager[t.Any]] = []
  100. self._context_stack = ExitStack()
  101. self.environ_base = {
  102. "REMOTE_ADDR": "127.0.0.1",
  103. "HTTP_USER_AGENT": f"Werkzeug/{_get_werkzeug_version()}",
  104. }
  105. @contextmanager
  106. def session_transaction(
  107. self, *args: t.Any, **kwargs: t.Any
  108. ) -> t.Iterator[SessionMixin]:
  109. """When used in combination with a ``with`` statement this opens a
  110. session transaction. This can be used to modify the session that
  111. the test client uses. Once the ``with`` block is left the session is
  112. stored back.
  113. ::
  114. with client.session_transaction() as session:
  115. session['value'] = 42
  116. Internally this is implemented by going through a temporary test
  117. request context and since session handling could depend on
  118. request variables this function accepts the same arguments as
  119. :meth:`~flask.Flask.test_request_context` which are directly
  120. passed through.
  121. """
  122. if self._cookies is None:
  123. raise TypeError(
  124. "Cookies are disabled. Create a client with 'use_cookies=True'."
  125. )
  126. app = self.application
  127. ctx = app.test_request_context(*args, **kwargs)
  128. self._add_cookies_to_wsgi(ctx.request.environ)
  129. with ctx:
  130. sess = app.session_interface.open_session(app, ctx.request)
  131. if sess is None:
  132. raise RuntimeError("Session backend did not open a session.")
  133. yield sess
  134. resp = app.response_class()
  135. if app.session_interface.is_null_session(sess):
  136. return
  137. with ctx:
  138. app.session_interface.save_session(app, sess, resp)
  139. self._update_cookies_from_response(
  140. ctx.request.host.partition(":")[0],
  141. ctx.request.path,
  142. resp.headers.getlist("Set-Cookie"),
  143. )
  144. def _copy_environ(self, other: WSGIEnvironment) -> WSGIEnvironment:
  145. out = {**self.environ_base, **other}
  146. if self.preserve_context:
  147. out["werkzeug.debug.preserve_context"] = self._new_contexts.append
  148. return out
  149. def _request_from_builder_args(
  150. self, args: tuple[t.Any, ...], kwargs: dict[str, t.Any]
  151. ) -> BaseRequest:
  152. kwargs["environ_base"] = self._copy_environ(kwargs.get("environ_base", {}))
  153. builder = EnvironBuilder(self.application, *args, **kwargs)
  154. try:
  155. return builder.get_request()
  156. finally:
  157. builder.close()
  158. def open(
  159. self,
  160. *args: t.Any,
  161. buffered: bool = False,
  162. follow_redirects: bool = False,
  163. **kwargs: t.Any,
  164. ) -> TestResponse:
  165. if args and isinstance(
  166. args[0], (werkzeug.test.EnvironBuilder, dict, BaseRequest)
  167. ):
  168. if isinstance(args[0], werkzeug.test.EnvironBuilder):
  169. builder = copy(args[0])
  170. builder.environ_base = self._copy_environ(builder.environ_base or {}) # type: ignore[arg-type]
  171. request = builder.get_request()
  172. elif isinstance(args[0], dict):
  173. request = EnvironBuilder.from_environ(
  174. args[0], app=self.application, environ_base=self._copy_environ({})
  175. ).get_request()
  176. else:
  177. # isinstance(args[0], BaseRequest)
  178. request = copy(args[0])
  179. request.environ = self._copy_environ(request.environ)
  180. else:
  181. # request is None
  182. request = self._request_from_builder_args(args, kwargs)
  183. # Pop any previously preserved contexts. This prevents contexts
  184. # from being preserved across redirects or multiple requests
  185. # within a single block.
  186. self._context_stack.close()
  187. response = super().open(
  188. request,
  189. buffered=buffered,
  190. follow_redirects=follow_redirects,
  191. )
  192. response.json_module = self.application.json # type: ignore[assignment]
  193. # Re-push contexts that were preserved during the request.
  194. for cm in self._new_contexts:
  195. self._context_stack.enter_context(cm)
  196. self._new_contexts.clear()
  197. return response
  198. def __enter__(self) -> FlaskClient:
  199. if self.preserve_context:
  200. raise RuntimeError("Cannot nest client invocations")
  201. self.preserve_context = True
  202. return self
  203. def __exit__(
  204. self,
  205. exc_type: type | None,
  206. exc_value: BaseException | None,
  207. tb: TracebackType | None,
  208. ) -> None:
  209. self.preserve_context = False
  210. self._context_stack.close()
  211. class FlaskCliRunner(CliRunner):
  212. """A :class:`~click.testing.CliRunner` for testing a Flask app's
  213. CLI commands. Typically created using
  214. :meth:`~flask.Flask.test_cli_runner`. See :ref:`testing-cli`.
  215. """
  216. def __init__(self, app: Flask, **kwargs: t.Any) -> None:
  217. self.app = app
  218. super().__init__(**kwargs)
  219. def invoke( # type: ignore
  220. self, cli: t.Any = None, args: t.Any = None, **kwargs: t.Any
  221. ) -> Result:
  222. """Invokes a CLI command in an isolated environment. See
  223. :meth:`CliRunner.invoke <click.testing.CliRunner.invoke>` for
  224. full method documentation. See :ref:`testing-cli` for examples.
  225. If the ``obj`` argument is not given, passes an instance of
  226. :class:`~flask.cli.ScriptInfo` that knows how to load the Flask
  227. app being tested.
  228. :param cli: Command object to invoke. Default is the app's
  229. :attr:`~flask.app.Flask.cli` group.
  230. :param args: List of strings to invoke the command with.
  231. :return: a :class:`~click.testing.Result` object.
  232. """
  233. if cli is None:
  234. cli = self.app.cli
  235. if "obj" not in kwargs:
  236. kwargs["obj"] = ScriptInfo(create_app=lambda: self.app)
  237. return super().invoke(cli, args, **kwargs)