From 7eab2eb8d98824e1f580ac80e604f1a8e92cac44 Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Mon, 20 Jan 2025 13:38:12 +0100 Subject: [PATCH] fix(ingest/fivetran): Add way to not add schema to the destination/source urn. (#12314) --- .../ingestion/source/fivetran/config.py | 4 + .../ingestion/source/fivetran/fivetran.py | 20 +- ...nowflake_empty_connection_user_golden.json | 317 +++++++++++++++++ .../fivetran/fivetran_snowflake_golden.json | 325 ++++++++++++++++++ .../integration/fivetran/test_fivetran.py | 73 +++- 5 files changed, 725 insertions(+), 14 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py index 86826ae7bedc09..b048b6157fbb9f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py @@ -167,6 +167,10 @@ class PlatformDetail(ConfigModel): description="The database that all assets produced by this connector belong to. " "For destinations, this defaults to the fivetran log config's database.", ) + include_schema_in_urn: bool = pydantic.Field( + default=True, + description="Include schema in the dataset URN. In some cases, the schema is not relevant to the dataset URN and Fivetran sets it to the source and destination table names in the connector.", + ) class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin): diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py index d8ebbe5b63d1ae..a3469696699f17 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py @@ -119,21 +119,31 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> Dict[str, s ) for lineage in connector.lineage: + source_table = ( + lineage.source_table + if source_details.include_schema_in_urn + else lineage.source_table.split(".", 1)[1] + ) input_dataset_urn = DatasetUrn.create_from_ids( platform_id=source_details.platform, table_name=( - f"{source_details.database.lower()}.{lineage.source_table}" + f"{source_details.database.lower()}.{source_table}" if source_details.database - else lineage.source_table + else source_table ), env=source_details.env, platform_instance=source_details.platform_instance, ) input_dataset_urn_list.append(input_dataset_urn) + destination_table = ( + lineage.destination_table + if destination_details.include_schema_in_urn + else lineage.destination_table.split(".", 1)[1] + ) output_dataset_urn = DatasetUrn.create_from_ids( platform_id=destination_details.platform, - table_name=f"{destination_details.database.lower()}.{lineage.destination_table}", + table_name=f"{destination_details.database.lower()}.{destination_table}", env=destination_details.env, platform_instance=destination_details.platform_instance, ) @@ -176,12 +186,12 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> Dict[str, s **{ f"source.{k}": str(v) for k, v in source_details.dict().items() - if v is not None + if v is not None and not isinstance(v, bool) }, **{ f"destination.{k}": str(v) for k, v in destination_details.dict().items() - if v is not None + if v is not None and not isinstance(v, bool) }, ) diff --git a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json index 7934153051c60b..cf77ab39cbcbe7 100644 --- a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json +++ b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json @@ -595,6 +595,323 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD)", + "changeType": "UPSERT", + "aspectName": "dataFlowInfo", + "aspect": { + "json": { + "customProperties": {}, + "name": "confluent_cloud", + "env": "PROD" + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:fivetran" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": { + "connector_id": "my_confluent_cloud_connector_id", + "connector_type": "confluent_cloud", + "paused": "False", + "sync_frequency": "1440", + "destination_id": "interval_unconstitutional", + "source.platform": "kafka", + "source.env": "PROD", + "source.database": "kafka_prod", + "destination.platform": "snowflake", + "destination.env": "PROD", + "destination.database": "test_database" + }, + "name": "confluent_cloud", + "type": { + "string": "COMMAND" + }, + "env": "PROD" + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)", + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "json": { + "inputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:kafka,kafka_prod.my-source-topic,PROD)" + ], + "outputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_database.confluent_cloud.my-destination-topic,PROD)" + ], + "inputDatajobs": [], + "fineGrainedLineages": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:fivetran" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceProperties", + "aspect": { + "json": { + "customProperties": {}, + "name": "d9a03d6-eded-4422-a46a-163266e58244", + "type": "BATCH_SCHEDULED", + "created": { + "time": 1695191853000, + "actor": "urn:li:corpuser:datahub" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRelationships", + "aspect": { + "json": { + "parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)", + "upstreamInstances": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceInput", + "aspect": { + "json": { + "inputs": [ + "urn:li:dataset:(urn:li:dataPlatform:kafka,kafka_prod.my-source-topic,PROD)" + ] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceOutput", + "aspect": { + "json": { + "outputs": [ + "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_database.confluent_cloud.my-destination-topic,PROD)" + ] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1695191853000, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "status": "STARTED" + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1695191885000, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "status": "COMPLETE", + "result": { + "type": "SUCCESS", + "nativeResultType": "fivetran" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataProcessInstance", "entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a", diff --git a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json index c46bd8fb65d876..cec1ef1b6221d3 100644 --- a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json +++ b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json @@ -603,6 +603,331 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD)", + "changeType": "UPSERT", + "aspectName": "dataFlowInfo", + "aspect": { + "json": { + "customProperties": {}, + "name": "confluent_cloud", + "env": "PROD" + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:fivetran" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": { + "connector_id": "my_confluent_cloud_connector_id", + "connector_type": "confluent_cloud", + "paused": "False", + "sync_frequency": "1440", + "destination_id": "my_confluent_cloud_connector_id", + "source.platform": "kafka", + "source.env": "PROD", + "source.database": "kafka_prod", + "destination.platform": "kafka", + "destination.env": "PROD", + "destination.database": "kafka_prod" + }, + "name": "confluent_cloud", + "type": { + "string": "COMMAND" + }, + "env": "PROD" + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)", + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "json": { + "inputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:kafka,kafka_prod.my-source-topic,PROD)" + ], + "outputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:kafka,kafka_prod.my-destination-topic,PROD)" + ], + "inputDatajobs": [], + "fineGrainedLineages": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [ + { + "owner": "urn:li:corpuser:abc.xyz@email.com", + "type": "DEVELOPER", + "source": { + "type": "SERVICE" + } + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:fivetran" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceProperties", + "aspect": { + "json": { + "customProperties": {}, + "name": "d9a03d6-eded-4422-a46a-163266e58244", + "type": "BATCH_SCHEDULED", + "created": { + "time": 1695191853000, + "actor": "urn:li:corpuser:datahub" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRelationships", + "aspect": { + "json": { + "parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)", + "upstreamInstances": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceInput", + "aspect": { + "json": { + "inputs": [ + "urn:li:dataset:(urn:li:dataPlatform:kafka,kafka_prod.my-source-topic,PROD)" + ] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceOutput", + "aspect": { + "json": { + "outputs": [ + "urn:li:dataset:(urn:li:dataPlatform:kafka,kafka_prod.my-destination-topic,PROD)" + ] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1695191853000, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "status": "STARTED" + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1695191885000, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "status": "COMPLETE", + "result": { + "type": "SUCCESS", + "nativeResultType": "fivetran" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataProcessInstance", "entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a", diff --git a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py index 2e6c2b1370d166..9f53ae2382d406 100644 --- a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py +++ b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py @@ -32,6 +32,15 @@ "sync_frequency": 1440, "destination_id": "interval_unconstitutional", }, + { + "connector_id": "my_confluent_cloud_connector_id", + "connecting_user_id": "reapply_phone", + "connector_type_id": "confluent_cloud", + "connector_name": "confluent_cloud", + "paused": False, + "sync_frequency": 1440, + "destination_id": "my_confluent_cloud_connector_id", + }, ] @@ -45,7 +54,7 @@ def default_query_results( elif query == fivetran_log_query.get_connectors_query(): return connector_query_results elif query == fivetran_log_query.get_table_lineage_query( - connector_ids=["calendar_elected"] + connector_ids=["calendar_elected", "my_confluent_cloud_connector_id"] ): return [ { @@ -66,9 +75,18 @@ def default_query_results( "destination_table_name": "company", "destination_schema_name": "postgres_public", }, + { + "connector_id": "my_confluent_cloud_connector_id", + "source_table_id": "10042", + "source_table_name": "my-source-topic", + "source_schema_name": "confluent_cloud", + "destination_table_id": "7781", + "destination_table_name": "my-destination-topic", + "destination_schema_name": "confluent_cloud", + }, ] elif query == fivetran_log_query.get_column_lineage_query( - connector_ids=["calendar_elected"] + connector_ids=["calendar_elected", "my_confluent_cloud_connector_id"] ): return [ { @@ -107,7 +125,7 @@ def default_query_results( ] elif query == fivetran_log_query.get_sync_logs_query( syncs_interval=7, - connector_ids=["calendar_elected"], + connector_ids=["calendar_elected", "my_confluent_cloud_connector_id"], ): return [ { @@ -131,6 +149,13 @@ def default_query_results( "end_time": datetime.datetime(2023, 10, 3, 14, 36, 29, 678000), "end_message_data": '"{\\"reason\\":\\"java.lang.RuntimeException: FATAL: too many connections for role \\\\\\"hxwraqld\\\\\\"\\",\\"taskType\\":\\"reconnect\\",\\"status\\":\\"FAILURE_WITH_TASK\\"}"', }, + { + "connector_id": "my_confluent_cloud_connector_id", + "sync_id": "d9a03d6-eded-4422-a46a-163266e58244", + "start_time": datetime.datetime(2023, 9, 20, 6, 37, 32, 606000), + "end_time": datetime.datetime(2023, 9, 20, 6, 38, 5, 56000), + "end_message_data": '"{\\"status\\":\\"SUCCESSFUL\\"}"', + }, ] # Unreachable code raise Exception(f"Unknown query {query}") @@ -172,19 +197,30 @@ def test_fivetran_with_snowflake_dest(pytestconfig, tmp_path): }, }, "connector_patterns": { - "allow": [ - "postgres", - ] + "allow": ["postgres", "confluent_cloud"] }, "destination_patterns": { "allow": [ "interval_unconstitutional", + "my_confluent_cloud_connector_id", ] }, "sources_to_platform_instance": { "calendar_elected": { "database": "postgres_db", "env": "DEV", + }, + "my_confluent_cloud_connector_id": { + "platform": "kafka", + "include_schema_in_urn": False, + "database": "kafka_prod", + }, + }, + "destination_to_platform_instance": { + "my_confluent_cloud_connector_id": { + "platform": "kafka", + "include_schema_in_urn": False, + "database": "kafka_prod", } }, }, @@ -234,6 +270,15 @@ def test_fivetran_with_snowflake_dest_and_null_connector_user(pytestconfig, tmp_ "sync_frequency": 1440, "destination_id": "interval_unconstitutional", }, + { + "connector_id": "my_confluent_cloud_connector_id", + "connecting_user_id": None, + "connector_type_id": "confluent_cloud", + "connector_name": "confluent_cloud", + "paused": False, + "sync_frequency": 1440, + "destination_id": "interval_unconstitutional", + }, ] connection_magic_mock.execute.side_effect = partial( @@ -261,9 +306,7 @@ def test_fivetran_with_snowflake_dest_and_null_connector_user(pytestconfig, tmp_ }, }, "connector_patterns": { - "allow": [ - "postgres", - ] + "allow": ["postgres", "confluent_cloud"] }, "destination_patterns": { "allow": [ @@ -275,6 +318,18 @@ def test_fivetran_with_snowflake_dest_and_null_connector_user(pytestconfig, tmp_ "platform": "postgres", "env": "DEV", "database": "postgres_db", + }, + "my_confluent_cloud_connector_id": { + "platform": "kafka", + "database": "kafka_prod", + "include_schema_in_urn": False, + }, + }, + "destination_to_platform_instance": { + "my_confluent_cloud_connector_id": { + "platform": "kafka", + "database": "kafka_prod", + "include_schema_in_urn": False, } }, },