Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Jan 8, 2025
2 parents ddb52f4 + a4c47fa commit 694e304
Show file tree
Hide file tree
Showing 21 changed files with 315 additions and 158 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/airflow-plugin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ jobs:
flags: airflow-${{ matrix.python-version }}-${{ matrix.extra_pip_extras }}
name: pytest-airflow
verbose: true
- name: Upload test results to Codecov
if: ${{ !cancelled() }}
uses: codecov/test-results-action@v1
with:
token: ${{ secrets.CODECOV_TOKEN }}

event-file:
runs-on: ubuntu-latest
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ jobs:
flags: ${{ matrix.timezone }}
name: ${{ matrix.command }}
verbose: true
- name: Upload test results to Codecov
if: ${{ !cancelled() }}
uses: codecov/test-results-action@v1
with:
token: ${{ secrets.CODECOV_TOKEN }}

quickstart-compose-validation:
runs-on: ubuntu-latest
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/dagster-plugin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ jobs:
flags: dagster-${{ matrix.python-version }}-${{ matrix.extraPythonRequirement }}
name: pytest-dagster
verbose: true
- name: Upload test results to Codecov
if: ${{ !cancelled() }}
uses: codecov/test-results-action@v1
with:
token: ${{ secrets.CODECOV_TOKEN }}

event-file:
runs-on: ubuntu-latest
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/gx-plugin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ jobs:
flags: gx-${{ matrix.python-version }}-${{ matrix.extraPythonRequirement }}
name: pytest-gx
verbose: true
- name: Upload test results to Codecov
if: ${{ !cancelled() }}
uses: codecov/test-results-action@v1
with:
token: ${{ secrets.CODECOV_TOKEN }}

event-file:
runs-on: ubuntu-latest
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/metadata-ingestion.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ jobs:
flags: ingestion-${{ matrix.python-version }}-${{ matrix.command }}
name: pytest-ingestion
verbose: true
- name: Upload test results to Codecov
if: ${{ !cancelled() }}
uses: codecov/test-results-action@v1
with:
token: ${{ secrets.CODECOV_TOKEN }}

event-file:
runs-on: ubuntu-latest
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/metadata-io.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ jobs:
fail_ci_if_error: false
name: metadata-io-test
verbose: true
- name: Upload test results to Codecov
if: ${{ !cancelled() }}
uses: codecov/test-results-action@v1
with:
token: ${{ secrets.CODECOV_TOKEN }}

event-file:
runs-on: ubuntu-latest
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/prefect-plugin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ jobs:
flags: prefect-${{ matrix.python-version }}
name: pytest-prefect
verbose: true
- name: Upload test results to Codecov
if: ${{ !cancelled() }}
uses: codecov/test-results-action@v1
with:
token: ${{ secrets.CODECOV_TOKEN }}

event-file:
runs-on: ubuntu-latest
Expand Down
13 changes: 12 additions & 1 deletion metadata-ingestion/src/datahub/cli/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import time
import typing
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple, Type, Union
from typing import Any, Dict, List, Optional, Tuple, Type, TypeVar, Union

import click
import requests
Expand Down Expand Up @@ -33,6 +33,15 @@ def first_non_null(ls: List[Optional[str]]) -> Optional[str]:
return next((el for el in ls if el is not None and el.strip() != ""), None)


_T = TypeVar("_T")


def get_or_else(value: Optional[_T], default: _T) -> _T:
# Normally we'd use `value or default`. However, that runs into issues
# when value is falsey but not None.
return value if value is not None else default


def parse_run_restli_response(response: requests.Response) -> dict:
response_json = response.json()
if response.status_code != 200:
Expand Down Expand Up @@ -321,6 +330,8 @@ def get_frontend_session_login_as(
def _ensure_valid_gms_url_acryl_cloud(url: str) -> str:
if "acryl.io" not in url:
return url
if url.endswith(":8080"):
url = url.replace(":8080", "")
if url.startswith("http://"):
url = url.replace("http://", "https://")
if url.endswith("acryl.io"):
Expand Down
209 changes: 125 additions & 84 deletions metadata-ingestion/src/datahub/emitter/rest_emitter.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
from __future__ import annotations

import functools
import json
import logging
import os
from json.decoder import JSONDecodeError
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Sequence, Union
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
List,
Optional,
Sequence,
Tuple,
Union,
)

import requests
from deprecated import deprecated
Expand All @@ -12,9 +24,13 @@

