From 960600f80fd8739419c48dc2a299e4005896f995 Mon Sep 17 00:00:00 2001 From: LadyCodesItBetter Date: Thu, 9 Jan 2025 15:17:21 +0100 Subject: [PATCH 01/10] fix: remove duplicated code --- pyeudiw/jwt/jws_helper.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/pyeudiw/jwt/jws_helper.py b/pyeudiw/jwt/jws_helper.py index 7608ba85..d78188f1 100644 --- a/pyeudiw/jwt/jws_helper.py +++ b/pyeudiw/jwt/jws_helper.py @@ -223,9 +223,6 @@ def verify(self, jwt: str) -> (str | Any | bytes): "unexpected verification state: found a valid verifying key," f"but its kid {obtained_kid} does not match token header kid {expected_kid}") ) - - verifier = JWS(alg=header["alg"]) - msg = verifier.verify_compact(jwt, [key_from_jwk_dict(verifying_key)]) # Verify the JWS compact signature verifier = JWS(alg=header["alg"]) From c5b6ab301b615138d32d15d991fca9abce2bc6b1 Mon Sep 17 00:00:00 2001 From: LadyCodesItBetter Date: Thu, 9 Jan 2025 17:20:11 +0100 Subject: [PATCH 02/10] fix: redundant controls --- pyeudiw/jwt/jws_helper.py | 79 +++++---------------------------------- 1 file changed, 10 insertions(+), 69 deletions(-) diff --git a/pyeudiw/jwt/jws_helper.py b/pyeudiw/jwt/jws_helper.py index d78188f1..efd2ff43 100644 --- a/pyeudiw/jwt/jws_helper.py +++ b/pyeudiw/jwt/jws_helper.py @@ -27,21 +27,6 @@ "EC": "ES256" } -DEFAULT_SIG_ALG_MAP = { - "RSA": "RS256", - "EC": "ES256" -} - -DEFAULT_ENC_ALG_MAP = { - "RSA": "RSA-OAEP", - "EC": "ECDH-ES+A256KW" -} - -DEFAULT_ENC_ENC_MAP = { - "RSA": "A256CBC-HS512", - "EC": "A256GCM" -} - class JWSHelper(JWHelperInterface): """ Helper class for working with JWS, extended to support SD-JWT. @@ -98,10 +83,6 @@ def sign( protected = {} if unprotected is None: unprotected = {} - - # Add SD-JWT claims if the payload matches the criteria - if isinstance(plain_dict, dict) and self._is_sd_jwt_payload(plain_dict): - plain_dict = self._add_sd_jwt_claims(plain_dict) # Select the signing key signing_key = self._select_signing_key((protected, unprotected), signing_kid) # TODO: check that singing key is either private or symmetric @@ -110,7 +91,7 @@ def sign( header_kid = protected.get("kid") signer_kid = signing_key.get("kid") if header_kid and signer_kid and (header_kid != signer_kid): - raise JWSSigningError(f"Token header contains kid {header_kid}, which does not match the signing key kid {signer_kid}.") + raise JWSSigningError(f"token header contains a kid {header_kid} that does not match the signing key kid {signer_kid}") payload = serialize_payload(plain_dict) @@ -134,14 +115,19 @@ def sign( signing_key.pop("kid", None) signer = JWS(payload, alg=signing_alg) - if serialization_format == "compact": try: - signed = signer.sign_compact([key_from_jwk_dict(signing_key)], protected=protected, **kwargs) + signed = signer.sign_compact( + [key_from_jwk_dict(signing_key)], protected=protected, **kwargs + ) return signed except Exception as e: raise JWSSigningError("Signing error: error in step", e) - return signer.sign_json(keys=[key_from_jwk_dict(signing_key)], headers=[(protected, unprotected)], flatten=True) + return signer.sign_json( + keys=[key_from_jwk_dict(signing_key)], + headers=[(protected, unprotected)], + flatten=True, + ) def _select_signing_key(self, headers: tuple[dict, dict], signing_kid: str = "") -> dict: if len(self.jwks) == 0: @@ -227,25 +213,7 @@ def verify(self, jwt: str) -> (str | Any | bytes): # Verify the JWS compact signature verifier = JWS(alg=header["alg"]) msg = verifier.verify_compact(jwt, [key_from_jwk_dict(verifying_key)]) - - # Handle the payload - if isinstance(msg, (str, bytes)): - try: - # Try to interpret as JSON - decoded_payload = json.loads(msg) - except json.JSONDecodeError: - # If not JSON, assume it's a simple string (non-SD-JWT) - decoded_payload = msg - elif isinstance(msg, dict): - decoded_payload = msg - else: - raise JWSVerificationError("Unexpected type for the JWS payload.") - - # Perform SD-JWT specific validations if applicable - if self._is_sd_jwt_payload(decoded_payload): - self._validate_sd_jwt(decoded_payload) - - return decoded_payload + return msg def _select_verifying_key(self, header: dict) -> dict | None: available_keys = [key.to_dict() for key in self.jwks] @@ -268,18 +236,6 @@ def _select_verifying_key(self, header: dict) -> dict | None: return self.jwks[0].to_dict() return None - def _is_sd_jwt_payload(self, payload: dict) -> bool: - """ - Determines if the payload corresponds to an SD-JWT. - - :param payload: The payload to inspect. - :returns: True if the payload contains SD-JWT-specific claims, False otherwise. - """ - if not isinstance(payload, dict): - return False - return payload.get("typ") == "sd-jwt" - - def _add_sd_jwt_claims(self, payload: dict) -> dict: """ Adds SD-JWT specific claims to the payload. @@ -289,18 +245,3 @@ def _add_sd_jwt_claims(self, payload: dict) -> dict: payload["iat"] = payload.get("iat", iat_now()) payload["typ"] = "sd-jwt" return payload - - def _validate_sd_jwt(self, payload: dict) -> None: - """ - Validates an SD-JWT payload. - - :param payload: The payload to validate. - - :raises JWSVerificationError: If the payload is invalid. - """ - if payload.get("typ") != "sd-jwt": - raise JWSVerificationError("The token is not a valid SD-JWT.") - if is_payload_expired(payload): - raise JWSVerificationError("The SD-JWT token has expired.") - - From 7536ce0cf0c60b7a34f6074a3c3b4a98834d9f67 Mon Sep 17 00:00:00 2001 From: LadyCodesItBetter Date: Thu, 9 Jan 2025 18:12:16 +0100 Subject: [PATCH 03/10] feat: add keys selection for verify issuer token --- pyeudiw/jwt/jws_helper.py | 40 +++++++++++++++++++++- pyeudiw/openid4vp/vp_sd_jwt_vc.py | 2 +- pyeudiw/satosa/default/response_handler.py | 5 +-- pyeudiw/sd_jwt/sd_jwt.py | 6 ++-- 4 files changed, 47 insertions(+), 6 deletions(-) diff --git a/pyeudiw/jwt/jws_helper.py b/pyeudiw/jwt/jws_helper.py index efd2ff43..80303af6 100644 --- a/pyeudiw/jwt/jws_helper.py +++ b/pyeudiw/jwt/jws_helper.py @@ -27,6 +27,21 @@ "EC": "ES256" } +DEFAULT_SIG_ALG_MAP = { + "RSA": "RS256", + "EC": "ES256" +} + +DEFAULT_ENC_ALG_MAP = { + "RSA": "RSA-OAEP", + "EC": "ECDH-ES+A256KW" +} + +DEFAULT_ENC_ENC_MAP = { + "RSA": "A256CBC-HS512", + "EC": "A256GCM" +} + class JWSHelper(JWHelperInterface): """ Helper class for working with JWS, extended to support SD-JWT. @@ -101,7 +116,7 @@ def sign( # Add "typ" header if not present if "typ" not in protected: - protected["typ"] = "sd-jwt" if self._is_sd_jwt_payload(plain_dict) else "JWT" + protected["typ"] = "sd-jwt" if self.is_sd_jwt(plain_dict) else "JWT" # Include the signing key's kid in the header if required if kid_in_header and signer_kid: @@ -245,3 +260,26 @@ def _select_verifying_key(self, header: dict) -> dict | None: payload["iat"] = payload.get("iat", iat_now()) payload["typ"] = "sd-jwt" return payload + + def is_sd_jwt(self, token: str) -> bool: + """ + Determines if the provided JWT is an SD-JWT. + + :param token: The JWT token to inspect. + :type token: str + :returns: True if the token is an SD-JWT, False otherwise. + :rtype: bool + """ + if not token: + return False + + try: + # Decode the JWT header to inspect the 'typ' field + header = decode_jwt_header(token) + + # Check if 'typ' field exists and is equal to 'sd-jwt' + return header.get("typ") == "sd-jwt" + except Exception as e: + # Log or handle errors (optional) + logger.warning(f"Unable to determine if token is SD-JWT: {e}") + return False diff --git a/pyeudiw/openid4vp/vp_sd_jwt_vc.py b/pyeudiw/openid4vp/vp_sd_jwt_vc.py index a2849268..2f8a950b 100644 --- a/pyeudiw/openid4vp/vp_sd_jwt_vc.py +++ b/pyeudiw/openid4vp/vp_sd_jwt_vc.py @@ -30,7 +30,7 @@ def get_issuer_name(self) -> str: def get_credentials(self) -> dict: return self.sdjwt.get_disclosed_claims() - def get_signing_key(self) -> ECKey | RSAKey | dict | KeyIdentifier_T: + def get_signing_key(self) -> ECKey | RSAKey | dict | KeyIdentifier_T |ECKey | RSAKey | dict | KeyIdentifier_T: return extract_key_identifier(self.sdjwt.issuer_jwt.header) def is_revoked(self) -> bool: diff --git a/pyeudiw/satosa/default/response_handler.py b/pyeudiw/satosa/default/response_handler.py index 05aa9fc8..311bd833 100644 --- a/pyeudiw/satosa/default/response_handler.py +++ b/pyeudiw/satosa/default/response_handler.py @@ -190,9 +190,10 @@ def response_endpoint(self, context: Context, *args: tuple) -> Redirect | JsonRe pub_jwk = find_vp_token_key(token_parser, self.trust_evaluator) except NoCriptographicMaterial as e: return self._handle_400(context, f"VP parsing error: {e}") - + token_issuer = token_parser.get_issuer_name() + whitelisted_keys = self.trust_evaluator.get_public_keys(token_issuer) try: - token_verifier.verify_signature(pub_jwk) + token_verifier.verify_signature(whitelisted_keys) except Exception as e: return self._handle_400(context, f"VP parsing error: {e}") diff --git a/pyeudiw/sd_jwt/sd_jwt.py b/pyeudiw/sd_jwt/sd_jwt.py index 85c82675..f5e7c4f5 100644 --- a/pyeudiw/sd_jwt/sd_jwt.py +++ b/pyeudiw/sd_jwt/sd_jwt.py @@ -2,6 +2,7 @@ from hashlib import sha256 import json from typing import Any, Callable, TypeVar +from pyeudiw.jwt.jws_helper import JWSHelper import pyeudiw.sd_jwt.common as sd_jwtcommon from pyeudiw.sd_jwt.common import SDJWTCommon @@ -81,8 +82,9 @@ def get_sd_alg(self) -> str: def has_key_binding(self) -> bool: return self.holder_kb is not None - def verify_issuer_jwt_signature(self, key: ECKey | RSAKey | dict) -> None: - verify_jws_with_key(self.issuer_jwt.jwt, key) + def verify_issuer_jwt_signature(self, keys: list[ECKey | RSAKey | dict] | ECKey | RSAKey | dict) -> None: + jws_verifier = JWSHelper(keys) + jws_verifier.verify(self.issuer_jwt.jwt) def verify_holder_kb_jwt(self, challenge: VerifierChallenge) -> None: """ From 1f745656b3569183179bd60d9e13895dd24dda02 Mon Sep 17 00:00:00 2001 From: LadyCodesItBetter Date: Thu, 9 Jan 2025 18:18:24 +0100 Subject: [PATCH 04/10] fix: duplicates --- pyeudiw/openid4vp/vp_sd_jwt_vc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyeudiw/openid4vp/vp_sd_jwt_vc.py b/pyeudiw/openid4vp/vp_sd_jwt_vc.py index 2f8a950b..a2849268 100644 --- a/pyeudiw/openid4vp/vp_sd_jwt_vc.py +++ b/pyeudiw/openid4vp/vp_sd_jwt_vc.py @@ -30,7 +30,7 @@ def get_issuer_name(self) -> str: def get_credentials(self) -> dict: return self.sdjwt.get_disclosed_claims() - def get_signing_key(self) -> ECKey | RSAKey | dict | KeyIdentifier_T |ECKey | RSAKey | dict | KeyIdentifier_T: + def get_signing_key(self) -> ECKey | RSAKey | dict | KeyIdentifier_T: return extract_key_identifier(self.sdjwt.issuer_jwt.header) def is_revoked(self) -> bool: From ecd49559c8533c88f328c9248806d49b5a84a978 Mon Sep 17 00:00:00 2001 From: LadyCodesItBetter Date: Thu, 9 Jan 2025 18:26:05 +0100 Subject: [PATCH 05/10] fix: dead code removed --- pyeudiw/jwt/jws_helper.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/pyeudiw/jwt/jws_helper.py b/pyeudiw/jwt/jws_helper.py index 80303af6..a33a2ee3 100644 --- a/pyeudiw/jwt/jws_helper.py +++ b/pyeudiw/jwt/jws_helper.py @@ -250,16 +250,6 @@ def _select_verifying_key(self, header: dict) -> dict | None: if len(self.jwks) == 1: return self.jwks[0].to_dict() return None - - """ - Adds SD-JWT specific claims to the payload. - - :param payload: The original payload. - :returns: The payload with added SD-JWT claims. - """ - payload["iat"] = payload.get("iat", iat_now()) - payload["typ"] = "sd-jwt" - return payload def is_sd_jwt(self, token: str) -> bool: """ From 4267f0310bc5381c458f49d9c72e7bdd32e397a8 Mon Sep 17 00:00:00 2001 From: LadyCodesItBetter Date: Thu, 9 Jan 2025 18:55:18 +0100 Subject: [PATCH 06/10] feat: JWT and SD-JWT check iat/exp and nbf #311 --- pyeudiw/jwt/jws_helper.py | 30 ++++++++++++++++++++++++ pyeudiw/sd_jwt/verifier.py | 32 ++++++++++++++++++++++++-- pyeudiw/tests/jwt/test_verification.py | 3 ++- 3 files changed, 62 insertions(+), 3 deletions(-) diff --git a/pyeudiw/jwt/jws_helper.py b/pyeudiw/jwt/jws_helper.py index a33a2ee3..dea0022e 100644 --- a/pyeudiw/jwt/jws_helper.py +++ b/pyeudiw/jwt/jws_helper.py @@ -1,5 +1,6 @@ import binascii from copy import deepcopy +import datetime import json import logging from typing import Any, Literal, Union @@ -228,6 +229,13 @@ def verify(self, jwt: str) -> (str | Any | bytes): # Verify the JWS compact signature verifier = JWS(alg=header["alg"]) msg = verifier.verify_compact(jwt, [key_from_jwk_dict(verifying_key)]) + + # Validate JWT claims + try: + self.validate_jwt_claims(msg) + except ValueError as e: + raise JWSVerificationError(f"Invalid JWT claims: {e}") + return msg def _select_verifying_key(self, header: dict) -> dict | None: @@ -273,3 +281,25 @@ def is_sd_jwt(self, token: str) -> bool: # Log or handle errors (optional) logger.warning(f"Unable to determine if token is SD-JWT: {e}") return False + + def validate_jwt_claims(self, payload: dict) -> None: + """ + Validates the 'iat', 'exp', and 'nbf' claims in a JWT payload. + + :param payload: The decoded JWT payload. + :type payload: dict + :raises ValueError: If any of the claims are invalid. + """ + current_time = iat_now() + + if 'iat' in payload: + if payload['iat'] > current_time: + raise ValueError("Future issue time, token is invalid.") + + if 'exp' in payload: + if payload['exp'] <= current_time: + raise ValueError("The token has expired.") + + if 'nbf' in payload: + if payload['nbf'] > current_time: + raise ValueError("The token is not yet valid.") \ No newline at end of file diff --git a/pyeudiw/sd_jwt/verifier.py b/pyeudiw/sd_jwt/verifier.py index ba72e5f2..2e85eb4f 100644 --- a/pyeudiw/sd_jwt/verifier.py +++ b/pyeudiw/sd_jwt/verifier.py @@ -1,5 +1,6 @@ import logging +from pyeudiw.jwt.exceptions import JWSVerificationError from pyeudiw.jwt.jws_helper import JWSHelper from pyeudiw.sd_jwt.common import ( SDJWTCommon, @@ -18,6 +19,7 @@ from cryptojwt.jws.jws import JWS from pyeudiw.jwt.utils import decode_jwt_payload, decode_jwt_header +from pyeudiw.tools.utils import iat_now logger = logging.getLogger(__name__) @@ -112,8 +114,12 @@ def _verify_sd_jwt( keys=issuer_public_key, sigalg=sign_alg ) - # self._sd_jwt_payload = loads(parsed_input_sd_jwt.payload.decode("utf-8")) - # TODO: Check exp/nbf/iat + + try: + self.validate_claims(self._sd_jwt_payload) + except ValueError as e: + raise JWSVerificationError(f"Invalid JWT claims: {e}") + else: raise ValueError( f"Unsupported serialization format: {self._serialization_format}" @@ -121,6 +127,28 @@ def _verify_sd_jwt( self._holder_public_key_payload = self._sd_jwt_payload.get("cnf", None) + def validate_claims(self, payload: dict) -> None: + """ + Validates the 'iat', 'exp', and 'nbf' claims in a SD-JWT payload. + + :param payload: The decoded JWT payload. + :type payload: dict + :raises ValueError: If any of the claims are invalid. + """ + current_time = iat_now() + + if 'iat' in payload: + if payload['iat'] > current_time: + raise ValueError("Future issue time, token is invalid.") + + if 'exp' in payload: + if payload['exp'] <= current_time: + raise ValueError("The token has expired.") + + if 'nbf' in payload: + if payload['nbf'] > current_time: + raise ValueError("The token is not yet valid.") + def _verify_key_binding_jwt( self, expected_aud: Union[str, None] = None, diff --git a/pyeudiw/tests/jwt/test_verification.py b/pyeudiw/tests/jwt/test_verification.py index 994e97b4..3ee35e97 100644 --- a/pyeudiw/tests/jwt/test_verification.py +++ b/pyeudiw/tests/jwt/test_verification.py @@ -5,6 +5,7 @@ from cryptojwt.jwk.ec import new_ec_key from pyeudiw.jwt.verification import verify_jws_with_key +from pyeudiw.tools.utils import iat_now def test_is_jwt_expired(): jwk = new_ec_key('P-256') @@ -26,7 +27,7 @@ def test_is_jwt_not_expired(): def test_verify_jws_with_key(): jwk = new_ec_key('P-256') - payload = {"exp": 1516239022} + payload = {"exp": iat_now()+5000} helper = JWSHelper(jwk) jws = helper.sign(payload) From bbd47b9e51b4fd4c38b2ac4d23dbd54643c8e4b9 Mon Sep 17 00:00:00 2001 From: LadyCodesItBetter Date: Fri, 10 Jan 2025 11:44:18 +0100 Subject: [PATCH 07/10] fix: move multiple function call to single variable --- pyeudiw/jwt/jws_helper.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pyeudiw/jwt/jws_helper.py b/pyeudiw/jwt/jws_helper.py index dea0022e..c490f91e 100644 --- a/pyeudiw/jwt/jws_helper.py +++ b/pyeudiw/jwt/jws_helper.py @@ -131,16 +131,18 @@ def sign( signing_key.pop("kid", None) signer = JWS(payload, alg=signing_alg) + keys = [key_from_jwk_dict(signing_key)] + if serialization_format == "compact": try: signed = signer.sign_compact( - [key_from_jwk_dict(signing_key)], protected=protected, **kwargs + keys, protected=protected, **kwargs ) return signed except Exception as e: raise JWSSigningError("Signing error: error in step", e) return signer.sign_json( - keys=[key_from_jwk_dict(signing_key)], + keys=keys, headers=[(protected, unprotected)], flatten=True, ) From 7556861ec852bd366394b76a524a7df444c18976 Mon Sep 17 00:00:00 2001 From: LadyCodesItBetter <114922565+LadyCodesItBetter@users.noreply.github.com> Date: Fri, 10 Jan 2025 12:29:37 +0100 Subject: [PATCH 08/10] Update pyeudiw/jwt/jws_helper.py Co-authored-by: Giuseppe De Marco --- pyeudiw/jwt/jws_helper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyeudiw/jwt/jws_helper.py b/pyeudiw/jwt/jws_helper.py index dea0022e..79ec6f1e 100644 --- a/pyeudiw/jwt/jws_helper.py +++ b/pyeudiw/jwt/jws_helper.py @@ -228,7 +228,7 @@ def verify(self, jwt: str) -> (str | Any | bytes): # Verify the JWS compact signature verifier = JWS(alg=header["alg"]) - msg = verifier.verify_compact(jwt, [key_from_jwk_dict(verifying_key)]) + msg: dict = verifier.verify_compact(jwt, [key_from_jwk_dict(verifying_key)]) # Validate JWT claims try: From c54c1209ec1d9d24c3a02fa995769711629c6514 Mon Sep 17 00:00:00 2001 From: LadyCodesItBetter Date: Fri, 10 Jan 2025 12:48:01 +0100 Subject: [PATCH 09/10] refactor: consolidate lifetime validation logic into a reusable function --- pyeudiw/jwt/helper.py | 23 ++++++++++++++++++++++- pyeudiw/jwt/jws_helper.py | 26 ++------------------------ pyeudiw/sd_jwt/verifier.py | 28 ++-------------------------- 3 files changed, 26 insertions(+), 51 deletions(-) diff --git a/pyeudiw/jwt/helper.py b/pyeudiw/jwt/helper.py index 80a1c719..b3607f44 100644 --- a/pyeudiw/jwt/helper.py +++ b/pyeudiw/jwt/helper.py @@ -96,7 +96,28 @@ def is_payload_expired(token_payload: dict) -> bool: return True return False - def is_jwt_expired(token: str) -> bool: payload = decode_jwt_payload(token) return is_payload_expired(payload) + +def validate_jwt_timestamps_claims(payload: dict) -> None: + """ + Validates the 'iat', 'exp', and 'nbf' claims in a JWT payload. + + :param payload: The decoded JWT payload. + :type payload: dict + :raises ValueError: If any of the claims are invalid. + """ + current_time = iat_now() + + if 'iat' in payload: + if payload['iat'] > current_time: + raise ValueError("Future issue time, token is invalid.") + + if 'exp' in payload: + if payload['exp'] <= current_time: + raise ValueError("The token has expired.") + + if 'nbf' in payload: + if payload['nbf'] > current_time: + raise ValueError("The token is not yet valid.") \ No newline at end of file diff --git a/pyeudiw/jwt/jws_helper.py b/pyeudiw/jwt/jws_helper.py index 89ea5c23..413965a6 100644 --- a/pyeudiw/jwt/jws_helper.py +++ b/pyeudiw/jwt/jws_helper.py @@ -11,7 +11,7 @@ from pyeudiw.jwk.exceptions import KidError from pyeudiw.jwk.jwks import find_jwk_by_kid, find_jwk_by_thumbprint from pyeudiw.jwt.exceptions import JWEEncryptionError, JWSSigningError, JWSVerificationError -from pyeudiw.jwt.helper import JWHelperInterface, find_self_contained_key, is_payload_expired, serialize_payload +from pyeudiw.jwt.helper import JWHelperInterface, find_self_contained_key, is_payload_expired, serialize_payload, validate_jwt_timestamps_claims from pyeudiw.jwk import JWK from pyeudiw.jwt.utils import decode_jwt_header @@ -234,7 +234,7 @@ def verify(self, jwt: str) -> (str | Any | bytes): # Validate JWT claims try: - self.validate_jwt_claims(msg) + validate_jwt_timestamps_claims(msg) except ValueError as e: raise JWSVerificationError(f"Invalid JWT claims: {e}") @@ -283,25 +283,3 @@ def is_sd_jwt(self, token: str) -> bool: # Log or handle errors (optional) logger.warning(f"Unable to determine if token is SD-JWT: {e}") return False - - def validate_jwt_claims(self, payload: dict) -> None: - """ - Validates the 'iat', 'exp', and 'nbf' claims in a JWT payload. - - :param payload: The decoded JWT payload. - :type payload: dict - :raises ValueError: If any of the claims are invalid. - """ - current_time = iat_now() - - if 'iat' in payload: - if payload['iat'] > current_time: - raise ValueError("Future issue time, token is invalid.") - - if 'exp' in payload: - if payload['exp'] <= current_time: - raise ValueError("The token has expired.") - - if 'nbf' in payload: - if payload['nbf'] > current_time: - raise ValueError("The token is not yet valid.") \ No newline at end of file diff --git a/pyeudiw/sd_jwt/verifier.py b/pyeudiw/sd_jwt/verifier.py index 2e85eb4f..ca1dcb36 100644 --- a/pyeudiw/sd_jwt/verifier.py +++ b/pyeudiw/sd_jwt/verifier.py @@ -1,6 +1,7 @@ import logging from pyeudiw.jwt.exceptions import JWSVerificationError +from pyeudiw.jwt.helper import validate_jwt_timestamps_claims from pyeudiw.jwt.jws_helper import JWSHelper from pyeudiw.sd_jwt.common import ( SDJWTCommon, @@ -11,15 +12,12 @@ KB_DIGEST_KEY, ) -from json import dumps, loads from typing import Dict, List, Union, Callable -from cryptojwt.jwk import JWK from cryptojwt.jwk.jwk import key_from_jwk_dict from cryptojwt.jws.jws import JWS from pyeudiw.jwt.utils import decode_jwt_payload, decode_jwt_header -from pyeudiw.tools.utils import iat_now logger = logging.getLogger(__name__) @@ -116,7 +114,7 @@ def _verify_sd_jwt( ) try: - self.validate_claims(self._sd_jwt_payload) + validate_jwt_timestamps_claims(self._sd_jwt_payload) except ValueError as e: raise JWSVerificationError(f"Invalid JWT claims: {e}") @@ -126,28 +124,6 @@ def _verify_sd_jwt( ) self._holder_public_key_payload = self._sd_jwt_payload.get("cnf", None) - - def validate_claims(self, payload: dict) -> None: - """ - Validates the 'iat', 'exp', and 'nbf' claims in a SD-JWT payload. - - :param payload: The decoded JWT payload. - :type payload: dict - :raises ValueError: If any of the claims are invalid. - """ - current_time = iat_now() - - if 'iat' in payload: - if payload['iat'] > current_time: - raise ValueError("Future issue time, token is invalid.") - - if 'exp' in payload: - if payload['exp'] <= current_time: - raise ValueError("The token has expired.") - - if 'nbf' in payload: - if payload['nbf'] > current_time: - raise ValueError("The token is not yet valid.") def _verify_key_binding_jwt( self, From f4d74c75f7ef586e9da7e26b6ef198b81a186864 Mon Sep 17 00:00:00 2001 From: LadyCodesItBetter Date: Fri, 10 Jan 2025 12:55:12 +0100 Subject: [PATCH 10/10] refactor: Add `LifetimeException` and centralized JWT timestamp validation --- pyeudiw/jwt/helper.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/pyeudiw/jwt/helper.py b/pyeudiw/jwt/helper.py index b3607f44..40c1c7b6 100644 --- a/pyeudiw/jwt/helper.py +++ b/pyeudiw/jwt/helper.py @@ -1,4 +1,6 @@ import json + +from pydantic import ValidationError from pyeudiw.jwk import JWK from pyeudiw.jwk.parse import parse_key_from_x5c @@ -100,6 +102,10 @@ def is_jwt_expired(token: str) -> bool: payload = decode_jwt_payload(token) return is_payload_expired(payload) +class LifetimeException(ValidationError): + """Exception raised for errors related to lifetime validation.""" + pass + def validate_jwt_timestamps_claims(payload: dict) -> None: """ Validates the 'iat', 'exp', and 'nbf' claims in a JWT payload. @@ -112,12 +118,12 @@ def validate_jwt_timestamps_claims(payload: dict) -> None: if 'iat' in payload: if payload['iat'] > current_time: - raise ValueError("Future issue time, token is invalid.") + raise LifetimeException("Future issue time, token is invalid.") if 'exp' in payload: if payload['exp'] <= current_time: - raise ValueError("The token has expired.") + raise LifetimeException("Token has expired.") if 'nbf' in payload: if payload['nbf'] > current_time: - raise ValueError("The token is not yet valid.") \ No newline at end of file + raise LifetimeException("Token not yet valid.") \ No newline at end of file