1 """Provide a wrapper around JWT that caches decoding tokens.
3 Since we decode the same tokens over and over again
4 we can cache the result of the decode of valid tokens
5 to speed up the process.
8 from __future__
import annotations
10 from datetime
import timedelta
11 from functools
import lru_cache, partial
12 from typing
import Any
14 from jwt
import DecodeError, PyJWS, PyJWT
18 JWT_TOKEN_CACHE_SIZE = 16
21 _VERIFY_KEYS = (
"signature",
"exp",
"nbf",
"iat",
"aud",
"iss",
"sub",
"jti")
23 _VERIFY_OPTIONS: dict[str, Any] = {f
"verify_{key}":
True for key
in _VERIFY_KEYS} | {
26 _NO_VERIFY_OPTIONS = {f
"verify_{key}":
False for key
in _VERIFY_KEYS}
30 """PyJWS with a dedicated load implementation."""
32 @lru_cache(maxsize=JWT_TOKEN_CACHE_SIZE)
36 def _load(self, jwt: str | bytes) -> tuple[bytes, bytes, dict, bytes]:
38 return super().
_load(jwt)
44 @lru_cache(maxsize=JWT_TOKEN_CACHE_SIZE)
46 """Decode the payload from a JWS dictionary."""
49 except ValueError
as err:
50 raise DecodeError(f
"Invalid payload string: {err}")
from err
51 if not isinstance(payload, dict):
52 raise DecodeError(
"Invalid payload string: must be a json object")
57 """PyJWT with a fast decode implementation."""
60 self, jwt: str, key: str, options: dict[str, Any], algorithms: list[str]
62 """Decode a JWT's payload."""
63 if len(jwt) > MAX_TOKEN_SIZE:
65 raise DecodeError(
"Token too large")
70 algorithms=algorithms,
79 algorithms: list[str],
80 issuer: str |
None =
None,
81 leeway: float | timedelta = 0,
82 options: dict[str, Any] |
None =
None,
84 """Verify a JWT's signature and claims."""
85 merged_options = {**_VERIFY_OPTIONS, **(options
or {})}
89 options=merged_options,
90 algorithms=algorithms,
95 assert "exp" in payload,
"exp claim is required"
96 assert "iat" in payload,
"iat claim is required"
97 self._validate_claims(
99 options=merged_options,
107 verify_and_decode = _jwt.verify_and_decode
108 unverified_hs256_token_decode = lru_cache(maxsize=JWT_TOKEN_CACHE_SIZE)(
110 _jwt.decode_payload, key=
"", algorithms=[
"HS256"], options=_NO_VERIFY_OPTIONS
115 "unverified_hs256_token_decode",
tuple[bytes, bytes, dict, bytes] _load(self, str|bytes jwt)
dict[str, Any] verify_and_decode(self, str jwt, str key, list[str] algorithms, str|None issuer=None, float|timedelta leeway=0, dict[str, Any]|None options=None)
dict[str, Any] decode_payload(self, str jwt, str key, dict[str, Any] options, list[str] algorithms)
dict[str, Any] _decode_payload(str json_payload)