diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index 759aebcfd46b0a..4aa937639e9590 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -67,6 +67,7 @@ SystemMetadataClass, TelemetryClientIdClass, ) +from datahub.telemetry.telemetry import telemetry_instance from datahub.utilities.perf_timer import PerfTimer from datahub.utilities.str_enum import StrEnum from datahub.utilities.urns.urn import Urn, guess_entity_type @@ -1819,4 +1820,5 @@ def get_default_graph() -> DataHubGraph: graph_config = config_utils.load_client_config() graph = DataHubGraph(graph_config) graph.test_connection() + telemetry_instance.set_context(server=graph) return graph diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py index 7c3a42c3e08931..667129ff83584a 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py @@ -44,7 +44,8 @@ ) from datahub.ingestion.transformer.transform_registry import transform_registry from datahub.metadata.schema_classes import MetadataChangeProposalClass -from datahub.telemetry import stats, telemetry +from datahub.telemetry import stats +from datahub.telemetry.telemetry import telemetry_instance from datahub.utilities._custom_package_loader import model_version_name from datahub.utilities.global_warning_util import ( clear_global_warnings, @@ -273,8 +274,9 @@ def __init__( if self.graph is None and isinstance(self.sink, DatahubRestSink): with _add_init_error_context("setup default datahub client"): self.graph = self.sink.emitter.to_graph() + self.graph.test_connection() self.ctx.graph = self.graph - telemetry.telemetry_instance.update_capture_exception_context(server=self.graph) + telemetry_instance.set_context(server=self.graph) with set_graph_context(self.graph): with _add_init_error_context("configure reporters"): @@ -615,7 +617,7 @@ def log_ingestion_stats(self) -> None: sink_warnings = len(self.sink.get_report().warnings) global_warnings = len(get_global_warnings()) - telemetry.telemetry_instance.ping( + telemetry_instance.ping( "ingest_stats", { "source_type": self.source_type, @@ -637,7 +639,6 @@ def log_ingestion_stats(self) -> None: ), "has_pipeline_name": bool(self.config.pipeline_name), }, - self.ctx.graph, ) def _approx_all_vals(self, d: LossyList[Any]) -> int: diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py b/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py index c4b4186f45fc38..52807ca2a3f026 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py @@ -144,15 +144,32 @@ def get_workunits_internal( self, ) -> Iterable[MetadataWorkUnit]: if self.config.cleanup_expired_tokens: - self.revoke_expired_tokens() + try: + self.revoke_expired_tokens() + except Exception as e: + self.report.failure("While trying to cleanup expired token ", exc=e) if self.config.truncate_indices: - self.truncate_indices() + try: + self.truncate_indices() + except Exception as e: + self.report.failure("While trying to truncate indices ", exc=e) if self.dataprocess_cleanup: - yield from self.dataprocess_cleanup.get_workunits_internal() + try: + yield from self.dataprocess_cleanup.get_workunits_internal() + except Exception as e: + self.report.failure("While trying to cleanup data process ", exc=e) if self.soft_deleted_entities_cleanup: - self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities() + try: + self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities() + except Exception as e: + self.report.failure( + "While trying to cleanup soft deleted entities ", exc=e + ) if self.execution_request_cleanup: - self.execution_request_cleanup.run() + try: + self.execution_request_cleanup.run() + except Exception as e: + self.report.failure("While trying to cleanup execution request ", exc=e) yield from [] def truncate_indices(self) -> None: diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py index 130f2c9c2e12fc..0f35e1a67fede7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py @@ -404,7 +404,9 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: try: self.delete_dpi_from_datajobs(datajob_entity) except Exception as e: - logger.error(f"While trying to delete {datajob_entity} got {e}") + self.report.failure( + f"While trying to delete {datajob_entity} ", exc=e + ) if ( datajob_entity.total_runs == 0 and self.config.delete_empty_data_jobs diff --git a/metadata-ingestion/src/datahub/telemetry/telemetry.py b/metadata-ingestion/src/datahub/telemetry/telemetry.py index 4faf04ee2d2c76..22b2cb6a101af9 100644 --- a/metadata-ingestion/src/datahub/telemetry/telemetry.py +++ b/metadata-ingestion/src/datahub/telemetry/telemetry.py @@ -7,7 +7,7 @@ import uuid from functools import wraps from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, TypeVar +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, TypeVar from mixpanel import Consumer, Mixpanel from typing_extensions import ParamSpec @@ -16,10 +16,12 @@ from datahub.cli.config_utils import DATAHUB_ROOT_FOLDER from datahub.cli.env_utils import get_boolean_env_variable from datahub.configuration.common import ExceptionWithProps -from datahub.ingestion.graph.client import DataHubGraph from datahub.metadata.schema_classes import _custom_package_path from datahub.utilities.perf_timer import PerfTimer +if TYPE_CHECKING: + from datahub.ingestion.graph.client import DataHubGraph + logger = logging.getLogger(__name__) DATAHUB_FOLDER = Path(DATAHUB_ROOT_FOLDER) @@ -117,7 +119,11 @@ class Telemetry: tracking_init: bool = False sentry_enabled: bool = False + context_properties: Dict[str, Any] = {} + def __init__(self): + self.context_properties = {} + if SENTRY_DSN: self.sentry_enabled = True try: @@ -157,6 +163,9 @@ def __init__(self): except Exception as e: logger.debug(f"Error connecting to mixpanel: {e}") + # Initialize the default properties for all events. + self.set_context() + def update_config(self) -> bool: """ Update the config file with the current client ID and enabled status. @@ -238,18 +247,22 @@ def load_config(self) -> bool: return False - def update_capture_exception_context( + def set_context( self, - server: Optional[DataHubGraph] = None, + server: Optional["DataHubGraph"] = None, properties: Optional[Dict[str, Any]] = None, ) -> None: + self.context_properties = { + **self._server_props(server), + **(properties or {}), + } + if self.sentry_enabled: from sentry_sdk import set_tag properties = { **_default_telemetry_properties(), - **self._server_props(server), - **(properties or {}), + **self.context_properties, } for key in properties: @@ -297,7 +310,6 @@ def ping( self, event_name: str, properties: Optional[Dict[str, Any]] = None, - server: Optional[DataHubGraph] = None, ) -> None: """ Send a single telemetry event. @@ -323,14 +335,15 @@ def ping( properties = { **_default_telemetry_properties(), - **self._server_props(server), + **self.context_properties, **properties, } self.mp.track(self.client_id, event_name, properties) except Exception as e: logger.debug(f"Error reporting telemetry: {e}") - def _server_props(self, server: Optional[DataHubGraph]) -> Dict[str, str]: + @classmethod + def _server_props(cls, server: Optional["DataHubGraph"]) -> Dict[str, str]: if not server: return { "server_type": "n/a", @@ -435,6 +448,7 @@ def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _T: **call_props, "status": "error", **_error_props(e), + "code": e.code, }, ) telemetry_instance.capture_exception(e)