Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/snowflake): propagate table list from main to query extractor #11222

Merged
merged 1 commit into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ClassificationReportMixin:
class ClassificationSourceConfigMixin(ConfigModel):
classification: ClassificationConfig = Field(
default=ClassificationConfig(),
description="For details, refer [Classification](../../../../metadata-ingestion/docs/dev_guides/classification.md).",
description="For details, refer to [Classification](../../../../metadata-ingestion/docs/dev_guides/classification.md).",
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ def _populate_external_lineage_from_copy_history(
def _process_external_lineage_result_row(
cls,
db_row: dict,
discovered_tables: Optional[List[str]],
discovered_tables: Optional[Collection[str]],
identifiers: SnowflakeIdentifierBuilder,
) -> Optional[KnownLineageMapping]:
# key is the down-stream table name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def __init__(
self.report = SnowflakeQueriesExtractorReport()
self.filters = filters
self.identifiers = identifiers
self.discovered_tables = discovered_tables
self.discovered_tables = set(discovered_tables) if discovered_tables else None

self._structured_report = structured_report

Expand Down Expand Up @@ -175,10 +175,24 @@ def local_temp_path(self) -> pathlib.Path:
return path

def is_temp_table(self, name: str) -> bool:
return any(
if any(
re.match(pattern, name, flags=re.IGNORECASE)
for pattern in self.config.temporary_tables_pattern
)
):
return True

# This is also a temp table if
# 1. this name would be allowed by the dataset patterns, and
# 2. we have a list of discovered tables, and
# 3. it's not in the discovered tables list
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't deleted tables as well fall into this category?
It might be ok to handle those as temp tables.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that's fine - generally tables get deleted because they're no longer used, and so bridging over them to generate lineage still makes sense

if (
self.filters.is_dataset_pattern_allowed(name, SnowflakeObjectDomain.TABLE)
and self.discovered_tables
and name not in self.discovered_tables
):
return True

return False

def is_allowed_table(self, name: str) -> bool:
if self.discovered_tables and name not in self.discovered_tables:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
filters=self.filters,
identifiers=self.identifiers,
schema_resolver=schema_resolver,
discovered_tables=discovered_datasets,
)

# TODO: This is slightly suboptimal because we create two SqlParsingAggregator instances with different configs
Expand Down
Loading