From 799138cba228eb9aee10372c6f1944670f399999 Mon Sep 17 00:00:00 2001 From: Salvatore Laiso <32564922+salvatorelaiso@users.noreply.github.com> Date: Fri, 3 Nov 2023 14:55:09 +0100 Subject: [PATCH] Handle expired request (#140) * feat: 403 on expired request * chore: linting --------- Co-authored-by: Salvatore Laiso --- pyeudiw/satosa/backend.py | 30 ++++++++++++++++++---------- pyeudiw/sd_jwt/__init__.py | 16 +++++++-------- pyeudiw/tests/satosa/test_backend.py | 12 ++++++++++- 3 files changed, 39 insertions(+), 19 deletions(-) diff --git a/pyeudiw/satosa/backend.py b/pyeudiw/satosa/backend.py index 731aba98..475664b5 100644 --- a/pyeudiw/satosa/backend.py +++ b/pyeudiw/satosa/backend.py @@ -417,7 +417,7 @@ def redirect_endpoint(self, context, *args): err=f"{e.__class__.__name__}: {e}", err_code="400" ) - + if stored_session["finalized"]: _msg = f"Session already finalized" return self.handle_error( @@ -427,7 +427,7 @@ def redirect_endpoint(self, context, *args): err=_msg, err_code="400" ) - + # TODO: handle vp token ops exceptions try: vpt.load_nonce(stored_session['nonce']) @@ -486,16 +486,17 @@ def redirect_endpoint(self, context, *args): # the trust is established to the credential issuer, then we can get the disclosed user attributes # TODO - what if the credential is different from sd-jwt? -> generalyze within Vp class - + try: vp.verify_sdjwt( - issuer_jwks_by_kid={i['kid']: i for i in vp.credential_jwks} + issuer_jwks_by_kid={ + i['kid']: i for i in vp.credential_jwks} ) except Exception as e: return self.handle_error( - context=context, - message="invalid_request", - troubleshoot=f"VP SD-JWT validation error: {e}", + context=context, + message="invalid_request", + troubleshoot=f"VP SD-JWT validation error: {e}", err_code="400" ) @@ -755,7 +756,7 @@ def get_response_endpoint(self, context): troubleshoot="session not found or invalid", err_code="400" ) - + _now = iat_now() _exp = finalized_session['request_object']['exp'] if _exp < _now: @@ -765,7 +766,7 @@ def get_response_endpoint(self, context): troubleshoot=f"session expired, request object exp is {_exp} while now is {_now}", err_code="400" ) - + internal_response = InternalData() resp = internal_response.from_dict( finalized_session['internal_response'] @@ -819,7 +820,16 @@ def status_endpoint(self, context): err_code="401" ) - # TODO: if the request is expired -> return 403 + request_object = session.get("request_object", None) + if request_object: + if request_object["exp"] >= iat_now(): + return self.handle_error( + context=context, + message="expired", + troubleshoot=f"Request object expired", + err_code="403" + ) + if session["finalized"]: # return Redirect( # self.registered_get_response_endpoint diff --git a/pyeudiw/sd_jwt/__init__.py b/pyeudiw/sd_jwt/__init__.py index f103adc2..14cd1d89 100644 --- a/pyeudiw/sd_jwt/__init__.py +++ b/pyeudiw/sd_jwt/__init__.py @@ -159,9 +159,9 @@ def _cb_get_issuer_key(issuer: str, settings: dict, adapted_keys: dict, *args, * def verify_sd_jwt( - sd_jwt_presentation: str, - issuer_key: JWK, - holder_key: JWK, + sd_jwt_presentation: str, + issuer_key: JWK, + holder_key: JWK, settings: dict = {'key_binding': True} ) -> dict: @@ -175,18 +175,18 @@ def verify_sd_jwt( "holder_key": jwcrypto.jwk.JWK(**holder_key.as_dict()), "issuer_public_key": jwcrypto.jwk.JWK(**issuer_key.as_dict()) } - + serialization_format = "compact" sdjwt_at_verifier = SDJWTVerifier( sd_jwt_presentation, - cb_get_issuer_key = ( + cb_get_issuer_key=( lambda x, unverified_header_parameters: _cb_get_issuer_key( x, settings, adapted_keys, **unverified_header_parameters ) ), - expected_aud = None, - expected_nonce = None, - serialization_format = serialization_format, + expected_aud=None, + expected_nonce=None, + serialization_format=serialization_format, ) return sdjwt_at_verifier.get_verified_payload() diff --git a/pyeudiw/tests/satosa/test_backend.py b/pyeudiw/tests/satosa/test_backend.py index 9bbf743f..5d6a3da6 100644 --- a/pyeudiw/tests/satosa/test_backend.py +++ b/pyeudiw/tests/satosa/test_backend.py @@ -3,7 +3,7 @@ import json import urllib.parse import uuid -from unittest.mock import Mock +from unittest.mock import Mock, patch import pytest from bs4 import BeautifulSoup @@ -337,6 +337,16 @@ def test_request_endpoint(self, context): assert payload["client_id"] == CONFIG["metadata"]["client_id"] assert payload["response_uri"] == CONFIG["metadata"]["redirect_uris"][0] + datetime_mock = Mock(wraps=datetime.datetime) + datetime_mock.now.return_value = datetime.datetime(1999, 1, 1) + with patch('datetime.datetime', new=datetime_mock): + self.backend.status_endpoint(context) + state_endpoint_response = self.backend.status_endpoint(context) + assert state_endpoint_response.status == "403" + assert state_endpoint_response.message + err = json.loads(state_endpoint_response.message) + assert err["error"] == "expired" + # TODO - the authentication is successful ONLY if redirect_endpoints gets a valid presentation! # state_endpoint_response = self.backend.status_endpoint(context) # assert state_endpoint_response.status == "302"