From 837d00d391a9f3c6d79cbed9969fe4003e36f580 Mon Sep 17 00:00:00 2001 From: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com> Date: Wed, 11 Sep 2024 00:47:23 +0530 Subject: [PATCH] fix(ingest/bq): fix ordering of queries for use_queries_v2 (#11333) --- .../source/bigquery_v2/queries_extractor.py | 39 ++++++++++++++----- ...ineage_via_temp_table_disordered_add.json} | 0 .../unit/sql_parsing/test_sql_aggregator.py | 7 +++- 3 files changed, 35 insertions(+), 11 deletions(-) rename metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/{test_lineage_via_temp_table_disordered_add.json => test_table_lineage_via_temp_table_disordered_add.json} (100%) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py index 23106ce7d2f868..1e2a12c6adb87e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py @@ -250,14 +250,29 @@ def get_workunits_internal( with self.report.audit_log_preprocessing_timer: # Preprocessing stage that deduplicates the queries using query hash per usage bucket - queries_deduped: FileBackedDict[Dict[int, ObservedQuery]] + # Using regular dictionary with + # key: usage bucket + # value: File backed dictionary with query hash as key and observed query as value + # This structure is chosen in order to maintain order of execution of queries + + queries_deduped: Dict[int, FileBackedDict[ObservedQuery]] queries_deduped = self.deduplicate_queries(queries) - self.report.num_unique_queries = len(queries_deduped) + self.report.num_unique_queries = len( + set( + query_hash + for bucket in queries_deduped.values() + for query_hash in bucket + ) + ) with self.report.audit_log_load_timer: i = 0 - for query_instances in queries_deduped.values(): - for _, query in query_instances.items(): + for queries_in_bucket in queries_deduped.values(): + # Ordering is essential for column-level lineage via temporary table + for row in queries_in_bucket.sql_query_iterator( + "select value from data order by last_query_timestamp asc", + ): + query = queries_in_bucket.deserializer(row["value"]) if i > 0 and i % 10000 == 0: logger.info(f"Added {i} query log entries to SQL aggregator") @@ -268,7 +283,7 @@ def get_workunits_internal( def deduplicate_queries( self, queries: FileBackedList[ObservedQuery] - ) -> FileBackedDict[Dict[int, ObservedQuery]]: + ) -> Dict[int, FileBackedDict[ObservedQuery]]: # This fingerprint based deduplication is done here to reduce performance hit due to # repetitive sql parsing while adding observed query to aggregator that would otherwise @@ -276,7 +291,7 @@ def deduplicate_queries( # With current implementation, it is possible that "Operation"(e.g. INSERT) is reported # only once per day, although it may have happened multiple times throughout the day. - queries_deduped: FileBackedDict[Dict[int, ObservedQuery]] = FileBackedDict() + queries_deduped: Dict[int, FileBackedDict[ObservedQuery]] = dict() for i, query in enumerate(queries): if i > 0 and i % 10000 == 0: @@ -295,14 +310,20 @@ def deduplicate_queries( query.query, self.identifiers.platform, fast=True ) - query_instances = queries_deduped.setdefault(query.query_hash, {}) + if time_bucket not in queries_deduped: + # TODO: Cleanup, etc as required for file backed dicts after use + queries_deduped[time_bucket] = FileBackedDict[ObservedQuery]( + extra_columns={"last_query_timestamp": lambda e: e.timestamp} + ) - observed_query = query_instances.setdefault(time_bucket, query) + observed_query = queries_deduped[time_bucket].get(query.query_hash) # If the query already exists for this time bucket, update its attributes - if observed_query is not query: + if observed_query is not None: observed_query.usage_multiplier += 1 observed_query.timestamp = query.timestamp + else: + queries_deduped[time_bucket][query.query_hash] = query return queries_deduped diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_lineage_via_temp_table_disordered_add.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_lineage_via_temp_table_disordered_add.json similarity index 100% rename from metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_lineage_via_temp_table_disordered_add.json rename to metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_lineage_via_temp_table_disordered_add.json diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py b/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py index c730b4ee35e552..0d21936a74d072 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py +++ b/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py @@ -579,7 +579,9 @@ def test_create_table_query_mcps(pytestconfig: pytest.Config) -> None: @freeze_time(FROZEN_TIME) -def test_lineage_via_temp_table_disordered_add(pytestconfig: pytest.Config) -> None: +def test_table_lineage_via_temp_table_disordered_add( + pytestconfig: pytest.Config, +) -> None: aggregator = SqlParsingAggregator( platform="redshift", generate_lineage=True, @@ -607,7 +609,8 @@ def test_lineage_via_temp_table_disordered_add(pytestconfig: pytest.Config) -> N mce_helpers.check_goldens_stream( pytestconfig, outputs=mcps, - golden_path=RESOURCE_DIR / "test_lineage_via_temp_table_disordered_add.json", + golden_path=RESOURCE_DIR + / "test_table_lineage_via_temp_table_disordered_add.json", )