diff --git a/pyeudiw/federation/__init__.py b/pyeudiw/federation/__init__.py index afdc418d..e6c380cb 100644 --- a/pyeudiw/federation/__init__.py +++ b/pyeudiw/federation/__init__.py @@ -1,10 +1,11 @@ from pyeudiw.federation.schemas.entity_configuration import EntityStatementPayload, EntityConfigurationPayload + def is_es(payload: dict) -> bool: """ - Determines if payload dict is an Entity Statement + Determines if payload dict is a Subordinate Entity Statement - :param payload: the object to determine if is an Entity Statement + :param payload: the object to determine if is a Subordinate Entity Statement :type payload: dict :returns: True if is an Entity Statement and False otherwise @@ -34,4 +35,4 @@ def is_ec(payload: dict) -> bool: EntityConfigurationPayload(**payload) return True except Exception as e: - return False \ No newline at end of file + return False diff --git a/pyeudiw/federation/statements.py b/pyeudiw/federation/statements.py index b74012fa..afe9f27a 100644 --- a/pyeudiw/federation/statements.py +++ b/pyeudiw/federation/statements.py @@ -1,5 +1,3 @@ -from __future__ import annotations - from copy import deepcopy from pyeudiw.federation.exceptions import ( UnknownKid, @@ -13,22 +11,15 @@ EntityConfigurationHeader, EntityStatementPayload ) +from pydantic import ValidationError from pyeudiw.jwt.utils import decode_jwt_payload, decode_jwt_header from pyeudiw.jwt import JWSHelper -from pyeudiw.tools.utils import get_http_url -from pydantic import ValidationError - from pyeudiw.jwk import find_jwk +from pyeudiw.tools.utils import get_http_url import json import logging -try: - pass -except ImportError: # pragma: no cover - pass - - OIDCFED_FEDERATION_WELLKNOWN_URL = ".well-known/openid-federation" logger = logging.getLogger(__name__) @@ -67,7 +58,6 @@ def get_federation_jwks(jwt_payload: dict) -> list[dict]: jwks = jwt_payload.get("jwks", {}) keys = jwks.get("keys", []) - return keys @@ -87,7 +77,6 @@ def get_entity_statements(urls: list[str] | str, httpc_params: dict, http_async: """ urls = urls if isinstance(urls, list) else [urls] - for url in urls: logger.debug(f"Starting Entity Statement Request to {url}") diff --git a/pyeudiw/federation/trust_chain_validator.py b/pyeudiw/federation/trust_chain_validator.py index 6426d9b8..6b6026b6 100644 --- a/pyeudiw/federation/trust_chain_validator.py +++ b/pyeudiw/federation/trust_chain_validator.py @@ -132,18 +132,27 @@ def validate(self) -> bool: ) if not ta_jwk: + logger.error( + f"Trust chain validation error: TA jwks not found." + ) return False # Validate the last statement with ta_jwk jwsh = JWSHelper(ta_jwk) if not jwsh.verify(last_element): + logger.error( + f"Trust chain signature validation error: {last_element} using {ta_jwk}" + ) return False # then go ahead with other checks self.exp = es_payload["exp"] if self._check_expired(self.exp): + logger.error( + f"Trust chain validation error, statement expired: {es_payload}" + ) return False fed_jwks = es_payload["jwks"]["keys"] @@ -160,10 +169,16 @@ def validate(self) -> bool: st_header.get("kid", None), fed_jwks ) except (KidNotFoundError, InvalidKid): + logger.error( + f"Trust chain validation KidNotFoundError: {st_header} not in {fed_jwks}" + ) return False jwsh = JWSHelper(jwk) if not jwsh.verify(st): + logger.error( + f"Trust chain signature validation error: {st} using {jwk}" + ) return False else: fed_jwks = st_payload["jwks"]["keys"] @@ -184,8 +199,9 @@ def _retrieve_ec(self, iss: str) -> str: """ jwt = get_entity_configurations(iss, self.httpc_params) if not jwt: - raise HttpError( - f"Cannot get the Entity Configuration from {iss}") + _msg = f"Cannot get the Entity Configuration from {iss}" + logger.error(_msg) + raise HttpError(_msg) # is something weird these will raise their Exceptions return jwt[0] @@ -204,9 +220,9 @@ def _retrieve_es(self, download_url: str, iss: str) -> str: """ jwt = get_entity_statements(download_url, self.httpc_params) if not jwt: - logger.warning( - f"Cannot fast refresh Entity Statement {iss}" - ) + _msg = f"Cannot fast refresh Entity Statement {iss}" + logger.warning(_msg) + raise HttpError(_msg) if isinstance(jwt, list) and jwt: return jwt[0] return jwt diff --git a/pyeudiw/oauth2/dpop/__init__.py b/pyeudiw/oauth2/dpop/__init__.py index 986efd44..edfdffdf 100644 --- a/pyeudiw/oauth2/dpop/__init__.py +++ b/pyeudiw/oauth2/dpop/__init__.py @@ -101,6 +101,8 @@ def __init__( "Jwk validation error, " f"{e.__class__.__name__}: {e}" ) + raise ValueError("JWK schema validation error during DPoP init") + # If the jwt is invalid, this will raise an exception try: decode_jwt_header(http_header_dpop) diff --git a/pyeudiw/openid4vp/direct_post_response.py b/pyeudiw/openid4vp/direct_post_response.py index a51ad414..97d1e3a7 100644 --- a/pyeudiw/openid4vp/direct_post_response.py +++ b/pyeudiw/openid4vp/direct_post_response.py @@ -12,6 +12,7 @@ from pyeudiw.openid4vp.vp import Vp from pydantic import ValidationError + class DirectPostResponse: """ Helper class for generate Direct Post Response. @@ -90,8 +91,10 @@ def _validate_vp(self, vp: dict) -> bool: ) VPTokenPayload(**vp.payload) VPTokenHeader(**vp.headers) - except ValidationError: - return False + except ValidationError as e: + raise InvalidVPToken( + f"VP is not valid, {e}: {vp.headers}.{vp.payload}" + ) return True @@ -102,12 +105,19 @@ def validate(self) -> bool: :returns: True if all VP are valid, False otherwhise. :rtype: bool """ - + all_valid = None for vp in self.get_presentation_vps(): - if not self._validate_vp(vp): - return False - - return True + try: + self._validate_vp(vp) + if all_valid == None: + all_valid = True + except Exception as e: + logger.error( + + ) + all_valid = False + + return all_valid def get_presentation_vps(self) -> list[Vp]: """ @@ -125,16 +135,18 @@ def get_presentation_vps(self) -> list[Vp]: vps = [_vps] if isinstance(_vps, str) else _vps if not vps: - raise VPNotFound(f"Vps are empty for response with nonce \"{self.nonce}\"") + raise VPNotFound( + f'Vps are empty for response with nonce "{self.nonce}"' + ) for vp in vps: + # TODO - add an exception handling here _vp = Vp(vp) self._vps.append(_vp) cred_iss = _vp.credential_payload['iss'] if not self.credentials_by_issuer.get(cred_iss, None): self.credentials_by_issuer[cred_iss] = [] - self.credentials_by_issuer[cred_iss].append(_vp.payload['vp']) return self._vps @@ -151,4 +163,4 @@ def payload(self) -> dict: """Returns the decoded payload of presentation""" if not self._payload: self._decode_payload() - return self._payload \ No newline at end of file + return self._payload diff --git a/pyeudiw/satosa/dpop.py b/pyeudiw/satosa/dpop.py index 778b0696..9e4da208 100644 --- a/pyeudiw/satosa/dpop.py +++ b/pyeudiw/satosa/dpop.py @@ -12,6 +12,7 @@ from pyeudiw.tools.base_logger import BaseLogger from .base_http_error_handler import BaseHTTPErrorHandler + class BackendDPoP(BaseHTTPErrorHandler, BaseLogger): """ Backend DPoP class. @@ -50,9 +51,13 @@ def _request_endpoint_dpop(self, context: Context, *args) -> Union[JsonResponse, try: WalletInstanceAttestationPayload(**wia) except ValidationError as e: - self._log_warning(context, message=f"[FOUND WIA] Invalid WIA: {wia}! \nValidation error: {e}") + _msg = f"[FOUND WIA] Invalid WIA: {wia}! \nValidation error: {e}" + self._log_warning(context, message=_msg) + # return self._handle_401(context, _msg, e) except Exception as e: - self._log_warning(context, message=f"[FOUND WIA] Invalid WIA: {wia}! \nUnexpected error: {e}") + _msg = f"[FOUND WIA] Invalid WIA: {wia}! \nUnexpected error: {e}" + self._log_warning(context, message=_msg) + # return self._handle_401(context, _msg, e) try: self._validate_trust(context, dpop_jws) @@ -84,7 +89,7 @@ def _request_endpoint_dpop(self, context: Context, *args) -> Union[JsonResponse, else: _msg = ( - "The Wallet Instance doesn't provide a valid Wallet Instance Attestation " + "The Wallet Instance doesn't provide a valid Wallet Attestation " "a default set of capabilities and a low security level are applied." ) - self._log_warning(context, message=_msg) \ No newline at end of file + self._log_warning(context, message=_msg) diff --git a/pyeudiw/satosa/trust.py b/pyeudiw/satosa/trust.py index 86540b31..a3232444 100644 --- a/pyeudiw/satosa/trust.py +++ b/pyeudiw/satosa/trust.py @@ -16,6 +16,7 @@ from pyeudiw.tools.base_logger import BaseLogger + class BackendTrust(BaseLogger): """ Backend Trust class. @@ -41,7 +42,10 @@ def init_trust_resources(self) -> None: try: self.get_backend_trust_chain() except Exception as e: - self._log_critical("Backend Trust", f"Cannot fetch the trust anchor configuration: {e}") + self._log_critical( + "Backend Trust", + f"Cannot fetch the trust anchor configuration: {e}" + ) self.db_engine.close() self._db_engine = None @@ -57,10 +61,9 @@ def entity_configuration_endpoint(self, context: Context) -> Response: :rtype: Response """ - data = self.entity_configuration_as_dict if context.qs_params.get('format', '') == 'json': return Response( - json.dumps(data), + json.dumps(self.entity_configuration_as_dict), status="200", content="application/json" ) @@ -101,21 +104,21 @@ def get_backend_trust_chain(self) -> list[str]: """ try: trust_evaluation_helper = TrustEvaluationHelper.build_trust_chain_for_entity_id( - storage=self.db_engine, - entity_id=self.client_id, - entity_configuration=self.entity_configuration, - httpc_params=self.config['network']['httpc_params'] + storage = self.db_engine, + entity_id = self.client_id, + entity_configuration = self.entity_configuration, + httpc_params = self.config['network']['httpc_params'] ) self.db_engine.add_or_update_trust_attestation( - entity_id=self.client_id, - attestation=trust_evaluation_helper.trust_chain, - exp=trust_evaluation_helper.exp + entity_id = self.client_id, + attestation = trust_evaluation_helper.trust_chain, + exp = trust_evaluation_helper.exp ) return trust_evaluation_helper.trust_chain except (DiscoveryFailedError, EntryNotFound, Exception) as e: message = ( - f"Error while building trust chain for client with id: {self.client_id}\n" + f"Error while building trust chain for client with id: {self.client_id}. " f"{e.__class__.__name__}: {e}" ) self._log_warning("Trust Chain", message) @@ -154,7 +157,6 @@ def _validate_trust(self, context: Context, jws: str) -> TrustEvaluationHelper: f"{trust_eval.entity_id}" ) self._log_error(context, message) - raise NotTrustedFederationError( f"{trust_eval.entity_id} not found for Trust evaluation." ) @@ -164,7 +166,6 @@ def _validate_trust(self, context: Context, jws: str) -> TrustEvaluationHelper: f"{trust_eval.entity_id}: {e}" ) self._log_error(context, message) - raise NotTrustedFederationError( f"{trust_eval.entity_id} is not trusted." ) @@ -208,4 +209,4 @@ def entity_configuration(self) -> dict: "typ": "entity-statement+jwt" }, plain_dict=data - ) \ No newline at end of file + ) diff --git a/pyeudiw/x509/verify.py b/pyeudiw/x509/verify.py index a5835318..768f73f2 100644 --- a/pyeudiw/x509/verify.py +++ b/pyeudiw/x509/verify.py @@ -9,6 +9,7 @@ logger = logging.getLogger(__name__) + def _verify_x509_certificate_chain(pems: list[str]): """ Verify the x509 certificate chain. @@ -21,25 +22,28 @@ def _verify_x509_certificate_chain(pems: list[str]): """ try: store = crypto.X509Store() - - x509_certs = [crypto.load_certificate(crypto.FILETYPE_PEM, str(pem)) for pem in pems] + x509_certs = [ + crypto.load_certificate(crypto.FILETYPE_PEM, str(pem)) + for pem in pems + ] for cert in x509_certs[:-1]: store.add_cert(cert) store_ctx = crypto.X509StoreContext(store, x509_certs[-1]) - store_ctx.verify_certificate() return True except crypto.Error as e: _message = f"cert's chain result invalid for the following reason -> {e}" logging.warning(LOG_ERROR.format(_message)) + return False except Exception as e: _message = f"cert's chain cannot be validated for error -> {e}" logging.warning(LOG_ERROR.format(e)) return False - + + def _check_chain_len(pems: list) -> bool: """ Check the x509 certificate chain lenght. @@ -50,16 +54,15 @@ def _check_chain_len(pems: list) -> bool: :returns: True if the x509 certificate chain lenght is valid else False :rtype: bool """ - chain_len = len(pems) - if chain_len < 2: message = f"invalid chain lenght -> minimum expected 2 found {chain_len}" logging.warning(LOG_ERROR.format(message)) return False return True - + + def _check_datetime(exp: datetime | None): """ Check the x509 certificate chain expiration date. @@ -80,6 +83,7 @@ def _check_datetime(exp: datetime | None): return True + def verify_x509_attestation_chain(x5c: list[bytes], exp: datetime | None = None) -> bool: """ Verify the x509 attestation certificate chain. @@ -99,7 +103,8 @@ def verify_x509_attestation_chain(x5c: list[bytes], exp: datetime | None = None) pems = [DER_cert_to_PEM_cert(cert) for cert in x5c] return _verify_x509_certificate_chain(pems) - + + def verify_x509_anchor(pem_str: str, exp: datetime | None = None) -> bool: """ Verify the x509 anchor certificate. @@ -113,15 +118,18 @@ def verify_x509_anchor(pem_str: str, exp: datetime | None = None) -> bool: :rtype: bool """ if not _check_datetime(exp): + logging.error(LOG_ERROR.format("check datetime failed")) return False pems = [str(cert) for cert in pem.parse(pem_str)] if not _check_chain_len(pems): + logging.error(LOG_ERROR.format("check chain len failed")) return False return _verify_x509_certificate_chain(pems) + def get_issuer_from_x5c(x5c: list[bytes]) -> str: """ Get the issuer from the x509 certificate chain. @@ -135,6 +143,7 @@ def get_issuer_from_x5c(x5c: list[bytes]) -> str: cert = load_der_x509_certificate(x5c[-1]) return cert.subject.rfc4514_string().split("=")[1] + def is_der_format(cert: bytes) -> str: """ Check if the certificate is in DER format. @@ -149,5 +158,6 @@ def is_der_format(cert: bytes) -> str: pem = DER_cert_to_PEM_cert(cert) crypto.load_certificate(crypto.FILETYPE_PEM, str(pem)) return True - except crypto.Error: + except crypto.Error as e: + logging.error(LOG_ERROR.format(e)) return False