diff --git a/kapitan/refs/vault_resources.py b/kapitan/refs/vault_resources.py index 6720d9cc9..c361ae932 100644 --- a/kapitan/refs/vault_resources.py +++ b/kapitan/refs/vault_resources.py @@ -9,6 +9,7 @@ import os import hvac +from requests.exceptions import SSLError from kapitan.errors import KapitanError @@ -26,8 +27,9 @@ class VaultClient(hvac.Client): def __init__(self, vault_parameters): self.vault_parameters = vault_parameters - client_parameters = get_env(vault_parameters) - super().__init__(**client_parameters) + + self.client_parameters = self.get_env() + super().__init__(**self.client_parameters) self.authenticate() @@ -64,7 +66,11 @@ def read_token_from_file(self, token_file): def authenticate(self): # different login method based on authentication type - auth_type = self.vault_parameters["auth"] + auth_type = self.vault_parameters.get("auth") + if not auth_type: + raise VaultError( + "No authentication method defined. Please specify it in 'parameters.kapitan.secrets.vaultkv.auth'." + ) token = self.get_auth_token() username = os.getenv("VAULT_USERNAME") @@ -109,62 +115,73 @@ def authenticate(self): else: raise VaultError(f"Authentication type '{auth_type}' not supported") - if not self.is_authenticated(): + try: + authenticated = self.is_authenticated() + except SSLError as e: + ca_path = os.path.abspath(self.client_parameters["verify"]) + raise VaultError(f"Error while verifying ca-bundle at {ca_path}: {e.__context__.__context__}") + + if not authenticated: self.adapter.close() raise VaultError("Vault Authentication Error, Environment Variables defined?") - -def get_env(vault_parameters): - """ - The following variables need to be exported to the environment or defined in inventory. - * VAULT_ADDR: url for vault - * VAULT_SKIP_VERIFY: if set, do not verify presented TLS certificate before communicating with Vault server. - * VAULT_CLIENT_KEY: path to an unencrypted PEM-encoded private key matching the client certificate - * VAULT_CLIENT_CERT: path to a PEM-encoded client certificate for TLS authentication to the Vault server - * VAULT_CACERT: path to a PEM-encoded CA cert file to use to verify the Vault server TLS certificate - * VAULT_CAPATH: path to a directory of PEM-encoded CA cert files to verify the Vault server TLS certificate - * VAULT_NAMESPACE: specify the Vault Namespace, if you have one - """ - client_parameters = {} - - # fetch missing values from env - variables = ["ADDR", "NAMESPACE", "SKIP_VERIFY", "CACERT", "CAPATH", "CLIENT_KEY", "CLIENT_CERT"] - for var in variables: - var = "VAULT_" + var - if vault_parameters.get(var) is None and os.getenv(var) is not None: - vault_parameters[var] = os.getenv(var) - - # set vault adress - vault_address = vault_parameters.get("VAULT_ADDR") - if not vault_address: - raise VaultError("VAULT_ADDR has to be specified in inventory or env") - else: - client_parameters["url"] = vault_address - - # set vault namespace - vault_namespace = vault_parameters.get("VAULT_NAMESPACE") - if vault_namespace: - client_parameters["namespace"] = vault_namespace - - # set ca cert/path or skip verification - skip_verify = not (str(vault_parameters.get("VAULT_SKIP_VERIFY", False)).lower() == "false") - if skip_verify: - # TODO: surpress ssl warning - client_parameters["verify"] = False - else: - cert = vault_parameters.get("VAULT_CACERT") - cert_path = vault_parameters.get("VAULT_CAPATH") - if cert and os.path.isfile(cert): - client_parameters["verify"] = cert - elif cert_path and os.path.isdir(cert_path): - client_parameters["verify"] = cert_path + def get_env(self): + """ + The following variables need to be exported to the environment or defined in inventory. + * VAULT_ADDR: url for vault + * VAULT_SKIP_VERIFY: if set, do not verify presented TLS certificate before communicating with Vault server. + * VAULT_CLIENT_KEY: path to an unencrypted PEM-encoded private key matching the client certificate + * VAULT_CLIENT_CERT: path to a PEM-encoded client certificate for TLS authentication to the Vault server + * VAULT_CACERT: path to a PEM-encoded CA cert file to use to verify the Vault server TLS certificate + * VAULT_CAPATH: path to a directory of PEM-encoded CA cert files to verify the Vault server TLS certificate + * VAULT_NAMESPACE: specify the Vault Namespace, if you have one + """ + client_parameters = {} + + # fetch missing values from env + variables = ["ADDR", "NAMESPACE", "SKIP_VERIFY", "CACERT", "CAPATH", "CLIENT_KEY", "CLIENT_CERT"] + for var in variables: + var = "VAULT_" + var + if self.vault_parameters.get(var) is None and os.getenv(var) is not None: + self.vault_parameters[var] = os.getenv(var) + + # set vault adress + vault_address = self.vault_parameters.get("VAULT_ADDR") + if not vault_address: + raise VaultError("VAULT_ADDR has to be specified in inventory or env") + else: + client_parameters["url"] = vault_address + + # set vault namespace + vault_namespace = self.vault_parameters.get("VAULT_NAMESPACE") + if vault_namespace: + client_parameters["namespace"] = vault_namespace + + # set ca cert/path or skip verification + skip_verify_value = self.vault_parameters.get("VAULT_SKIP_VERIFY", False) + skip_verify_bool = not (str(skip_verify_value).lower() == "false") + if skip_verify_bool: + # disable ssl warnings + import urllib3 + + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + client_parameters["verify"] = False else: - raise VaultError("Neither VAULT_CACERT nor VAULT_CAPATH specified or exist") + cert = self.vault_parameters.get("VAULT_CACERT") + cert_path = self.vault_parameters.get("VAULT_CAPATH") + if cert and os.path.isfile(cert): + client_parameters["verify"] = cert + elif cert_path and os.path.isdir(cert_path): + client_parameters["verify"] = cert_path + else: + raise VaultError( + "Neither VAULT_CACERT nor VAULT_CAPATH specified or found. If you don't want to verify the requests, set VAULT_SKIP_VERIFY to 'true'." + ) - # set client cert for tls authentication - client_key = vault_parameters.get("VAULT_CLIENT_KEY") - client_cert = vault_parameters.get("VAULT_CLIENT_CERT") - if client_key and client_cert: - client_parameters["cert"] = (client_cert, client_key) + # set client cert for tls authentication + client_key = self.vault_parameters.get("VAULT_CLIENT_KEY") + client_cert = self.vault_parameters.get("VAULT_CLIENT_CERT") + if client_key and client_cert: + client_parameters["cert"] = (client_cert, client_key) - return client_parameters + return client_parameters