From f726f384d9ece3822519117e79d14a2699ff15c2 Mon Sep 17 00:00:00 2001 From: Brock Griffey <52086127+brock-acryl@users.noreply.github.com> Date: Thu, 16 Jan 2025 20:37:12 -0500 Subject: [PATCH] code review updates. --- .../source/snowflake/snowflake_config.py | 5 - .../source/snowflake/snowflake_schema_gen.py | 147 ++++++++---------- .../source/snowflake/snowflake_utils.py | 14 +- 3 files changed, 76 insertions(+), 90 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py index 99a1fd606e3c5..f81e7ec4e31ed 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py @@ -278,11 +278,6 @@ class SnowflakeV2Config( description="If enabled, streams will be ingested as separate entities from tables/views.", ) - stream_pattern: AllowDenyPattern = Field( - default=AllowDenyPattern.allow_all(), - description="Regex patterns for streams to filter in ingestion.", - ) - structured_property_pattern: AllowDenyPattern = Field( default=AllowDenyPattern.allow_all(), description=( diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py index abb75c49c992b..3d6d04010bbc1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py @@ -59,7 +59,7 @@ SnowflakeIdentifierBuilder, SnowflakeStructuredReportMixin, SnowsightUrlBuilder, - _split_qualified_name, + split_qualified_name, ) from datahub.ingestion.source.sql.sql_utils import ( add_table_to_schema_container, @@ -886,48 +886,42 @@ def get_dataset_properties( custom_properties = {} if isinstance(table, SnowflakeTable): - if table.clustering_key: - custom_properties["CLUSTERING_KEY"] = table.clustering_key - - if table.is_hybrid: - custom_properties["IS_HYBRID"] = "true" - - if table.is_dynamic: - custom_properties["IS_DYNAMIC"] = "true" - - if table.is_iceberg: - custom_properties["IS_ICEBERG"] = "true" + custom_properties.update( + { + k: v + for k, v in { + "CLUSTERING_KEY": table.clustering_key, + "IS_HYBRID": "true" if table.is_hybrid else None, + "IS_DYNAMIC": "true" if table.is_dynamic else None, + "IS_ICEBERG": "true" if table.is_iceberg else None, + }.items() + if v + } + ) if isinstance(table, SnowflakeView) and table.is_secure: custom_properties["IS_SECURE"] = "true" elif isinstance(table, SnowflakeStream): - if table.source_type: - custom_properties["SOURCE_TYPE"] = table.source_type - - if table.type: - custom_properties["TYPE"] = table.type - - if table.stale: - custom_properties["STALE"] = table.stale - - if table.mode: - custom_properties["MODE"] = table.mode - - if table.invalid_reason: - custom_properties["INVALID_REASON"] = table.invalid_reason - - if table.owner_role_type: - custom_properties["OWNER_ROLE_TYPE"] = table.owner_role_type - - if table.table_name: - custom_properties["TABLE_NAME"] = table.table_name - - if table.base_tables: - custom_properties["BASE_TABLES"] = table.base_tables - - if table.stale_after: - custom_properties["STALE_AFTER"] = table.stale_after.isoformat() + custom_properties.update( + { + k: v + for k, v in { + "SOURCE_TYPE": table.source_type, + "TYPE": table.type, + "STALE": table.stale, + "MODE": table.mode, + "INVALID_REASON": table.invalid_reason, + "OWNER_ROLE_TYPE": table.owner_role_type, + "TABLE_NAME": table.table_name, + "BASE_TABLES": table.base_tables, + "STALE_AFTER": table.stale_after.isoformat() + if table.stale_after + else None, + }.items() + if v + } + ) return DatasetProperties( name=table.name, @@ -1386,7 +1380,7 @@ def get_columns_for_stream( """ columns: List[SnowflakeColumn] = [] - source_parts = _split_qualified_name(source_object) + source_parts = split_qualified_name(source_object) source_db, source_schema, source_name = source_parts @@ -1446,48 +1440,45 @@ def populate_stream_upstreams( """ Populate Streams upstream tables excluding the metadata columns """ - if self.aggregator: - source_parts = _split_qualified_name(stream.table_name) - source_db, source_schema, source_name = source_parts + source_parts = split_qualified_name(stream.table_name) + source_db, source_schema, source_name = source_parts - downstream_identifier = self.identifiers.get_dataset_identifier( - stream.name, schema_name, db_name - ) - downstream_urn = self.identifiers.gen_dataset_urn(downstream_identifier) + downstream_identifier = self.identifiers.get_dataset_identifier( + stream.name, schema_name, db_name + ) + downstream_urn = self.identifiers.gen_dataset_urn(downstream_identifier) - upstream_identifier = self.identifiers.get_dataset_identifier( - source_name, source_schema, source_db - ) - upstream_urn = self.identifiers.gen_dataset_urn(upstream_identifier) - - column_lineage = [] - for col in stream.columns: - if not col.name.startswith("METADATA$"): - column_lineage.append( - ColumnLineageInfo( - downstream=DownstreamColumnRef( - dataset=downstream_urn, + upstream_identifier = self.identifiers.get_dataset_identifier( + source_name, source_schema, source_db + ) + upstream_urn = self.identifiers.gen_dataset_urn(upstream_identifier) + + column_lineage = [] + for col in stream.columns: + if not col.name.startswith("METADATA$"): + column_lineage.append( + ColumnLineageInfo( + downstream=DownstreamColumnRef( + dataset=downstream_urn, + column=self.identifiers.snowflake_identifier(col.name), + ), + upstreams=[ + ColumnRef( + table=upstream_urn, column=self.identifiers.snowflake_identifier(col.name), - ), - upstreams=[ - ColumnRef( - table=upstream_urn, - column=self.identifiers.snowflake_identifier( - col.name - ), - ) - ], - ) + ) + ], ) + ) - if column_lineage: - self.aggregator.add_known_query_lineage( - known_query_lineage=KnownQueryLineageInfo( - query_id=f"stream_lineage_{stream.name}", - query_text="", - upstreams=[upstream_urn], - downstream=downstream_urn, - column_lineage=column_lineage, - timestamp=stream.created if stream.created else None, - ) + if column_lineage: + self.aggregator.add_known_query_lineage( + known_query_lineage=KnownQueryLineageInfo( + query_id=f"stream_lineage_{stream.name}", + query_text="", + upstreams=[upstream_urn], + downstream=downstream_urn, + column_lineage=column_lineage, + timestamp=stream.created if stream.created else None, ) + ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py index 512c0270d8e3c..9e451c1c887fa 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py @@ -131,7 +131,7 @@ def is_dataset_pattern_allowed( if _is_sys_table(dataset_name): return False - dataset_params = _split_qualified_name(dataset_name) + dataset_params = split_qualified_name(dataset_name) if len(dataset_params) != 3: self.structured_reporter.info( title="Unexpected dataset pattern", @@ -193,17 +193,17 @@ def _is_sys_table(table_name: str) -> bool: return table_name.lower().startswith("sys$") -def _split_qualified_name(qualified_name: str) -> List[str]: +def split_qualified_name(qualified_name: str) -> List[str]: """ Split a qualified name into its constituent parts. - >>> _split_qualified_name("db.my_schema.my_table") + >>> split_qualified_name("db.my_schema.my_table") ['db', 'my_schema', 'my_table'] - >>> _split_qualified_name('"db"."my_schema"."my_table"') + >>> split_qualified_name('"db"."my_schema"."my_table"') ['db', 'my_schema', 'my_table'] - >>> _split_qualified_name('TEST_DB.TEST_SCHEMA."TABLE.WITH.DOTS"') + >>> split_qualified_name('TEST_DB.TEST_SCHEMA."TABLE.WITH.DOTS"') ['TEST_DB', 'TEST_SCHEMA', 'TABLE.WITH.DOTS'] - >>> _split_qualified_name('TEST_DB."SCHEMA.WITH.DOTS".MY_TABLE') + >>> split_qualified_name('TEST_DB."SCHEMA.WITH.DOTS".MY_TABLE') ['TEST_DB', 'SCHEMA.WITH.DOTS', 'MY_TABLE'] """ @@ -241,7 +241,7 @@ def _split_qualified_name(qualified_name: str) -> List[str]: def _cleanup_qualified_name( qualified_name: str, structured_reporter: SourceReport ) -> str: - name_parts = _split_qualified_name(qualified_name) + name_parts = split_qualified_name(qualified_name) if len(name_parts) != 3: if not _is_sys_table(qualified_name): structured_reporter.info(