from datahub import nice_version_name
from datahub.cli import config_utils
from datahub.cli.cli_utils import ensure_has_system_metadata, fixup_gms_url
from datahub.cli.cli_utils import ensure_has_system_metadata, fixup_gms_url, get_or_else
from datahub.cli.env_utils import get_boolean_env_variable
from datahub.configuration.common import ConfigurationError, OperationalError
from datahub.configuration.common import (
ConfigModel,
ConfigurationError,
OperationalError,
)
from datahub.emitter.generic_emitter import Emitter
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.request_helper import make_curl_command
Expand All @@ -31,10 +47,8 @@

logger = logging.getLogger(__name__)

_DEFAULT_CONNECT_TIMEOUT_SEC = 30 # 30 seconds should be plenty to connect
_DEFAULT_READ_TIMEOUT_SEC = (
30 # Any ingest call taking longer than 30 seconds should be abandoned
)
_DEFAULT_TIMEOUT_SEC = 30 # 30 seconds should be plenty to connect
_TIMEOUT_LOWER_BOUND_SEC = 1 # if below this, we log a warning
_DEFAULT_RETRY_STATUS_CODES = [ # Additional status codes to retry on
429,
500,
Expand Down Expand Up @@ -63,15 +77,76 @@
)


class RequestsSessionConfig(ConfigModel):
timeout: Union[float, Tuple[float, float], None] = _DEFAULT_TIMEOUT_SEC

retry_status_codes: List[int] = _DEFAULT_RETRY_STATUS_CODES
retry_methods: List[str] = _DEFAULT_RETRY_METHODS
retry_max_times: int = _DEFAULT_RETRY_MAX_TIMES

extra_headers: Dict[str, str] = {}

ca_certificate_path: Optional[str] = None
client_certificate_path: Optional[str] = None
disable_ssl_verification: bool = False

def build_session(self) -> requests.Session:
session = requests.Session()

if self.extra_headers:
session.headers.update(self.extra_headers)

if self.client_certificate_path:
session.cert = self.client_certificate_path

if self.ca_certificate_path:
session.verify = self.ca_certificate_path

if self.disable_ssl_verification:
session.verify = False

try:
# Set raise_on_status to False to propagate errors:
# https://stackoverflow.com/questions/70189330/determine-status-code-from-python-retry-exception
# Must call `raise_for_status` after making a request, which we do
retry_strategy = Retry(
total=self.retry_max_times,
status_forcelist=self.retry_status_codes,
backoff_factor=2,
allowed_methods=self.retry_methods,
raise_on_status=False,
)
except TypeError:
# Prior to urllib3 1.26, the Retry class used `method_whitelist` instead of `allowed_methods`.
retry_strategy = Retry(
total=self.retry_max_times,
status_forcelist=self.retry_status_codes,
backoff_factor=2,
method_whitelist=self.retry_methods,
raise_on_status=False,
)

adapter = HTTPAdapter(
pool_connections=100, pool_maxsize=100, max_retries=retry_strategy
)
session.mount("http://", adapter)
session.mount("https://", adapter)

if self.timeout is not None:
# Shim session.request to apply default timeout values.
# Via https://stackoverflow.com/a/59317604.
session.request = functools.partial( # type: ignore
session.request,
timeout=self.timeout,
)

return session


class DataHubRestEmitter(Closeable, Emitter):
_gms_server: str
_token: Optional[str]
_session: requests.Session
_connect_timeout_sec: float = _DEFAULT_CONNECT_TIMEOUT_SEC
_read_timeout_sec: float = _DEFAULT_READ_TIMEOUT_SEC
_retry_status_codes: List[int] = _DEFAULT_RETRY_STATUS_CODES
_retry_methods: List[str] = _DEFAULT_RETRY_METHODS
_retry_max_times: int = _DEFAULT_RETRY_MAX_TIMES

