From e4ea993df17b6bee7d04c581fbe2bd9bbcf34927 Mon Sep 17 00:00:00 2001 From: Shirshanka Das Date: Mon, 9 Dec 2024 16:40:31 -0800 Subject: [PATCH] fix(py-sdk): DataJobPatchBuilder handling timestamps, output edges (#12067) Co-authored-by: Harshal Sheth --- docs/how/updating-datahub.md | 9 +- .../src/datahub/specific/datajob.py | 14 +-- .../nifi/nifi_mces_golden_standalone.json | 40 +----- .../tests/unit/patch/test_patch_builder.py | 117 ++++++++++++------ .../tests/patch/test_datajob_patches.py | 94 ++++++++++++++ 5 files changed, 190 insertions(+), 84 deletions(-) diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index 3f9de1ff2f7f9e..bcc89332cc1c1b 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -48,7 +48,14 @@ This file documents any backwards-incompatible changes in DataHub and assists pe - #11619 - schema field/column paths can no longer be duplicated within the schema - #11570 - The `DatahubClientConfig`'s server field no longer defaults to `http://localhost:8080`. Be sure to explicitly set this. - #11570 - If a `datahub_api` is explicitly passed to a stateful ingestion config provider, it will be used. We previously ignored it if the pipeline context also had a graph object. -- #11518 - DataHub Garbage Collection: Various entities that are soft-deleted (after 10d) or are timeseries *entities* (dataprocess, execution requests) will be removed automatically using logic in the `datahub-gc` ingestion source. +- #11518 - DataHub Garbage Collection: Various entities that are soft-deleted + (after 10d) or are timeseries *entities* (dataprocess, execution requests) + will be removed automatically using logic in the `datahub-gc` ingestion + source. +- #12067 - Default behavior of DataJobPatchBuilder in Python sdk has been + changed to NOT fill out `created` and `lastModified` auditstamps by default + for input and output dataset edges. This should not have any user-observable + impact (time-based lineage viz will still continue working based on observed time), but could break assumptions previously being made by clients. ### Potential Downtime diff --git a/metadata-ingestion/src/datahub/specific/datajob.py b/metadata-ingestion/src/datahub/specific/datajob.py index fb7b0ae7816f17..6ff4741b09c26a 100644 --- a/metadata-ingestion/src/datahub/specific/datajob.py +++ b/metadata-ingestion/src/datahub/specific/datajob.py @@ -102,7 +102,7 @@ def add_input_datajob(self, input: Union[Edge, Urn, str]) -> "DataJobPatchBuilde Notes: If `input` is an Edge object, it is used directly. If `input` is a Urn object or string, - it is converted to an Edge object and added with default audit stamps. + it is converted to an Edge object and added without any audit stamps. """ if isinstance(input, Edge): input_urn: str = input.destinationUrn @@ -114,8 +114,6 @@ def add_input_datajob(self, input: Union[Edge, Urn, str]) -> "DataJobPatchBuilde input_edge = Edge( destinationUrn=input_urn, - created=self._mint_auditstamp(), - lastModified=self._mint_auditstamp(), ) self._ensure_urn_type("dataJob", [input_edge], "add_input_datajob") @@ -185,7 +183,7 @@ def add_input_dataset(self, input: Union[Edge, Urn, str]) -> "DataJobPatchBuilde Notes: If `input` is an Edge object, it is used directly. If `input` is a Urn object or string, - it is converted to an Edge object and added with default audit stamps. + it is converted to an Edge object and added without any audit stamps. """ if isinstance(input, Edge): input_urn: str = input.destinationUrn @@ -197,8 +195,6 @@ def add_input_dataset(self, input: Union[Edge, Urn, str]) -> "DataJobPatchBuilde input_edge = Edge( destinationUrn=input_urn, - created=self._mint_auditstamp(), - lastModified=self._mint_auditstamp(), ) self._ensure_urn_type("dataset", [input_edge], "add_input_dataset") @@ -270,7 +266,7 @@ def add_output_dataset( Notes: If `output` is an Edge object, it is used directly. If `output` is a Urn object or string, - it is converted to an Edge object and added with default audit stamps. + it is converted to an Edge object and added without any audit stamps. """ if isinstance(output, Edge): output_urn: str = output.destinationUrn @@ -282,15 +278,13 @@ def add_output_dataset( output_edge = Edge( destinationUrn=output_urn, - created=self._mint_auditstamp(), - lastModified=self._mint_auditstamp(), ) self._ensure_urn_type("dataset", [output_edge], "add_output_dataset") self._add_patch( DataJobInputOutput.ASPECT_NAME, "add", - path=f"/outputDatasetEdges/{self.quote(str(output))}", + path=f"/outputDatasetEdges/{self.quote(output_urn)}", value=output_edge, ) return self diff --git a/metadata-ingestion/tests/integration/nifi/nifi_mces_golden_standalone.json b/metadata-ingestion/tests/integration/nifi/nifi_mces_golden_standalone.json index f820efc7399492..e026664a78e0be 100644 --- a/metadata-ingestion/tests/integration/nifi/nifi_mces_golden_standalone.json +++ b/metadata-ingestion/tests/integration/nifi/nifi_mces_golden_standalone.json @@ -60,15 +60,7 @@ "op": "add", "path": "/inputDatajobEdges/urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,PROD),91d59f03-1c2b-3f3f-48bc-f89296a328bd)", "value": { - "destinationUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,PROD),91d59f03-1c2b-3f3f-48bc-f89296a328bd)", - "created": { - "time": 1638532800000, - "actor": "urn:li:corpuser:datahub" - }, - "lastModified": { - "time": 1638532800000, - "actor": "urn:li:corpuser:datahub" - } + "destinationUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,PROD),91d59f03-1c2b-3f3f-48bc-f89296a328bd)" } } ] @@ -178,30 +170,14 @@ "op": "add", "path": "/inputDatasetEdges/urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)", "value": { - "destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)", - "created": { - "time": 1638532800000, - "actor": "urn:li:corpuser:datahub" - }, - "lastModified": { - "time": 1638532800000, - "actor": "urn:li:corpuser:datahub" - } + "destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)" } }, { "op": "add", "path": "/inputDatajobEdges/urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,PROD),cb7693ed-f93b-3340-3776-fe80e6283ddc)", "value": { - "destinationUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,PROD),cb7693ed-f93b-3340-3776-fe80e6283ddc)", - "created": { - "time": 1638532800000, - "actor": "urn:li:corpuser:datahub" - }, - "lastModified": { - "time": 1638532800000, - "actor": "urn:li:corpuser:datahub" - } + "destinationUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,PROD),cb7693ed-f93b-3340-3776-fe80e6283ddc)" } } ] @@ -287,15 +263,7 @@ "op": "add", "path": "/inputDatasetEdges/urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)", "value": { - "destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)", - "created": { - "time": 1638532800000, - "actor": "urn:li:corpuser:datahub" - }, - "lastModified": { - "time": 1638532800000, - "actor": "urn:li:corpuser:datahub" - } + "destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)" } } ] diff --git a/metadata-ingestion/tests/unit/patch/test_patch_builder.py b/metadata-ingestion/tests/unit/patch/test_patch_builder.py index 267da6cdd5d205..f4bf501e0714d0 100644 --- a/metadata-ingestion/tests/unit/patch/test_patch_builder.py +++ b/metadata-ingestion/tests/unit/patch/test_patch_builder.py @@ -1,5 +1,6 @@ import json import pathlib +from typing import Any, Dict, Union import pytest from freezegun.api import freeze_time @@ -15,7 +16,9 @@ ) from datahub.ingestion.sink.file import write_metadata_file from datahub.metadata.schema_classes import ( + AuditStampClass, DatasetLineageTypeClass, + EdgeClass, FineGrainedLineageClass, FineGrainedLineageDownstreamTypeClass, FineGrainedLineageUpstreamTypeClass, @@ -182,8 +185,66 @@ def test_basic_dashboard_patch_builder(): ] +@pytest.mark.parametrize( + "created_on,last_modified,expected_actor", + [ + (1586847600000, 1586847600000, "urn:li:corpuser:datahub"), + (None, None, "urn:li:corpuser:datahub"), + (1586847600000, None, "urn:li:corpuser:datahub"), + (None, 1586847600000, "urn:li:corpuser:datahub"), + ], + ids=["both_timestamps", "no_timestamps", "only_created", "only_modified"], +) @freeze_time("2020-04-14 07:00:00") -def test_datajob_patch_builder(): +def test_datajob_patch_builder(created_on, last_modified, expected_actor): + def make_edge_or_urn(urn: str) -> Union[EdgeClass, str]: + if created_on or last_modified: + return EdgeClass( + destinationUrn=str(urn), + created=( + AuditStampClass( + time=created_on, + actor=expected_actor, + ) + if created_on + else None + ), + lastModified=( + AuditStampClass( + time=last_modified, + actor=expected_actor, + ) + if last_modified + else None + ), + ) + return urn + + def get_edge_expectation(urn: str) -> Dict[str, Any]: + if created_on or last_modified: + expected = { + "destinationUrn": str(urn), + "created": ( + AuditStampClass( + time=created_on, + actor=expected_actor, + ).to_obj() + if created_on + else None + ), + "lastModified": ( + AuditStampClass( + time=last_modified, + actor=expected_actor, + ).to_obj() + if last_modified + else None + ), + } + # filter out None values + return {k: v for k, v in expected.items() if v is not None} + return {"destinationUrn": str(urn)} + flow_urn = make_data_flow_urn( orchestrator="nifi", flow_id="252C34e5af19-0192-1000-b248-b1abee565b5d" ) @@ -193,13 +254,19 @@ def test_datajob_patch_builder(): patcher = DataJobPatchBuilder(job_urn) patcher.add_output_dataset( - "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder1,DEV)" + make_edge_or_urn( + "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder1,DEV)" + ) ) patcher.add_output_dataset( - "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder3,DEV)" + make_edge_or_urn( + "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder3,DEV)" + ) ) patcher.add_output_dataset( - "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder2,DEV)" + make_edge_or_urn( + "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder2,DEV)" + ) ) assert patcher.build() == [ @@ -214,47 +281,23 @@ def test_datajob_patch_builder(): { "op": "add", "path": "/outputDatasetEdges/urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket~1folder1,DEV)", - "value": { - "destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder1,DEV)", - "created": { - "time": 1586847600000, - "actor": "urn:li:corpuser:datahub", - }, - "lastModified": { - "time": 1586847600000, - "actor": "urn:li:corpuser:datahub", - }, - }, + "value": get_edge_expectation( + "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder1,DEV)" + ), }, { "op": "add", "path": "/outputDatasetEdges/urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket~1folder3,DEV)", - "value": { - "destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder3,DEV)", - "created": { - "time": 1586847600000, - "actor": "urn:li:corpuser:datahub", - }, - "lastModified": { - "time": 1586847600000, - "actor": "urn:li:corpuser:datahub", - }, - }, + "value": get_edge_expectation( + "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder3,DEV)" + ), }, { "op": "add", "path": "/outputDatasetEdges/urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket~1folder2,DEV)", - "value": { - "destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder2,DEV)", - "created": { - "time": 1586847600000, - "actor": "urn:li:corpuser:datahub", - }, - "lastModified": { - "time": 1586847600000, - "actor": "urn:li:corpuser:datahub", - }, - }, + "value": get_edge_expectation( + "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder2,DEV)" + ), }, ] ).encode("utf-8"), diff --git a/smoke-test/tests/patch/test_datajob_patches.py b/smoke-test/tests/patch/test_datajob_patches.py index eb129e1e032125..e025c8a2aebeb5 100644 --- a/smoke-test/tests/patch/test_datajob_patches.py +++ b/smoke-test/tests/patch/test_datajob_patches.py @@ -1,5 +1,7 @@ +import time import uuid +import datahub.metadata.schema_classes as models from datahub.emitter.mce_builder import make_data_job_urn, make_dataset_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.metadata.schema_classes import ( @@ -136,3 +138,95 @@ def test_datajob_inputoutput_dataset_patch(graph_client): inputoutput_lineage_read.inputDatasetEdges[0].destinationUrn == other_dataset_urn ) + + +def test_datajob_multiple_inputoutput_dataset_patch(graph_client): + """Test creating a data job with multiple input and output datasets and verifying the aspects.""" + # Create the data job + datajob_urn = "urn:li:dataJob:(urn:li:dataFlow:(airflow,training,default),training)" + + # Create input and output dataset URNs + input_datasets = ["input_data_1", "input_data_2"] + output_datasets = ["output_data_1", "output_data_2"] + + input_dataset_urns = [ + make_dataset_urn(platform="s3", name=f"test_patch_{dataset}", env="PROD") + for dataset in input_datasets + ] + output_dataset_urns = [ + make_dataset_urn(platform="s3", name=f"test_patch_{dataset}", env="PROD") + for dataset in output_datasets + ] + + # Create edges for datasets + def make_edge(urn, generate_auditstamp=False): + audit_stamp = models.AuditStampClass( + time=int(time.time() * 1000.0), + actor="urn:li:corpuser:datahub", + ) + return EdgeClass( + destinationUrn=str(urn), + lastModified=audit_stamp if generate_auditstamp else None, + ) + + # Initialize empty input/output lineage + initial_lineage = DataJobInputOutputClass( + inputDatasets=[], outputDatasets=[], inputDatasetEdges=[], outputDatasetEdges=[] + ) + + # Emit initial lineage + mcpw = MetadataChangeProposalWrapper(entityUrn=datajob_urn, aspect=initial_lineage) + graph_client.emit_mcp(mcpw) + + # Create patches for input and output datasets + patch_builder = DataJobPatchBuilder(datajob_urn) + for input_urn in input_dataset_urns: + patch_builder.add_input_dataset(make_edge(input_urn)) + for output_urn in output_dataset_urns: + patch_builder.add_output_dataset(make_edge(output_urn)) + + # Apply patches + for patch_mcp in patch_builder.build(): + graph_client.emit_mcp(patch_mcp) + + # Verify the lineage was correctly applied + lineage_aspect = graph_client.get_aspect( + entity_urn=datajob_urn, + aspect_type=DataJobInputOutputClass, + ) + + # Assert lineage was created + assert lineage_aspect is not None + assert lineage_aspect.inputDatasetEdges is not None + assert lineage_aspect.outputDatasetEdges is not None + + # Verify input datasets + assert len(lineage_aspect.inputDatasetEdges) == len(input_datasets) + input_urns = {edge.destinationUrn for edge in lineage_aspect.inputDatasetEdges} + expected_input_urns = {str(urn) for urn in input_dataset_urns} + assert input_urns == expected_input_urns + + # Verify output datasets + assert len(lineage_aspect.outputDatasetEdges) == len(output_datasets) + output_urns = {edge.destinationUrn for edge in lineage_aspect.outputDatasetEdges} + expected_output_urns = {str(urn) for urn in output_dataset_urns} + assert output_urns == expected_output_urns + + # Test updating the same datasets again (idempotency) + patch_builder = DataJobPatchBuilder(datajob_urn) + for input_urn in input_dataset_urns: + patch_builder.add_input_dataset(make_edge(input_urn)) + for output_urn in output_dataset_urns: + patch_builder.add_output_dataset(make_edge(output_urn)) + + for patch_mcp in patch_builder.build(): + graph_client.emit_mcp(patch_mcp) + + # Verify the aspect hasn't changed + updated_lineage_aspect = graph_client.get_aspect( + entity_urn=datajob_urn, + aspect_type=DataJobInputOutputClass, + ) + + assert updated_lineage_aspect is not None + assert updated_lineage_aspect.to_obj() == lineage_aspect.to_obj()