Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add specific exception handling for VP token #194

Merged
merged 1 commit into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions pyeudiw/satosa/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from pyeudiw.openid4vp.direct_post_response import DirectPostResponse
from pyeudiw.openid4vp.exceptions import (
KIDNotFound,
InvalidVPToken
InvalidVPToken, VPNotFound, NoNonceInVPToken, VPInvalidNonce
)
from pyeudiw.storage.db_engine import DBEngine
from pyeudiw.storage.exceptions import StorageWriteError
Expand Down Expand Up @@ -428,11 +428,37 @@ def redirect_endpoint(self, context, *args):
err_code="400"
)

# TODO: handle vp token ops exceptions
try:
vpt.load_nonce(stored_session['nonce'])
vps: list = vpt.get_presentation_vps()
vpt.validate()
except VPNotFound as e:
_msg = "Error while retrieving VP. Payload 'vp_token' is empty or has an unexpected value."
return self.handle_error(
context=context,
message="invalid_request",
troubleshoot=_msg,
err=f"{e.__class__.__name__}: {e}",
err_code="400"
)
except NoNonceInVPToken as e:
_msg = "Error while validating VP: vp has no nonce."
return self.handle_error(
context=context,
message="invalid_request",
troubleshoot=_msg,
err=f"{e.__class__.__name__}: {e}",
err_code="400"
)
except VPInvalidNonce as e:
_msg = "Error while validating VP: unexpected value."
return self.handle_error(
context=context,
message="invalid_request",
troubleshoot=_msg,
err=f"{e.__class__.__name__}: {e}",
err_code="400"
)
except Exception as e:
_msg = (
"DirectPostResponse content parse and validation error. "
Expand Down
153 changes: 153 additions & 0 deletions pyeudiw/tests/satosa/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,159 @@ def test_pre_request_endpoint_mobile(self, context):
assert qs["request_uri"][0].startswith(
CONFIG["metadata"]["request_uris"][0])

def test_vp_validation_in_redirect_endpoint(self, context):
self.backend.register_endpoints()

issuer_jwk = JWK(leaf_cred_jwk_prot.serialize(private=True))
holder_jwk = JWK(leaf_wallet_jwk.serialize(private=True))

settings = ISSUER_CONF
settings['issuer'] = "https://issuer.example.com"
settings['default_exp'] = CONFIG['jwt']['default_exp']

sd_specification = load_specification_from_yaml_string(
settings["sd_specification"])

issued_jwt = issue_sd_jwt(
sd_specification,
settings,
issuer_jwk,
holder_jwk,
trust_chain=trust_chain_issuer
)

_adapt_keys(issuer_jwk, holder_jwk)

sdjwt_at_holder = SDJWTHolder(
issued_jwt["issuance"],
serialization_format="compact",
)

nonce = str(uuid.uuid4())
sdjwt_at_holder.create_presentation(
{},
nonce,
str(uuid.uuid4()),
import_pyca_pri_rsa(holder_jwk.key.priv_key, kid=holder_jwk.kid) if sd_specification.get(
"key_binding", False) else None,
sign_alg=DEFAULT_SIG_KTY_MAP[holder_jwk.key.kty],
)

data = {
"iss": "https://wallet-provider.example.org/instance/vbeXJksM45xphtANnCiG6mCyuU4jfGNzopGuKvogg9c",
"jti": str(uuid.uuid4()),
"aud": "https://verifier.example.org/callback",
"iat": iat_now(),
"exp": exp_from_now(minutes=15),
"nonce": nonce,
"vp": sdjwt_at_holder.sd_jwt_presentation,
}

vp_token = JWSHelper(leaf_wallet_jwk.serialize(private=True)).sign(
data,
protected={"typ": "JWT"}
)

context.request_method = "POST"
context.request_uri = CONFIG["metadata"]["redirect_uris"][0].removeprefix(CONFIG["base_url"])

state = str(uuid.uuid4())
response = {
"nonce": nonce,
"state": state,
"vp_token": vp_token,
"presentation_submission": {
"definition_id": "32f54163-7166-48f1-93d8-ff217bdb0653",
"id": "04a98be3-7fb0-4cf5-af9a-31579c8b0e7d",
"descriptor_map": [
{
"id": "pid-sd-jwt:unique_id+given_name+family_name",
"path": "$.vp_token.verified_claims.claims._sd[0]",
"format": "vc+sd-jwt"
}
]
}
}
session_id = context.state["SESSION_ID"]
self.backend.db_engine.init_session(
state=state,
session_id=session_id
)
doc_id = self.backend.db_engine.get_by_state(state)["document_id"]

# Put a different nonce in the stored request object.
# This will trigger a `VPInvalidNonce` error
self.backend.db_engine.update_request_object(
document_id=doc_id,
request_object={"nonce": str(uuid.uuid4()), "state": state})

encrypted_response = JWEHelper(
JWK(CONFIG["metadata_jwks"][1])).encrypt(response)
context.request = {
"response": encrypted_response
}
redirect_endpoint = self.backend.redirect_endpoint(context)
assert redirect_endpoint.status == "400"
msg = json.loads(redirect_endpoint.message)
assert msg["error"] == "invalid_request"
assert msg["error_description"] == "Error while validating VP: unexpected value."

# Recreate data without nonce
# This will trigger a `NoNonceInVPToken` error
data = {
"iss": "https://wallet-provider.example.org/instance/vbeXJksM45xphtANnCiG6mCyuU4jfGNzopGuKvogg9c",
"jti": str(uuid.uuid4()),
"aud": "https://verifier.example.org/callback",
"iat": iat_now(),
"exp": exp_from_now(minutes=15),
"vp": sdjwt_at_holder.sd_jwt_presentation,
}

vp_token = JWSHelper(leaf_wallet_jwk.serialize(private=True)).sign(
data,
protected={"typ": "JWT"}
)
response = {
"nonce": nonce,
"state": state,
"vp_token": vp_token,
"presentation_submission": {
"definition_id": "32f54163-7166-48f1-93d8-ff217bdb0653",
"id": "04a98be3-7fb0-4cf5-af9a-31579c8b0e7d",
"descriptor_map": [
{
"id": "pid-sd-jwt:unique_id+given_name+family_name",
"path": "$.vp_token.verified_claims.claims._sd[0]",
"format": "vc+sd-jwt"
}
]
}
}
encrypted_response = JWEHelper(
JWK(CONFIG["metadata_jwks"][1])).encrypt(response)
context.request = {
"response": encrypted_response
}
redirect_endpoint = self.backend.redirect_endpoint(context)
assert redirect_endpoint.status == "400"
msg = json.loads(redirect_endpoint.message)
assert msg["error"] == "invalid_request"
assert msg["error_description"] == "Error while validating VP: vp has no nonce."

# This will trigger a `UnicodeDecodeError` which will be caught by the generic `Exception case`.
response["vp_token"] = "asd.fgh.jkl"
encrypted_response = JWEHelper(
JWK(CONFIG["metadata_jwks"][1])).encrypt(response)
context.request = {
"response": encrypted_response
}
redirect_endpoint = self.backend.redirect_endpoint(context)
assert redirect_endpoint.status == "400"
msg = json.loads(redirect_endpoint.message)
assert msg["error"] == "invalid_request"
assert msg["error_description"] == "DirectPostResponse content parse and validation error. Single VPs are faulty."


def test_redirect_endpoint(self, context):
self.backend.register_endpoints()

Expand Down
Loading