diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index cd28cc1b20f311..9d46fe606fa564 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -1,11 +1,38 @@ # Updating DataHub + + This file documents any backwards-incompatible changes in DataHub and assists people when migrating to a new version. ## Next ### Breaking Changes +- #9934 - Stateful ingestion is now enabled by default if datahub-rest sink is used or if a `datahub_api` is specified. It will still be disabled by default when any other sink type is used. +- #10002 - The `DataHubGraph` client no longer makes a request to the backend during initialization. If you want to preserve the old behavior, call `graph.test_connection()` after constructing the client. + +### Potential Downtime + +### Deprecations + +### Other Notable Changes + +## 0.13.0 + +### Breaking Changes + - Updating MySQL version for quickstarts to 8.2, may cause quickstart issues for existing instances. - Neo4j 5.x, may require migration from 4.x - Build requires JDK17 (Runtime Java 11) @@ -36,7 +63,6 @@ This file documents any backwards-incompatible changes in DataHub and assists pe - #9601 - The Unity Catalog(UC) ingestion source config `include_hive_metastore` is now enabled by default. This requires config `warehouse_id` to be set. You can disable `include_hive_metastore` by setting it to `False` to avoid ingesting legacy hive metastore catalog in Databricks. - #9904 - The default Redshift `table_lineage_mode` is now MIXED, instead of `STL_SCAN_BASED`. Improved lineage generation is also available by enabling `use_lineaege_v2`. This v2 implementation will become the default in a future release. -- #9934 - The stateful_ingestion is now enabled by default, if datahub-rest sink is used or if a `datahub_api` is specified ### Potential Downtime diff --git a/metadata-ingestion/src/datahub/cli/specific/user_cli.py b/metadata-ingestion/src/datahub/cli/specific/user_cli.py index a7afe94a141061..740e870d0f49b2 100644 --- a/metadata-ingestion/src/datahub/cli/specific/user_cli.py +++ b/metadata-ingestion/src/datahub/cli/specific/user_cli.py @@ -42,12 +42,14 @@ def upsert(file: Path, override_editable: bool) -> None: for user_config in user_configs: try: datahub_user: CorpUser = CorpUser.parse_obj(user_config) - for mcp in datahub_user.generate_mcp( - generation_config=CorpUserGenerationConfig( - override_editable=override_editable + + emitter.emit_all( + datahub_user.generate_mcp( + generation_config=CorpUserGenerationConfig( + override_editable=override_editable + ) ) - ): - emitter.emit(mcp) + ) click.secho(f"Update succeeded for urn {datahub_user.urn}.", fg="green") except Exception as e: click.secho( diff --git a/metadata-ingestion/src/datahub/configuration/source_common.py b/metadata-ingestion/src/datahub/configuration/source_common.py index 80b6ceb576c1cc..39d6bee9b30621 100644 --- a/metadata-ingestion/src/datahub/configuration/source_common.py +++ b/metadata-ingestion/src/datahub/configuration/source_common.py @@ -38,7 +38,8 @@ class EnvConfigMixin(ConfigModel): _env_deprecation = pydantic_field_deprecated( "env", - message="env is deprecated and will be removed in a future release. Please use platform_instance instead.", + message="We recommend using platform_instance instead of env. " + "While specifying env does still work, we intend to deprecate it in the future.", ) @validator("env") diff --git a/metadata-ingestion/src/datahub/emitter/mcp.py b/metadata-ingestion/src/datahub/emitter/mcp.py index 47717f3c1ed190..c6fcfad2e0abaa 100644 --- a/metadata-ingestion/src/datahub/emitter/mcp.py +++ b/metadata-ingestion/src/datahub/emitter/mcp.py @@ -2,7 +2,7 @@ import json from typing import TYPE_CHECKING, List, Optional, Sequence, Tuple, Union -from datahub.emitter.aspect import ASPECT_MAP, JSON_CONTENT_TYPE, TIMESERIES_ASPECT_MAP +from datahub.emitter.aspect import ASPECT_MAP, JSON_CONTENT_TYPE from datahub.emitter.serialization_helper import post_json_transform, pre_json_transform from datahub.metadata.schema_classes import ( ChangeTypeClass, @@ -244,21 +244,9 @@ def as_workunit( ) -> "MetadataWorkUnit": from datahub.ingestion.api.workunit import MetadataWorkUnit - if self.aspect and self.aspectName in TIMESERIES_ASPECT_MAP: - # TODO: Make this a cleaner interface. - ts = getattr(self.aspect, "timestampMillis", None) - assert ts is not None - - # If the aspect is a timeseries aspect, include the timestampMillis in the ID. - return MetadataWorkUnit( - id=f"{self.entityUrn}-{self.aspectName}-{ts}", - mcp=self, - treat_errors_as_warnings=treat_errors_as_warnings, - is_primary_source=is_primary_source, - ) - + id = MetadataWorkUnit.generate_workunit_id(self) return MetadataWorkUnit( - id=f"{self.entityUrn}-{self.aspectName}", + id=id, mcp=self, treat_errors_as_warnings=treat_errors_as_warnings, is_primary_source=is_primary_source, diff --git a/metadata-ingestion/src/datahub/emitter/rest_emitter.py b/metadata-ingestion/src/datahub/emitter/rest_emitter.py index 4598c7faa21058..b2c1f685e288c7 100644 --- a/metadata-ingestion/src/datahub/emitter/rest_emitter.py +++ b/metadata-ingestion/src/datahub/emitter/rest_emitter.py @@ -158,14 +158,14 @@ def __init__( timeout=(self._connect_timeout_sec, self._read_timeout_sec), ) - def test_connection(self) -> dict: + def test_connection(self) -> None: url = f"{self._gms_server}/config" response = self._session.get(url) if response.status_code == 200: config: dict = response.json() if config.get("noCode") == "true": self.server_config = config - return config + return else: # Looks like we either connected to an old GMS or to some other service. Let's see if we can determine which before raising an error @@ -195,6 +195,10 @@ def test_connection(self) -> dict: message += "\nPlease check your configuration and make sure you are talking to the DataHub GMS (usually :8080) or Frontend GMS API (usually :9002/api/gms)." raise ConfigurationError(message) + def get_server_config(self) -> dict: + self.test_connection() + return self.server_config + def to_graph(self) -> "DataHubGraph": from datahub.ingestion.graph.client import DataHubGraph diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index 499fab58420769..a528508e5944b0 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -55,7 +55,10 @@ def auto_workunit( for item in stream: if isinstance(item, MetadataChangeEventClass): - yield MetadataWorkUnit(id=f"{item.proposedSnapshot.urn}/mce", mce=item) + yield MetadataWorkUnit( + id=MetadataWorkUnit.generate_workunit_id(item), + mce=item, + ) else: yield item.as_workunit() diff --git a/metadata-ingestion/src/datahub/ingestion/api/workunit.py b/metadata-ingestion/src/datahub/ingestion/api/workunit.py index b1c003ee27e125..82aefda920cb86 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/workunit.py +++ b/metadata-ingestion/src/datahub/ingestion/api/workunit.py @@ -4,6 +4,7 @@ from deprecated import deprecated +from datahub.emitter.aspect import TIMESERIES_ASPECT_MAP from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import WorkUnit from datahub.metadata.com.linkedin.pegasus2avro.mxe import ( @@ -97,6 +98,28 @@ def get_urn(self) -> str: assert self.metadata.entityUrn return self.metadata.entityUrn + @classmethod + def generate_workunit_id( + cls, + item: Union[ + MetadataChangeEvent, MetadataChangeProposal, MetadataChangeProposalWrapper + ], + ) -> str: + if isinstance(item, MetadataChangeEvent): + return f"{item.proposedSnapshot.urn}/mce" + elif isinstance(item, (MetadataChangeProposalWrapper, MetadataChangeProposal)): + if item.aspect and item.aspectName in TIMESERIES_ASPECT_MAP: + # TODO: Make this a cleaner interface. + ts = getattr(item.aspect, "timestampMillis", None) + assert ts is not None + + # If the aspect is a timeseries aspect, include the timestampMillis in the ID. + return f"{item.entityUrn}-{item.aspectName}-{ts}" + + return f"{item.entityUrn}-{item.aspectName}" + else: + raise ValueError(f"Unexpected type {type(item)}") + def get_aspect_of_type(self, aspect_cls: Type[T_Aspect]) -> Optional[T_Aspect]: aspects: list if isinstance(self.metadata, MetadataChangeEvent): diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index 35ddc727dadbe0..f5d7c50427f47a 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -1,3 +1,4 @@ +import contextlib import enum import functools import json @@ -7,7 +8,18 @@ from dataclasses import dataclass from datetime import datetime from json.decoder import JSONDecodeError -from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, Type +from typing import ( + TYPE_CHECKING, + Any, + Dict, + Iterable, + Iterator, + List, + Optional, + Tuple, + Type, + Union, +) from avro.schema import RecordSchema from deprecated import deprecated @@ -26,6 +38,10 @@ generate_filter, ) from datahub.ingestion.source.state.checkpoint import Checkpoint +from datahub.metadata.com.linkedin.pegasus2avro.mxe import ( + MetadataChangeEvent, + MetadataChangeProposal, +) from datahub.metadata.schema_classes import ( ASPECT_NAME_MAP, KEY_ASPECTS, @@ -47,6 +63,7 @@ from datahub.utilities.urns.urn import Urn, guess_entity_type if TYPE_CHECKING: + from datahub.ingestion.sink.datahub_rest import DatahubRestSink from datahub.ingestion.source.state.entity_removal_state import ( GenericCheckpointState, ) @@ -58,6 +75,8 @@ logger = logging.getLogger(__name__) +_MISSING_SERVER_ID = "missing" +_GRAPH_DUMMY_RUN_ID = "__datahub-graph-client" class DatahubClientConfig(ConfigModel): @@ -122,21 +141,25 @@ def __init__(self, config: DatahubClientConfig) -> None: client_certificate_path=self.config.client_certificate_path, disable_ssl_verification=self.config.disable_ssl_verification, ) - self.test_connection() + + self.server_id = _MISSING_SERVER_ID + + def test_connection(self) -> None: + super().test_connection() # Cache the server id for telemetry. from datahub.telemetry.telemetry import telemetry_instance if not telemetry_instance.enabled: - self.server_id = "missing" + self.server_id = _MISSING_SERVER_ID return try: client_id: Optional[TelemetryClientIdClass] = self.get_aspect( "urn:li:telemetry:clientId", TelemetryClientIdClass ) - self.server_id = client_id.clientId if client_id else "missing" + self.server_id = client_id.clientId if client_id else _MISSING_SERVER_ID except Exception as e: - self.server_id = "missing" + self.server_id = _MISSING_SERVER_ID logger.debug(f"Failed to get server id due to {e}") @classmethod @@ -179,6 +202,56 @@ def _get_generic(self, url: str, params: Optional[Dict] = None) -> Dict: def _post_generic(self, url: str, payload_dict: Dict) -> Dict: return self._send_restli_request("POST", url, json=payload_dict) + @contextlib.contextmanager + def make_rest_sink( + self, run_id: str = _GRAPH_DUMMY_RUN_ID + ) -> Iterator["DatahubRestSink"]: + from datahub.ingestion.api.common import PipelineContext + from datahub.ingestion.sink.datahub_rest import ( + DatahubRestSink, + DatahubRestSinkConfig, + SyncOrAsync, + ) + + # This is a bit convoluted - this DataHubGraph class is a subclass of DatahubRestEmitter, + # but initializing the rest sink creates another rest emitter. + # TODO: We should refactor out the multithreading functionality of the sink + # into a separate class that can be used by both the sink and the graph client + # e.g. a DatahubBulkRestEmitter that both the sink and the graph client use. + sink_config = DatahubRestSinkConfig( + **self.config.dict(), mode=SyncOrAsync.ASYNC + ) + + with DatahubRestSink(PipelineContext(run_id=run_id), sink_config) as sink: + yield sink + if sink.report.failures: + raise OperationalError( + f"Failed to emit {len(sink.report.failures)} records", + info=sink.report.as_obj(), + ) + + def emit_all( + self, + items: Iterable[ + Union[ + MetadataChangeEvent, + MetadataChangeProposal, + MetadataChangeProposalWrapper, + ] + ], + run_id: str = _GRAPH_DUMMY_RUN_ID, + ) -> None: + """Emit all items in the iterable using multiple threads.""" + + with self.make_rest_sink(run_id=run_id) as sink: + for item in items: + sink.emit_async(item) + if sink.report.failures: + raise OperationalError( + f"Failed to emit {len(sink.report.failures)} records", + info=sink.report.as_obj(), + ) + def get_aspect( self, entity_urn: str, @@ -861,7 +934,7 @@ def exists(self, entity_urn: str) -> bool: def soft_delete_entity( self, urn: str, - run_id: str = "__datahub-graph-client", + run_id: str = _GRAPH_DUMMY_RUN_ID, deletion_timestamp: Optional[int] = None, ) -> None: """Soft-delete an entity by urn. @@ -873,7 +946,7 @@ def soft_delete_entity( assert urn deletion_timestamp = deletion_timestamp or int(time.time() * 1000) - self.emit_mcp( + self.emit( MetadataChangeProposalWrapper( entityUrn=urn, aspect=StatusClass(removed=True), @@ -1098,4 +1171,6 @@ def close(self) -> None: def get_default_graph() -> DataHubGraph: (url, token) = get_url_and_token() - return DataHubGraph(DatahubClientConfig(server=url, token=token)) + graph = DataHubGraph(DatahubClientConfig(server=url, token=token)) + graph.test_connection() + return graph diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py index 70ff6992645e71..7990700c7f805b 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py @@ -213,6 +213,7 @@ def __init__( with _add_init_error_context("connect to DataHub"): if self.config.datahub_api: self.graph = DataHubGraph(self.config.datahub_api) + self.graph.test_connection() telemetry.telemetry_instance.update_capture_exception_context( server=self.graph diff --git a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py index d8524c90ddfad8..a37f6ad8d279ea 100644 --- a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py +++ b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py @@ -16,7 +16,12 @@ from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.rest_emitter import DatahubRestEmitter from datahub.ingestion.api.common import RecordEnvelope, WorkUnit -from datahub.ingestion.api.sink import Sink, SinkReport, WriteCallback +from datahub.ingestion.api.sink import ( + NoopWriteCallback, + Sink, + SinkReport, + WriteCallback, +) from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.graph.client import DatahubClientConfig from datahub.metadata.com.linkedin.pegasus2avro.mxe import ( @@ -91,12 +96,11 @@ def __post_init__(self) -> None: disable_ssl_verification=self.config.disable_ssl_verification, ) try: - gms_config = self.emitter.test_connection() + gms_config = self.emitter.get_server_config() except Exception as exc: raise ConfigurationError( - f"💥 Failed to connect to DataHub@{self.config.server} (token:{'XXX-redacted' if self.config.token else 'empty'}) over REST", - exc, - ) + f"💥 Failed to connect to DataHub with {repr(self.emitter)}" + ) from exc self.report.gms_version = ( gms_config.get("versions", {}) @@ -205,6 +209,17 @@ def write_record_async( except Exception as e: write_callback.on_failure(record_envelope, e, failure_metadata={}) + def emit_async( + self, + item: Union[ + MetadataChangeEvent, MetadataChangeProposal, MetadataChangeProposalWrapper + ], + ) -> None: + return self.write_record_async( + RecordEnvelope(item, metadata={}), + NoopWriteCallback(), + ) + def close(self): self.executor.shutdown()