From afa94a588754c28c8e11f5aa0963808ba5ee6599 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 7 Jan 2025 17:00:13 -0500 Subject: [PATCH 1/8] fix(ingest): better correctness on the emitter -> graph conversion (#12272) --- .../src/datahub/cli/cli_utils.py | 11 +- .../src/datahub/emitter/rest_emitter.py | 209 +++++++++++------- .../src/datahub/ingestion/graph/client.py | 25 ++- .../src/datahub/ingestion/graph/config.py | 2 +- .../tests/unit/sdk/test_rest_emitter.py | 32 +-- 5 files changed, 167 insertions(+), 112 deletions(-) diff --git a/metadata-ingestion/src/datahub/cli/cli_utils.py b/metadata-ingestion/src/datahub/cli/cli_utils.py index f80181192ba583..ca4a11b41925e5 100644 --- a/metadata-ingestion/src/datahub/cli/cli_utils.py +++ b/metadata-ingestion/src/datahub/cli/cli_utils.py @@ -3,7 +3,7 @@ import time import typing from datetime import datetime -from typing import Any, Dict, List, Optional, Tuple, Type, Union +from typing import Any, Dict, List, Optional, Tuple, Type, TypeVar, Union import click import requests @@ -33,6 +33,15 @@ def first_non_null(ls: List[Optional[str]]) -> Optional[str]: return next((el for el in ls if el is not None and el.strip() != ""), None) +_T = TypeVar("_T") + + +def get_or_else(value: Optional[_T], default: _T) -> _T: + # Normally we'd use `value or default`. However, that runs into issues + # when value is falsey but not None. + return value if value is not None else default + + def parse_run_restli_response(response: requests.Response) -> dict: response_json = response.json() if response.status_code != 200: diff --git a/metadata-ingestion/src/datahub/emitter/rest_emitter.py b/metadata-ingestion/src/datahub/emitter/rest_emitter.py index 7c67349c74db10..74b8ade7da445b 100644 --- a/metadata-ingestion/src/datahub/emitter/rest_emitter.py +++ b/metadata-ingestion/src/datahub/emitter/rest_emitter.py @@ -1,9 +1,21 @@ +from __future__ import annotations + import functools import json import logging import os from json.decoder import JSONDecodeError -from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Sequence, Union +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Dict, + List, + Optional, + Sequence, + Tuple, + Union, +) import requests from deprecated import deprecated @@ -12,9 +24,13 @@ from datahub import nice_version_name from datahub.cli import config_utils -from datahub.cli.cli_utils import ensure_has_system_metadata, fixup_gms_url +from datahub.cli.cli_utils import ensure_has_system_metadata, fixup_gms_url, get_or_else from datahub.cli.env_utils import get_boolean_env_variable -from datahub.configuration.common import ConfigurationError, OperationalError +from datahub.configuration.common import ( + ConfigModel, + ConfigurationError, + OperationalError, +) from datahub.emitter.generic_emitter import Emitter from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.request_helper import make_curl_command @@ -31,10 +47,8 @@ logger = logging.getLogger(__name__) -_DEFAULT_CONNECT_TIMEOUT_SEC = 30 # 30 seconds should be plenty to connect -_DEFAULT_READ_TIMEOUT_SEC = ( - 30 # Any ingest call taking longer than 30 seconds should be abandoned -) +_DEFAULT_TIMEOUT_SEC = 30 # 30 seconds should be plenty to connect +_TIMEOUT_LOWER_BOUND_SEC = 1 # if below this, we log a warning _DEFAULT_RETRY_STATUS_CODES = [ # Additional status codes to retry on 429, 500, @@ -63,15 +77,76 @@ ) +class RequestsSessionConfig(ConfigModel): + timeout: Union[float, Tuple[float, float], None] = _DEFAULT_TIMEOUT_SEC + + retry_status_codes: List[int] = _DEFAULT_RETRY_STATUS_CODES + retry_methods: List[str] = _DEFAULT_RETRY_METHODS + retry_max_times: int = _DEFAULT_RETRY_MAX_TIMES + + extra_headers: Dict[str, str] = {} + + ca_certificate_path: Optional[str] = None + client_certificate_path: Optional[str] = None + disable_ssl_verification: bool = False + + def build_session(self) -> requests.Session: + session = requests.Session() + + if self.extra_headers: + session.headers.update(self.extra_headers) + + if self.client_certificate_path: + session.cert = self.client_certificate_path + + if self.ca_certificate_path: + session.verify = self.ca_certificate_path + + if self.disable_ssl_verification: + session.verify = False + + try: + # Set raise_on_status to False to propagate errors: + # https://stackoverflow.com/questions/70189330/determine-status-code-from-python-retry-exception + # Must call `raise_for_status` after making a request, which we do + retry_strategy = Retry( + total=self.retry_max_times, + status_forcelist=self.retry_status_codes, + backoff_factor=2, + allowed_methods=self.retry_methods, + raise_on_status=False, + ) + except TypeError: + # Prior to urllib3 1.26, the Retry class used `method_whitelist` instead of `allowed_methods`. + retry_strategy = Retry( + total=self.retry_max_times, + status_forcelist=self.retry_status_codes, + backoff_factor=2, + method_whitelist=self.retry_methods, + raise_on_status=False, + ) + + adapter = HTTPAdapter( + pool_connections=100, pool_maxsize=100, max_retries=retry_strategy + ) + session.mount("http://", adapter) + session.mount("https://", adapter) + + if self.timeout is not None: + # Shim session.request to apply default timeout values. + # Via https://stackoverflow.com/a/59317604. + session.request = functools.partial( # type: ignore + session.request, + timeout=self.timeout, + ) + + return session + + class DataHubRestEmitter(Closeable, Emitter): _gms_server: str _token: Optional[str] _session: requests.Session - _connect_timeout_sec: float = _DEFAULT_CONNECT_TIMEOUT_SEC - _read_timeout_sec: float = _DEFAULT_READ_TIMEOUT_SEC - _retry_status_codes: List[int] = _DEFAULT_RETRY_STATUS_CODES - _retry_methods: List[str] = _DEFAULT_RETRY_METHODS - _retry_max_times: int = _DEFAULT_RETRY_MAX_TIMES def __init__( self, @@ -102,15 +177,13 @@ def __init__( self._session = requests.Session() - self._session.headers.update( - { - "X-RestLi-Protocol-Version": "2.0.0", - "X-DataHub-Py-Cli-Version": nice_version_name(), - "Content-Type": "application/json", - } - ) + headers = { + "X-RestLi-Protocol-Version": "2.0.0", + "X-DataHub-Py-Cli-Version": nice_version_name(), + "Content-Type": "application/json", + } if token: - self._session.headers.update({"Authorization": f"Bearer {token}"}) + headers["Authorization"] = f"Bearer {token}" else: # HACK: When no token is provided but system auth env variables are set, we use them. # Ideally this should simply get passed in as config, instead of being sneakily injected @@ -119,75 +192,43 @@ def __init__( # rest emitter, and the rest sink uses the rest emitter under the hood. system_auth = config_utils.get_system_auth() if system_auth is not None: - self._session.headers.update({"Authorization": system_auth}) - - if extra_headers: - self._session.headers.update(extra_headers) - - if client_certificate_path: - self._session.cert = client_certificate_path - - if ca_certificate_path: - self._session.verify = ca_certificate_path - - if disable_ssl_verification: - self._session.verify = False - - self._connect_timeout_sec = ( - connect_timeout_sec or timeout_sec or _DEFAULT_CONNECT_TIMEOUT_SEC - ) - self._read_timeout_sec = ( - read_timeout_sec or timeout_sec or _DEFAULT_READ_TIMEOUT_SEC - ) - - if self._connect_timeout_sec < 1 or self._read_timeout_sec < 1: - logger.warning( - f"Setting timeout values lower than 1 second is not recommended. Your configuration is connect_timeout:{self._connect_timeout_sec}s, read_timeout:{self._read_timeout_sec}s" - ) - - if retry_status_codes is not None: # Only if missing. Empty list is allowed - self._retry_status_codes = retry_status_codes - - if retry_methods is not None: - self._retry_methods = retry_methods - - if retry_max_times: - self._retry_max_times = retry_max_times + headers["Authorization"] = system_auth - try: - # Set raise_on_status to False to propagate errors: - # https://stackoverflow.com/questions/70189330/determine-status-code-from-python-retry-exception - # Must call `raise_for_status` after making a request, which we do - retry_strategy = Retry( - total=self._retry_max_times, - status_forcelist=self._retry_status_codes, - backoff_factor=2, - allowed_methods=self._retry_methods, - raise_on_status=False, - ) - except TypeError: - # Prior to urllib3 1.26, the Retry class used `method_whitelist` instead of `allowed_methods`. - retry_strategy = Retry( - total=self._retry_max_times, - status_forcelist=self._retry_status_codes, - backoff_factor=2, - method_whitelist=self._retry_methods, - raise_on_status=False, + timeout: float | tuple[float, float] + if connect_timeout_sec is not None or read_timeout_sec is not None: + timeout = ( + connect_timeout_sec or timeout_sec or _DEFAULT_TIMEOUT_SEC, + read_timeout_sec or timeout_sec or _DEFAULT_TIMEOUT_SEC, ) + if ( + timeout[0] < _TIMEOUT_LOWER_BOUND_SEC + or timeout[1] < _TIMEOUT_LOWER_BOUND_SEC + ): + logger.warning( + f"Setting timeout values lower than {_TIMEOUT_LOWER_BOUND_SEC} second is not recommended. Your configuration is (connect_timeout, read_timeout) = {timeout} seconds" + ) + else: + timeout = get_or_else(timeout_sec, _DEFAULT_TIMEOUT_SEC) + if timeout < _TIMEOUT_LOWER_BOUND_SEC: + logger.warning( + f"Setting timeout values lower than {_TIMEOUT_LOWER_BOUND_SEC} second is not recommended. Your configuration is timeout = {timeout} seconds" + ) - adapter = HTTPAdapter( - pool_connections=100, pool_maxsize=100, max_retries=retry_strategy - ) - self._session.mount("http://", adapter) - self._session.mount("https://", adapter) - - # Shim session.request to apply default timeout values. - # Via https://stackoverflow.com/a/59317604. - self._session.request = functools.partial( # type: ignore - self._session.request, - timeout=(self._connect_timeout_sec, self._read_timeout_sec), + self._session_config = RequestsSessionConfig( + timeout=timeout, + retry_status_codes=get_or_else( + retry_status_codes, _DEFAULT_RETRY_STATUS_CODES + ), + retry_methods=get_or_else(retry_methods, _DEFAULT_RETRY_METHODS), + retry_max_times=get_or_else(retry_max_times, _DEFAULT_RETRY_MAX_TIMES), + extra_headers={**headers, **(extra_headers or {})}, + ca_certificate_path=ca_certificate_path, + client_certificate_path=client_certificate_path, + disable_ssl_verification=disable_ssl_verification, ) + self._session = self._session_config.build_session() + def test_connection(self) -> None: url = f"{self._gms_server}/config" response = self._session.get(url) diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index ca9a41172e5b6e..7de6e8130a7ab6 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -179,21 +179,24 @@ def frontend_base_url(self) -> str: @classmethod def from_emitter(cls, emitter: DatahubRestEmitter) -> "DataHubGraph": + session_config = emitter._session_config + if isinstance(session_config.timeout, tuple): + # TODO: This is slightly lossy. Eventually, we want to modify the emitter + # to accept a tuple for timeout_sec, and then we'll be able to remove this. + timeout_sec: Optional[float] = session_config.timeout[0] + else: + timeout_sec = session_config.timeout return cls( DatahubClientConfig( server=emitter._gms_server, token=emitter._token, - timeout_sec=emitter._read_timeout_sec, - retry_status_codes=emitter._retry_status_codes, - retry_max_times=emitter._retry_max_times, - extra_headers=emitter._session.headers, - disable_ssl_verification=emitter._session.verify is False, - ca_certificate_path=( - emitter._session.verify - if isinstance(emitter._session.verify, str) - else None - ), - client_certificate_path=emitter._session.cert, + timeout_sec=timeout_sec, + retry_status_codes=session_config.retry_status_codes, + retry_max_times=session_config.retry_max_times, + extra_headers=session_config.extra_headers, + disable_ssl_verification=session_config.disable_ssl_verification, + ca_certificate_path=session_config.ca_certificate_path, + client_certificate_path=session_config.client_certificate_path, ) ) diff --git a/metadata-ingestion/src/datahub/ingestion/graph/config.py b/metadata-ingestion/src/datahub/ingestion/graph/config.py index 5f269e14e1a4af..8f0a5844c97c4b 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/config.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/config.py @@ -10,7 +10,7 @@ class DatahubClientConfig(ConfigModel): # by callers / the CLI, but the actual client should not have any magic. server: str token: Optional[str] = None - timeout_sec: Optional[int] = None + timeout_sec: Optional[float] = None retry_status_codes: Optional[List[int]] = None retry_max_times: Optional[int] = None extra_headers: Optional[Dict[str, str]] = None diff --git a/metadata-ingestion/tests/unit/sdk/test_rest_emitter.py b/metadata-ingestion/tests/unit/sdk/test_rest_emitter.py index b4d7cb17b66f5c..81120dfc87aba3 100644 --- a/metadata-ingestion/tests/unit/sdk/test_rest_emitter.py +++ b/metadata-ingestion/tests/unit/sdk/test_rest_emitter.py @@ -4,39 +4,41 @@ MOCK_GMS_ENDPOINT = "http://fakegmshost:8080" -def test_datahub_rest_emitter_construction(): +def test_datahub_rest_emitter_construction() -> None: emitter = DatahubRestEmitter(MOCK_GMS_ENDPOINT) - assert emitter._connect_timeout_sec == rest_emitter._DEFAULT_CONNECT_TIMEOUT_SEC - assert emitter._read_timeout_sec == rest_emitter._DEFAULT_READ_TIMEOUT_SEC - assert emitter._retry_status_codes == rest_emitter._DEFAULT_RETRY_STATUS_CODES - assert emitter._retry_max_times == rest_emitter._DEFAULT_RETRY_MAX_TIMES + assert emitter._session_config.timeout == rest_emitter._DEFAULT_TIMEOUT_SEC + assert ( + emitter._session_config.retry_status_codes + == rest_emitter._DEFAULT_RETRY_STATUS_CODES + ) + assert ( + emitter._session_config.retry_max_times == rest_emitter._DEFAULT_RETRY_MAX_TIMES + ) -def test_datahub_rest_emitter_timeout_construction(): +def test_datahub_rest_emitter_timeout_construction() -> None: emitter = DatahubRestEmitter( MOCK_GMS_ENDPOINT, connect_timeout_sec=2, read_timeout_sec=4 ) - assert emitter._connect_timeout_sec == 2 - assert emitter._read_timeout_sec == 4 + assert emitter._session_config.timeout == (2, 4) -def test_datahub_rest_emitter_general_timeout_construction(): +def test_datahub_rest_emitter_general_timeout_construction() -> None: emitter = DatahubRestEmitter(MOCK_GMS_ENDPOINT, timeout_sec=2, read_timeout_sec=4) - assert emitter._connect_timeout_sec == 2 - assert emitter._read_timeout_sec == 4 + assert emitter._session_config.timeout == (2, 4) -def test_datahub_rest_emitter_retry_construction(): +def test_datahub_rest_emitter_retry_construction() -> None: emitter = DatahubRestEmitter( MOCK_GMS_ENDPOINT, retry_status_codes=[418], retry_max_times=42, ) - assert emitter._retry_status_codes == [418] - assert emitter._retry_max_times == 42 + assert emitter._session_config.retry_status_codes == [418] + assert emitter._session_config.retry_max_times == 42 -def test_datahub_rest_emitter_extra_params(): +def test_datahub_rest_emitter_extra_params() -> None: emitter = DatahubRestEmitter( MOCK_GMS_ENDPOINT, extra_headers={"key1": "value1", "key2": "value2"} ) From cbb36bbe590812b525e6f92608279c624123333c Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 7 Jan 2025 19:23:58 -0500 Subject: [PATCH 2/8] feat(ingest): configurable query generation in combined sources (#12284) --- .../src/datahub/ingestion/source/bigquery_v2/bigquery.py | 2 ++ .../ingestion/source/bigquery_v2/bigquery_config.py | 8 ++++++++ .../ingestion/source/snowflake/snowflake_config.py | 8 ++++++++ .../datahub/ingestion/source/snowflake/snowflake_v2.py | 2 ++ 4 files changed, 20 insertions(+) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 38eab3606b7e95..db7b0540e49e71 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -281,6 +281,8 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: include_lineage=self.config.include_table_lineage, include_usage_statistics=self.config.include_usage_statistics, include_operations=self.config.usage.include_operational_stats, + include_queries=self.config.include_queries, + include_query_usage_statistics=self.config.include_query_usage_statistics, top_n_queries=self.config.usage.top_n_queries, region_qualifiers=self.config.region_qualifiers, ), diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index ef323260b014e6..afbe919df4dcae 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -447,6 +447,14 @@ class BigQueryV2Config( default=False, description="If enabled, uses the new queries extractor to extract queries from bigquery.", ) + include_queries: bool = Field( + default=True, + description="If enabled, generate query entities associated with lineage edges. Only applicable if `use_queries_v2` is enabled.", + ) + include_query_usage_statistics: bool = Field( + default=True, + description="If enabled, generate query popularity statistics. Only applicable if `use_queries_v2` is enabled.", + ) @property def have_table_data_read_permission(self) -> bool: diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py index 12e5fb72b00de8..2d61ce59857778 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py @@ -221,6 +221,14 @@ class SnowflakeV2Config( default=False, description="If enabled, uses the new queries extractor to extract queries from snowflake.", ) + include_queries: bool = Field( + default=True, + description="If enabled, generate query entities associated with lineage edges. Only applicable if `use_queries_v2` is enabled.", + ) + include_query_usage_statistics: bool = Field( + default=True, + description="If enabled, generate query popularity statistics. Only applicable if `use_queries_v2` is enabled.", + ) lazy_schema_resolver: bool = Field( default=True, diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py index 954e8a29c1a1bd..aede3d056709a2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py @@ -528,6 +528,8 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: include_lineage=self.config.include_table_lineage, include_usage_statistics=self.config.include_usage_stats, include_operations=self.config.include_operational_stats, + include_queries=self.config.include_queries, + include_query_usage_statistics=self.config.include_query_usage_statistics, user_email_pattern=self.config.user_email_pattern, ), structured_report=self.report, From 98a5a2c086df1667a1b669410efaeafbeb5e3d8b Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Wed, 8 Jan 2025 06:34:10 -0600 Subject: [PATCH 3/8] fix(javaEntityClient): correct config parameter (#12287) --- .../java/com/linkedin/metadata/client/JavaEntityClient.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java b/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java index 3d35f5956b0f4f..35d133c74c0692 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java @@ -775,7 +775,8 @@ public List batchIngestProposals( List updatedUrns = new ArrayList<>(); Iterators.partition( - metadataChangeProposals.iterator(), Math.max(1, entityClientConfig.getBatchGetV2Size())) + metadataChangeProposals.iterator(), + Math.max(1, entityClientConfig.getBatchIngestSize())) .forEachRemaining( batch -> { AspectsBatch aspectsBatch = From c0b13f087aaff9898ab8377259fb0b691b128ca0 Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Wed, 8 Jan 2025 18:40:19 +0530 Subject: [PATCH 4/8] ci: upload test coverage to codecov (#12291) --- .github/workflows/airflow-plugin.yml | 5 +++++ .github/workflows/build-and-test.yml | 5 +++++ .github/workflows/dagster-plugin.yml | 5 +++++ .github/workflows/gx-plugin.yml | 5 +++++ .github/workflows/metadata-ingestion.yml | 5 +++++ .github/workflows/metadata-io.yml | 5 +++++ .github/workflows/prefect-plugin.yml | 5 +++++ 7 files changed, 35 insertions(+) diff --git a/.github/workflows/airflow-plugin.yml b/.github/workflows/airflow-plugin.yml index b824a21be63f8f..89e0c9e2513d8b 100644 --- a/.github/workflows/airflow-plugin.yml +++ b/.github/workflows/airflow-plugin.yml @@ -87,6 +87,11 @@ jobs: flags: airflow-${{ matrix.python-version }}-${{ matrix.extra_pip_extras }} name: pytest-airflow verbose: true + - name: Upload test results to Codecov + if: ${{ !cancelled() }} + uses: codecov/test-results-action@v1 + with: + token: ${{ secrets.CODECOV_TOKEN }} event-file: runs-on: ubuntu-latest diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index 0cca80c8fdf982..058ac4a5c9b1e5 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -134,6 +134,11 @@ jobs: flags: ${{ matrix.timezone }} name: ${{ matrix.command }} verbose: true + - name: Upload test results to Codecov + if: ${{ !cancelled() }} + uses: codecov/test-results-action@v1 + with: + token: ${{ secrets.CODECOV_TOKEN }} quickstart-compose-validation: runs-on: ubuntu-latest diff --git a/.github/workflows/dagster-plugin.yml b/.github/workflows/dagster-plugin.yml index ae9a0b1605cdf3..c29e72367c53c5 100644 --- a/.github/workflows/dagster-plugin.yml +++ b/.github/workflows/dagster-plugin.yml @@ -74,6 +74,11 @@ jobs: flags: dagster-${{ matrix.python-version }}-${{ matrix.extraPythonRequirement }} name: pytest-dagster verbose: true + - name: Upload test results to Codecov + if: ${{ !cancelled() }} + uses: codecov/test-results-action@v1 + with: + token: ${{ secrets.CODECOV_TOKEN }} event-file: runs-on: ubuntu-latest diff --git a/.github/workflows/gx-plugin.yml b/.github/workflows/gx-plugin.yml index 2fd814a0764858..825f8beda2f561 100644 --- a/.github/workflows/gx-plugin.yml +++ b/.github/workflows/gx-plugin.yml @@ -78,6 +78,11 @@ jobs: flags: gx-${{ matrix.python-version }}-${{ matrix.extraPythonRequirement }} name: pytest-gx verbose: true + - name: Upload test results to Codecov + if: ${{ !cancelled() }} + uses: codecov/test-results-action@v1 + with: + token: ${{ secrets.CODECOV_TOKEN }} event-file: runs-on: ubuntu-latest diff --git a/.github/workflows/metadata-ingestion.yml b/.github/workflows/metadata-ingestion.yml index f4d87b361b5edc..aa404c4c35c505 100644 --- a/.github/workflows/metadata-ingestion.yml +++ b/.github/workflows/metadata-ingestion.yml @@ -98,6 +98,11 @@ jobs: flags: ingestion-${{ matrix.python-version }}-${{ matrix.command }} name: pytest-ingestion verbose: true + - name: Upload test results to Codecov + if: ${{ !cancelled() }} + uses: codecov/test-results-action@v1 + with: + token: ${{ secrets.CODECOV_TOKEN }} event-file: runs-on: ubuntu-latest diff --git a/.github/workflows/metadata-io.yml b/.github/workflows/metadata-io.yml index aedcd9257d83ba..bcadc641ee2f7c 100644 --- a/.github/workflows/metadata-io.yml +++ b/.github/workflows/metadata-io.yml @@ -90,6 +90,11 @@ jobs: fail_ci_if_error: false name: metadata-io-test verbose: true + - name: Upload test results to Codecov + if: ${{ !cancelled() }} + uses: codecov/test-results-action@v1 + with: + token: ${{ secrets.CODECOV_TOKEN }} event-file: runs-on: ubuntu-latest diff --git a/.github/workflows/prefect-plugin.yml b/.github/workflows/prefect-plugin.yml index 879df032409f28..0bce4d5ef19f31 100644 --- a/.github/workflows/prefect-plugin.yml +++ b/.github/workflows/prefect-plugin.yml @@ -70,6 +70,11 @@ jobs: flags: prefect-${{ matrix.python-version }} name: pytest-prefect verbose: true + - name: Upload test results to Codecov + if: ${{ !cancelled() }} + uses: codecov/test-results-action@v1 + with: + token: ${{ secrets.CODECOV_TOKEN }} event-file: runs-on: ubuntu-latest From 333445326a627a28353f1def04955f5812dc17bb Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Wed, 8 Jan 2025 18:50:07 +0530 Subject: [PATCH 5/8] log(elastic/index builder): add est time remaining (#12280) --- .../indexbuilder/ESIndexBuilder.java | 25 ++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/ESIndexBuilder.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/ESIndexBuilder.java index 6de79b6c4b181e..792e67e69f2da6 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/ESIndexBuilder.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/ESIndexBuilder.java @@ -411,6 +411,8 @@ private void reindex(ReindexConfig indexState) throws Throwable { boolean reindexTaskCompleted = false; Pair documentCounts = getDocumentCounts(indexState.name(), tempIndexName); long documentCountsLastUpdated = System.currentTimeMillis(); + long previousDocCount = documentCounts.getSecond(); + long estimatedMinutesRemaining = 0; while (System.currentTimeMillis() < timeoutAt) { log.info( @@ -421,8 +423,22 @@ private void reindex(ReindexConfig indexState) throws Throwable { Pair tempDocumentsCount = getDocumentCounts(indexState.name(), tempIndexName); if (!tempDocumentsCount.equals(documentCounts)) { - documentCountsLastUpdated = System.currentTimeMillis(); + long currentTime = System.currentTimeMillis(); + long timeElapsed = currentTime - documentCountsLastUpdated; + long docsIndexed = tempDocumentsCount.getSecond() - previousDocCount; + + // Calculate indexing rate (docs per millisecond) + double indexingRate = timeElapsed > 0 ? (double) docsIndexed / timeElapsed : 0; + + // Calculate remaining docs and estimated time + long remainingDocs = tempDocumentsCount.getFirst() - tempDocumentsCount.getSecond(); + long estimatedMillisRemaining = + indexingRate > 0 ? (long) (remainingDocs / indexingRate) : 0; + estimatedMinutesRemaining = estimatedMillisRemaining / (1000 * 60); + + documentCountsLastUpdated = currentTime; documentCounts = tempDocumentsCount; + previousDocCount = documentCounts.getSecond(); } if (documentCounts.getFirst().equals(documentCounts.getSecond())) { @@ -435,12 +451,15 @@ private void reindex(ReindexConfig indexState) throws Throwable { break; } else { + float progressPercentage = + 100 * (1.0f * documentCounts.getSecond()) / documentCounts.getFirst(); log.warn( - "Task: {} - Document counts do not match {} != {}. Complete: {}%", + "Task: {} - Document counts do not match {} != {}. Complete: {}%. Estimated time remaining: {} minutes", parentTaskId, documentCounts.getFirst(), documentCounts.getSecond(), - 100 * (1.0f * documentCounts.getSecond()) / documentCounts.getFirst()); + progressPercentage, + estimatedMinutesRemaining); long lastUpdateDelta = System.currentTimeMillis() - documentCountsLastUpdated; if (lastUpdateDelta > (300 * 1000)) { From 99c30f2b3c80ed55a7c39448ffc8fad3bfc010f3 Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Wed, 8 Jan 2025 19:04:19 +0530 Subject: [PATCH 6/8] fix(ingest/glue): don't fail on profile (#12288) --- .../src/datahub/ingestion/source/aws/glue.py | 87 +++++++++++-------- 1 file changed, 52 insertions(+), 35 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py index 7a5ed154d40bc7..a0bed4ae9a7581 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py @@ -1054,49 +1054,66 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: yield from self.gen_database_containers(database) for table in tables: - database_name = table["DatabaseName"] table_name = table["Name"] - full_table_name = f"{database_name}.{table_name}" - self.report.report_table_scanned() - if not self.source_config.database_pattern.allowed( - database_name - ) or not self.source_config.table_pattern.allowed(full_table_name): - self.report.report_table_dropped(full_table_name) - continue + try: + yield from self._gen_table_wu(table=table) + except KeyError as e: + self.report.report_failure( + message="Failed to extract workunit for table", + context=f"Table: {table_name}", + exc=e, + ) + if self.extract_transforms: + yield from self._transform_extraction() - dataset_urn = make_dataset_urn_with_platform_instance( - platform=self.platform, - name=full_table_name, - env=self.env, - platform_instance=self.source_config.platform_instance, - ) + def _gen_table_wu(self, table: Dict) -> Iterable[MetadataWorkUnit]: + database_name = table["DatabaseName"] + table_name = table["Name"] + full_table_name = f"{database_name}.{table_name}" + self.report.report_table_scanned() + if not self.source_config.database_pattern.allowed( + database_name + ) or not self.source_config.table_pattern.allowed(full_table_name): + self.report.report_table_dropped(full_table_name) + return + + dataset_urn = make_dataset_urn_with_platform_instance( + platform=self.platform, + name=full_table_name, + env=self.env, + platform_instance=self.source_config.platform_instance, + ) - mce = self._extract_record(dataset_urn, table, full_table_name) - yield MetadataWorkUnit(full_table_name, mce=mce) + mce = self._extract_record(dataset_urn, table, full_table_name) + yield MetadataWorkUnit(full_table_name, mce=mce) - # We also want to assign "table" subType to the dataset representing glue table - unfortunately it is not - # possible via Dataset snapshot embedded in a mce, so we have to generate a mcp. - yield MetadataChangeProposalWrapper( - entityUrn=dataset_urn, - aspect=SubTypes(typeNames=[DatasetSubTypes.TABLE]), - ).as_workunit() + # We also want to assign "table" subType to the dataset representing glue table - unfortunately it is not + # possible via Dataset snapshot embedded in a mce, so we have to generate a mcp. + yield MetadataChangeProposalWrapper( + entityUrn=dataset_urn, + aspect=SubTypes(typeNames=[DatasetSubTypes.TABLE]), + ).as_workunit() - yield from self._get_domain_wu( - dataset_name=full_table_name, - entity_urn=dataset_urn, - ) - yield from self.add_table_to_database_container( - dataset_urn=dataset_urn, db_name=database_name - ) + yield from self._get_domain_wu( + dataset_name=full_table_name, + entity_urn=dataset_urn, + ) + yield from self.add_table_to_database_container( + dataset_urn=dataset_urn, db_name=database_name + ) - wu = self.get_lineage_if_enabled(mce) - if wu: - yield wu + wu = self.get_lineage_if_enabled(mce) + if wu: + yield wu + try: yield from self.get_profile_if_enabled(mce, database_name, table_name) - - if self.extract_transforms: - yield from self._transform_extraction() + except KeyError as e: + self.report.report_failure( + message="Failed to extract profile for table", + context=f"Table: {dataset_urn}", + exc=e, + ) def _transform_extraction(self) -> Iterable[MetadataWorkUnit]: dags: Dict[str, Optional[Dict[str, Any]]] = {} From 0fe4163332eec5cf527ee0a0110507eb8934c4c9 Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Wed, 8 Jan 2025 19:13:31 +0530 Subject: [PATCH 7/8] fix(ingest/gc): also query data process instance (#12292) --- .../source/gc/soft_deleted_entity_cleanup.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py index 32243106bb53f6..0a52b7e17bf714 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py @@ -19,8 +19,8 @@ logger = logging.getLogger(__name__) -QUERY_QUERY_ENTITY = """ -query listQueries($input: ScrollAcrossEntitiesInput!) { +QUERY_ENTITIES = """ +query listEntities($input: ScrollAcrossEntitiesInput!) { scrollAcrossEntities(input: $input) { nextScrollId count @@ -29,6 +29,9 @@ ... on QueryEntity { urn } + ... on DataProcessInstance { + urn + } } } } @@ -225,16 +228,16 @@ def _process_futures(self, futures: Dict[Future, str]) -> Dict[Future, str]: time.sleep(self.config.delay) return futures - def _get_soft_deleted_queries(self) -> Iterable[str]: + def _get_soft_deleted(self, graphql_query: str, entity_type: str) -> Iterable[str]: assert self.ctx.graph scroll_id: Optional[str] = None while True: try: result = self.ctx.graph.execute_graphql( - QUERY_QUERY_ENTITY, + graphql_query, { "input": { - "types": ["QUERY"], + "types": [entity_type], "query": "*", "scrollId": scroll_id if scroll_id else None, "count": self.config.batch_size, @@ -254,7 +257,7 @@ def _get_soft_deleted_queries(self) -> Iterable[str]: ) except Exception as e: self.report.failure( - f"While trying to get queries with {scroll_id}", exc=e + f"While trying to get {entity_type} with {scroll_id}", exc=e ) break scroll_across_entities = result.get("scrollAcrossEntities") @@ -275,7 +278,8 @@ def _get_urns(self) -> Iterable[str]: status=RemovedStatusFilter.ONLY_SOFT_DELETED, batch_size=self.config.batch_size, ) - yield from self._get_soft_deleted_queries() + yield from self._get_soft_deleted(QUERY_ENTITIES, "QUERY") + yield from self._get_soft_deleted(QUERY_ENTITIES, "DATA_PROCESS_INSTANCE") def _times_up(self) -> bool: if ( From a4c47fa343cec4e6bc7addc11c553bace0a852a9 Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Wed, 8 Jan 2025 19:46:57 +0530 Subject: [PATCH 8/8] fix(cli): correct url ending with acryl.io:8080 (#12289) --- metadata-ingestion/src/datahub/cli/cli_utils.py | 2 ++ metadata-ingestion/tests/unit/cli/test_cli_utils.py | 4 ++++ 2 files changed, 6 insertions(+) diff --git a/metadata-ingestion/src/datahub/cli/cli_utils.py b/metadata-ingestion/src/datahub/cli/cli_utils.py index ca4a11b41925e5..f6b5ba6176c59d 100644 --- a/metadata-ingestion/src/datahub/cli/cli_utils.py +++ b/metadata-ingestion/src/datahub/cli/cli_utils.py @@ -330,6 +330,8 @@ def get_frontend_session_login_as( def _ensure_valid_gms_url_acryl_cloud(url: str) -> str: if "acryl.io" not in url: return url + if url.endswith(":8080"): + url = url.replace(":8080", "") if url.startswith("http://"): url = url.replace("http://", "https://") if url.endswith("acryl.io"): diff --git a/metadata-ingestion/tests/unit/cli/test_cli_utils.py b/metadata-ingestion/tests/unit/cli/test_cli_utils.py index c9693c75d96fe9..c430f585200e5a 100644 --- a/metadata-ingestion/tests/unit/cli/test_cli_utils.py +++ b/metadata-ingestion/tests/unit/cli/test_cli_utils.py @@ -70,6 +70,10 @@ def test_fixup_gms_url(): cli_utils.fixup_gms_url("http://abc.acryl.io/api/gms") == "https://abc.acryl.io/gms" ) + assert ( + cli_utils.fixup_gms_url("http://abcd.acryl.io:8080") + == "https://abcd.acryl.io/gms" + ) def test_guess_frontend_url_from_gms_url():