Skip to content

Commit

Permalink
code review updates.
Browse files Browse the repository at this point in the history
  • Loading branch information
brock-acryl committed Jan 17, 2025
1 parent 18770a9 commit f726f38
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -278,11 +278,6 @@ class SnowflakeV2Config(
description="If enabled, streams will be ingested as separate entities from tables/views.",
)

stream_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for streams to filter in ingestion.",
)

structured_property_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description=(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
SnowflakeIdentifierBuilder,
SnowflakeStructuredReportMixin,
SnowsightUrlBuilder,
_split_qualified_name,
split_qualified_name,
)
from datahub.ingestion.source.sql.sql_utils import (
add_table_to_schema_container,
Expand Down Expand Up @@ -886,48 +886,42 @@ def get_dataset_properties(
custom_properties = {}

if isinstance(table, SnowflakeTable):
if table.clustering_key:
custom_properties["CLUSTERING_KEY"] = table.clustering_key

if table.is_hybrid:
custom_properties["IS_HYBRID"] = "true"

if table.is_dynamic:
custom_properties["IS_DYNAMIC"] = "true"

if table.is_iceberg:
custom_properties["IS_ICEBERG"] = "true"
custom_properties.update(
{
k: v
for k, v in {
"CLUSTERING_KEY": table.clustering_key,
"IS_HYBRID": "true" if table.is_hybrid else None,
"IS_DYNAMIC": "true" if table.is_dynamic else None,
"IS_ICEBERG": "true" if table.is_iceberg else None,
}.items()
if v
}
)

if isinstance(table, SnowflakeView) and table.is_secure:
custom_properties["IS_SECURE"] = "true"

elif isinstance(table, SnowflakeStream):
if table.source_type:
custom_properties["SOURCE_TYPE"] = table.source_type

if table.type:
custom_properties["TYPE"] = table.type

if table.stale:
custom_properties["STALE"] = table.stale

if table.mode:
custom_properties["MODE"] = table.mode

if table.invalid_reason:
custom_properties["INVALID_REASON"] = table.invalid_reason

if table.owner_role_type:
custom_properties["OWNER_ROLE_TYPE"] = table.owner_role_type

if table.table_name:
custom_properties["TABLE_NAME"] = table.table_name

if table.base_tables:
custom_properties["BASE_TABLES"] = table.base_tables

if table.stale_after:
custom_properties["STALE_AFTER"] = table.stale_after.isoformat()
custom_properties.update(
{
k: v
for k, v in {
"SOURCE_TYPE": table.source_type,
"TYPE": table.type,
"STALE": table.stale,
"MODE": table.mode,
"INVALID_REASON": table.invalid_reason,
"OWNER_ROLE_TYPE": table.owner_role_type,
"TABLE_NAME": table.table_name,
"BASE_TABLES": table.base_tables,
"STALE_AFTER": table.stale_after.isoformat()
if table.stale_after
else None,
}.items()
if v
}
)

return DatasetProperties(
name=table.name,
Expand Down Expand Up @@ -1386,7 +1380,7 @@ def get_columns_for_stream(
"""
columns: List[SnowflakeColumn] = []

source_parts = _split_qualified_name(source_object)
source_parts = split_qualified_name(source_object)

source_db, source_schema, source_name = source_parts

Expand Down Expand Up @@ -1446,48 +1440,45 @@ def populate_stream_upstreams(
"""
Populate Streams upstream tables excluding the metadata columns
"""
if self.aggregator:
source_parts = _split_qualified_name(stream.table_name)
source_db, source_schema, source_name = source_parts
source_parts = split_qualified_name(stream.table_name)
source_db, source_schema, source_name = source_parts

downstream_identifier = self.identifiers.get_dataset_identifier(
stream.name, schema_name, db_name
)
downstream_urn = self.identifiers.gen_dataset_urn(downstream_identifier)
downstream_identifier = self.identifiers.get_dataset_identifier(
stream.name, schema_name, db_name
)
downstream_urn = self.identifiers.gen_dataset_urn(downstream_identifier)

upstream_identifier = self.identifiers.get_dataset_identifier(
source_name, source_schema, source_db
)
upstream_urn = self.identifiers.gen_dataset_urn(upstream_identifier)

column_lineage = []
for col in stream.columns:
if not col.name.startswith("METADATA$"):
column_lineage.append(
ColumnLineageInfo(
downstream=DownstreamColumnRef(
dataset=downstream_urn,
upstream_identifier = self.identifiers.get_dataset_identifier(
source_name, source_schema, source_db
)
upstream_urn = self.identifiers.gen_dataset_urn(upstream_identifier)

column_lineage = []
for col in stream.columns:
if not col.name.startswith("METADATA$"):
column_lineage.append(
ColumnLineageInfo(
downstream=DownstreamColumnRef(
dataset=downstream_urn,
column=self.identifiers.snowflake_identifier(col.name),
),
upstreams=[
ColumnRef(
table=upstream_urn,
column=self.identifiers.snowflake_identifier(col.name),
),
upstreams=[
ColumnRef(
table=upstream_urn,
column=self.identifiers.snowflake_identifier(
col.name
),
)
],
)
)
],
)
)

if column_lineage:
self.aggregator.add_known_query_lineage(
known_query_lineage=KnownQueryLineageInfo(
query_id=f"stream_lineage_{stream.name}",
query_text="",
upstreams=[upstream_urn],
downstream=downstream_urn,
column_lineage=column_lineage,
timestamp=stream.created if stream.created else None,
)
if column_lineage:
self.aggregator.add_known_query_lineage(
known_query_lineage=KnownQueryLineageInfo(
query_id=f"stream_lineage_{stream.name}",
query_text="",
upstreams=[upstream_urn],
downstream=downstream_urn,
column_lineage=column_lineage,
timestamp=stream.created if stream.created else None,
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def is_dataset_pattern_allowed(
if _is_sys_table(dataset_name):
return False

dataset_params = _split_qualified_name(dataset_name)
dataset_params = split_qualified_name(dataset_name)
if len(dataset_params) != 3:
self.structured_reporter.info(
title="Unexpected dataset pattern",
Expand Down Expand Up @@ -193,17 +193,17 @@ def _is_sys_table(table_name: str) -> bool:
return table_name.lower().startswith("sys$")


def _split_qualified_name(qualified_name: str) -> List[str]:
def split_qualified_name(qualified_name: str) -> List[str]:
"""
Split a qualified name into its constituent parts.
>>> _split_qualified_name("db.my_schema.my_table")
>>> split_qualified_name("db.my_schema.my_table")
['db', 'my_schema', 'my_table']
>>> _split_qualified_name('"db"."my_schema"."my_table"')
>>> split_qualified_name('"db"."my_schema"."my_table"')
['db', 'my_schema', 'my_table']
>>> _split_qualified_name('TEST_DB.TEST_SCHEMA."TABLE.WITH.DOTS"')
>>> split_qualified_name('TEST_DB.TEST_SCHEMA."TABLE.WITH.DOTS"')
['TEST_DB', 'TEST_SCHEMA', 'TABLE.WITH.DOTS']
>>> _split_qualified_name('TEST_DB."SCHEMA.WITH.DOTS".MY_TABLE')
>>> split_qualified_name('TEST_DB."SCHEMA.WITH.DOTS".MY_TABLE')
['TEST_DB', 'SCHEMA.WITH.DOTS', 'MY_TABLE']
"""

Expand Down Expand Up @@ -241,7 +241,7 @@ def _split_qualified_name(qualified_name: str) -> List[str]:
def _cleanup_qualified_name(
qualified_name: str, structured_reporter: SourceReport
) -> str:
name_parts = _split_qualified_name(qualified_name)
name_parts = split_qualified_name(qualified_name)
if len(name_parts) != 3:
if not _is_sys_table(qualified_name):
structured_reporter.info(
Expand Down

0 comments on commit f726f38

Please sign in to comment.