def __init__(
self,
Expand Down Expand Up @@ -102,15 +177,13 @@ def __init__(

self._session = requests.Session()

self._session.headers.update(
{
"X-RestLi-Protocol-Version": "2.0.0",
"X-DataHub-Py-Cli-Version": nice_version_name(),
"Content-Type": "application/json",
}
)
headers = {
"X-RestLi-Protocol-Version": "2.0.0",
"X-DataHub-Py-Cli-Version": nice_version_name(),
"Content-Type": "application/json",
}
if token:
self._session.headers.update({"Authorization": f"Bearer {token}"})
headers["Authorization"] = f"Bearer {token}"
else:
# HACK: When no token is provided but system auth env variables are set, we use them.
# Ideally this should simply get passed in as config, instead of being sneakily injected
Expand All @@ -119,75 +192,43 @@ def __init__(
# rest emitter, and the rest sink uses the rest emitter under the hood.
system_auth = config_utils.get_system_auth()
if system_auth is not None:
self._session.headers.update({"Authorization": system_auth})

if extra_headers:
self._session.headers.update(extra_headers)

if client_certificate_path:
self._session.cert = client_certificate_path

if ca_certificate_path:
self._session.verify = ca_certificate_path

if disable_ssl_verification:
self._session.verify = False

self._connect_timeout_sec = (
connect_timeout_sec or timeout_sec or _DEFAULT_CONNECT_TIMEOUT_SEC
)
self._read_timeout_sec = (
read_timeout_sec or timeout_sec or _DEFAULT_READ_TIMEOUT_SEC
)

if self._connect_timeout_sec < 1 or self._read_timeout_sec < 1:
logger.warning(
f"Setting timeout values lower than 1 second is not recommended. Your configuration is connect_timeout:{self._connect_timeout_sec}s, read_timeout:{self._read_timeout_sec}s"
)

if retry_status_codes is not None: # Only if missing. Empty list is allowed
self._retry_status_codes = retry_status_codes

if retry_methods is not None:
self._retry_methods = retry_methods

if retry_max_times:
self._retry_max_times = retry_max_times
headers["Authorization"] = system_auth

try:
# Set raise_on_status to False to propagate errors:
# https://stackoverflow.com/questions/70189330/determine-status-code-from-python-retry-exception
# Must call `raise_for_status` after making a request, which we do
retry_strategy = Retry(
total=self._retry_max_times,
status_forcelist=self._retry_status_codes,
backoff_factor=2,
allowed_methods=self._retry_methods,
raise_on_status=False,
)
except TypeError:
# Prior to urllib3 1.26, the Retry class used `method_whitelist` instead of `allowed_methods`.
retry_strategy = Retry(
total=self._retry_max_times,
status_forcelist=self._retry_status_codes,
backoff_factor=2,
method_whitelist=self._retry_methods,
raise_on_status=False,
timeout: float | tuple[float, float]
if connect_timeout_sec is not None or read_timeout_sec is not None:
timeout = (
connect_timeout_sec or timeout_sec or _DEFAULT_TIMEOUT_SEC,
read_timeout_sec or timeout_sec or _DEFAULT_TIMEOUT_SEC,
)
if (
timeout[0] < _TIMEOUT_LOWER_BOUND_SEC
or timeout[1] < _TIMEOUT_LOWER_BOUND_SEC
):
logger.warning(
f"Setting timeout values lower than {_TIMEOUT_LOWER_BOUND_SEC} second is not recommended. Your configuration is (connect_timeout, read_timeout) = {timeout} seconds"
)
else:
timeout = get_or_else(timeout_sec, _DEFAULT_TIMEOUT_SEC)
if timeout < _TIMEOUT_LOWER_BOUND_SEC:
logger.warning(
f"Setting timeout values lower than {_TIMEOUT_LOWER_BOUND_SEC} second is not recommended. Your configuration is timeout = {timeout} seconds"
)

adapter = HTTPAdapter(
pool_connections=100, pool_maxsize=100, max_retries=retry_strategy
)
self._session.mount("http://", adapter)
self._session.mount("https://", adapter)

# Shim session.request to apply default timeout values.
# Via https://stackoverflow.com/a/59317604.
self._session.request = functools.partial( # type: ignore
self._session.request,
timeout=(self._connect_timeout_sec, self._read_timeout_sec),
self._session_config = RequestsSessionConfig(
timeout=timeout,
retry_status_codes=get_or_else(
retry_status_codes, _DEFAULT_RETRY_STATUS_CODES
),
retry_methods=get_or_else(retry_methods, _DEFAULT_RETRY_METHODS),
retry_max_times=get_or_else(retry_max_times, _DEFAULT_RETRY_MAX_TIMES),
extra_headers={**headers, **(extra_headers or {})},
ca_certificate_path=ca_certificate_path,
client_certificate_path=client_certificate_path,
disable_ssl_verification=disable_ssl_verification,
)

self._session = self._session_config.build_session()

def test_connection(self) -> None:
url = f"{self._gms_server}/config"
response = self._session.get(url)
Expand Down
Loading

0 comments on commit 694e304

Please sign in to comment.