From 6b84b0892b2480d27e20e8e00994a46904d2d59a Mon Sep 17 00:00:00 2001 From: Salvatore Laiso <32564922+salvatorelaiso@users.noreply.github.com> Date: Fri, 1 Dec 2023 16:33:20 +0100 Subject: [PATCH] fix: add specific exception handling for VP token (#194) Co-authored-by: Salvatore Laiso --- pyeudiw/satosa/backend.py | 30 +++++- pyeudiw/tests/satosa/test_backend.py | 153 +++++++++++++++++++++++++++ 2 files changed, 181 insertions(+), 2 deletions(-) diff --git a/pyeudiw/satosa/backend.py b/pyeudiw/satosa/backend.py index c32aa47e..45ff40d8 100644 --- a/pyeudiw/satosa/backend.py +++ b/pyeudiw/satosa/backend.py @@ -29,7 +29,7 @@ from pyeudiw.openid4vp.direct_post_response import DirectPostResponse from pyeudiw.openid4vp.exceptions import ( KIDNotFound, - InvalidVPToken + InvalidVPToken, VPNotFound, NoNonceInVPToken, VPInvalidNonce ) from pyeudiw.storage.db_engine import DBEngine from pyeudiw.storage.exceptions import StorageWriteError @@ -428,11 +428,37 @@ def redirect_endpoint(self, context, *args): err_code="400" ) - # TODO: handle vp token ops exceptions try: vpt.load_nonce(stored_session['nonce']) vps: list = vpt.get_presentation_vps() vpt.validate() + except VPNotFound as e: + _msg = "Error while retrieving VP. Payload 'vp_token' is empty or has an unexpected value." + return self.handle_error( + context=context, + message="invalid_request", + troubleshoot=_msg, + err=f"{e.__class__.__name__}: {e}", + err_code="400" + ) + except NoNonceInVPToken as e: + _msg = "Error while validating VP: vp has no nonce." + return self.handle_error( + context=context, + message="invalid_request", + troubleshoot=_msg, + err=f"{e.__class__.__name__}: {e}", + err_code="400" + ) + except VPInvalidNonce as e: + _msg = "Error while validating VP: unexpected value." + return self.handle_error( + context=context, + message="invalid_request", + troubleshoot=_msg, + err=f"{e.__class__.__name__}: {e}", + err_code="400" + ) except Exception as e: _msg = ( "DirectPostResponse content parse and validation error. " diff --git a/pyeudiw/tests/satosa/test_backend.py b/pyeudiw/tests/satosa/test_backend.py index 7bc5a0b0..15b9540e 100644 --- a/pyeudiw/tests/satosa/test_backend.py +++ b/pyeudiw/tests/satosa/test_backend.py @@ -155,6 +155,159 @@ def test_pre_request_endpoint_mobile(self, context): assert qs["request_uri"][0].startswith( CONFIG["metadata"]["request_uris"][0]) + def test_vp_validation_in_redirect_endpoint(self, context): + self.backend.register_endpoints() + + issuer_jwk = JWK(leaf_cred_jwk_prot.serialize(private=True)) + holder_jwk = JWK(leaf_wallet_jwk.serialize(private=True)) + + settings = ISSUER_CONF + settings['issuer'] = "https://issuer.example.com" + settings['default_exp'] = CONFIG['jwt']['default_exp'] + + sd_specification = load_specification_from_yaml_string( + settings["sd_specification"]) + + issued_jwt = issue_sd_jwt( + sd_specification, + settings, + issuer_jwk, + holder_jwk, + trust_chain=trust_chain_issuer + ) + + _adapt_keys(issuer_jwk, holder_jwk) + + sdjwt_at_holder = SDJWTHolder( + issued_jwt["issuance"], + serialization_format="compact", + ) + + nonce = str(uuid.uuid4()) + sdjwt_at_holder.create_presentation( + {}, + nonce, + str(uuid.uuid4()), + import_pyca_pri_rsa(holder_jwk.key.priv_key, kid=holder_jwk.kid) if sd_specification.get( + "key_binding", False) else None, + sign_alg=DEFAULT_SIG_KTY_MAP[holder_jwk.key.kty], + ) + + data = { + "iss": "https://wallet-provider.example.org/instance/vbeXJksM45xphtANnCiG6mCyuU4jfGNzopGuKvogg9c", + "jti": str(uuid.uuid4()), + "aud": "https://verifier.example.org/callback", + "iat": iat_now(), + "exp": exp_from_now(minutes=15), + "nonce": nonce, + "vp": sdjwt_at_holder.sd_jwt_presentation, + } + + vp_token = JWSHelper(leaf_wallet_jwk.serialize(private=True)).sign( + data, + protected={"typ": "JWT"} + ) + + context.request_method = "POST" + context.request_uri = CONFIG["metadata"]["redirect_uris"][0].removeprefix(CONFIG["base_url"]) + + state = str(uuid.uuid4()) + response = { + "nonce": nonce, + "state": state, + "vp_token": vp_token, + "presentation_submission": { + "definition_id": "32f54163-7166-48f1-93d8-ff217bdb0653", + "id": "04a98be3-7fb0-4cf5-af9a-31579c8b0e7d", + "descriptor_map": [ + { + "id": "pid-sd-jwt:unique_id+given_name+family_name", + "path": "$.vp_token.verified_claims.claims._sd[0]", + "format": "vc+sd-jwt" + } + ] + } + } + session_id = context.state["SESSION_ID"] + self.backend.db_engine.init_session( + state=state, + session_id=session_id + ) + doc_id = self.backend.db_engine.get_by_state(state)["document_id"] + + # Put a different nonce in the stored request object. + # This will trigger a `VPInvalidNonce` error + self.backend.db_engine.update_request_object( + document_id=doc_id, + request_object={"nonce": str(uuid.uuid4()), "state": state}) + + encrypted_response = JWEHelper( + JWK(CONFIG["metadata_jwks"][1])).encrypt(response) + context.request = { + "response": encrypted_response + } + redirect_endpoint = self.backend.redirect_endpoint(context) + assert redirect_endpoint.status == "400" + msg = json.loads(redirect_endpoint.message) + assert msg["error"] == "invalid_request" + assert msg["error_description"] == "Error while validating VP: unexpected value." + + # Recreate data without nonce + # This will trigger a `NoNonceInVPToken` error + data = { + "iss": "https://wallet-provider.example.org/instance/vbeXJksM45xphtANnCiG6mCyuU4jfGNzopGuKvogg9c", + "jti": str(uuid.uuid4()), + "aud": "https://verifier.example.org/callback", + "iat": iat_now(), + "exp": exp_from_now(minutes=15), + "vp": sdjwt_at_holder.sd_jwt_presentation, + } + + vp_token = JWSHelper(leaf_wallet_jwk.serialize(private=True)).sign( + data, + protected={"typ": "JWT"} + ) + response = { + "nonce": nonce, + "state": state, + "vp_token": vp_token, + "presentation_submission": { + "definition_id": "32f54163-7166-48f1-93d8-ff217bdb0653", + "id": "04a98be3-7fb0-4cf5-af9a-31579c8b0e7d", + "descriptor_map": [ + { + "id": "pid-sd-jwt:unique_id+given_name+family_name", + "path": "$.vp_token.verified_claims.claims._sd[0]", + "format": "vc+sd-jwt" + } + ] + } + } + encrypted_response = JWEHelper( + JWK(CONFIG["metadata_jwks"][1])).encrypt(response) + context.request = { + "response": encrypted_response + } + redirect_endpoint = self.backend.redirect_endpoint(context) + assert redirect_endpoint.status == "400" + msg = json.loads(redirect_endpoint.message) + assert msg["error"] == "invalid_request" + assert msg["error_description"] == "Error while validating VP: vp has no nonce." + + # This will trigger a `UnicodeDecodeError` which will be caught by the generic `Exception case`. + response["vp_token"] = "asd.fgh.jkl" + encrypted_response = JWEHelper( + JWK(CONFIG["metadata_jwks"][1])).encrypt(response) + context.request = { + "response": encrypted_response + } + redirect_endpoint = self.backend.redirect_endpoint(context) + assert redirect_endpoint.status == "400" + msg = json.loads(redirect_endpoint.message) + assert msg["error"] == "invalid_request" + assert msg["error_description"] == "DirectPostResponse content parse and validation error. Single VPs are faulty." + + def test_redirect_endpoint(self, context): self.backend.register_endpoints()