From 7eab2eb8d98824e1f580ac80e604f1a8e92cac44 Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Mon, 20 Jan 2025 13:38:12 +0100 Subject: [PATCH 1/5] fix(ingest/fivetran): Add way to not add schema to the destination/source urn. (#12314) --- .../ingestion/source/fivetran/config.py | 4 + .../ingestion/source/fivetran/fivetran.py | 20 +- ...nowflake_empty_connection_user_golden.json | 317 +++++++++++++++++ .../fivetran/fivetran_snowflake_golden.json | 325 ++++++++++++++++++ .../integration/fivetran/test_fivetran.py | 73 +++- 5 files changed, 725 insertions(+), 14 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py index 86826ae7bedc09..b048b6157fbb9f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py @@ -167,6 +167,10 @@ class PlatformDetail(ConfigModel): description="The database that all assets produced by this connector belong to. " "For destinations, this defaults to the fivetran log config's database.", ) + include_schema_in_urn: bool = pydantic.Field( + default=True, + description="Include schema in the dataset URN. In some cases, the schema is not relevant to the dataset URN and Fivetran sets it to the source and destination table names in the connector.", + ) class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin): diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py index d8ebbe5b63d1ae..a3469696699f17 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py @@ -119,21 +119,31 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> Dict[str, s ) for lineage in connector.lineage: + source_table = ( + lineage.source_table + if source_details.include_schema_in_urn + else lineage.source_table.split(".", 1)[1] + ) input_dataset_urn = DatasetUrn.create_from_ids( platform_id=source_details.platform, table_name=( - f"{source_details.database.lower()}.{lineage.source_table}" + f"{source_details.database.lower()}.{source_table}" if source_details.database - else lineage.source_table + else source_table ), env=source_details.env, platform_instance=source_details.platform_instance, ) input_dataset_urn_list.append(input_dataset_urn) + destination_table = ( + lineage.destination_table + if destination_details.include_schema_in_urn + else lineage.destination_table.split(".", 1)[1] + ) output_dataset_urn = DatasetUrn.create_from_ids( platform_id=destination_details.platform, - table_name=f"{destination_details.database.lower()}.{lineage.destination_table}", + table_name=f"{destination_details.database.lower()}.{destination_table}", env=destination_details.env, platform_instance=destination_details.platform_instance, ) @@ -176,12 +186,12 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> Dict[str, s **{ f"source.{k}": str(v) for k, v in source_details.dict().items() - if v is not None + if v is not None and not isinstance(v, bool) }, **{ f"destination.{k}": str(v) for k, v in destination_details.dict().items() - if v is not None + if v is not None and not isinstance(v, bool) }, ) diff --git a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json index 7934153051c60b..cf77ab39cbcbe7 100644 --- a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json +++ b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json @@ -595,6 +595,323 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD)", + "changeType": "UPSERT", + "aspectName": "dataFlowInfo", + "aspect": { + "json": { + "customProperties": {}, + "name": "confluent_cloud", + "env": "PROD" + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:fivetran" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": { + "connector_id": "my_confluent_cloud_connector_id", + "connector_type": "confluent_cloud", + "paused": "False", + "sync_frequency": "1440", + "destination_id": "interval_unconstitutional", + "source.platform": "kafka", + "source.env": "PROD", + "source.database": "kafka_prod", + "destination.platform": "snowflake", + "destination.env": "PROD", + "destination.database": "test_database" + }, + "name": "confluent_cloud", + "type": { + "string": "COMMAND" + }, + "env": "PROD" + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)", + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "json": { + "inputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:kafka,kafka_prod.my-source-topic,PROD)" + ], + "outputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_database.confluent_cloud.my-destination-topic,PROD)" + ], + "inputDatajobs": [], + "fineGrainedLineages": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:fivetran" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceProperties", + "aspect": { + "json": { + "customProperties": {}, + "name": "d9a03d6-eded-4422-a46a-163266e58244", + "type": "BATCH_SCHEDULED", + "created": { + "time": 1695191853000, + "actor": "urn:li:corpuser:datahub" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRelationships", + "aspect": { + "json": { + "parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)", + "upstreamInstances": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceInput", + "aspect": { + "json": { + "inputs": [ + "urn:li:dataset:(urn:li:dataPlatform:kafka,kafka_prod.my-source-topic,PROD)" + ] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceOutput", + "aspect": { + "json": { + "outputs": [ + "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_database.confluent_cloud.my-destination-topic,PROD)" + ] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1695191853000, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "status": "STARTED" + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1695191885000, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "status": "COMPLETE", + "result": { + "type": "SUCCESS", + "nativeResultType": "fivetran" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataProcessInstance", "entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a", diff --git a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json index c46bd8fb65d876..cec1ef1b6221d3 100644 --- a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json +++ b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json @@ -603,6 +603,331 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD)", + "changeType": "UPSERT", + "aspectName": "dataFlowInfo", + "aspect": { + "json": { + "customProperties": {}, + "name": "confluent_cloud", + "env": "PROD" + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:fivetran" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": { + "connector_id": "my_confluent_cloud_connector_id", + "connector_type": "confluent_cloud", + "paused": "False", + "sync_frequency": "1440", + "destination_id": "my_confluent_cloud_connector_id", + "source.platform": "kafka", + "source.env": "PROD", + "source.database": "kafka_prod", + "destination.platform": "kafka", + "destination.env": "PROD", + "destination.database": "kafka_prod" + }, + "name": "confluent_cloud", + "type": { + "string": "COMMAND" + }, + "env": "PROD" + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)", + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "json": { + "inputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:kafka,kafka_prod.my-source-topic,PROD)" + ], + "outputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:kafka,kafka_prod.my-destination-topic,PROD)" + ], + "inputDatajobs": [], + "fineGrainedLineages": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [ + { + "owner": "urn:li:corpuser:abc.xyz@email.com", + "type": "DEVELOPER", + "source": { + "type": "SERVICE" + } + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:fivetran" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceProperties", + "aspect": { + "json": { + "customProperties": {}, + "name": "d9a03d6-eded-4422-a46a-163266e58244", + "type": "BATCH_SCHEDULED", + "created": { + "time": 1695191853000, + "actor": "urn:li:corpuser:datahub" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRelationships", + "aspect": { + "json": { + "parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)", + "upstreamInstances": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceInput", + "aspect": { + "json": { + "inputs": [ + "urn:li:dataset:(urn:li:dataPlatform:kafka,kafka_prod.my-source-topic,PROD)" + ] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceOutput", + "aspect": { + "json": { + "outputs": [ + "urn:li:dataset:(urn:li:dataPlatform:kafka,kafka_prod.my-destination-topic,PROD)" + ] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1695191853000, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "status": "STARTED" + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1695191885000, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "status": "COMPLETE", + "result": { + "type": "SUCCESS", + "nativeResultType": "fivetran" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataProcessInstance", "entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a", diff --git a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py index 2e6c2b1370d166..9f53ae2382d406 100644 --- a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py +++ b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py @@ -32,6 +32,15 @@ "sync_frequency": 1440, "destination_id": "interval_unconstitutional", }, + { + "connector_id": "my_confluent_cloud_connector_id", + "connecting_user_id": "reapply_phone", + "connector_type_id": "confluent_cloud", + "connector_name": "confluent_cloud", + "paused": False, + "sync_frequency": 1440, + "destination_id": "my_confluent_cloud_connector_id", + }, ] @@ -45,7 +54,7 @@ def default_query_results( elif query == fivetran_log_query.get_connectors_query(): return connector_query_results elif query == fivetran_log_query.get_table_lineage_query( - connector_ids=["calendar_elected"] + connector_ids=["calendar_elected", "my_confluent_cloud_connector_id"] ): return [ { @@ -66,9 +75,18 @@ def default_query_results( "destination_table_name": "company", "destination_schema_name": "postgres_public", }, + { + "connector_id": "my_confluent_cloud_connector_id", + "source_table_id": "10042", + "source_table_name": "my-source-topic", + "source_schema_name": "confluent_cloud", + "destination_table_id": "7781", + "destination_table_name": "my-destination-topic", + "destination_schema_name": "confluent_cloud", + }, ] elif query == fivetran_log_query.get_column_lineage_query( - connector_ids=["calendar_elected"] + connector_ids=["calendar_elected", "my_confluent_cloud_connector_id"] ): return [ { @@ -107,7 +125,7 @@ def default_query_results( ] elif query == fivetran_log_query.get_sync_logs_query( syncs_interval=7, - connector_ids=["calendar_elected"], + connector_ids=["calendar_elected", "my_confluent_cloud_connector_id"], ): return [ { @@ -131,6 +149,13 @@ def default_query_results( "end_time": datetime.datetime(2023, 10, 3, 14, 36, 29, 678000), "end_message_data": '"{\\"reason\\":\\"java.lang.RuntimeException: FATAL: too many connections for role \\\\\\"hxwraqld\\\\\\"\\",\\"taskType\\":\\"reconnect\\",\\"status\\":\\"FAILURE_WITH_TASK\\"}"', }, + { + "connector_id": "my_confluent_cloud_connector_id", + "sync_id": "d9a03d6-eded-4422-a46a-163266e58244", + "start_time": datetime.datetime(2023, 9, 20, 6, 37, 32, 606000), + "end_time": datetime.datetime(2023, 9, 20, 6, 38, 5, 56000), + "end_message_data": '"{\\"status\\":\\"SUCCESSFUL\\"}"', + }, ] # Unreachable code raise Exception(f"Unknown query {query}") @@ -172,19 +197,30 @@ def test_fivetran_with_snowflake_dest(pytestconfig, tmp_path): }, }, "connector_patterns": { - "allow": [ - "postgres", - ] + "allow": ["postgres", "confluent_cloud"] }, "destination_patterns": { "allow": [ "interval_unconstitutional", + "my_confluent_cloud_connector_id", ] }, "sources_to_platform_instance": { "calendar_elected": { "database": "postgres_db", "env": "DEV", + }, + "my_confluent_cloud_connector_id": { + "platform": "kafka", + "include_schema_in_urn": False, + "database": "kafka_prod", + }, + }, + "destination_to_platform_instance": { + "my_confluent_cloud_connector_id": { + "platform": "kafka", + "include_schema_in_urn": False, + "database": "kafka_prod", } }, }, @@ -234,6 +270,15 @@ def test_fivetran_with_snowflake_dest_and_null_connector_user(pytestconfig, tmp_ "sync_frequency": 1440, "destination_id": "interval_unconstitutional", }, + { + "connector_id": "my_confluent_cloud_connector_id", + "connecting_user_id": None, + "connector_type_id": "confluent_cloud", + "connector_name": "confluent_cloud", + "paused": False, + "sync_frequency": 1440, + "destination_id": "interval_unconstitutional", + }, ] connection_magic_mock.execute.side_effect = partial( @@ -261,9 +306,7 @@ def test_fivetran_with_snowflake_dest_and_null_connector_user(pytestconfig, tmp_ }, }, "connector_patterns": { - "allow": [ - "postgres", - ] + "allow": ["postgres", "confluent_cloud"] }, "destination_patterns": { "allow": [ @@ -275,6 +318,18 @@ def test_fivetran_with_snowflake_dest_and_null_connector_user(pytestconfig, tmp_ "platform": "postgres", "env": "DEV", "database": "postgres_db", + }, + "my_confluent_cloud_connector_id": { + "platform": "kafka", + "database": "kafka_prod", + "include_schema_in_urn": False, + }, + }, + "destination_to_platform_instance": { + "my_confluent_cloud_connector_id": { + "platform": "kafka", + "database": "kafka_prod", + "include_schema_in_urn": False, } }, }, From 68218768d3295b3c406bc8b5804dd646f9cc7a44 Mon Sep 17 00:00:00 2001 From: skrydal Date: Mon, 20 Jan 2025 14:43:12 +0100 Subject: [PATCH 2/5] fix(ingest/redshift): Fix query sequence duplication for serverless mode (#12353) --- .../ingestion/source/redshift/query.py | 124 +++++++++++------- 1 file changed, 77 insertions(+), 47 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py index 71a20890d35e88..62f7d0a3901c7a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py @@ -797,61 +797,91 @@ def stl_scan_based_lineage_query( db_name: str, start_time: datetime, end_time: datetime ) -> str: return """ - SELECT - distinct cluster, - target_schema, - target_table, - username, - source_schema, - source_table, - query_text AS ddl, - start_time AS timestamp - FROM - ( - SELECT - sti.schema AS target_schema, - sti.table AS target_table, - sti.database AS cluster, - qi.table_id AS target_table_id, - qi.query_id AS query_id, - qi.start_time AS start_time - FROM - SYS_QUERY_DETAIL qi - JOIN - SVV_TABLE_INFO sti on sti.table_id = qi.table_id - WHERE - start_time >= '{start_time}' and - start_time < '{end_time}' and - cluster = '{db_name}' and - step_name = 'insert' - ) AS target_tables - JOIN - ( + WITH queries AS ( SELECT - sti.schema AS source_schema, - sti.table AS source_table, - qs.table_id AS source_table_id, - qs.query_id AS query_id, - sui.user_name AS username, - LISTAGG(qt."text") WITHIN GROUP (ORDER BY sequence) AS query_text + sti.database as cluster, + sti.schema AS "schema", + sti.table AS "table", + qs.table_id AS table_id, + qs.query_id as query_id, + qs.step_name as step_name, + sui.user_name as username, + source, + MIN(qs.start_time) as "timestamp" -- multiple duplicate records with start_time increasing slightly by miliseconds FROM SYS_QUERY_DETAIL qs JOIN SVV_TABLE_INFO sti ON sti.table_id = qs.table_id LEFT JOIN - SYS_QUERY_TEXT qt ON qt.query_id = qs.query_id - LEFT JOIN SVV_USER_INFO sui ON qs.user_id = sui.user_id WHERE - qs.step_name = 'scan' AND - qs.source = 'Redshift(local)' AND - qt.sequence < 16 AND -- See https://stackoverflow.com/questions/72770890/redshift-result-size-exceeds-listagg-limit-on-svl-statementtext - sti.database = '{db_name}' AND -- this was required to not retrieve some internal redshift tables, try removing to see what happens - sui.user_name <> 'rdsdb' -- not entirely sure about this filter - GROUP BY sti.schema, sti.table, qs.table_id, qs.query_id, sui.user_name - ) AS source_tables ON target_tables.query_id = source_tables.query_id - WHERE source_tables.source_table_id <> target_tables.target_table_id - ORDER BY cluster, target_schema, target_table, start_time ASC + cluster = '{db_name}' AND + qs.user_id <> 1 AND -- this is user 'rdsdb' + qs.start_time >= '{start_time}' AND + qs.start_time < '{end_time}' + GROUP BY cluster, "schema", "table", qs.table_id, query_id, step_name, username, source -- to be sure we are not making duplicates ourselves the list of group by must match whatever we use in "group by" and "where" of subsequent queries ("cluster" is already set to single value in this query) + ), + unique_query_text AS ( + SELECT + query_id, + sequence, + text + FROM ( + SELECT + query_id, + "sequence", + text, + ROW_NUMBER() OVER ( + PARTITION BY query_id, sequence + ) as rn + FROM SYS_QUERY_TEXT + ) + WHERE rn = 1 + ), + scan_queries AS ( + SELECT + "schema" as source_schema, + "table" as source_table, + table_id as source_table_id, + queries.query_id as query_id, + username, + LISTAGG(qt."text") WITHIN GROUP (ORDER BY sequence) AS query_text + FROM + "queries" LEFT JOIN + unique_query_text qt ON qt.query_id = queries.query_id + WHERE + source = 'Redshift(local)' AND + step_name = 'scan' AND + qt.sequence < 16 -- truncating query to not exceed Redshift limit on LISTAGG function (each sequence has at most 4k characters, limit is 64k, divided by 4k gives 16, starts count from 0) + GROUP BY source_schema, source_table, source_table_id, queries.query_id, username + ), + insert_queries AS ( + SELECT + "schema" as target_schema, + "table" as target_table, + table_id as target_table_id, + query_id, + cluster, + min("timestamp") as "timestamp" + FROM + queries + WHERE + step_name = 'insert' + GROUP BY cluster, target_schema, target_table, target_table_id, query_id + ) + SELECT + cluster, + target_schema, + target_table, + username, + source_schema, + source_table, + query_text AS ddl, + "timestamp" + FROM scan_queries + JOIN insert_queries on insert_queries.query_id = scan_queries.query_id + WHERE source_table_id <> target_table_id + ORDER BY cluster, target_schema, target_table, "timestamp" ASC; """.format( # We need the original database name for filtering db_name=db_name, From 7ac6523b2de6f46edadd83d834f694c892a4c761 Mon Sep 17 00:00:00 2001 From: josges Date: Mon, 20 Jan 2025 15:07:12 +0100 Subject: [PATCH 3/5] fix(ingestion): fix stateful ingestion for GCS source (#11879) --- .../src/datahub/ingestion/source/gcs/gcs_source.py | 3 ++- metadata-ingestion/tests/unit/test_gcs_source.py | 7 ++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/gcs/gcs_source.py b/metadata-ingestion/src/datahub/ingestion/source/gcs/gcs_source.py index 5196c8ec5b998b..95490147d48204 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gcs/gcs_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gcs/gcs_source.py @@ -88,6 +88,7 @@ def __init__(self, config: GCSSourceConfig, ctx: PipelineContext): super().__init__(config, ctx) self.config = config self.report = GCSSourceReport() + self.platform: str = PLATFORM_GCS self.s3_source = self.create_equivalent_s3_source(ctx) @classmethod @@ -135,7 +136,7 @@ def create_equivalent_s3_path_specs(self): def create_equivalent_s3_source(self, ctx: PipelineContext) -> S3Source: config = self.create_equivalent_s3_config() - return self.s3_source_overrides(S3Source(config, ctx)) + return self.s3_source_overrides(S3Source(config, PipelineContext(ctx.run_id))) def s3_source_overrides(self, source: S3Source) -> S3Source: source.source_config.platform = PLATFORM_GCS diff --git a/metadata-ingestion/tests/unit/test_gcs_source.py b/metadata-ingestion/tests/unit/test_gcs_source.py index 9d5f4e915b18cf..5b99862c92a04b 100644 --- a/metadata-ingestion/tests/unit/test_gcs_source.py +++ b/metadata-ingestion/tests/unit/test_gcs_source.py @@ -1,13 +1,17 @@ +from unittest import mock + import pytest from pydantic import ValidationError from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.graph.client import DataHubGraph from datahub.ingestion.source.data_lake_common.data_lake_utils import PLATFORM_GCS from datahub.ingestion.source.gcs.gcs_source import GCSSource def test_gcs_source_setup(): - ctx = PipelineContext(run_id="test-gcs") + graph = mock.MagicMock(spec=DataHubGraph) + ctx = PipelineContext(run_id="test-gcs", graph=graph, pipeline_name="test-gcs") # Baseline: valid config source: dict = { @@ -18,6 +22,7 @@ def test_gcs_source_setup(): } ], "credential": {"hmac_access_id": "id", "hmac_access_secret": "secret"}, + "stateful_ingestion": {"enabled": "true"}, } gcs = GCSSource.create(source, ctx) assert gcs.s3_source.source_config.platform == PLATFORM_GCS From 2109abdc1a1de5fe711900ee47e5f5891d2c59dc Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Mon, 20 Jan 2025 22:32:43 +0530 Subject: [PATCH 4/5] feat(cli/delete): add --urn-file option (#12247) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Sergio Gómez Villamor --- .../src/datahub/cli/delete_cli.py | 18 ++++++++++++++++-- .../cli/delete_cmd/test_timeseries_delete.py | 1 + 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/cli/delete_cli.py b/metadata-ingestion/src/datahub/cli/delete_cli.py index 8501cf71f0d544..f77156b4bd7544 100644 --- a/metadata-ingestion/src/datahub/cli/delete_cli.py +++ b/metadata-ingestion/src/datahub/cli/delete_cli.py @@ -265,6 +265,11 @@ def undo_by_filter( type=str, help="Urn of the entity to delete, for single entity deletion", ) +@click.option( + "--urn-file", + required=False, + help="Path of file with urns (one per line) to be deleted", +) @click.option( "-a", "--aspect", @@ -353,6 +358,7 @@ def undo_by_filter( @telemetry.with_telemetry() def by_filter( urn: Optional[str], + urn_file: Optional[str], aspect: Optional[str], force: bool, soft: bool, @@ -373,6 +379,7 @@ def by_filter( # Validate the cli arguments. _validate_user_urn_and_filters( urn=urn, + urn_file=urn_file, entity_type=entity_type, platform=platform, env=env, @@ -429,6 +436,12 @@ def by_filter( batch_size=batch_size, ) ) + elif urn_file: + with open(urn_file, "r") as r: + urns = [] + for line in r.readlines(): + urn = line.strip().strip('"') + urns.append(urn) else: urns = list( graph.get_urns_by_filter( @@ -537,6 +550,7 @@ def process_urn(urn): def _validate_user_urn_and_filters( urn: Optional[str], + urn_file: Optional[str], entity_type: Optional[str], platform: Optional[str], env: Optional[str], @@ -549,9 +563,9 @@ def _validate_user_urn_and_filters( raise click.UsageError( "You cannot provide both an urn and a filter rule (entity-type / platform / env / query)." ) - elif not urn and not (entity_type or platform or env or query): + elif not urn and not urn_file and not (entity_type or platform or env or query): raise click.UsageError( - "You must provide either an urn or at least one filter (entity-type / platform / env / query) in order to delete entities." + "You must provide either an urn or urn_file or at least one filter (entity-type / platform / env / query) in order to delete entities." ) elif query: logger.warning( diff --git a/smoke-test/tests/cli/delete_cmd/test_timeseries_delete.py b/smoke-test/tests/cli/delete_cmd/test_timeseries_delete.py index 99ed790f25ffd7..745224a099fc53 100644 --- a/smoke-test/tests/cli/delete_cmd/test_timeseries_delete.py +++ b/smoke-test/tests/cli/delete_cmd/test_timeseries_delete.py @@ -97,6 +97,7 @@ def datahub_delete(auth_session, params: List[str]) -> None: args: List[str] = ["delete"] args.extend(params) args.append("--hard") + logger.info(f"Running delete command with args: {args}") delete_result: Result = runner.invoke( datahub, args, From a20f660ac109ed0d989893de26ad0902acf5b1d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20L=C3=BCdin?= <13187726+Masterchen09@users.noreply.github.com> Date: Mon, 20 Jan 2025 23:57:22 +0100 Subject: [PATCH 5/5] fix(graphql): removed duplicated entity in EntityTypeUrnMapper (#12406) --- .../types/entitytype/EntityTypeUrnMapper.java | 3 -- .../entitytype/EntityTypeMapperTest.java | 20 +++++++++++++ .../entitytype/EntityTypeUrnMapperTest.java | 30 +++++++++++++++++++ 3 files changed, 50 insertions(+), 3 deletions(-) create mode 100644 datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/types/entitytype/EntityTypeMapperTest.java create mode 100644 datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/types/entitytype/EntityTypeUrnMapperTest.java diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/entitytype/EntityTypeUrnMapper.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/entitytype/EntityTypeUrnMapper.java index 5b72c2b3c11c5e..334faf753cb8b5 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/entitytype/EntityTypeUrnMapper.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/entitytype/EntityTypeUrnMapper.java @@ -77,9 +77,6 @@ public class EntityTypeUrnMapper { .put( Constants.BUSINESS_ATTRIBUTE_ENTITY_NAME, "urn:li:entityType:datahub.businessAttribute") - .put( - Constants.DATA_PROCESS_INSTANCE_ENTITY_NAME, - "urn:li:entityType:datahub.dataProcessInstance") .build(); private static final Map ENTITY_TYPE_URN_TO_NAME = diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/types/entitytype/EntityTypeMapperTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/types/entitytype/EntityTypeMapperTest.java new file mode 100644 index 00000000000000..79cc7725b1fc7f --- /dev/null +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/types/entitytype/EntityTypeMapperTest.java @@ -0,0 +1,20 @@ +package com.linkedin.datahub.graphql.types.entitytype; + +import static org.testng.Assert.*; + +import com.linkedin.datahub.graphql.generated.EntityType; +import com.linkedin.metadata.Constants; +import org.testng.annotations.Test; + +public class EntityTypeMapperTest { + + @Test + public void testGetType() throws Exception { + assertEquals(EntityTypeMapper.getType(Constants.DATASET_ENTITY_NAME), EntityType.DATASET); + } + + @Test + public void testGetName() throws Exception { + assertEquals(EntityTypeMapper.getName(EntityType.DATASET), Constants.DATASET_ENTITY_NAME); + } +} diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/types/entitytype/EntityTypeUrnMapperTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/types/entitytype/EntityTypeUrnMapperTest.java new file mode 100644 index 00000000000000..ed16226d0685ee --- /dev/null +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/types/entitytype/EntityTypeUrnMapperTest.java @@ -0,0 +1,30 @@ +package com.linkedin.datahub.graphql.types.entitytype; + +import static org.testng.Assert.*; + +import com.linkedin.datahub.graphql.generated.EntityType; +import com.linkedin.metadata.Constants; +import org.testng.annotations.Test; + +public class EntityTypeUrnMapperTest { + + @Test + public void testGetName() throws Exception { + assertEquals( + EntityTypeUrnMapper.getName("urn:li:entityType:datahub.dataset"), + Constants.DATASET_ENTITY_NAME); + } + + @Test + public void testGetEntityType() throws Exception { + assertEquals( + EntityTypeUrnMapper.getEntityType("urn:li:entityType:datahub.dataset"), EntityType.DATASET); + } + + @Test + public void testGetEntityTypeUrn() throws Exception { + assertEquals( + EntityTypeUrnMapper.getEntityTypeUrn(Constants.DATASET_ENTITY_NAME), + "urn:li:entityType:datahub.dataset"); + } +}