Skip to content

Commit

Permalink
Handle expired request (#140)
Browse files Browse the repository at this point in the history
* feat: 403 on expired request

* chore: linting

---------

Co-authored-by: Salvatore Laiso <[email protected]>
  • Loading branch information
salvatorelaiso and Salvatore Laiso authored Nov 3, 2023
1 parent 9a32ad1 commit 799138c
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 19 deletions.
30 changes: 20 additions & 10 deletions pyeudiw/satosa/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ def redirect_endpoint(self, context, *args):
err=f"{e.__class__.__name__}: {e}",
err_code="400"
)

if stored_session["finalized"]:
_msg = f"Session already finalized"
return self.handle_error(
Expand All @@ -427,7 +427,7 @@ def redirect_endpoint(self, context, *args):
err=_msg,
err_code="400"
)

# TODO: handle vp token ops exceptions
try:
vpt.load_nonce(stored_session['nonce'])
Expand Down Expand Up @@ -486,16 +486,17 @@ def redirect_endpoint(self, context, *args):

# the trust is established to the credential issuer, then we can get the disclosed user attributes
# TODO - what if the credential is different from sd-jwt? -> generalyze within Vp class

try:
vp.verify_sdjwt(
issuer_jwks_by_kid={i['kid']: i for i in vp.credential_jwks}
issuer_jwks_by_kid={
i['kid']: i for i in vp.credential_jwks}
)
except Exception as e:
return self.handle_error(
context=context,
message="invalid_request",
troubleshoot=f"VP SD-JWT validation error: {e}",
context=context,
message="invalid_request",
troubleshoot=f"VP SD-JWT validation error: {e}",
err_code="400"
)

Expand Down Expand Up @@ -755,7 +756,7 @@ def get_response_endpoint(self, context):
troubleshoot="session not found or invalid",
err_code="400"
)

_now = iat_now()
_exp = finalized_session['request_object']['exp']
if _exp < _now:
Expand All @@ -765,7 +766,7 @@ def get_response_endpoint(self, context):
troubleshoot=f"session expired, request object exp is {_exp} while now is {_now}",
err_code="400"
)

internal_response = InternalData()
resp = internal_response.from_dict(
finalized_session['internal_response']
Expand Down Expand Up @@ -819,7 +820,16 @@ def status_endpoint(self, context):
err_code="401"
)

# TODO: if the request is expired -> return 403
request_object = session.get("request_object", None)
if request_object:
if request_object["exp"] >= iat_now():
return self.handle_error(
context=context,
message="expired",
troubleshoot=f"Request object expired",
err_code="403"
)

if session["finalized"]:
# return Redirect(
# self.registered_get_response_endpoint
Expand Down
16 changes: 8 additions & 8 deletions pyeudiw/sd_jwt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,9 @@ def _cb_get_issuer_key(issuer: str, settings: dict, adapted_keys: dict, *args, *


def verify_sd_jwt(
sd_jwt_presentation: str,
issuer_key: JWK,
holder_key: JWK,
sd_jwt_presentation: str,
issuer_key: JWK,
holder_key: JWK,
settings: dict = {'key_binding': True}
) -> dict:

Expand All @@ -175,18 +175,18 @@ def verify_sd_jwt(
"holder_key": jwcrypto.jwk.JWK(**holder_key.as_dict()),
"issuer_public_key": jwcrypto.jwk.JWK(**issuer_key.as_dict())
}

serialization_format = "compact"
sdjwt_at_verifier = SDJWTVerifier(
sd_jwt_presentation,
cb_get_issuer_key = (
cb_get_issuer_key=(
lambda x, unverified_header_parameters: _cb_get_issuer_key(
x, settings, adapted_keys, **unverified_header_parameters
)
),
expected_aud = None,
expected_nonce = None,
serialization_format = serialization_format,
expected_aud=None,
expected_nonce=None,
serialization_format=serialization_format,
)

return sdjwt_at_verifier.get_verified_payload()
12 changes: 11 additions & 1 deletion pyeudiw/tests/satosa/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import json
import urllib.parse
import uuid
from unittest.mock import Mock
from unittest.mock import Mock, patch

import pytest
from bs4 import BeautifulSoup
Expand Down Expand Up @@ -337,6 +337,16 @@ def test_request_endpoint(self, context):
assert payload["client_id"] == CONFIG["metadata"]["client_id"]
assert payload["response_uri"] == CONFIG["metadata"]["redirect_uris"][0]

datetime_mock = Mock(wraps=datetime.datetime)
datetime_mock.now.return_value = datetime.datetime(1999, 1, 1)
with patch('datetime.datetime', new=datetime_mock):
self.backend.status_endpoint(context)
state_endpoint_response = self.backend.status_endpoint(context)
assert state_endpoint_response.status == "403"
assert state_endpoint_response.message
err = json.loads(state_endpoint_response.message)
assert err["error"] == "expired"

# TODO - the authentication is successful ONLY if redirect_endpoints gets a valid presentation!
# state_endpoint_response = self.backend.status_endpoint(context)
# assert state_endpoint_response.status == "302"
Expand Down

0 comments on commit 799138c

Please sign in to comment.