diff --git a/misc/python/materialize/cloudtest/util/authentication.py b/misc/python/materialize/cloudtest/util/authentication.py deleted file mode 100644 index be878435dfeb0..0000000000000 --- a/misc/python/materialize/cloudtest/util/authentication.py +++ /dev/null @@ -1,124 +0,0 @@ -# Copyright Materialize, Inc. and contributors. All rights reserved. -# -# Use of this software is governed by the Business Source License -# included in the LICENSE file at the root of this repository. -# -# As of the Change Date specified in that file, in accordance with -# the Business Source License, use of this software will be governed -# by the Apache License, Version 2.0. -from __future__ import annotations - -from collections.abc import Callable -from dataclasses import dataclass, fields - -import requests -from requests.exceptions import ConnectionError, ReadTimeout - -from materialize.cloudtest.util.common import retry -from materialize.cloudtest.util.jwt_key import fetch_jwt - - -@dataclass -class AuthConfig: - organization_id: str - token: str - app_user: str | None - app_password: str | None - - refresh_fn: Callable[[AuthConfig], None] - - pgwire_ssl_mode: str = "require" - tls_ca_cert_path: str | None = None - - def refresh(self) -> None: - self.refresh_fn(self) - - -@dataclass -class TestUserConfig: - email: str - password: str - frontegg_host: str - - -DEFAULT_ORG_ID = "80b1a04a-2277-11ed-a1ce-5405dbb9e0f7" - - -# TODO: this retry loop should not be necessary, but we are seeing -# connections getting frequently (but sporadically) interrupted here - we -# should track this down and remove these retries -def create_auth( - user: TestUserConfig, - refresh_fn: Callable[[AuthConfig], None], -) -> AuthConfig: - config: AuthConfig = retry( - lambda: _create_auth( - user, - refresh_fn, - ), - 5, - [ConnectionError, ReadTimeout], - ) - return config - - -def _create_auth( - user: TestUserConfig, - refresh_fn: Callable[[AuthConfig], None], -) -> AuthConfig: - if user.frontegg_host.startswith("127.0.0.1"): - scheme = "http" - else: - scheme = "https" - token = fetch_jwt( - email=user.email, - password=user.password, - host=user.frontegg_host, - scheme=scheme, - ) - - identity_url = f"{scheme}://{user.frontegg_host}/identity/resources/users/v2/me" - response = requests.get( - identity_url, - headers={"authorization": f"Bearer {token}"}, - timeout=10, - ) - response.raise_for_status() - - organization_id = response.json()["tenantId"] - app_user = user.email - app_password = make_app_password(user.frontegg_host, token, scheme) - tls_ca_cert_path = None - - return AuthConfig( - organization_id=organization_id, - token=token, - app_user=app_user, - app_password=app_password, - refresh_fn=refresh_fn, - tls_ca_cert_path=tls_ca_cert_path, - ) - - -def update_auth( - user: TestUserConfig, - auth: AuthConfig, -) -> None: - new_auth = create_auth(user, auth.refresh_fn) - - for field in fields(new_auth): - setattr(auth, field.name, getattr(new_auth, field.name)) - - -def make_app_password(frontegg_host: str, token: str, scheme: str) -> str: - response = requests.post( - f"{scheme}://{frontegg_host}/identity/resources/users/api-tokens/v1", - json={"description": "e2e test password"}, - headers={"authorization": f"Bearer {token}"}, - timeout=10, - ) - response.raise_for_status() - data = response.json() - client_id = data["clientId"].replace("-", "") - secret = data["secret"].replace("-", "") - return f"mzp_{client_id}{secret}" diff --git a/misc/python/materialize/cloudtest/util/controller.py b/misc/python/materialize/cloudtest/util/controller.py deleted file mode 100644 index c3b11e3a750ae..0000000000000 --- a/misc/python/materialize/cloudtest/util/controller.py +++ /dev/null @@ -1,156 +0,0 @@ -# Copyright Materialize, Inc. and contributors. All rights reserved. -# -# Use of this software is governed by the Business Source License -# included in the LICENSE file at the root of this repository. -# -# As of the Change Date specified in that file, in accordance with -# the Business Source License, use of this software will be governed -# by the Apache License, Version 2.0. - -# pyright: reportMissingImports=false -import logging -import socket -import subprocess -import urllib.parse -from dataclasses import dataclass -from typing import Any - -from materialize.cloudtest.util.authentication import AuthConfig -from materialize.cloudtest.util.common import log_subprocess_error, retry -from materialize.cloudtest.util.web_request import WebRequests - -LOGGER = logging.getLogger(__name__) - - -@dataclass -class Endpoint: - scheme: str - host: str - port: int - - @property - def base_url(self) -> str: - return f"{self.scheme}://{self.host}:{self.port}" - - @property - def host_port(self) -> tuple[str, int]: - return (self.host, self.port) - - @classmethod - def parse(cls, s: str) -> "Endpoint": - u = parse_url(s) - assert u.hostname is not None and u.port is not None - return cls(scheme=u.scheme or "http", host=u.hostname, port=u.port) - - -@dataclass -class ControllerDefinition: - name: str - default_port: str - has_configurable_address: bool = True - endpoint: Endpoint | None = None - client_cert: tuple[str, str] | None = None - - def default_address(self) -> str: - return f"http://127.0.0.1:{self.default_port}" - - def configured_base_url(self) -> str: - if self.endpoint is None: - raise RuntimeError("Endpoint not configured") - - return self.endpoint.base_url - - def requests( - self, - auth: AuthConfig | None, - client_cert: tuple[str, str] | None = None, - additional_headers: dict[str, str] | None = None, - ) -> WebRequests: - return WebRequests( - auth, - self.configured_base_url(), - client_cert=client_cert, - additional_headers=additional_headers, - ) - - -def wait_for_connectable( - address: tuple[Any, int] | str, - max_attempts: int = 30, -) -> None: - def f() -> None: - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.connect(address) - - retry( - f, - max_attempts=max_attempts, - exception_types=[ConnectionRefusedError, socket.gaierror, socket.timeout], - message=f"Error connecting to {address}. Tried {max_attempts} times.", - ) - - -def parse_url(s: str) -> urllib.parse.ParseResult: - """ - >>> parse_url('127.0.0.1:8002').port - 8002 - >>> parse_url('127.0.0.1:8002').hostname - '127.0.0.1' - >>> parse_url('the men who stare at goats') - Traceback (most recent call last): - File "/nix/store/7awm88zrzq5c0qks8ypf8s8jblm4r3i2-python3-3.9.16/lib/python3.9/doctest.py", line 1334, in __run - exec(compile(example.source, filename, "single", - File "", line 1, in - parse_url('the men who stare at goats') - File "/Users/rami/Code/cloud/k8s_tests/util.py", line 343, in parse_url - raise ValueError(s) - ValueError: //the men who stare at goats - """ - try: - parsed = urllib.parse.urlparse(s) - assert parsed.netloc is not None and parsed.port is not None - except AssertionError: - try: - s = "//" + s - parsed = urllib.parse.urlparse(s) - assert parsed.netloc is not None and parsed.port is not None - except AssertionError as e: - raise ValueError(s) from e - return parsed - - -def launch_controllers(controller_names: list[str], docker_env: dict[str, str]) -> None: - try: - subprocess.run( - [ - "bin/compose", - "up", - "--wait", - *controller_names, - ], - capture_output=True, - check=True, - env=docker_env, - ) - except subprocess.CalledProcessError as e: - log_subprocess_error(e) - raise - - -def wait_for_controllers(*endpoints: Endpoint) -> None: - for endpoint in endpoints: - LOGGER.info(f"Waiting for {endpoint.host_port} to be connectable...") - wait_for_connectable(endpoint.host_port) - - -def cleanup_controllers(docker_env: dict[str, str]) -> None: - try: - subprocess.run( - ["bin/compose", "down", "-v"], - capture_output=True, - check=True, - env=docker_env, - ) - except subprocess.CalledProcessError as e: - log_subprocess_error(e) - raise diff --git a/misc/python/materialize/cloudtest/util/environment.py b/misc/python/materialize/cloudtest/util/environment.py deleted file mode 100644 index d1436cc62cc07..0000000000000 --- a/misc/python/materialize/cloudtest/util/environment.py +++ /dev/null @@ -1,204 +0,0 @@ -# Copyright Materialize, Inc. and contributors. All rights reserved. -# -# Use of this software is governed by the Business Source License -# included in the LICENSE file at the root of this repository. -# -# As of the Change Date specified in that file, in accordance with -# the Business Source License, use of this software will be governed -# by the Apache License, Version 2.0. - -from typing import Any - -import requests -from requests import Response - -from materialize.cloudtest.util.authentication import AuthConfig -from materialize.cloudtest.util.common import retry -from materialize.cloudtest.util.controller import wait_for_connectable -from materialize.cloudtest.util.kubectl import Kubectl -from materialize.cloudtest.util.web_request import WebRequests - - -class Environment: - def __init__( - self, - auth: AuthConfig, - region_api_server_base_url: str, - env_kubectl: Kubectl, - sys_kubectl: Kubectl, - ): - self.auth = auth - self.env_kubectl = env_kubectl - self.sys_kubectl = sys_kubectl - self.region_api_requests = WebRequests( - self.auth, region_api_server_base_url, default_timeout_in_sec=45 - ) - self.create_env_assignment_get_retries = 120 - self.envd_waiting_get_env_retries = 900 - - def create_environment_assignment( - self, - image: str | None = None, - ) -> dict[str, Any]: - environment_assignment = f"{self.auth.organization_id}-0" - environment = f"environment-{environment_assignment}" - - json: dict[str, Any] = {} - if image is not None: - json["environmentdImageRef"] = image - self.region_api_requests.patch( - "/api/region", - json, - ) - - self.env_kubectl.get_retry( - None, - "environment", - environment, - self.create_env_assignment_get_retries, - ) - return self.sys_kubectl.get( - None, - "environmentassignment", - environment_assignment, - ) - - def wait_for_environmentd(self, max_attempts: int = 600) -> dict[str, Any]: - def get_environment() -> Response: - response = self.region_api_requests.get( - "/api/region", - ) - region_info = response.json().get("regionInfo") - assert region_info - assert region_info.get("resolvable") - assert region_info.get("sqlAddress") - return response - - environment_json: dict[str, Any] = retry( - get_environment, self.envd_waiting_get_env_retries, [AssertionError] - ).json() - pgwire_url = environment_json["regionInfo"]["sqlAddress"] - (pgwire_host, pgwire_port) = pgwire_url.split(":") - wait_for_connectable((pgwire_host, int(pgwire_port)), max_attempts) - return environment_json - - def delete_environment_assignment(self) -> None: - environment_assignment = f"{self.auth.organization_id}-0" - environment = f"environment-{environment_assignment}" - - def delete_environment() -> None: - self.region_api_requests.delete( - "/api/region", - # we have a 60 second timeout in the region api's load balancer - # for this call and a 5 minute timeout in the region api (which - # is relevant when running in kind) - timeout_in_sec=305, - ) - - retry(delete_environment, 20, [requests.exceptions.HTTPError]) - - assert ( - self.env_kubectl.get_or_none( - namespace=None, - resource_type="namespace", - resource_name=environment, - ) - is None - ) - assert ( - self.env_kubectl.get_or_none( - namespace=None, - resource_type="environment", - resource_name=environment, - ) - is None - ) - assert ( - self.sys_kubectl.get_or_none( - namespace=None, - resource_type="environmentassignment", - resource_name=environment_assignment, - ) - is None - ) - - def wait_for_no_environmentd(self) -> None: - # Confirm the Region API is not returning the environment - def get_environment() -> None: - res = self.region_api_requests.get( - "/api/region", - ) - # a 204 indicates no region is found - if res.status_code != 204: - raise AssertionError() - - retry(get_environment, 600, [AssertionError]) - - # Confirm the environment resource is gone - environment_assignment = f"{self.auth.organization_id}-0" - environment = f"environment-{environment_assignment}" - - def get_k8s_environment() -> None: - assert ( - self.env_kubectl.get_or_none( - namespace=None, - resource_type="environment", - resource_name=environment, - ) - is None - ) - - retry(get_k8s_environment, 600, [AssertionError]) - - def await_environment_pod( - self, kubectl: Kubectl, namespace: str, pod_name: str - ) -> None: - # we can't just wait, since it errors if it doesn't exist yet - kubectl.get_retry( - namespace=namespace, - resource_type="pod", - resource_name=pod_name, - # Especially on dev stacks, this can take a little while - max_attempts=360, - ) - kubectl.wait( - namespace=namespace, - resource_type="pod", - resource_name=pod_name, - wait_for="condition=Ready", - # If we're unlucky with certificates, we can take up to 10 minutes :-( - # -- pad a bit just in case - timeout_secs=630, - ) - - def cleanup_crds( - self, - ) -> None: - environment_context = self.env_kubectl.context - system_context = self.sys_kubectl.context - - if system_context != "kind-mzcloud": - return - - assert "production" not in system_context - assert "production" not in environment_context - assert "staging" not in system_context - assert "staging" not in environment_context - - self.sys_kubectl.delete( - namespace=None, - resource_type="crd", - resource_name="environmentassignments.materialize.cloud", - ) - - self.env_kubectl.delete( - namespace=None, - resource_type="crd", - resource_name="environments.materialize.cloud", - ) - - self.env_kubectl.delete( - namespace=None, - resource_type="crd", - resource_name="vpcendpoints.materialize.cloud", - ) diff --git a/misc/python/materialize/cloudtest/util/kubectl.py b/misc/python/materialize/cloudtest/util/kubectl.py deleted file mode 100644 index 6efdc6e89f25e..0000000000000 --- a/misc/python/materialize/cloudtest/util/kubectl.py +++ /dev/null @@ -1,238 +0,0 @@ -# Copyright Materialize, Inc. and contributors. All rights reserved. -# -# Use of this software is governed by the Business Source License -# included in the LICENSE file at the root of this repository. -# -# As of the Change Date specified in that file, in accordance with -# the Business Source License, use of this software will be governed -# by the Apache License, Version 2.0. - -import json -import subprocess -from pathlib import Path -from typing import Any - -import yaml - -from materialize.cloudtest.util.common import retry -from materialize.cloudtest.util.wait import wait - - -class KubectlError(AssertionError): - def __init__(self, returncode: int, cmd: list[str], stdout: bytes, stderr: bytes): - self.returncode = returncode - self.cmd = cmd - self.stdout = stdout - self.stderr = stderr - - @classmethod - def from_subprocess_error(cls, e: subprocess.CalledProcessError) -> BaseException: - return cls(e.returncode, e.cmd, e.stdout, e.stderr) - - def __str__(self) -> str: - return "\n".join( - [ - f"error in kubectl command: {self.cmd} returned {self.returncode}", - f"stdout: {self.stdout.decode('utf-8')}", - f"stderr: {self.stderr.decode('utf-8')}", - ], - ) - - -class Kubectl: - def __init__(self, context: str): - self.context = context - - def patch( - self, - resource_type: str, - name: str, - namespace: str | None, - patch: Any, - ) -> None: - command = [ - "kubectl", - "--context", - self.context, - "patch", - resource_type, - name, - "-p", - json.dumps(patch), - "--type", - "merge", - ] - if namespace: - command.extend(["-n", namespace]) - subprocess.run( - command, - check=True, - ) - - def wait( - self, - namespace: str | None, - resource_type: str, - resource_name: str | None, - wait_for: str, - timeout_secs: int, - label: str | None = None, - ) -> None: - if resource_name is None and label is None: - raise RuntimeError("Either resource_name or label must be set") - - if resource_name is None: - resource = resource_type - else: - resource = f"{resource_type}/{resource_name}" - - wait( - wait_for, - resource, - timeout_secs, - self.context, - label=label, - namespace=namespace, - ) - - def delete( - self, - namespace: str | None, - resource_type: str, - resource_name: str, - ) -> None: - command = [ - "kubectl", - "--context", - self.context, - "delete", - resource_type, - resource_name, - "--wait=true", - "--cascade=foreground", - ] - if namespace: - command.extend(["-n", namespace]) - - try: - subprocess.run( - command, - capture_output=True, - check=True, - text=True, - ) - except subprocess.CalledProcessError as e: - if "NotFound" not in e.stderr: - raise KubectlError.from_subprocess_error(e) from e - - def get( - self, - namespace: str | None, - resource_type: str, - resource_name: str | None = None, - ) -> dict[str, Any]: - command = [ - "kubectl", - "--context", - self.context, - "get", - resource_type, - ] - if resource_name is not None: - command.append(resource_name) - if namespace: - command.extend(["-n", namespace]) - - command.extend(["-o", "yaml"]) - - try: - yaml_data: dict[str, Any] = yaml.safe_load( - subprocess.run( - command, - capture_output=True, - check=True, - ).stdout, - ) - return yaml_data - except subprocess.CalledProcessError as e: - raise KubectlError.from_subprocess_error(e) from e - - def get_retry( - self, - namespace: str | None, - resource_type: str, - resource_name: str, - max_attempts: int, - ) -> dict[str, Any]: - def f() -> dict[str, Any]: - return self.get(namespace, resource_type, resource_name) - - yaml_data: dict[str, Any] = retry( - f, - max_attempts=max_attempts, - exception_types=[KubectlError], - ) - return yaml_data - - def get_or_none( - self, - namespace: str | None, - resource_type: str, - resource_name: str | None = None, - ) -> dict[str, Any] | None: - try: - return self.get(namespace, resource_type, resource_name) - except KubectlError as e: - if b"NotFound" in e.stderr: - return None - raise - - def load_k8s_yaml( - self, - filepath: str, - tests_dir: str, - substitutions: dict[str, str] | None = None, - ) -> dict[str, Any]: - """ - Load a Kubernetes YAML specification to assert against. If `substitutions` - are given, find-and-replace in the YAML contents before parsing. - """ - contents = Path(tests_dir).joinpath(filepath).read_text() - for old, new in (substitutions or {}).items(): - contents = contents.replace(old, new) - yaml_data: dict[str, Any] = yaml.safe_load(contents) - return yaml_data - - def apply( - self, - filepath: str, - field_manager: str | None = None, - server_side: bool = False, - namespace: str | None = None, - ) -> None: - """ - Apply configuration to a resource from a file. - - :param filepath: The path to the config file being applied. - :param field_manager: The field manager that owns the resource being modified. - :param server_side: Whether to apply the update server-side, required to avoid field ownership conflicts in some cases. - :param namespace: The namespace to apply the update in. - """ - command = [ - "kubectl", - "--context", - self.context, - "apply", - "-f", - filepath, - ] - if field_manager: - command.extend(["--field-manager", field_manager]) - if namespace: - command.extend(["-n", namespace]) - if server_side: - command.extend(["--server-side"]) - subprocess.run( - args=command, - check=True, - ) diff --git a/misc/python/materialize/cloudtest/util/sql.py b/misc/python/materialize/cloudtest/util/sql.py deleted file mode 100644 index 6e7087c6b6b59..0000000000000 --- a/misc/python/materialize/cloudtest/util/sql.py +++ /dev/null @@ -1,119 +0,0 @@ -# Copyright Materialize, Inc. and contributors. All rights reserved. -# -# Use of this software is governed by the Business Source License -# included in the LICENSE file at the root of this repository. -# -# As of the Change Date specified in that file, in accordance with -# the Business Source License, use of this software will be governed -# by the Apache License, Version 2.0. - -import logging -from ipaddress import IPv4Address -from typing import Any - -import psycopg -from psycopg.abc import Params, Query -from psycopg.connection import Connection - -from materialize.cloudtest.util.authentication import AuthConfig -from materialize.cloudtest.util.environment import Environment -from materialize.cloudtest.util.web_request import WebRequests - -LOGGER = logging.getLogger(__name__) - - -def sql_query( - conn: Connection[Any], - query: Query, - vars: Params | None = None, -) -> list[list[Any]]: - cur = conn.cursor() - cur.execute(query, vars) - return [list(row) for row in cur] - - -def sql_execute( - conn: Connection[Any], - query: Query, - vars: Params | None = None, -) -> None: - cur = conn.cursor() - cur.execute(query, vars) - - -def sql_execute_ddl( - conn: Connection[Any], - query: Query, - vars: Params | None = None, -) -> None: - cur = psycopg.ClientCursor(conn) - cur.execute(query, vars) - - -def pgwire_sql_conn(auth: AuthConfig, environment: Environment) -> Connection[Any]: - environment_params = environment.wait_for_environmentd() - pgwire_url: str = environment_params["regionInfo"]["sqlAddress"] - (pgwire_host, pgwire_port) = pgwire_url.split(":") - conn = psycopg.connect( - dbname="materialize", - user=auth.app_user, - password=auth.app_password, - host=pgwire_host, - port=pgwire_port, - sslmode=auth.pgwire_ssl_mode, - sslrootcert=auth.tls_ca_cert_path, - ) - conn.autocommit = True - return conn - - -def sql_query_pgwire( - auth: AuthConfig, - environment: Environment, - query: Query, - vars: Params | None = None, -) -> list[list[Any]]: - with pgwire_sql_conn(auth, environment) as conn: - LOGGER.info(f"QUERY: {query}") - return sql_query(conn, query, vars) - - -def sql_execute_pgwire( - auth: AuthConfig, - environment: Environment, - query: Query, - vars: Params | None = None, -) -> None: - with pgwire_sql_conn(auth, environment) as conn: - LOGGER.info(f"QUERY: {query}") - return sql_execute(conn, query, vars) - - -def sql_query_http( - auth: AuthConfig, environment: Environment, query: str -) -> list[list[Any]]: - environment_params = environment.wait_for_environmentd() - environmentd_url: str = environment_params["regionInfo"]["httpAddress"] - override_ip = ( - IPv4Address("127.0.0.1") - if environment.env_kubectl.context == "kind-mzcloud" - else None - ) - schema = "http" if "127.0.0.1" in environmentd_url else "https" - verify = ( - "misc/kind/balancer/tls/ca-cert.pem" - if schema == "https" and override_ip is not None - else None - ) - envd_web_requests = WebRequests( - auth, - f"{schema}://{environmentd_url}", - override_ip=override_ip, - verify=verify, - ) - response = envd_web_requests.post( - "/api/sql", - {"query": query}, - ) - rows: list[list[Any]] = response.json()["results"][0]["rows"] - return rows diff --git a/misc/python/materialize/cloudtest/util/web_request.py b/misc/python/materialize/cloudtest/util/web_request.py deleted file mode 100644 index 5b507a7ef08c0..0000000000000 --- a/misc/python/materialize/cloudtest/util/web_request.py +++ /dev/null @@ -1,253 +0,0 @@ -# Copyright Materialize, Inc. and contributors. All rights reserved. -# -# Use of this software is governed by the Business Source License -# included in the LICENSE file at the root of this repository. -# -# As of the Change Date specified in that file, in accordance with -# the Business Source License, use of this software will be governed -# by the Apache License, Version 2.0. - -import logging -from collections.abc import Generator -from contextlib import contextmanager -from ipaddress import IPv4Address, IPv6Address -from textwrap import dedent -from typing import Any -from urllib.parse import urlparse - -import requests -from requests.adapters import DEFAULT_POOLBLOCK, HTTPAdapter, Retry - -from materialize.cloudtest.util.authentication import AuthConfig - -LOGGER = logging.getLogger(__name__) - - -@contextmanager -def verbose_http_errors() -> Generator[None, None, None]: - try: - yield - except requests.HTTPError as e: - LOGGER.error( - dedent( - f""" - response status: {e.response.status_code} - response reason: {e.response.reason} - response content: {e.response.content} - """ - ) - ) - raise - - -class DNSResolverHTTPSAdapter(HTTPAdapter): - def __init__(self, common_name, ip, **kwargs): - self.__common_name = common_name - self.__ip = str(ip) - super().__init__(**kwargs) - - def get_connection(self, url, proxies=None): - redirected_url = url.replace(self.__common_name.lower(), self.__ip) - LOGGER.info(f"original url: {url}") - LOGGER.info(f"redirected url: {redirected_url}") - return super().get_connection( - redirected_url, - proxies=proxies, - ) - - def init_poolmanager( - self, - connections, - maxsize, - block=DEFAULT_POOLBLOCK, - **pool_kwargs, - ): - pool_kwargs["assert_hostname"] = self.__common_name - pool_kwargs["server_hostname"] = self.__common_name - super().init_poolmanager( - connections, - maxsize, - block, - **pool_kwargs, - ) - - -class WebRequests: - def __init__( - self, - auth: AuthConfig | None, - base_url: str, - client_cert: tuple[str, str] | None = None, - additional_headers: dict[str, str] | None = None, - default_timeout_in_sec: int = 15, - override_ip: IPv4Address | IPv6Address | None = None, - verify: str | None = None, - ): - self.auth = auth - self.base_url = base_url - self.client_cert = client_cert - self.additional_headers = additional_headers - self.default_timeout_in_sec = default_timeout_in_sec - self.override_ip = override_ip - self.verify = verify - - def session(self) -> requests.Session: - session = requests.Session() - if self.override_ip is not None: - parsed_url = urlparse(self.base_url) - session.mount( - self.base_url.lower(), - DNSResolverHTTPSAdapter( - parsed_url.netloc.split(":", 1)[0], - self.override_ip, - ), - ) - return session - - def get( - self, - path: str, - timeout_in_sec: int | None = None, - ) -> requests.Response: - LOGGER.info(f"GET {self.base_url}{path}") - - def try_get() -> requests.Response: - with verbose_http_errors(): - headers = self._create_headers(self.auth) - s = self.session() - s.mount(self.base_url, HTTPAdapter(max_retries=Retry(3))) - response = s.get( - f"{self.base_url}{path}", - headers=headers, - timeout=self._timeout_or_default(timeout_in_sec), - cert=self.client_cert, - verify=self.verify, - ) - response.raise_for_status() - return response - - try: - response = try_get() - except requests.exceptions.HTTPError as e: - if self.auth and e.response.status_code == 401: - self.auth.refresh() - response = try_get() - else: - raise - - return response - - def post( - self, - path: str, - json: Any, - timeout_in_sec: int | None = None, - ) -> requests.Response: - LOGGER.info(f"POST {self.base_url}{path}") - - def try_post() -> requests.Response: - with verbose_http_errors(): - headers = self._create_headers(self.auth) - response = self.session().post( - f"{self.base_url}{path}", - headers=headers, - json=json, - timeout=self._timeout_or_default(timeout_in_sec), - cert=self.client_cert, - verify=self.verify, - ) - response.raise_for_status() - return response - - try: - response = try_post() - except requests.exceptions.HTTPError as e: - if self.auth and e.response.status_code == 401: - self.auth.refresh() - response = try_post() - else: - raise - - return response - - def patch( - self, - path: str, - json: Any, - timeout_in_sec: int | None = None, - ) -> requests.Response: - LOGGER.info(f"PATCH {self.base_url}{path}") - - def try_patch() -> requests.Response: - with verbose_http_errors(): - headers = self._create_headers(self.auth) - response = self.session().patch( - f"{self.base_url}{path}", - headers=headers, - json=json, - timeout=self._timeout_or_default(timeout_in_sec), - cert=self.client_cert, - verify=self.verify, - ) - response.raise_for_status() - return response - - try: - response = try_patch() - except requests.exceptions.HTTPError as e: - if self.auth and e.response.status_code == 401: - self.auth.refresh() - response = try_patch() - else: - raise - - return response - - def delete( - self, - path: str, - params: Any = None, - timeout_in_sec: int | None = None, - ) -> requests.Response: - LOGGER.info(f"DELETE {self.base_url}{path}") - - def try_delete() -> requests.Response: - with verbose_http_errors(): - headers = self._create_headers(self.auth) - response = self.session().delete( - f"{self.base_url}{path}", - headers=headers, - timeout=self._timeout_or_default(timeout_in_sec), - cert=self.client_cert, - verify=self.verify, - **( - { - "params": params, - } - if params is not None - else {} - ), - ) - response.raise_for_status() - return response - - try: - response = try_delete() - except requests.exceptions.HTTPError as e: - if self.auth and e.response.status_code == 401: - self.auth.refresh() - response = try_delete() - else: - raise - - return response - - def _create_headers(self, auth: AuthConfig | None) -> dict[str, Any]: - headers = self.additional_headers.copy() if self.additional_headers else {} - if auth: - headers["Authorization"] = f"Bearer {auth.token}" - - return headers - - def _timeout_or_default(self, timeout_in_sec: int | None) -> int: - return timeout_in_sec or self.default_timeout_in_sec