From f7e69fc59448024223c18240e01ae3f5ded6ea6e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20G=C3=B3mez=20Villamor?= Date: Mon, 20 Jan 2025 16:41:26 +0100 Subject: [PATCH] fix(ingestion): groupby_unsorted --- .../source/bigquery_v2/bigquery_schema_gen.py | 4 ++-- .../datahub/ingestion/source/dbt/dbt_common.py | 4 ++-- .../ingestion/source/sql/hive_metastore.py | 6 +++--- .../datahub/ingestion/source/sql/teradata.py | 4 ++-- .../sql_parsing/sql_parsing_aggregator.py | 6 +++--- .../src/datahub/utilities/groupby.py | 17 +++++++++++++++++ .../test_overlapping_inserts.json | 8 ++++---- ...st_overlapping_inserts_from_temp_tables.json | 16 ++++++++-------- .../tests/unit/utilities/test_utilities.py | 11 +++++++++++ 9 files changed, 52 insertions(+), 24 deletions(-) create mode 100644 metadata-ingestion/src/datahub/utilities/groupby.py diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py index ebfbbf0639c38c..1cace31a5e4c49 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py @@ -2,7 +2,6 @@ import re from base64 import b32decode from collections import defaultdict -from itertools import groupby from typing import Dict, Iterable, List, Optional, Set, Type, Union, cast from google.cloud.bigquery.table import TableListItem @@ -101,6 +100,7 @@ from datahub.metadata.urns import TagUrn from datahub.sql_parsing.schema_resolver import SchemaResolver from datahub.utilities.file_backed_collections import FileBackedDict +from datahub.utilities.groupby import groupby_unsorted from datahub.utilities.hive_schema_to_avro import ( HiveColumnToAvroConverter, get_schema_fields_for_hive_column, @@ -730,7 +730,7 @@ def gen_foreign_keys( foreign_keys: List[BigqueryTableConstraint] = list( filter(lambda x: x.type == "FOREIGN KEY", table.constraints) ) - for key, group in groupby( + for key, group in groupby_unsorted( foreign_keys, lambda x: f"{x.referenced_project_id}.{x.referenced_dataset}.{x.referenced_table_name}", ): diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index fa85308b325979..99fa09b8b20b9b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -1,4 +1,3 @@ -import itertools import logging import re from abc import abstractmethod @@ -111,6 +110,7 @@ parse_statements_and_pick, try_format_query, ) +from datahub.utilities.groupby import groupby_unsorted from datahub.utilities.lossy_collections import LossyList from datahub.utilities.mapping import Constants, OperationProcessor from datahub.utilities.time import datetime_to_ts_millis @@ -1929,7 +1929,7 @@ def _translate_dbt_name_to_upstream_urn(dbt_name: str) -> str: else None ), ) - for downstream, upstreams in itertools.groupby( + for downstream, upstreams in groupby_unsorted( node.upstream_cll, lambda x: x.downstream_col ) ] diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hive_metastore.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hive_metastore.py index 60ecbaf38838a6..82b22d896698a8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hive_metastore.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hive_metastore.py @@ -2,7 +2,6 @@ import json import logging from collections import namedtuple -from itertools import groupby from typing import Any, Dict, Iterable, List, Optional, Tuple, Union from pydantic.dataclasses import dataclass @@ -58,6 +57,7 @@ SubTypesClass, ViewPropertiesClass, ) +from datahub.utilities.groupby import groupby_unsorted from datahub.utilities.hive_schema_to_avro import get_schema_fields_for_hive_column from datahub.utilities.str_enum import StrEnum @@ -490,7 +490,7 @@ def loop_tables( iter_res = self._alchemy_client.execute_query(statement) - for key, group in groupby(iter_res, self._get_table_key): + for key, group in groupby_unsorted(iter_res, self._get_table_key): schema_name = ( f"{db_name}.{key.schema}" if self.config.include_catalog_name_in_ids @@ -647,7 +647,7 @@ def get_hive_view_columns(self, inspector: Inspector) -> Iterable[ViewDataset]: ) iter_res = self._alchemy_client.execute_query(statement) - for key, group in groupby(iter_res, self._get_table_key): + for key, group in groupby_unsorted(iter_res, self._get_table_key): db_name = self.get_db_name(inspector) schema_name = ( diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py index 84b65d6635e9d4..e6319f668ecb8c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py @@ -3,7 +3,6 @@ from dataclasses import dataclass from datetime import datetime from functools import lru_cache -from itertools import groupby from typing import ( Any, Dict, @@ -59,6 +58,7 @@ from datahub.metadata.schema_classes import SchemaMetadataClass from datahub.sql_parsing.schema_resolver import SchemaResolver from datahub.sql_parsing.sqlglot_lineage import sqlglot_lineage +from datahub.utilities.groupby import groupby_unsorted logger: logging.Logger = logging.getLogger(__name__) @@ -286,7 +286,7 @@ def grouper(fk_row): # TODO: Check if there's a better way fk_dicts = list() - for constraint_info, constraint_cols in groupby(res, grouper): + for constraint_info, constraint_cols in groupby_unsorted(res, grouper): fk_dict = { "name": str(constraint_info["name"]), "constrained_columns": list(), diff --git a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py index 8637802f6b9fee..893e89f177094f 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py @@ -2,7 +2,6 @@ import dataclasses import enum import functools -import itertools import json import logging import os @@ -63,6 +62,7 @@ FileBackedDict, FileBackedList, ) +from datahub.utilities.groupby import groupby_unsorted from datahub.utilities.lossy_collections import LossyDict, LossyList from datahub.utilities.ordered_set import OrderedSet from datahub.utilities.perf_timer import PerfTimer @@ -1314,8 +1314,8 @@ def _gen_lineage_for_downstream( upstream_aspect.fineGrainedLineages = [] for downstream_column, all_upstream_columns in cll.items(): # Group by query ID. - for query_id, upstream_columns_for_query in itertools.groupby( - sorted(all_upstream_columns.items(), key=lambda x: x[1]), + for query_id, upstream_columns_for_query in groupby_unsorted( + all_upstream_columns.items(), key=lambda x: x[1], ): upstream_columns = [x[0] for x in upstream_columns_for_query] diff --git a/metadata-ingestion/src/datahub/utilities/groupby.py b/metadata-ingestion/src/datahub/utilities/groupby.py new file mode 100644 index 00000000000000..463bca44c1e49c --- /dev/null +++ b/metadata-ingestion/src/datahub/utilities/groupby.py @@ -0,0 +1,17 @@ +import collections +from typing import Callable, Iterable, Tuple, TypeVar + +T = TypeVar("T") +K = TypeVar("K") + + +def groupby_unsorted( + iterable: Iterable[T], key: Callable[[T], K] +) -> Iterable[Tuple[K, Iterable[T]]]: + """The default itertools.groupby() requires that the iterable is already sorted by the key. + This method is similar to groupby() but without the pre-sorted requirement.""" + + values = collections.defaultdict(list) + for v in iterable: + values[key(v)].append(v) + return values.items() diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_overlapping_inserts.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_overlapping_inserts.json index fcbe0ec5aeb839..fa693381854972 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_overlapping_inserts.json +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_overlapping_inserts.json @@ -38,26 +38,26 @@ { "upstreamType": "FIELD_SET", "upstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.upstream1,PROD),a)" + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.upstream2,PROD),a)" ], "downstreamType": "FIELD", "downstreams": [ "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.downstream,PROD),a)" ], "confidenceScore": 0.2, - "query": "urn:li:query:377a73bbf094c8b176b15157c24242cdfc7a0f407d78e52e63ded08c913468f1" + "query": "urn:li:query:c4b3a21ef8c262ebbe99a5bdb6c29cb0be646392bb4af10b6f4a758af881470e" }, { "upstreamType": "FIELD_SET", "upstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.upstream2,PROD),a)" + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.upstream1,PROD),a)" ], "downstreamType": "FIELD", "downstreams": [ "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.downstream,PROD),a)" ], "confidenceScore": 0.2, - "query": "urn:li:query:c4b3a21ef8c262ebbe99a5bdb6c29cb0be646392bb4af10b6f4a758af881470e" + "query": "urn:li:query:377a73bbf094c8b176b15157c24242cdfc7a0f407d78e52e63ded08c913468f1" }, { "upstreamType": "FIELD_SET", diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_overlapping_inserts_from_temp_tables.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_overlapping_inserts_from_temp_tables.json index ed8009477bf4db..0f9870cda322c2 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_overlapping_inserts_from_temp_tables.json +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_overlapping_inserts_from_temp_tables.json @@ -64,26 +64,26 @@ { "upstreamType": "FIELD_SET", "upstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.in_person_returns,PROD),customer_id)" + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.online_returns,PROD),customer_id)" ], "downstreamType": "FIELD", "downstreams": [ "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.all_returns,PROD),customer_id)" ], "confidenceScore": 0.2, - "query": "urn:li:query:composite_6b5a11e96e3d2b742e4e4ec3310bb538d0f5c0c6496b84e4bfe0e8014d5f5b45" + "query": "urn:li:query:composite_adc1c41c0ad37c643776d9d93d524e6c435a7e70633da1ce7e3222dda4bb9fb8" }, { "upstreamType": "FIELD_SET", "upstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.online_returns,PROD),customer_id)" + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.in_person_returns,PROD),customer_id)" ], "downstreamType": "FIELD", "downstreams": [ "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.all_returns,PROD),customer_id)" ], "confidenceScore": 0.2, - "query": "urn:li:query:composite_adc1c41c0ad37c643776d9d93d524e6c435a7e70633da1ce7e3222dda4bb9fb8" + "query": "urn:li:query:composite_6b5a11e96e3d2b742e4e4ec3310bb538d0f5c0c6496b84e4bfe0e8014d5f5b45" }, { "upstreamType": "FIELD_SET", @@ -100,26 +100,26 @@ { "upstreamType": "FIELD_SET", "upstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.in_person_returns,PROD),return_date)" + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.online_returns,PROD),return_date)" ], "downstreamType": "FIELD", "downstreams": [ "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.all_returns,PROD),return_date)" ], "confidenceScore": 0.2, - "query": "urn:li:query:composite_6b5a11e96e3d2b742e4e4ec3310bb538d0f5c0c6496b84e4bfe0e8014d5f5b45" + "query": "urn:li:query:composite_adc1c41c0ad37c643776d9d93d524e6c435a7e70633da1ce7e3222dda4bb9fb8" }, { "upstreamType": "FIELD_SET", "upstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.online_returns,PROD),return_date)" + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.in_person_returns,PROD),return_date)" ], "downstreamType": "FIELD", "downstreams": [ "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.all_returns,PROD),return_date)" ], "confidenceScore": 0.2, - "query": "urn:li:query:composite_adc1c41c0ad37c643776d9d93d524e6c435a7e70633da1ce7e3222dda4bb9fb8" + "query": "urn:li:query:composite_6b5a11e96e3d2b742e4e4ec3310bb538d0f5c0c6496b84e4bfe0e8014d5f5b45" }, { "upstreamType": "FIELD_SET", diff --git a/metadata-ingestion/tests/unit/utilities/test_utilities.py b/metadata-ingestion/tests/unit/utilities/test_utilities.py index c333ceb136fffc..52406d37de46a6 100644 --- a/metadata-ingestion/tests/unit/utilities/test_utilities.py +++ b/metadata-ingestion/tests/unit/utilities/test_utilities.py @@ -6,6 +6,7 @@ from datahub.sql_parsing.sqlglot_lineage import sqlglot_lineage from datahub.testing.doctest import assert_doctest from datahub.utilities.delayed_iter import delayed_iter +from datahub.utilities.groupby import groupby_unsorted from datahub.utilities.is_pytest import is_pytest_running from datahub.utilities.urns.dataset_urn import DatasetUrn @@ -335,3 +336,13 @@ def test_logging_name_extraction() -> None: def test_is_pytest_running() -> None: assert is_pytest_running() + + +def test_groupby_unsorted(): + grouped = groupby_unsorted("ABCAC", key=lambda x: x) + + assert list(grouped) == [ + ("A", ["A", "A"]), + ("B", ["B"]), + ("C", ["C", "C"]), + ]