Skip to content

Commit

Permalink
fix(ingest/mode): add connection timeouts to avoid RemoteDisconnected…
Browse files Browse the repository at this point in the history
  • Loading branch information
sagar-salvi-apptware authored and sleeperdeep committed Dec 17, 2024
1 parent 4c16dc4 commit 5eb6e91
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 7 deletions.
23 changes: 19 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import yaml
from liquid import Template, Undefined
from pydantic import Field, validator
from requests.adapters import HTTPAdapter, Retry
from requests.exceptions import ConnectionError
from requests.models import HTTPBasicAuth, HTTPError
from sqllineage.runner import LineageRunner
from tenacity import retry_if_exception_type, stop_after_attempt, wait_exponential
Expand Down Expand Up @@ -127,6 +129,10 @@ class ModeAPIConfig(ConfigModel):
max_attempts: int = Field(
default=5, description="Maximum number of attempts to retry before failing"
)
timeout: int = Field(
default=40,
description="Timout setting, how long to wait for the Mode rest api to send data before giving up",
)


class ModeConfig(StatefulIngestionConfigBase, DatasetLineageProviderConfigBase):
Expand Down Expand Up @@ -299,7 +305,15 @@ def __init__(self, ctx: PipelineContext, config: ModeConfig):
self.report = ModeSourceReport()
self.ctx = ctx

self.session = requests.session()
self.session = requests.Session()
# Handling retry and backoff
retries = 3
backoff_factor = 10
retry = Retry(total=retries, backoff_factor=backoff_factor)
adapter = HTTPAdapter(max_retries=retry)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)

self.session.auth = HTTPBasicAuth(
self.config.token,
self.config.password.get_secret_value(),
Expand Down Expand Up @@ -1469,15 +1483,16 @@ def _get_request_json(self, url: str) -> Dict:
multiplier=self.config.api_options.retry_backoff_multiplier,
max=self.config.api_options.max_retry_interval,
),
retry=retry_if_exception_type(HTTPError429),
retry=retry_if_exception_type((HTTPError429, ConnectionError)),
stop=stop_after_attempt(self.config.api_options.max_attempts),
)

@r.wraps
def get_request():
try:
response = self.session.get(url)
response.raise_for_status()
response = self.session.get(
url, timeout=self.config.api_options.timeout
)
return response.json()
except HTTPError as http_error:
error_response = http_error.response
Expand Down
10 changes: 7 additions & 3 deletions metadata-ingestion/tests/integration/mode/test_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,12 @@ def __init__(self, error_list, status_code):
def json(self):
return self.json_data

def get(self, url):
def mount(self, prefix, adaptor):
return self

def get(self, url, timeout=40):
self.url = url
self.timeout = timeout
response_json_path = f"{test_resources_dir}/setup/{JSON_RESPONSE_MAP.get(url)}"
with open(response_json_path) as file:
data = json.loads(file.read())
Expand Down Expand Up @@ -74,7 +78,7 @@ def mocked_requests_failure(*args, **kwargs):
@freeze_time(FROZEN_TIME)
def test_mode_ingest_success(pytestconfig, tmp_path):
with patch(
"datahub.ingestion.source.mode.requests.session",
"datahub.ingestion.source.mode.requests.Session",
side_effect=mocked_requests_sucess,
):
pipeline = Pipeline.create(
Expand Down Expand Up @@ -111,7 +115,7 @@ def test_mode_ingest_success(pytestconfig, tmp_path):
@freeze_time(FROZEN_TIME)
def test_mode_ingest_failure(pytestconfig, tmp_path):
with patch(
"datahub.ingestion.source.mode.requests.session",
"datahub.ingestion.source.mode.requests.Session",
side_effect=mocked_requests_failure,
):
global test_resources_dir
Expand Down

0 comments on commit 5eb6e91

Please sign in to comment.