Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/dbt): Prevent lineage cycles when parsing sql of dbt models #11666

Merged
merged 2 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1989,6 +1989,11 @@ def _translate_dbt_name_to_upstream_urn(dbt_name: str) -> str:
time=mce_builder.get_sys_time(),
actor=_DEFAULT_ACTOR,
)
sibling_urn = node.get_urn(
self.config.target_platform,
self.config.env,
self.config.target_platform_instance,
)
return UpstreamLineageClass(
upstreams=[
UpstreamClass(
Expand All @@ -1997,6 +2002,7 @@ def _translate_dbt_name_to_upstream_urn(dbt_name: str) -> str:
auditStamp=auditStamp,
)
for upstream in upstream_urns
if not (node.node_type == "model" and upstream == sibling_urn)
],
fineGrainedLineages=(
(cll or None) if self.config.include_column_lineage else None
Expand Down
42 changes: 42 additions & 0 deletions metadata-ingestion/tests/unit/test_dbt_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.dbt import dbt_cloud
from datahub.ingestion.source.dbt.dbt_cloud import DBTCloudConfig
from datahub.ingestion.source.dbt.dbt_common import DBTNode
from datahub.ingestion.source.dbt.dbt_core import (
DBTCoreConfig,
DBTCoreSource,
Expand Down Expand Up @@ -253,6 +254,47 @@ def test_dbt_config_prefer_sql_parser_lineage():
assert config.prefer_sql_parser_lineage is True


def test_dbt_prefer_sql_parser_lineage_no_self_reference():
ctx = PipelineContext(run_id="test-run-id")
config = DBTCoreConfig.parse_obj(
{
**create_base_dbt_config(),
"skip_sources_in_lineage": True,
"prefer_sql_parser_lineage": True,
}
)
source: DBTCoreSource = DBTCoreSource(config, ctx, "dbt")
all_nodes_map = {
"model1": DBTNode(
name="model1",
database=None,
schema=None,
alias=None,
comment="",
description="",
language=None,
raw_code=None,
dbt_adapter="postgres",
dbt_name="model1",
dbt_file_path=None,
dbt_package_name=None,
node_type="model",
materialization="table",
max_loaded_at=None,
catalog_type=None,
missing_from_catalog=False,
owner=None,
compiled_code="SELECT d FROM results WHERE d > (SELECT MAX(d) FROM model1)",
),
}
source._infer_schemas_and_update_cll(all_nodes_map)
upstream_lineage = source._create_lineage_aspect_for_dbt_node(
all_nodes_map["model1"], all_nodes_map
)
assert upstream_lineage is not None
assert len(upstream_lineage.upstreams) == 1


def test_dbt_s3_config():
# test missing aws config
config_dict: dict = {
Expand Down
Loading