Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/fivetran): Add way to override destination platform if the audit log's platform … #11696

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ def report_connectors_dropped(self, model: str) -> None:


class PlatformDetail(ConfigModel):
platform_name: Optional[str] = pydantic.Field(
description="The name of the platform that all assets produced by this recipe belong to",
)

platform_instance: Optional[str] = pydantic.Field(
default=None,
description="The instance of the platform that all assets produced by this recipe belong to",
Expand All @@ -153,6 +157,11 @@ class PlatformDetail(ConfigModel):
description="The environment that all assets produced by DataHub platform ingestion source belong to",
)

database: Optional[str] = pydantic.Field(
default=None,
description="The database this platform instance belongs to. This override only need to set if the source wrongly gets the database",
)


class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin):
fivetran_log_config: FivetranLogConfig = pydantic.Field(
Expand All @@ -172,9 +181,12 @@ class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin
default=True,
description="Populates table->table column lineage.",
)

# Mapping of connector id to connector name
sources_to_database: Dict[str, str] = pydantic.Field(
default={},
description="A mapping of the connector's all sources to its database. Use connector id as key.",
deprecated=True,
description="A mapping of the connector's all sources to its database. Use connector id as key. Use sources_to_platform_instance instead.",
)
# Configuration for stateful ingestion
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = pydantic.Field(
Expand All @@ -194,3 +206,8 @@ class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin
7,
description="The number of days to look back when extracting connectors' sync history.",
)

platform_mapping: Dict[str, str] = pydantic.Field(
default=KNOWN_DATA_PLATFORM_MAPPING,
description="A mapping of the connector's platform to DataHub platform.",
)
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from datahub.ingestion.api.source import MetadataWorkUnitProcessor, Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.fivetran.config import (
KNOWN_DATA_PLATFORM_MAPPING,
Constant,
FivetranSourceConfig,
FivetranSourceReport,
Expand Down Expand Up @@ -95,13 +94,18 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> None:

# Get database for connector source
# TODO: Once Fivetran exposes this, we shouldn't ask for it via config.
source_database: Optional[str] = self.config.sources_to_database.get(
connector.connector_id
source_database: Optional[str] = (
source_platform_detail.database if source_platform_detail else None
)
if not source_database:
source_database = (
self.config.sources_to_database.get(connector.connector_id)
if self.config.sources_to_database.get(connector.connector_id)
else None
)

if connector.connector_type in KNOWN_DATA_PLATFORM_MAPPING:
source_platform = KNOWN_DATA_PLATFORM_MAPPING[connector.connector_type]
else:
source_platform = source_platform_detail.platform_name
if not source_platform:
source_platform = connector.connector_type
logger.info(
f"Fivetran connector source type: {connector.connector_type} is not supported to mapped with Datahub dataset entity."
Expand All @@ -128,9 +132,14 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> None:
)
input_dataset_urn_list.append(input_dataset_urn)

output_database = (
self.audit_log.fivetran_log_database.lower()
if not destination_platform_detail.database
else destination_platform_detail.database
)
output_dataset_urn = DatasetUrn.create_from_ids(
platform_id=self.config.fivetran_log_config.destination_platform,
table_name=f"{self.audit_log.fivetran_log_database.lower()}.{lineage.destination_table}",
table_name=f"{output_database}.{lineage.destination_table}",
env=destination_platform_detail.env,
platform_instance=destination_platform_detail.platform_instance,
)
Expand Down
Loading