Skip to content

Commit

Permalink
fix(ingest/fivetran): Add way to not add schema to the destination/so…
Browse files Browse the repository at this point in the history
…urce urn. (#12314)
  • Loading branch information
treff7es authored Jan 20, 2025
1 parent ef36837 commit 7eab2eb
Show file tree
Hide file tree
Showing 5 changed files with 725 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ class PlatformDetail(ConfigModel):
description="The database that all assets produced by this connector belong to. "
"For destinations, this defaults to the fivetran log config's database.",
)
include_schema_in_urn: bool = pydantic.Field(
default=True,
description="Include schema in the dataset URN. In some cases, the schema is not relevant to the dataset URN and Fivetran sets it to the source and destination table names in the connector.",
)


class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,21 +119,31 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> Dict[str, s
)

for lineage in connector.lineage:
source_table = (
lineage.source_table
if source_details.include_schema_in_urn
else lineage.source_table.split(".", 1)[1]
)
input_dataset_urn = DatasetUrn.create_from_ids(
platform_id=source_details.platform,
table_name=(
f"{source_details.database.lower()}.{lineage.source_table}"
f"{source_details.database.lower()}.{source_table}"
if source_details.database
else lineage.source_table
else source_table
),
env=source_details.env,
platform_instance=source_details.platform_instance,
)
input_dataset_urn_list.append(input_dataset_urn)

destination_table = (
lineage.destination_table
if destination_details.include_schema_in_urn
else lineage.destination_table.split(".", 1)[1]
)
output_dataset_urn = DatasetUrn.create_from_ids(
platform_id=destination_details.platform,
table_name=f"{destination_details.database.lower()}.{lineage.destination_table}",
table_name=f"{destination_details.database.lower()}.{destination_table}",
env=destination_details.env,
platform_instance=destination_details.platform_instance,
)
Expand Down Expand Up @@ -176,12 +186,12 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> Dict[str, s
**{
f"source.{k}": str(v)
for k, v in source_details.dict().items()
if v is not None
if v is not None and not isinstance(v, bool)
},
**{
f"destination.{k}": str(v)
for k, v in destination_details.dict().items()
if v is not None
if v is not None and not isinstance(v, bool)
},
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,323 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD)",
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"json": {
"customProperties": {},
"name": "confluent_cloud",
"env": "PROD"
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD)",
"changeType": "UPSERT",
"aspectName": "ownership",
"aspect": {
"json": {
"owners": [],
"ownerTypes": {},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:fivetran"
}
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD)",
"changeType": "UPSERT",
"aspectName": "globalTags",
"aspect": {
"json": {
"tags": []
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {
"connector_id": "my_confluent_cloud_connector_id",
"connector_type": "confluent_cloud",
"paused": "False",
"sync_frequency": "1440",
"destination_id": "interval_unconstitutional",
"source.platform": "kafka",
"source.env": "PROD",
"source.database": "kafka_prod",
"destination.platform": "snowflake",
"destination.env": "PROD",
"destination.database": "test_database"
},
"name": "confluent_cloud",
"type": {
"string": "COMMAND"
},
"env": "PROD"
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)",
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"json": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:kafka,kafka_prod.my-source-topic,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,test_database.confluent_cloud.my-destination-topic,PROD)"
],
"inputDatajobs": [],
"fineGrainedLineages": []
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)",
"changeType": "UPSERT",
"aspectName": "ownership",
"aspect": {
"json": {
"owners": [],
"ownerTypes": {},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:fivetran"
}
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)",
"changeType": "UPSERT",
"aspectName": "globalTags",
"aspect": {
"json": {
"tags": []
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceProperties",
"aspect": {
"json": {
"customProperties": {},
"name": "d9a03d6-eded-4422-a46a-163266e58244",
"type": "BATCH_SCHEDULED",
"created": {
"time": 1695191853000,
"actor": "urn:li:corpuser:datahub"
}
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceRelationships",
"aspect": {
"json": {
"parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)",
"upstreamInstances": []
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceInput",
"aspect": {
"json": {
"inputs": [
"urn:li:dataset:(urn:li:dataPlatform:kafka,kafka_prod.my-source-topic,PROD)"
]
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceOutput",
"aspect": {
"json": {
"outputs": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,test_database.confluent_cloud.my-destination-topic,PROD)"
]
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1695191853000,
"partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "STARTED"
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1695191885000,
"partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "COMPLETE",
"result": {
"type": "SUCCESS",
"nativeResultType": "fivetran"
}
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a",
Expand Down
Loading

0 comments on commit 7eab2eb

Please sign in to comment.