Skip to content

Commit

Permalink
Add way to not add schema to the destination/source urn.
Browse files Browse the repository at this point in the history
Kafka connector adds connector name to schema which should not be part of the urn.
  • Loading branch information
treff7es committed Jan 10, 2025
1 parent b86bbf7 commit f22db80
Show file tree
Hide file tree
Showing 5 changed files with 725 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ class PlatformDetail(ConfigModel):
description="The database that all assets produced by this connector belong to. "
"For destinations, this defaults to the fivetran log config's database.",
)
include_schema: bool = pydantic.Field(
default=True,
description="Include schema in the dataset URN. In some cases, the schema is not relevant to the dataset URN and Fivetran sets it to the connector name.",
)


class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,21 +119,31 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> Dict[str, s
)

for lineage in connector.lineage:
source_table = (
lineage.source_table
if source_details.include_schema
else lineage.source_table.split(".")[1]
)
input_dataset_urn = DatasetUrn.create_from_ids(
platform_id=source_details.platform,
table_name=(
f"{source_details.database.lower()}.{lineage.source_table}"
f"{source_details.database.lower()}.{source_table}"
if source_details.database
else lineage.source_table
else source_table
),
env=source_details.env,
platform_instance=source_details.platform_instance,
)
input_dataset_urn_list.append(input_dataset_urn)

destination_table = (
lineage.destination_table
if destination_details.include_schema
else lineage.destination_table.split(".")[1]
)
output_dataset_urn = DatasetUrn.create_from_ids(
platform_id=destination_details.platform,
table_name=f"{destination_details.database.lower()}.{lineage.destination_table}",
table_name=f"{destination_details.database.lower()}.{destination_table}",
env=destination_details.env,
platform_instance=destination_details.platform_instance,
)
Expand Down Expand Up @@ -176,12 +186,12 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> Dict[str, s
**{
f"source.{k}": str(v)
for k, v in source_details.dict().items()
if v is not None
if v is not None and not isinstance(v, bool)
},
**{
f"destination.{k}": str(v)
for k, v in destination_details.dict().items()
if v is not None
if v is not None and not isinstance(v, bool)
},
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,323 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD)",
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"json": {
"customProperties": {},
"name": "confluent_cloud",
"env": "PROD"
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD)",
"changeType": "UPSERT",
"aspectName": "ownership",
"aspect": {
"json": {
"owners": [],
"ownerTypes": {},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:fivetran"
}
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD)",
"changeType": "UPSERT",
"aspectName": "globalTags",
"aspect": {
"json": {
"tags": []
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {
"connector_id": "my_confluent_cloud_connector_id",
"connector_type": "confluent_cloud",
"paused": "False",
"sync_frequency": "1440",
"destination_id": "interval_unconstitutional",
"source.platform": "kafka",
"source.env": "PROD",
"source.database": "kafka_prod",
"destination.platform": "snowflake",
"destination.env": "PROD",
"destination.database": "test_database"
},
"name": "confluent_cloud",
"type": {
"string": "COMMAND"
},
"env": "PROD"
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)",
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"json": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:kafka,kafka_prod.my-source-topic,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,test_database.confluent_cloud.my-destination-topic,PROD)"
],
"inputDatajobs": [],
"fineGrainedLineages": []
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)",
"changeType": "UPSERT",
"aspectName": "ownership",
"aspect": {
"json": {
"owners": [],
"ownerTypes": {},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:fivetran"
}
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)",
"changeType": "UPSERT",
"aspectName": "globalTags",
"aspect": {
"json": {
"tags": []
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceProperties",
"aspect": {
"json": {
"customProperties": {},
"name": "d9a03d6-eded-4422-a46a-163266e58244",
"type": "BATCH_SCHEDULED",
"created": {
"time": 1695191853000,
"actor": "urn:li:corpuser:datahub"
}
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceRelationships",
"aspect": {
"json": {
"parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)",
"upstreamInstances": []
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceInput",
"aspect": {
"json": {
"inputs": [
"urn:li:dataset:(urn:li:dataPlatform:kafka,kafka_prod.my-source-topic,PROD)"
]
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceOutput",
"aspect": {
"json": {
"outputs": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,test_database.confluent_cloud.my-destination-topic,PROD)"
]
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1695191853000,
"partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "STARTED"
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1695191885000,
"partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "COMPLETE",
"result": {
"type": "SUCCESS",
"nativeResultType": "fivetran"
}
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a",
Expand Down
Loading

0 comments on commit f22db80

Please sign in to comment.