Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Sep 10, 2024
2 parents 215a50d + 837d00d commit ca180e2
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -250,14 +250,29 @@ def get_workunits_internal(

with self.report.audit_log_preprocessing_timer:
# Preprocessing stage that deduplicates the queries using query hash per usage bucket
queries_deduped: FileBackedDict[Dict[int, ObservedQuery]]
# Using regular dictionary with
# key: usage bucket
# value: File backed dictionary with query hash as key and observed query as value
# This structure is chosen in order to maintain order of execution of queries

queries_deduped: Dict[int, FileBackedDict[ObservedQuery]]
queries_deduped = self.deduplicate_queries(queries)
self.report.num_unique_queries = len(queries_deduped)
self.report.num_unique_queries = len(
set(
query_hash
for bucket in queries_deduped.values()
for query_hash in bucket
)
)

with self.report.audit_log_load_timer:
i = 0
for query_instances in queries_deduped.values():
for _, query in query_instances.items():
for queries_in_bucket in queries_deduped.values():
# Ordering is essential for column-level lineage via temporary table
for row in queries_in_bucket.sql_query_iterator(
"select value from data order by last_query_timestamp asc",
):
query = queries_in_bucket.deserializer(row["value"])
if i > 0 and i % 10000 == 0:
logger.info(f"Added {i} query log entries to SQL aggregator")

Expand All @@ -268,15 +283,15 @@ def get_workunits_internal(

def deduplicate_queries(
self, queries: FileBackedList[ObservedQuery]
) -> FileBackedDict[Dict[int, ObservedQuery]]:
) -> Dict[int, FileBackedDict[ObservedQuery]]:

# This fingerprint based deduplication is done here to reduce performance hit due to
# repetitive sql parsing while adding observed query to aggregator that would otherwise
# parse same query multiple times. In future, aggregator may absorb this deduplication.
# With current implementation, it is possible that "Operation"(e.g. INSERT) is reported
# only once per day, although it may have happened multiple times throughout the day.

queries_deduped: FileBackedDict[Dict[int, ObservedQuery]] = FileBackedDict()
queries_deduped: Dict[int, FileBackedDict[ObservedQuery]] = dict()

for i, query in enumerate(queries):
if i > 0 and i % 10000 == 0:
Expand All @@ -295,14 +310,20 @@ def deduplicate_queries(
query.query, self.identifiers.platform, fast=True
)

query_instances = queries_deduped.setdefault(query.query_hash, {})
if time_bucket not in queries_deduped:
# TODO: Cleanup, etc as required for file backed dicts after use
queries_deduped[time_bucket] = FileBackedDict[ObservedQuery](
extra_columns={"last_query_timestamp": lambda e: e.timestamp}
)

observed_query = query_instances.setdefault(time_bucket, query)
observed_query = queries_deduped[time_bucket].get(query.query_hash)

# If the query already exists for this time bucket, update its attributes
if observed_query is not query:
if observed_query is not None:
observed_query.usage_multiplier += 1
observed_query.timestamp = query.timestamp
else:
queries_deduped[time_bucket][query.query_hash] = query

return queries_deduped

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,9 @@ def test_create_table_query_mcps(pytestconfig: pytest.Config) -> None:


@freeze_time(FROZEN_TIME)
def test_lineage_via_temp_table_disordered_add(pytestconfig: pytest.Config) -> None:
def test_table_lineage_via_temp_table_disordered_add(
pytestconfig: pytest.Config,
) -> None:
aggregator = SqlParsingAggregator(
platform="redshift",
generate_lineage=True,
Expand Down Expand Up @@ -607,7 +609,8 @@ def test_lineage_via_temp_table_disordered_add(pytestconfig: pytest.Config) -> N
mce_helpers.check_goldens_stream(
pytestconfig,
outputs=mcps,
golden_path=RESOURCE_DIR / "test_lineage_via_temp_table_disordered_add.json",
golden_path=RESOURCE_DIR
/ "test_table_lineage_via_temp_table_disordered_add.json",
)


Expand Down

0 comments on commit ca180e2

Please sign in to comment.