From 1082e068124236808bb7420fe562c3e59cdcf4a6 Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Mon, 23 Oct 2023 18:59:07 -0400 Subject: [PATCH] fix(ingest/bigquery): Correctly apply table pattern to read events; fix end time calculation --- .../ingestion/source/bigquery_v2/bigquery_config.py | 7 +++---- .../datahub/ingestion/source/bigquery_v2/lineage.py | 2 +- .../src/datahub/ingestion/source/bigquery_v2/usage.py | 10 +++++++--- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index 944814b6936a45..a6a740385cf5c3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -119,8 +119,8 @@ class BigQueryV2Config( ) match_fully_qualified_names: bool = Field( - default=False, - description="Whether `dataset_pattern` is matched against fully qualified dataset name `.`.", + default=True, + description="[deprecated] Whether `dataset_pattern` is matched against fully qualified dataset name `.`.", ) include_external_url: bool = Field( @@ -327,8 +327,7 @@ def backward_compatibility_configs_set(cls, values: Dict) -> Dict: ): logger.warning( "Please update `dataset_pattern` to match against fully qualified schema name `.` and set config `match_fully_qualified_names : True`." - "Current default `match_fully_qualified_names: False` is only to maintain backward compatibility. " - "The config option `match_fully_qualified_names` will be deprecated in future and the default behavior will assume `match_fully_qualified_names: True`." + "The config option `match_fully_qualified_names` is deprecated and will be removed in a future release." ) return values diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py index 98c8cbaf85eec5..aa462435b81055 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py @@ -548,7 +548,7 @@ def _get_parsed_audit_log_events(self, project_id: str) -> Iterable[QueryEvent]: # handle the case where the read happens within our time range but the query # completion event is delayed and happens after the configured end time. corrected_start_time = self.start_time - self.config.max_query_duration - corrected_end_time = self.end_time + -self.config.max_query_duration + corrected_end_time = self.end_time + self.config.max_query_duration self.report.log_entry_start_time = corrected_start_time self.report.log_entry_end_time = corrected_end_time diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py index 201567e104a510..7fc38991e59284 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py @@ -335,8 +335,12 @@ def get_time_window(self) -> Tuple[datetime, datetime]: def _is_table_allowed(self, table_ref: Optional[BigQueryTableRef]) -> bool: return ( table_ref is not None - and self.config.dataset_pattern.allowed(table_ref.table_identifier.dataset) - and self.config.table_pattern.allowed(table_ref.table_identifier.table) + and self.config.dataset_pattern.allowed( + f"{table_ref.table_identifier.project_id}.{table_ref.table_identifier.dataset}" + if self.config.match_fully_qualified_names + else table_ref.table_identifier.dataset + ) + and self.config.table_pattern.allowed(str(table_ref.table_identifier)) ) def _should_ingest_usage(self) -> bool: @@ -844,7 +848,7 @@ def _get_parsed_bigquery_log_events( # handle the case where the read happens within our time range but the query # completion event is delayed and happens after the configured end time. corrected_start_time = self.start_time - self.config.max_query_duration - corrected_end_time = self.end_time + -self.config.max_query_duration + corrected_end_time = self.end_time + self.config.max_query_duration self.report.audit_start_time = corrected_start_time self.report.audit_end_time = corrected_end_time