| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206 |
- # SPDX-FileCopyrightText: 2015 Eric Larson
- #
- # SPDX-License-Identifier: Apache-2.0
- from __future__ import annotations
- import io
- from typing import IO, TYPE_CHECKING, Any, Mapping, cast
- from pip._vendor import msgpack
- from pip._vendor.requests.structures import CaseInsensitiveDict
- from pip._vendor.urllib3 import HTTPResponse
- if TYPE_CHECKING:
- from pip._vendor.requests import PreparedRequest
- class Serializer:
- serde_version = "4"
- def dumps(
- self,
- request: PreparedRequest,
- response: HTTPResponse,
- body: bytes | None = None,
- ) -> bytes:
- response_headers: CaseInsensitiveDict[str] = CaseInsensitiveDict(
- response.headers
- )
- if body is None:
- # When a body isn't passed in, we'll read the response. We
- # also update the response with a new file handler to be
- # sure it acts as though it was never read.
- body = response.read(decode_content=False)
- response._fp = io.BytesIO(body) # type: ignore[attr-defined]
- response.length_remaining = len(body)
- data = {
- "response": {
- "body": body, # Empty bytestring if body is stored separately
- "headers": {str(k): str(v) for k, v in response.headers.items()}, # type: ignore[no-untyped-call]
- "status": response.status,
- "version": response.version,
- "reason": str(response.reason),
- "decode_content": response.decode_content,
- }
- }
- # Construct our vary headers
- data["vary"] = {}
- if "vary" in response_headers:
- varied_headers = response_headers["vary"].split(",")
- for header in varied_headers:
- header = str(header).strip()
- header_value = request.headers.get(header, None)
- if header_value is not None:
- header_value = str(header_value)
- data["vary"][header] = header_value
- return b",".join([f"cc={self.serde_version}".encode(), self.serialize(data)])
- def serialize(self, data: dict[str, Any]) -> bytes:
- return cast(bytes, msgpack.dumps(data, use_bin_type=True))
- def loads(
- self,
- request: PreparedRequest,
- data: bytes,
- body_file: IO[bytes] | None = None,
- ) -> HTTPResponse | None:
- # Short circuit if we've been given an empty set of data
- if not data:
- return None
- # Determine what version of the serializer the data was serialized
- # with
- try:
- ver, data = data.split(b",", 1)
- except ValueError:
- ver = b"cc=0"
- # Make sure that our "ver" is actually a version and isn't a false
- # positive from a , being in the data stream.
- if ver[:3] != b"cc=":
- data = ver + data
- ver = b"cc=0"
- # Get the version number out of the cc=N
- verstr = ver.split(b"=", 1)[-1].decode("ascii")
- # Dispatch to the actual load method for the given version
- try:
- return getattr(self, f"_loads_v{verstr}")(request, data, body_file) # type: ignore[no-any-return]
- except AttributeError:
- # This is a version we don't have a loads function for, so we'll
- # just treat it as a miss and return None
- return None
- def prepare_response(
- self,
- request: PreparedRequest,
- cached: Mapping[str, Any],
- body_file: IO[bytes] | None = None,
- ) -> HTTPResponse | None:
- """Verify our vary headers match and construct a real urllib3
- HTTPResponse object.
- """
- # Special case the '*' Vary value as it means we cannot actually
- # determine if the cached response is suitable for this request.
- # This case is also handled in the controller code when creating
- # a cache entry, but is left here for backwards compatibility.
- if "*" in cached.get("vary", {}):
- return None
- # Ensure that the Vary headers for the cached response match our
- # request
- for header, value in cached.get("vary", {}).items():
- if request.headers.get(header, None) != value:
- return None
- body_raw = cached["response"].pop("body")
- headers: CaseInsensitiveDict[str] = CaseInsensitiveDict(
- data=cached["response"]["headers"]
- )
- if headers.get("transfer-encoding", "") == "chunked":
- headers.pop("transfer-encoding")
- cached["response"]["headers"] = headers
- try:
- body: IO[bytes]
- if body_file is None:
- body = io.BytesIO(body_raw)
- else:
- body = body_file
- except TypeError:
- # This can happen if cachecontrol serialized to v1 format (pickle)
- # using Python 2. A Python 2 str(byte string) will be unpickled as
- # a Python 3 str (unicode string), which will cause the above to
- # fail with:
- #
- # TypeError: 'str' does not support the buffer interface
- body = io.BytesIO(body_raw.encode("utf8"))
- # Discard any `strict` parameter serialized by older version of cachecontrol.
- cached["response"].pop("strict", None)
- return HTTPResponse(body=body, preload_content=False, **cached["response"])
- def _loads_v0(
- self,
- request: PreparedRequest,
- data: bytes,
- body_file: IO[bytes] | None = None,
- ) -> None:
- # The original legacy cache data. This doesn't contain enough
- # information to construct everything we need, so we'll treat this as
- # a miss.
- return None
- def _loads_v1(
- self,
- request: PreparedRequest,
- data: bytes,
- body_file: IO[bytes] | None = None,
- ) -> HTTPResponse | None:
- # The "v1" pickled cache format. This is no longer supported
- # for security reasons, so we treat it as a miss.
- return None
- def _loads_v2(
- self,
- request: PreparedRequest,
- data: bytes,
- body_file: IO[bytes] | None = None,
- ) -> HTTPResponse | None:
- # The "v2" compressed base64 cache format.
- # This has been removed due to age and poor size/performance
- # characteristics, so we treat it as a miss.
- return None
- def _loads_v3(
- self,
- request: PreparedRequest,
- data: bytes,
- body_file: IO[bytes] | None = None,
- ) -> None:
- # Due to Python 2 encoding issues, it's impossible to know for sure
- # exactly how to load v3 entries, thus we'll treat these as a miss so
- # that they get rewritten out as v4 entries.
- return None
- def _loads_v4(
- self,
- request: PreparedRequest,
- data: bytes,
- body_file: IO[bytes] | None = None,
- ) -> HTTPResponse | None:
- try:
- cached = msgpack.loads(data, raw=False)
- except ValueError:
- return None
- return self.prepare_response(request, cached, body_file)
|