From 455fcde47a7d7085f7d9b15bc05ce46907e61d0e Mon Sep 17 00:00:00 2001 From: Mayuri N Date: Mon, 23 Dec 2024 16:03:51 +0530 Subject: [PATCH] fix(ingest/snowflake): handle empty snowflake column upstreams --- .../source/snowflake/snowflake_lineage_v2.py | 6 ++--- .../unit/snowflake/test_snowflake_source.py | 24 +++++++++++++++++++ 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py index 69f28a0e6e595..b815a6584379a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py @@ -4,7 +4,7 @@ from datetime import datetime from typing import Any, Collection, Iterable, List, Optional, Set, Tuple, Type -from pydantic import BaseModel, validator +from pydantic import BaseModel, Field, validator from datahub.configuration.datetimes import parse_absolute_time from datahub.ingestion.api.closeable import Closeable @@ -72,8 +72,8 @@ class ColumnUpstreamJob(BaseModel): class ColumnUpstreamLineage(BaseModel): - column_name: str - upstreams: List[ColumnUpstreamJob] + column_name: Optional[str] + upstreams: List[ColumnUpstreamJob] = Field(default_factory=list) class UpstreamTableNode(BaseModel): diff --git a/metadata-ingestion/tests/unit/snowflake/test_snowflake_source.py b/metadata-ingestion/tests/unit/snowflake/test_snowflake_source.py index c735feb539608..2ff85a08f052f 100644 --- a/metadata-ingestion/tests/unit/snowflake/test_snowflake_source.py +++ b/metadata-ingestion/tests/unit/snowflake/test_snowflake_source.py @@ -18,6 +18,7 @@ DEFAULT_TEMP_TABLES_PATTERNS, SnowflakeV2Config, ) +from datahub.ingestion.source.snowflake.snowflake_lineage_v2 import UpstreamLineageEdge from datahub.ingestion.source.snowflake.snowflake_query import ( SnowflakeQuery, create_deny_regex_sql_filter, @@ -664,3 +665,26 @@ def test_create_snowsight_base_url_ap_northeast_1(): def test_snowflake_utils() -> None: assert_doctest(datahub.ingestion.source.snowflake.snowflake_utils) + + +def test_snowflake_query_result_parsing(): + db_row = { + "DOWNSTREAM_TABLE_NAME": "db.schema.downstream_table", + "DOWNSTREAM_TABLE_DOMAIN": "Table", + "UPSTREAM_TABLES": [ + { + "query_id": "01b92f61-0611-c826-000d-0103cf9b5db7", + "upstream_object_domain": "Table", + "upstream_object_name": "db.schema.upstream_table", + } + ], + "UPSTREAM_COLUMNS": [{}], + "QUERIES": [ + { + "query_id": "01b92f61-0611-c826-000d-0103cf9b5db7", + "query_text": "Query test", + "start_time": "2022-12-01 19:56:34", + } + ], + } + assert UpstreamLineageEdge.parse_obj(db_row)