From ba7a43f530cfe21c179e32c8712eb040b83a4c13 Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Fri, 18 Oct 2024 20:16:08 +0200 Subject: [PATCH 1/9] fix(ingest/dagster): Fix JobSnapshot import is broken (#11672) --- .../datahub_dagster_plugin/client/dagster_generator.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/client/dagster_generator.py b/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/client/dagster_generator.py index a2cf159dd12f6..df123b127e040 100644 --- a/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/client/dagster_generator.py +++ b/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/client/dagster_generator.py @@ -12,7 +12,13 @@ TableSchemaMetadataValue, ) from dagster._core.execution.stats import RunStepKeyStatsSnapshot, StepEventStatus -from dagster._core.snap import JobSnapshot + +try: + from dagster._core.snap import JobSnapshot # type: ignore[attr-defined] +except ImportError: + # Import changed since Dagster 1.8.12 to this -> https://github.com/dagster-io/dagster/commit/29a37d1f0260cfd112849633d1096ffc916d6c95 + from dagster._core.snap import JobSnap as JobSnapshot + from dagster._core.snap.node import OpDefSnap from dagster._core.storage.dagster_run import DagsterRun, DagsterRunStatsSnapshot from datahub.api.entities.datajob import DataFlow, DataJob From 72d1236669c4052488f9ff23517f4568edbe6610 Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Fri, 18 Oct 2024 12:01:39 -0700 Subject: [PATCH 2/9] feat(ingest/transformer/domain): Add support for on conflict do nothing to dataset domain transformers (#11649) --- .../docs/transformer/dataset_transformer.md | 28 +++---- .../src/datahub/ingestion/graph/client.py | 1 + .../ingestion/transformer/dataset_domain.py | 41 ++++++---- .../tests/unit/test_transform_dataset.py | 76 +++++++++++++++++++ 4 files changed, 119 insertions(+), 27 deletions(-) diff --git a/metadata-ingestion/docs/transformer/dataset_transformer.md b/metadata-ingestion/docs/transformer/dataset_transformer.md index d48c6d2c1ab5b..66274ce64a8d2 100644 --- a/metadata-ingestion/docs/transformer/dataset_transformer.md +++ b/metadata-ingestion/docs/transformer/dataset_transformer.md @@ -122,12 +122,13 @@ transformers: ``` ## Simple Add Dataset ownership ### Config Details -| Field | Required | Type | Default | Description | -|--------------------|----------|--------------|-------------|---------------------------------------------------------------------| -| `owner_urns` | ✅ | list[string] | | List of owner urns. | -| `ownership_type` | | string | "DATAOWNER" | ownership type of the owners (either as enum or ownership type urn) | -| `replace_existing` | | boolean | `false` | Whether to remove ownership from entity sent by ingestion source. | -| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. | +| Field | Required | Type | Default | Description | +|--------------------|----------|--------------|-------------|------------------------------------------------------------------------------------------------------------| +| `owner_urns` | ✅ | list[string] | | List of owner urns. | +| `ownership_type` | | string | "DATAOWNER" | ownership type of the owners (either as enum or ownership type urn) | +| `replace_existing` | | boolean | `false` | Whether to remove ownership from entity sent by ingestion source. | +| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. | +| `on_conflict` | | enum | `DO_UPDATE` | Whether to make changes if domains already exist. If set to DO_NOTHING, `semantics` setting is irrelevant. | For transformer behaviour on `replace_existing` and `semantics`, please refer section [Relationship Between replace_existing And semantics](#relationship-between-replace_existing-and-semantics). @@ -191,13 +192,14 @@ transformers: ## Pattern Add Dataset ownership ### Config Details -| Field | Required | Type | Default | Description | -|--------------------|----------|----------------------|-------------|-----------------------------------------------------------------------------------------| -| `owner_pattern` | ✅ | map[regx, list[urn]] | | entity urn with regular expression and list of owners urn apply to matching entity urn. | -| `ownership_type` | | string | "DATAOWNER" | ownership type of the owners (either as enum or ownership type urn) | -| `replace_existing` | | boolean | `false` | Whether to remove owners from entity sent by ingestion source. | -| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. | -| `is_container` | | bool | `false` | Whether to also consider a container or not. If true, then ownership will be attached to both the dataset and its container. | +| Field | Required | Type | Default | Description | +|--------------------|----------|----------------------|-------------|------------------------------------------------------------------------------------------------------------------------------| +| `owner_pattern` | ✅ | map[regx, list[urn]] | | entity urn with regular expression and list of owners urn apply to matching entity urn. | +| `ownership_type` | | string | "DATAOWNER" | ownership type of the owners (either as enum or ownership type urn) | +| `replace_existing` | | boolean | `false` | Whether to remove owners from entity sent by ingestion source. | +| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. | +| `is_container` | | bool | `false` | Whether to also consider a container or not. If true, then ownership will be attached to both the dataset and its container. | +| `on_conflict` | | enum | `DO_UPDATE` | Whether to make changes if domains already exist. If set to DO_NOTHING, `semantics` setting is irrelevant. | let’s suppose we’d like to append a series of users who we know to own a different dataset from a data source but aren't detected during normal ingestion. To do so, we can use the `pattern_add_dataset_ownership` module that’s included in the ingestion framework. This will match the pattern to `urn` of the dataset and assign the respective owners. diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index e8fae6254ae88..1d2528a24c4e5 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -351,6 +351,7 @@ def get_tags(self, entity_urn: str) -> Optional[GlobalTagsClass]: def get_glossary_terms(self, entity_urn: str) -> Optional[GlossaryTermsClass]: return self.get_aspect(entity_urn=entity_urn, aspect_type=GlossaryTermsClass) + @functools.lru_cache(maxsize=1) def get_domain(self, entity_urn: str) -> Optional[DomainsClass]: return self.get_aspect(entity_urn=entity_urn, aspect_type=DomainsClass) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py index 6a83824815265..6b78b71eaa78e 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py @@ -1,6 +1,8 @@ import logging +from enum import auto from typing import Callable, Dict, List, Optional, Sequence, Union, cast +from datahub.configuration._config_enum import ConfigEnum from datahub.configuration.common import ( ConfigurationError, KeyValuePattern, @@ -23,6 +25,13 @@ logger = logging.getLogger(__name__) +class TransformerOnConflict(ConfigEnum): + """Describes the behavior of the transformer when writing an aspect that already exists.""" + + DO_UPDATE = auto() # On conflict, apply the new aspect + DO_NOTHING = auto() # On conflict, do not apply the new aspect + + class AddDatasetDomainSemanticsConfig(TransformerSemanticsConfigModel): get_domains_to_add: Union[ Callable[[str], DomainsClass], @@ -32,10 +41,12 @@ class AddDatasetDomainSemanticsConfig(TransformerSemanticsConfigModel): _resolve_domain_fn = pydantic_resolve_key("get_domains_to_add") is_container: bool = False + on_conflict: TransformerOnConflict = TransformerOnConflict.DO_UPDATE class SimpleDatasetDomainSemanticsConfig(TransformerSemanticsConfigModel): domains: List[str] + on_conflict: TransformerOnConflict = TransformerOnConflict.DO_UPDATE class PatternDatasetDomainSemanticsConfig(TransformerSemanticsConfigModel): @@ -80,12 +91,13 @@ def get_domain_class( @staticmethod def _merge_with_server_domains( - graph: DataHubGraph, urn: str, mce_domain: Optional[DomainsClass] + graph: Optional[DataHubGraph], urn: str, mce_domain: Optional[DomainsClass] ) -> Optional[DomainsClass]: if not mce_domain or not mce_domain.domains: # nothing to add, no need to consult server return None + assert graph server_domain = graph.get_domain(entity_urn=urn) if server_domain: # compute patch @@ -155,7 +167,7 @@ def transform_aspect( self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect] ) -> Optional[Aspect]: in_domain_aspect: DomainsClass = cast(DomainsClass, aspect) - domain_aspect = DomainsClass(domains=[]) + domain_aspect: DomainsClass = DomainsClass(domains=[]) # Check if we have received existing aspect if in_domain_aspect is not None and self.config.replace_existing is False: domain_aspect.domains.extend(in_domain_aspect.domains) @@ -164,16 +176,18 @@ def transform_aspect( domain_aspect.domains.extend(domain_to_add.domains) - if self.config.semantics == TransformerSemantics.PATCH: - assert self.ctx.graph - patch_domain_aspect: Optional[ - DomainsClass - ] = AddDatasetDomain._merge_with_server_domains( - self.ctx.graph, entity_urn, domain_aspect - ) - return cast(Optional[Aspect], patch_domain_aspect) - - return cast(Optional[Aspect], domain_aspect) + final_aspect: Optional[DomainsClass] = domain_aspect + if domain_aspect.domains: + if self.config.on_conflict == TransformerOnConflict.DO_NOTHING: + assert self.ctx.graph + server_domain = self.ctx.graph.get_domain(entity_urn) + if server_domain and server_domain.domains: + return None + if self.config.semantics == TransformerSemantics.PATCH: + final_aspect = AddDatasetDomain._merge_with_server_domains( + self.ctx.graph, entity_urn, domain_aspect + ) + return cast(Optional[Aspect], final_aspect) class SimpleAddDatasetDomain(AddDatasetDomain): @@ -186,8 +200,7 @@ def __init__( domains = AddDatasetDomain.get_domain_class(ctx.graph, config.domains) generic_config = AddDatasetDomainSemanticsConfig( get_domains_to_add=lambda _: domains, - semantics=config.semantics, - replace_existing=config.replace_existing, + **config.dict(exclude={"domains"}), ) super().__init__(generic_config, ctx) diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index 2e2e85b5d1811..4e9a38cb37ae6 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -56,6 +56,7 @@ from datahub.ingestion.transformer.dataset_domain import ( PatternAddDatasetDomain, SimpleAddDatasetDomain, + TransformerOnConflict, ) from datahub.ingestion.transformer.dataset_domain_based_on_tags import ( DatasetTagDomainMapper, @@ -2498,6 +2499,81 @@ def fake_get_domain(entity_urn: str) -> models.DomainsClass: assert server_domain in transformed_aspect.domains +def test_simple_add_dataset_domain_on_conflict_do_nothing( + pytestconfig, tmp_path, mock_time, mock_datahub_graph_instance +): + acryl_domain = builder.make_domain_urn("acryl.io") + datahub_domain = builder.make_domain_urn("datahubproject.io") + server_domain = builder.make_domain_urn("test.io") + + pipeline_context = PipelineContext(run_id="transformer_pipe_line") + pipeline_context.graph = mock_datahub_graph_instance + + # Return fake aspect to simulate server behaviour + def fake_get_domain(entity_urn: str) -> models.DomainsClass: + return models.DomainsClass(domains=[server_domain]) + + pipeline_context.graph.get_domain = fake_get_domain # type: ignore + + output = run_dataset_transformer_pipeline( + transformer_type=SimpleAddDatasetDomain, + aspect=models.DomainsClass(domains=[datahub_domain]), + config={ + "replace_existing": False, + "semantics": TransformerSemantics.PATCH, + "domains": [acryl_domain], + "on_conflict": TransformerOnConflict.DO_NOTHING, + }, + pipeline_context=pipeline_context, + ) + + assert len(output) == 1 + assert output[0] is not None + assert output[0].record is not None + assert isinstance(output[0].record, EndOfStream) + + +def test_simple_add_dataset_domain_on_conflict_do_nothing_no_conflict( + pytestconfig, tmp_path, mock_time, mock_datahub_graph_instance +): + acryl_domain = builder.make_domain_urn("acryl.io") + datahub_domain = builder.make_domain_urn("datahubproject.io") + irrelevant_domain = builder.make_domain_urn("test.io") + + pipeline_context = PipelineContext(run_id="transformer_pipe_line") + pipeline_context.graph = mock_datahub_graph_instance + + # Return fake aspect to simulate server behaviour + def fake_get_domain(entity_urn: str) -> models.DomainsClass: + return models.DomainsClass(domains=[]) + + pipeline_context.graph.get_domain = fake_get_domain # type: ignore + + output = run_dataset_transformer_pipeline( + transformer_type=SimpleAddDatasetDomain, + aspect=models.DomainsClass(domains=[datahub_domain]), + config={ + "replace_existing": False, + "semantics": TransformerSemantics.PATCH, + "domains": [acryl_domain], + "on_conflict": TransformerOnConflict.DO_NOTHING, + }, + pipeline_context=pipeline_context, + ) + + assert len(output) == 2 + assert output[0] is not None + assert output[0].record is not None + assert isinstance(output[0].record, MetadataChangeProposalWrapper) + assert output[0].record.aspect is not None + assert isinstance(output[0].record.aspect, models.DomainsClass) + transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) + assert len(transformed_aspect.domains) == 2 + assert datahub_domain in transformed_aspect.domains + assert acryl_domain in transformed_aspect.domains + assert irrelevant_domain not in transformed_aspect.domains + + def test_pattern_add_dataset_domain_aspect_name(mock_datahub_graph_instance): pipeline_context: PipelineContext = PipelineContext( run_id="test_simple_add_dataset_domain" From 7e4d4aba122f3ec22825ef5005f759172be26678 Mon Sep 17 00:00:00 2001 From: Jay Feldman <8128360+feldjay@users.noreply.github.com> Date: Fri, 18 Oct 2024 15:58:52 -0400 Subject: [PATCH 3/9] fix(ingest/looker): Remove bad imports from looker_common (#11663) --- .../src/datahub/ingestion/source/looker/looker_common.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py index 3cbb13375229b..317b212f7fa96 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py @@ -929,7 +929,6 @@ def from_api( # noqa: C901 reporter: SourceReport, source_config: LookerDashboardSourceConfig, ) -> Optional["LookerExplore"]: # noqa: C901 - from datahub.ingestion.source.looker.lookml_source import _BASE_PROJECT_NAME try: explore = client.lookml_model_explore(model, explore_name) @@ -1194,7 +1193,6 @@ def _to_metadata_events( # noqa: C901 ) -> Optional[List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]]]: # We only generate MCE-s for explores that contain from clauses and do NOT contain joins # All other explores (passthrough explores and joins) end in correct resolution of lineage, and don't need additional nodes in the graph. - from datahub.ingestion.source.looker.lookml_source import _BASE_PROJECT_NAME dataset_snapshot = DatasetSnapshot( urn=self.get_explore_urn(config), From dfd7293f8ded30f6db5a3a13b045a2c9b5fdf21d Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Fri, 18 Oct 2024 13:05:06 -0700 Subject: [PATCH 4/9] feat(ingest/looker): include project name in model/explore properties (#11664) Co-authored-by: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com> --- .../ingestion/source/looker/looker_common.py | 16 ++-- .../ingestion/source/looker/looker_source.py | 41 ++++++---- .../looker/golden_looker_mces.json | 7 ++ .../looker/golden_test_allow_ingest.json | 4 + ...olden_test_external_project_view_mces.json | 4 + .../looker/golden_test_file_path_ingest.json | 4 + ...olden_test_folder_path_pattern_ingest.json | 4 + .../golden_test_independent_look_ingest.json | 82 +++++++++++-------- .../looker/golden_test_ingest.json | 4 + .../looker/golden_test_ingest_joins.json | 4 + .../golden_test_ingest_unaliased_joins.json | 4 + ...en_test_non_personal_independent_look.json | 7 ++ .../looker_mces_golden_deleted_stateful.json | 16 ++-- .../looker/looker_mces_usage_history.json | 4 + 14 files changed, 135 insertions(+), 66 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py index 317b212f7fa96..3d1683100474e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py @@ -1205,15 +1205,19 @@ def _to_metadata_events( # noqa: C901 dataset_snapshot.aspects.append(browse_paths) dataset_snapshot.aspects.append(StatusClass(removed=False)) - custom_properties = {} - if self.label is not None: - custom_properties["looker.explore.label"] = str(self.label) - if self.source_file is not None: - custom_properties["looker.explore.file"] = str(self.source_file) + custom_properties = { + "project": self.project_name, + "model": self.model_name, + "looker.explore.label": self.label, + "looker.explore.name": self.name, + "looker.explore.file": self.source_file, + } dataset_props = DatasetPropertiesClass( name=str(self.label) if self.label else LookerUtil._display_name(self.name), description=self.description, - customProperties=custom_properties, + customProperties={ + k: str(v) for k, v in custom_properties.items() if v is not None + }, ) dataset_props.externalUrl = self._get_url(base_url) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py index f269ccf1cd98f..e42ac7b61c177 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py @@ -139,26 +139,21 @@ class LookerDashboardSource(TestableSource, StatefulIngestionSourceBase): """ platform = "looker" - source_config: LookerDashboardSourceConfig - reporter: LookerDashboardSourceReport - user_registry: LookerUserRegistry - reachable_look_registry: Set[ - str - ] # Keep track of look-id which are reachable from Dashboard def __init__(self, config: LookerDashboardSourceConfig, ctx: PipelineContext): super().__init__(config, ctx) - self.source_config = config - self.reporter = LookerDashboardSourceReport() + self.source_config: LookerDashboardSourceConfig = config + self.reporter: LookerDashboardSourceReport = LookerDashboardSourceReport() self.looker_api: LookerAPI = LookerAPI(self.source_config) - self.user_registry = LookerUserRegistry(self.looker_api) - self.explore_registry = LookerExploreRegistry( + self.user_registry: LookerUserRegistry = LookerUserRegistry(self.looker_api) + self.explore_registry: LookerExploreRegistry = LookerExploreRegistry( self.looker_api, self.reporter, self.source_config ) self.reporter._looker_explore_registry = self.explore_registry self.reporter._looker_api = self.looker_api - self.reachable_look_registry = set() + # Keep track of look-id which are reachable from Dashboard + self.reachable_look_registry: Set[str] = set() # (model, explore) -> list of charts/looks/dashboards that reference this explore # The list values are used purely for debugging purposes. @@ -868,21 +863,31 @@ def _make_explore_metadata_events( ) -> Iterable[ Union[MetadataChangeEvent, MetadataChangeProposalWrapper, MetadataWorkUnit] ]: - if self.source_config.emit_used_explores_only: - explores_to_fetch = list(self.reachable_explores.keys()) - else: + if not self.source_config.emit_used_explores_only: explores_to_fetch = list(self.list_all_explores()) + else: + # We don't keep track of project names for each explore right now. + # Because project names are just used for a custom property, it's + # fine to set them to None. + # TODO: Track project names for each explore. + explores_to_fetch = [ + (None, model, explore) + for (model, explore) in self.reachable_explores.keys() + ] explores_to_fetch.sort() processed_models: List[str] = [] - for model, _ in explores_to_fetch: + for project_name, model, _ in explores_to_fetch: if model not in processed_models: model_key = gen_model_key(self.source_config, model) yield from gen_containers( container_key=model_key, name=model, sub_types=[BIContainerSubTypes.LOOKML_MODEL], + extra_properties=( + {"project": project_name} if project_name is not None else None + ), ) yield MetadataChangeProposalWrapper( entityUrn=model_key.as_urn(), @@ -896,7 +901,7 @@ def _make_explore_metadata_events( self.reporter.total_explores = len(explores_to_fetch) for future in BackpressureAwareExecutor.map( self.fetch_one_explore, - ((model, explore) for (model, explore) in explores_to_fetch), + ((model, explore) for (_project, model, explore) in explores_to_fetch), max_workers=self.source_config.max_threads, ): events, explore_id, start_time, end_time = future.result() @@ -907,7 +912,7 @@ def _make_explore_metadata_events( f"Running time of fetch_one_explore for {explore_id}: {(end_time - start_time).total_seconds()}" ) - def list_all_explores(self) -> Iterable[Tuple[str, str]]: + def list_all_explores(self) -> Iterable[Tuple[Optional[str], str, str]]: # returns a list of (model, explore) tuples for model in self.looker_api.all_lookml_models(): @@ -916,7 +921,7 @@ def list_all_explores(self) -> Iterable[Tuple[str, str]]: for explore in model.explores: if explore.name is None: continue - yield (model.name, explore.name) + yield (model.project_name, model.name, explore.name) def fetch_one_explore( self, model: str, explore: str diff --git a/metadata-ingestion/tests/integration/looker/golden_looker_mces.json b/metadata-ingestion/tests/integration/looker/golden_looker_mces.json index 5cac7b1bb73b1..a9c445b5986ef 100644 --- a/metadata-ingestion/tests/integration/looker/golden_looker_mces.json +++ b/metadata-ingestion/tests/integration/looker/golden_looker_mces.json @@ -11,6 +11,7 @@ "description": "lorem ipsum", "charts": [], "datasets": [], + "dashboards": [], "lastModified": { "created": { "time": 1586847600000, @@ -440,7 +441,10 @@ { "com.linkedin.pegasus2avro.dataset.DatasetProperties": { "customProperties": { + "project": "lkml_samples", + "model": "bogus data", "looker.explore.label": "My Explore View", + "looker.explore.name": "my_view", "looker.explore.file": "test_source_file.lkml" }, "externalUrl": "https://looker.company.com/explore/bogus data/my_view", @@ -616,7 +620,10 @@ { "com.linkedin.pegasus2avro.dataset.DatasetProperties": { "customProperties": { + "project": "lkml_samples", + "model": "data", "looker.explore.label": "My Explore View", + "looker.explore.name": "my_view", "looker.explore.file": "test_source_file.lkml" }, "externalUrl": "https://looker.company.com/explore/data/my_view", diff --git a/metadata-ingestion/tests/integration/looker/golden_test_allow_ingest.json b/metadata-ingestion/tests/integration/looker/golden_test_allow_ingest.json index 24a738a815cda..af9c62a2a4180 100644 --- a/metadata-ingestion/tests/integration/looker/golden_test_allow_ingest.json +++ b/metadata-ingestion/tests/integration/looker/golden_test_allow_ingest.json @@ -11,6 +11,7 @@ "description": "lorem ipsum", "charts": [], "datasets": [], + "dashboards": [], "lastModified": { "created": { "time": 1586847600000, @@ -282,7 +283,10 @@ { "com.linkedin.pegasus2avro.dataset.DatasetProperties": { "customProperties": { + "project": "lkml_samples", + "model": "data", "looker.explore.label": "My Explore View", + "looker.explore.name": "my_view", "looker.explore.file": "test_source_file.lkml" }, "externalUrl": "https://looker.company.com/explore/data/my_view", diff --git a/metadata-ingestion/tests/integration/looker/golden_test_external_project_view_mces.json b/metadata-ingestion/tests/integration/looker/golden_test_external_project_view_mces.json index b1460779da4f5..b89bc356b48fd 100644 --- a/metadata-ingestion/tests/integration/looker/golden_test_external_project_view_mces.json +++ b/metadata-ingestion/tests/integration/looker/golden_test_external_project_view_mces.json @@ -202,6 +202,7 @@ "urn:li:chart:(looker,dashboard_elements.2)" ], "datasets": [], + "dashboards": [], "lastModified": { "created": { "time": 1586847600000, @@ -520,7 +521,10 @@ { "com.linkedin.pegasus2avro.dataset.DatasetProperties": { "customProperties": { + "project": "looker_hub", + "model": "data", "looker.explore.label": "My Explore View", + "looker.explore.name": "my_view", "looker.explore.file": "test_source_file.lkml" }, "externalUrl": "https://looker.company.com/explore/data/my_view", diff --git a/metadata-ingestion/tests/integration/looker/golden_test_file_path_ingest.json b/metadata-ingestion/tests/integration/looker/golden_test_file_path_ingest.json index 74400b9b5cc56..810fefd8f6cb8 100644 --- a/metadata-ingestion/tests/integration/looker/golden_test_file_path_ingest.json +++ b/metadata-ingestion/tests/integration/looker/golden_test_file_path_ingest.json @@ -202,6 +202,7 @@ "urn:li:chart:(looker,dashboard_elements.2)" ], "datasets": [], + "dashboards": [], "lastModified": { "created": { "time": 1586847600000, @@ -520,7 +521,10 @@ { "com.linkedin.pegasus2avro.dataset.DatasetProperties": { "customProperties": { + "project": "looker_hub", + "model": "data", "looker.explore.label": "My Explore View", + "looker.explore.name": "my_view", "looker.explore.file": "test_source_file.lkml" }, "externalUrl": "https://looker.company.com/explore/data/my_view", diff --git a/metadata-ingestion/tests/integration/looker/golden_test_folder_path_pattern_ingest.json b/metadata-ingestion/tests/integration/looker/golden_test_folder_path_pattern_ingest.json index 89241fb52fb63..3d78397f54a23 100644 --- a/metadata-ingestion/tests/integration/looker/golden_test_folder_path_pattern_ingest.json +++ b/metadata-ingestion/tests/integration/looker/golden_test_folder_path_pattern_ingest.json @@ -287,6 +287,7 @@ "description": "third", "charts": [], "datasets": [], + "dashboards": [], "lastModified": { "created": { "time": 1586847600000, @@ -613,7 +614,10 @@ { "com.linkedin.pegasus2avro.dataset.DatasetProperties": { "customProperties": { + "project": "lkml_samples", + "model": "data", "looker.explore.label": "My Explore View", + "looker.explore.name": "my_view", "looker.explore.file": "test_source_file.lkml" }, "externalUrl": "https://looker.company.com/explore/data/my_view", diff --git a/metadata-ingestion/tests/integration/looker/golden_test_independent_look_ingest.json b/metadata-ingestion/tests/integration/looker/golden_test_independent_look_ingest.json index f178e97e78fa0..5a540e61e768d 100644 --- a/metadata-ingestion/tests/integration/looker/golden_test_independent_look_ingest.json +++ b/metadata-ingestion/tests/integration/looker/golden_test_independent_look_ingest.json @@ -210,6 +210,7 @@ "urn:li:chart:(looker,dashboard_elements.2)" ], "datasets": [], + "dashboards": [], "lastModified": { "created": { "time": 1586847600000, @@ -1107,12 +1108,12 @@ { "proposedSnapshot": { "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { - "urn": "urn:li:dataset:(urn:li:dataPlatform:looker,data.explore.my_view,PROD)", + "urn": "urn:li:dataset:(urn:li:dataPlatform:looker,sales_model.explore.sales_explore,PROD)", "aspects": [ { "com.linkedin.pegasus2avro.common.BrowsePaths": { "paths": [ - "/Explore/data" + "/Explore/sales_model" ] } }, @@ -1124,10 +1125,13 @@ { "com.linkedin.pegasus2avro.dataset.DatasetProperties": { "customProperties": { + "project": "lkml_samples", + "model": "sales_model", "looker.explore.label": "My Explore View", + "looker.explore.name": "sales_explore", "looker.explore.file": "test_source_file.lkml" }, - "externalUrl": "https://looker.company.com/explore/data/my_view", + "externalUrl": "https://looker.company.com/explore/sales_model/sales_explore", "name": "My Explore View", "description": "lorem ipsum", "tags": [] @@ -1149,7 +1153,7 @@ }, { "com.linkedin.pegasus2avro.schema.SchemaMetadata": { - "schemaName": "my_view", + "schemaName": "sales_explore", "platform": "urn:li:dataPlatform:looker", "version": 0, "created": { @@ -1204,7 +1208,7 @@ }, { "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,data.explore.my_view,PROD)", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,sales_model.explore.sales_explore,PROD)", "changeType": "UPSERT", "aspectName": "subTypes", "aspect": { @@ -1223,12 +1227,12 @@ }, { "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,data.explore.my_view,PROD)", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,sales_model.explore.sales_explore,PROD)", "changeType": "UPSERT", "aspectName": "embed", "aspect": { "json": { - "renderUrl": "https://looker.company.com/embed/explore/data/my_view" + "renderUrl": "https://looker.company.com/embed/explore/sales_model/sales_explore" } }, "systemMetadata": { @@ -1240,12 +1244,12 @@ }, { "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,data.explore.my_view,PROD)", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,sales_model.explore.sales_explore,PROD)", "changeType": "UPSERT", "aspectName": "container", "aspect": { "json": { - "container": "urn:li:container:59a5aa45397364e6882e793f1bc77b42" + "container": "urn:li:container:d38ab60586a6e39b4cf63f14946969c5" } }, "systemMetadata": { @@ -1257,7 +1261,7 @@ }, { "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,data.explore.my_view,PROD)", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,sales_model.explore.sales_explore,PROD)", "changeType": "UPSERT", "aspectName": "browsePathsV2", "aspect": { @@ -1267,8 +1271,8 @@ "id": "Explore" }, { - "id": "urn:li:container:59a5aa45397364e6882e793f1bc77b42", - "urn": "urn:li:container:59a5aa45397364e6882e793f1bc77b42" + "id": "urn:li:container:d38ab60586a6e39b4cf63f14946969c5", + "urn": "urn:li:container:d38ab60586a6e39b4cf63f14946969c5" } ] } @@ -1283,12 +1287,12 @@ { "proposedSnapshot": { "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { - "urn": "urn:li:dataset:(urn:li:dataPlatform:looker,order_model.explore.order_explore,PROD)", + "urn": "urn:li:dataset:(urn:li:dataPlatform:looker,data.explore.my_view,PROD)", "aspects": [ { "com.linkedin.pegasus2avro.common.BrowsePaths": { "paths": [ - "/Explore/order_model" + "/Explore/data" ] } }, @@ -1300,10 +1304,13 @@ { "com.linkedin.pegasus2avro.dataset.DatasetProperties": { "customProperties": { + "project": "lkml_samples", + "model": "data", "looker.explore.label": "My Explore View", + "looker.explore.name": "my_view", "looker.explore.file": "test_source_file.lkml" }, - "externalUrl": "https://looker.company.com/explore/order_model/order_explore", + "externalUrl": "https://looker.company.com/explore/data/my_view", "name": "My Explore View", "description": "lorem ipsum", "tags": [] @@ -1325,7 +1332,7 @@ }, { "com.linkedin.pegasus2avro.schema.SchemaMetadata": { - "schemaName": "order_explore", + "schemaName": "my_view", "platform": "urn:li:dataPlatform:looker", "version": 0, "created": { @@ -1380,7 +1387,7 @@ }, { "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,order_model.explore.order_explore,PROD)", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,data.explore.my_view,PROD)", "changeType": "UPSERT", "aspectName": "subTypes", "aspect": { @@ -1399,12 +1406,12 @@ }, { "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,order_model.explore.order_explore,PROD)", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,data.explore.my_view,PROD)", "changeType": "UPSERT", "aspectName": "embed", "aspect": { "json": { - "renderUrl": "https://looker.company.com/embed/explore/order_model/order_explore" + "renderUrl": "https://looker.company.com/embed/explore/data/my_view" } }, "systemMetadata": { @@ -1416,12 +1423,12 @@ }, { "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,order_model.explore.order_explore,PROD)", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,data.explore.my_view,PROD)", "changeType": "UPSERT", "aspectName": "container", "aspect": { "json": { - "container": "urn:li:container:df4ee66abd19b668c88bfe4408f87e60" + "container": "urn:li:container:59a5aa45397364e6882e793f1bc77b42" } }, "systemMetadata": { @@ -1433,7 +1440,7 @@ }, { "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,order_model.explore.order_explore,PROD)", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,data.explore.my_view,PROD)", "changeType": "UPSERT", "aspectName": "browsePathsV2", "aspect": { @@ -1443,8 +1450,8 @@ "id": "Explore" }, { - "id": "urn:li:container:df4ee66abd19b668c88bfe4408f87e60", - "urn": "urn:li:container:df4ee66abd19b668c88bfe4408f87e60" + "id": "urn:li:container:59a5aa45397364e6882e793f1bc77b42", + "urn": "urn:li:container:59a5aa45397364e6882e793f1bc77b42" } ] } @@ -1459,12 +1466,12 @@ { "proposedSnapshot": { "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { - "urn": "urn:li:dataset:(urn:li:dataPlatform:looker,sales_model.explore.sales_explore,PROD)", + "urn": "urn:li:dataset:(urn:li:dataPlatform:looker,order_model.explore.order_explore,PROD)", "aspects": [ { "com.linkedin.pegasus2avro.common.BrowsePaths": { "paths": [ - "/Explore/sales_model" + "/Explore/order_model" ] } }, @@ -1476,10 +1483,13 @@ { "com.linkedin.pegasus2avro.dataset.DatasetProperties": { "customProperties": { + "project": "lkml_samples", + "model": "order_model", "looker.explore.label": "My Explore View", + "looker.explore.name": "order_explore", "looker.explore.file": "test_source_file.lkml" }, - "externalUrl": "https://looker.company.com/explore/sales_model/sales_explore", + "externalUrl": "https://looker.company.com/explore/order_model/order_explore", "name": "My Explore View", "description": "lorem ipsum", "tags": [] @@ -1501,7 +1511,7 @@ }, { "com.linkedin.pegasus2avro.schema.SchemaMetadata": { - "schemaName": "sales_explore", + "schemaName": "order_explore", "platform": "urn:li:dataPlatform:looker", "version": 0, "created": { @@ -1556,7 +1566,7 @@ }, { "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,sales_model.explore.sales_explore,PROD)", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,order_model.explore.order_explore,PROD)", "changeType": "UPSERT", "aspectName": "subTypes", "aspect": { @@ -1575,12 +1585,12 @@ }, { "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,sales_model.explore.sales_explore,PROD)", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,order_model.explore.order_explore,PROD)", "changeType": "UPSERT", "aspectName": "embed", "aspect": { "json": { - "renderUrl": "https://looker.company.com/embed/explore/sales_model/sales_explore" + "renderUrl": "https://looker.company.com/embed/explore/order_model/order_explore" } }, "systemMetadata": { @@ -1592,12 +1602,12 @@ }, { "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,sales_model.explore.sales_explore,PROD)", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,order_model.explore.order_explore,PROD)", "changeType": "UPSERT", "aspectName": "container", "aspect": { "json": { - "container": "urn:li:container:d38ab60586a6e39b4cf63f14946969c5" + "container": "urn:li:container:df4ee66abd19b668c88bfe4408f87e60" } }, "systemMetadata": { @@ -1609,7 +1619,7 @@ }, { "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,sales_model.explore.sales_explore,PROD)", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,order_model.explore.order_explore,PROD)", "changeType": "UPSERT", "aspectName": "browsePathsV2", "aspect": { @@ -1619,8 +1629,8 @@ "id": "Explore" }, { - "id": "urn:li:container:d38ab60586a6e39b4cf63f14946969c5", - "urn": "urn:li:container:d38ab60586a6e39b4cf63f14946969c5" + "id": "urn:li:container:df4ee66abd19b668c88bfe4408f87e60", + "urn": "urn:li:container:df4ee66abd19b668c88bfe4408f87e60" } ] } diff --git a/metadata-ingestion/tests/integration/looker/golden_test_ingest.json b/metadata-ingestion/tests/integration/looker/golden_test_ingest.json index d969ef62a96e5..9ac95b8482a47 100644 --- a/metadata-ingestion/tests/integration/looker/golden_test_ingest.json +++ b/metadata-ingestion/tests/integration/looker/golden_test_ingest.json @@ -229,6 +229,7 @@ "urn:li:chart:(looker,ap-south-1.dashboard_elements.2)" ], "datasets": [], + "dashboards": [], "lastModified": { "created": { "time": 1586847600000, @@ -574,7 +575,10 @@ { "com.linkedin.pegasus2avro.dataset.DatasetProperties": { "customProperties": { + "project": "lkml_samples", + "model": "data", "looker.explore.label": "My Explore View", + "looker.explore.name": "my_view", "looker.explore.file": "test_source_file.lkml" }, "externalUrl": "https://looker.company.com/explore/data/my_view", diff --git a/metadata-ingestion/tests/integration/looker/golden_test_ingest_joins.json b/metadata-ingestion/tests/integration/looker/golden_test_ingest_joins.json index 153db363c7828..3a2c6359ea63c 100644 --- a/metadata-ingestion/tests/integration/looker/golden_test_ingest_joins.json +++ b/metadata-ingestion/tests/integration/looker/golden_test_ingest_joins.json @@ -202,6 +202,7 @@ "urn:li:chart:(looker,dashboard_elements.2)" ], "datasets": [], + "dashboards": [], "lastModified": { "created": { "time": 1586847600000, @@ -520,7 +521,10 @@ { "com.linkedin.pegasus2avro.dataset.DatasetProperties": { "customProperties": { + "project": "lkml_samples", + "model": "data", "looker.explore.label": "My Explore View", + "looker.explore.name": "my_view", "looker.explore.file": "test_source_file.lkml" }, "externalUrl": "https://looker.company.com/explore/data/my_view", diff --git a/metadata-ingestion/tests/integration/looker/golden_test_ingest_unaliased_joins.json b/metadata-ingestion/tests/integration/looker/golden_test_ingest_unaliased_joins.json index 98adbdc5b829e..007eee348aeaf 100644 --- a/metadata-ingestion/tests/integration/looker/golden_test_ingest_unaliased_joins.json +++ b/metadata-ingestion/tests/integration/looker/golden_test_ingest_unaliased_joins.json @@ -11,6 +11,7 @@ "description": "lorem ipsum", "charts": [], "datasets": [], + "dashboards": [], "lastModified": { "created": { "time": 1586847600000, @@ -282,7 +283,10 @@ { "com.linkedin.pegasus2avro.dataset.DatasetProperties": { "customProperties": { + "project": "lkml_samples", + "model": "data", "looker.explore.label": "My Explore View", + "looker.explore.name": "my_view", "looker.explore.file": "test_source_file.lkml" }, "externalUrl": "https://looker.company.com/explore/data/my_view", diff --git a/metadata-ingestion/tests/integration/looker/golden_test_non_personal_independent_look.json b/metadata-ingestion/tests/integration/looker/golden_test_non_personal_independent_look.json index 63ffdda8c5b6f..859b9163d7aad 100644 --- a/metadata-ingestion/tests/integration/looker/golden_test_non_personal_independent_look.json +++ b/metadata-ingestion/tests/integration/looker/golden_test_non_personal_independent_look.json @@ -210,6 +210,7 @@ "urn:li:chart:(looker,dashboard_elements.2)" ], "datasets": [], + "dashboards": [], "lastModified": { "created": { "time": 1586847600000, @@ -783,7 +784,10 @@ { "com.linkedin.pegasus2avro.dataset.DatasetProperties": { "customProperties": { + "project": "lkml_samples", + "model": "data", "looker.explore.label": "My Explore View", + "looker.explore.name": "my_view", "looker.explore.file": "test_source_file.lkml" }, "externalUrl": "https://looker.company.com/explore/data/my_view", @@ -959,7 +963,10 @@ { "com.linkedin.pegasus2avro.dataset.DatasetProperties": { "customProperties": { + "project": "lkml_samples", + "model": "sales_model", "looker.explore.label": "My Explore View", + "looker.explore.name": "sales_explore", "looker.explore.file": "test_source_file.lkml" }, "externalUrl": "https://looker.company.com/explore/sales_model/sales_explore", diff --git a/metadata-ingestion/tests/integration/looker/looker_mces_golden_deleted_stateful.json b/metadata-ingestion/tests/integration/looker/looker_mces_golden_deleted_stateful.json index 567ab78a14754..8256c984afb27 100644 --- a/metadata-ingestion/tests/integration/looker/looker_mces_golden_deleted_stateful.json +++ b/metadata-ingestion/tests/integration/looker/looker_mces_golden_deleted_stateful.json @@ -210,6 +210,7 @@ "urn:li:chart:(looker,dashboard_elements.2)" ], "datasets": [], + "dashboards": [], "lastModified": { "created": { "time": 1586847600000, @@ -539,7 +540,10 @@ { "com.linkedin.pegasus2avro.dataset.DatasetProperties": { "customProperties": { + "project": "lkml_samples", + "model": "data", "looker.explore.label": "My Explore View", + "looker.explore.name": "my_view", "looker.explore.file": "test_source_file.lkml" }, "externalUrl": "https://looker.company.com/explore/data/my_view", @@ -810,8 +814,8 @@ } }, { - "entityType": "chart", - "entityUrn": "urn:li:chart:(looker,dashboard_elements.10)", + "entityType": "dashboard", + "entityUrn": "urn:li:dashboard:(looker,dashboards.11)", "changeType": "UPSERT", "aspectName": "status", "aspect": { @@ -827,8 +831,8 @@ } }, { - "entityType": "container", - "entityUrn": "urn:li:container:621eb6e00da9abece0f64522f81be0e7", + "entityType": "chart", + "entityUrn": "urn:li:chart:(looker,dashboard_elements.10)", "changeType": "UPSERT", "aspectName": "status", "aspect": { @@ -844,8 +848,8 @@ } }, { - "entityType": "dashboard", - "entityUrn": "urn:li:dashboard:(looker,dashboards.11)", + "entityType": "container", + "entityUrn": "urn:li:container:621eb6e00da9abece0f64522f81be0e7", "changeType": "UPSERT", "aspectName": "status", "aspect": { diff --git a/metadata-ingestion/tests/integration/looker/looker_mces_usage_history.json b/metadata-ingestion/tests/integration/looker/looker_mces_usage_history.json index 3befb62a631de..0b3530f9c2462 100644 --- a/metadata-ingestion/tests/integration/looker/looker_mces_usage_history.json +++ b/metadata-ingestion/tests/integration/looker/looker_mces_usage_history.json @@ -11,6 +11,7 @@ "description": "lorem ipsum", "charts": [], "datasets": [], + "dashboards": [], "lastModified": { "created": { "time": 1586847600000, @@ -234,7 +235,10 @@ { "com.linkedin.pegasus2avro.dataset.DatasetProperties": { "customProperties": { + "project": "lkml_samples", + "model": "data", "looker.explore.label": "My Explore View", + "looker.explore.name": "my_view", "looker.explore.file": "test_source_file.lkml" }, "externalUrl": "https://looker.company.com/explore/data/my_view", From 8f7f2c17242fb084f8784f015f76808bfda9b593 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Fri, 18 Oct 2024 14:29:03 -0700 Subject: [PATCH 5/9] feat(ingest/fivetran): protect against high sync volume (#11589) --- .../ingestion/source/fivetran/fivetran.py | 21 ++++--- .../source/fivetran/fivetran_log_api.py | 63 ++++++++++--------- .../source/fivetran/fivetran_query.py | 34 +++++++--- .../integration/fivetran/test_fivetran.py | 58 ++++------------- 4 files changed, 87 insertions(+), 89 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py index 704a6f20a5c19..334bb58ea84f8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py @@ -27,7 +27,10 @@ PlatformDetail, ) from datahub.ingestion.source.fivetran.data_classes import Connector, Job -from datahub.ingestion.source.fivetran.fivetran_log_api import FivetranLogAPI +from datahub.ingestion.source.fivetran.fivetran_log_api import ( + MAX_JOBS_PER_CONNECTOR, + FivetranLogAPI, +) from datahub.ingestion.source.state.stale_entity_removal_handler import ( StaleEntityRemovalHandler, ) @@ -72,11 +75,6 @@ def __init__(self, config: FivetranSourceConfig, ctx: PipelineContext): self.audit_log = FivetranLogAPI(self.config.fivetran_log_config) - # Create and register the stateful ingestion use-case handler. - self.stale_entity_removal_handler = StaleEntityRemovalHandler.create( - self, self.config, self.ctx - ) - def _extend_lineage(self, connector: Connector, datajob: DataJob) -> None: input_dataset_urn_list: List[DatasetUrn] = [] output_dataset_urn_list: List[DatasetUrn] = [] @@ -267,6 +265,13 @@ def _get_connector_workunits( ).as_workunit(is_primary_source=False) # Map Fivetran's job/sync history entity with Datahub's data process entity + if len(connector.jobs) >= MAX_JOBS_PER_CONNECTOR: + self.report.warning( + title="Not all sync history was captured", + message=f"The connector had more than {MAX_JOBS_PER_CONNECTOR} sync runs in the past {self.config.history_sync_lookback_period} days. " + f"Only the most recent {MAX_JOBS_PER_CONNECTOR} syncs were ingested.", + context=f"{connector.connector_name} (connector_id: {connector.connector_id})", + ) for job in connector.jobs: dpi = self._generate_dpi_from_job(job, datajob) yield from self._get_dpi_workunits(job, dpi) @@ -279,7 +284,9 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> Source: def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: return [ *super().get_workunit_processors(), - self.stale_entity_removal_handler.workunit_processor, + StaleEntityRemovalHandler.create( + self, self.config, self.ctx + ).workunit_processor, ] def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py index 31c16139066e4..5908efe39e2b4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py @@ -22,6 +22,10 @@ logger: logging.Logger = logging.getLogger(__name__) +# We don't want to generate a massive number of dataProcesses for a single connector. +# This is primarily used as a safeguard to prevent performance issues. +MAX_JOBS_PER_CONNECTOR = 1000 + class FivetranLogAPI: def __init__(self, fivetran_log_config: FivetranLogConfig) -> None: @@ -158,34 +162,32 @@ def _get_table_lineage( return table_lineage_list - def _get_all_connector_sync_logs(self, syncs_interval: int) -> Dict[str, Dict]: - sync_logs = {} - for row in self._query( - self.fivetran_log_query.get_sync_logs_query().format( - db_clause=self.fivetran_log_query.db_clause, - syncs_interval=syncs_interval, - ) - ): - if row[Constant.CONNECTOR_ID] not in sync_logs: - sync_logs[row[Constant.CONNECTOR_ID]] = { - row[Constant.SYNC_ID]: { - row["message_event"]: ( - row[Constant.TIME_STAMP].timestamp(), - row[Constant.MESSAGE_DATA], - ) - } - } - elif row[Constant.SYNC_ID] not in sync_logs[row[Constant.CONNECTOR_ID]]: - sync_logs[row[Constant.CONNECTOR_ID]][row[Constant.SYNC_ID]] = { - row["message_event"]: ( - row[Constant.TIME_STAMP].timestamp(), - row[Constant.MESSAGE_DATA], - ) - } - else: - sync_logs[row[Constant.CONNECTOR_ID]][row[Constant.SYNC_ID]][ - row["message_event"] - ] = (row[Constant.TIME_STAMP].timestamp(), row[Constant.MESSAGE_DATA]) + def _get_all_connector_sync_logs( + self, syncs_interval: int, connector_ids: List[str] + ) -> Dict[str, Dict[str, Dict[str, Tuple[float, Optional[str]]]]]: + sync_logs: Dict[str, Dict[str, Dict[str, Tuple[float, Optional[str]]]]] = {} + + # Format connector_ids as a comma-separated string of quoted IDs + formatted_connector_ids = ", ".join(f"'{id}'" for id in connector_ids) + + query = self.fivetran_log_query.get_sync_logs_query().format( + db_clause=self.fivetran_log_query.db_clause, + syncs_interval=syncs_interval, + max_jobs_per_connector=MAX_JOBS_PER_CONNECTOR, + connector_ids=formatted_connector_ids, + ) + + for row in self._query(query): + connector_id = row[Constant.CONNECTOR_ID] + sync_id = row[Constant.SYNC_ID] + + if connector_id not in sync_logs: + sync_logs[connector_id] = {} + + sync_logs[connector_id][sync_id] = { + "sync_start": (row["start_time"].timestamp(), None), + "sync_end": (row["end_time"].timestamp(), row["end_message_data"]), + } return sync_logs @@ -244,7 +246,10 @@ def _fill_connectors_table_lineage(self, connectors: List[Connector]) -> None: def _fill_connectors_jobs( self, connectors: List[Connector], syncs_interval: int ) -> None: - sync_logs = self._get_all_connector_sync_logs(syncs_interval) + connector_ids = [connector.connector_id for connector in connectors] + sync_logs = self._get_all_connector_sync_logs( + syncs_interval, connector_ids=connector_ids + ) for connector in connectors: connector.jobs = self._get_jobs_list(sync_logs.get(connector.connector_id)) diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py index d965f53ff554b..c4680b4b1037a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py @@ -37,14 +37,32 @@ def get_users_query(self) -> str: def get_sync_logs_query(self) -> str: return """ - SELECT connector_id, - sync_id, - message_event, - message_data, - time_stamp - FROM {db_clause}log - WHERE message_event in ('sync_start', 'sync_end') - and time_stamp > CURRENT_TIMESTAMP - INTERVAL '{syncs_interval} days'""" + WITH ranked_syncs AS ( + SELECT + connector_id, + sync_id, + MAX(CASE WHEN message_event = 'sync_start' THEN time_stamp END) as start_time, + MAX(CASE WHEN message_event = 'sync_end' THEN time_stamp END) as end_time, + MAX(CASE WHEN message_event = 'sync_end' THEN message_data END) as end_message_data, + ROW_NUMBER() OVER (PARTITION BY connector_id ORDER BY MAX(time_stamp) DESC) as rn + FROM {db_clause}log + WHERE message_event in ('sync_start', 'sync_end') + AND time_stamp > CURRENT_TIMESTAMP - INTERVAL '{syncs_interval} days' + AND connector_id IN ({connector_ids}) + GROUP BY connector_id, sync_id + ) + SELECT + connector_id, + sync_id, + start_time, + end_time, + end_message_data + FROM ranked_syncs + WHERE rn <= {max_jobs_per_connector} + AND start_time IS NOT NULL + AND end_time IS NOT NULL + ORDER BY connector_id, end_time DESC + """ def get_table_lineage_query(self) -> str: return f""" diff --git a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py index 0f5d098ee39c4..33ac09e69a3c0 100644 --- a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py +++ b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py @@ -101,64 +101,32 @@ def default_query_results( } ] elif query == fivetran_log_query.get_sync_logs_query().format( - db_clause=fivetran_log_query.db_clause, syncs_interval=7 + db_clause=fivetran_log_query.db_clause, + syncs_interval=7, + max_jobs_per_connector=1000, + connector_ids="'calendar_elected'", ): return [ { "connector_id": "calendar_elected", "sync_id": "4c9a03d6-eded-4422-a46a-163266e58243", - "message_event": "sync_start", - "message_data": None, - "time_stamp": datetime.datetime(2023, 9, 20, 6, 37, 32, 606000), + "start_time": datetime.datetime(2023, 9, 20, 6, 37, 32, 606000), + "end_time": datetime.datetime(2023, 9, 20, 6, 38, 5, 56000), + "end_message_data": '"{\\"status\\":\\"SUCCESSFUL\\"}"', }, { "connector_id": "calendar_elected", "sync_id": "f773d1e9-c791-48f4-894f-8cf9b3dfc834", - "message_event": "sync_start", - "message_data": None, - "time_stamp": datetime.datetime(2023, 10, 3, 14, 35, 30, 345000), + "start_time": datetime.datetime(2023, 10, 3, 14, 35, 30, 345000), + "end_time": datetime.datetime(2023, 10, 3, 14, 35, 31, 512000), + "end_message_data": '"{\\"reason\\":\\"Sync has been cancelled because of a user action in the dashboard.Standard Config updated.\\",\\"status\\":\\"CANCELED\\"}"', }, { "connector_id": "calendar_elected", "sync_id": "63c2fc85-600b-455f-9ba0-f576522465be", - "message_event": "sync_start", - "message_data": None, - "time_stamp": datetime.datetime(2023, 10, 3, 14, 35, 55, 401000), - }, - { - "connector_id": "calendar_elected", - "sync_id": "e773e1e9-c791-46f4-894f-8ch9b3dfc832", - "message_event": "sync_start", - "message_data": None, - "time_stamp": datetime.datetime(2023, 10, 3, 14, 37, 5, 403000), - }, - { - "connector_id": "calendar_elected", - "sync_id": "4c9a03d6-eded-4422-a46a-163266e58243", - "message_event": "sync_end", - "message_data": '"{\\"status\\":\\"SUCCESSFUL\\"}"', - "time_stamp": datetime.datetime(2023, 9, 20, 6, 38, 5, 56000), - }, - { - "connector_id": "calendar_elected", - "sync_id": "f773d1e9-c791-48f4-894f-8cf9b3dfc834", - "message_event": "sync_end", - "message_data": '"{\\"reason\\":\\"Sync has been cancelled because of a user action in the dashboard.Standard Config updated.\\",\\"status\\":\\"CANCELED\\"}"', - "time_stamp": datetime.datetime(2023, 10, 3, 14, 35, 31, 512000), - }, - { - "connector_id": "calendar_elected", - "sync_id": "63c2fc85-600b-455f-9ba0-f576522465be", - "message_event": "sync_end", - "message_data": '"{\\"reason\\":\\"java.lang.RuntimeException: FATAL: too many connections for role \\\\\\"hxwraqld\\\\\\"\\",\\"taskType\\":\\"reconnect\\",\\"status\\":\\"FAILURE_WITH_TASK\\"}"', - "time_stamp": datetime.datetime(2023, 10, 3, 14, 36, 29, 678000), - }, - { - "connector_id": "calendar_elected", - "sync_id": "e773e1e9-c791-46f4-894f-8ch9b3dfc832", - "message_event": "sync_end", - "message_data": None, - "time_stamp": datetime.datetime(2023, 10, 3, 14, 37, 35, 478000), + "start_time": datetime.datetime(2023, 10, 3, 14, 35, 55, 401000), + "end_time": datetime.datetime(2023, 10, 3, 14, 36, 29, 678000), + "end_message_data": '"{\\"reason\\":\\"java.lang.RuntimeException: FATAL: too many connections for role \\\\\\"hxwraqld\\\\\\"\\",\\"taskType\\":\\"reconnect\\",\\"status\\":\\"FAILURE_WITH_TASK\\"}"', }, ] # Unreachable code From 3b1b76244dfc6121e866f9d8803f1491ee6f5cec Mon Sep 17 00:00:00 2001 From: Shirshanka Das Date: Sat, 19 Oct 2024 14:53:28 -0700 Subject: [PATCH 6/9] feat(sdk):platform-resource - complex queries (#11675) --- .../platformresource/platform_resource.py | 193 ++++++------ .../src/datahub/utilities/openapi_utils.py | 69 +++++ .../src/datahub/utilities/search_utils.py | 285 ++++++++++++++++++ .../test_platform_resource.py | 15 + .../tests/unit/utilities/test_search_utils.py | 71 +++++ .../test_platform_resource.py | 78 ++++- 6 files changed, 617 insertions(+), 94 deletions(-) create mode 100644 metadata-ingestion/src/datahub/utilities/openapi_utils.py create mode 100644 metadata-ingestion/src/datahub/utilities/search_utils.py create mode 100644 metadata-ingestion/tests/unit/utilities/test_search_utils.py diff --git a/metadata-ingestion/src/datahub/api/entities/platformresource/platform_resource.py b/metadata-ingestion/src/datahub/api/entities/platformresource/platform_resource.py index 349b0ff11d84f..0f7b10a067053 100644 --- a/metadata-ingestion/src/datahub/api/entities/platformresource/platform_resource.py +++ b/metadata-ingestion/src/datahub/api/entities/platformresource/platform_resource.py @@ -1,5 +1,5 @@ import logging -from typing import Dict, Iterable, List, Optional, Union +from typing import Callable, Dict, Iterable, List, Optional, Tuple, Type, Union, cast from avrogen.dict_wrapper import DictWrapper from pydantic import BaseModel @@ -14,7 +14,14 @@ from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp_builder import DatahubKey from datahub.ingestion.graph.client import DataHubGraph -from datahub.metadata.urns import PlatformResourceUrn +from datahub.metadata.urns import DataPlatformUrn, PlatformResourceUrn, Urn +from datahub.utilities.openapi_utils import OpenAPIGraphClient +from datahub.utilities.search_utils import ( + ElasticDocumentQuery, + ElasticsearchQueryBuilder, + LogicalOperator, + SearchField, +) logger = logging.getLogger(__name__) @@ -69,71 +76,75 @@ def to_resource_info(self) -> models.PlatformResourceInfoClass: ) -class OpenAPIGraphClient: +class DataPlatformInstanceUrn: + """ + A simple implementation of a URN class for DataPlatformInstance. + Since this is not present in the URN registry, we need to implement it here. + """ - ENTITY_KEY_ASPECT_MAP = { - aspect_type.ASPECT_INFO.get("keyForEntity"): name - for name, aspect_type in models.ASPECT_NAME_MAP.items() - if aspect_type.ASPECT_INFO.get("keyForEntity") - } + @staticmethod + def create_from_id(platform_instance_urn: str) -> Urn: + if platform_instance_urn.startswith("urn:li:platformInstance:"): + string_urn = platform_instance_urn + else: + string_urn = f"urn:li:platformInstance:{platform_instance_urn}" + return Urn.from_string(string_urn) - def __init__(self, graph: DataHubGraph): - self.graph = graph - self.openapi_base = graph._gms_server.rstrip("/") + "/openapi/v3" - def scroll_urns_by_filter( - self, - entity_type: str, - extra_or_filters: List[Dict[str, str]], - extra_and_filters: List[Dict[str, str]] = [], - ) -> Iterable[str]: - """ - Scroll through all urns that match the given filters - """ +class UrnSearchField(SearchField): + """ + A search field that supports URN values. + TODO: Move this to search_utils after we make this more generic. + """ - key_aspect = self.ENTITY_KEY_ASPECT_MAP.get(entity_type) - assert key_aspect, f"No key aspect found for entity type {entity_type}" - if extra_or_filters and extra_and_filters: - raise ValueError( - "Only one of extra_or_filters and extra_and_filters should be provided" - ) + def __init__(self, field_name: str, urn_value_extractor: Callable[[str], Urn]): + self.urn_value_extractor = urn_value_extractor + super().__init__(field_name) - count = 1000 - query = ( - " OR ".join( - [ - f"{filter['field']}:\"{filter['value']}\"" - for filter in extra_or_filters - ] - ) - if extra_or_filters - else " AND ".join( - [ - f"{filter['field']}:\"{filter['value']}\"" - for filter in extra_and_filters - ] - ) + def get_search_value(self, value: str) -> str: + return str(self.urn_value_extractor(value)) + + +class PlatformResourceSearchField(SearchField): + def __init__(self, field_name: str): + super().__init__(field_name) + + @classmethod + def from_search_field( + cls, search_field: SearchField + ) -> "PlatformResourceSearchField": + # pretends to be a class method, but just returns the input + return search_field # type: ignore + + +class PlatformResourceSearchFields: + PRIMARY_KEY = PlatformResourceSearchField("primaryKey") + RESOURCE_TYPE = PlatformResourceSearchField("resourceType") + SECONDARY_KEYS = PlatformResourceSearchField("secondaryKeys") + PLATFORM = PlatformResourceSearchField.from_search_field( + UrnSearchField( + field_name="platform.keyword", + urn_value_extractor=DataPlatformUrn.create_from_id, ) - scroll_id = None - while True: - response = self.graph._get_generic( - self.openapi_base + f"/entity/{entity_type.lower()}", - params={ - "systemMetadata": "false", - "includeSoftDelete": "false", - "skipCache": "false", - "aspects": [key_aspect], - "scrollId": scroll_id, - "count": count, - "query": query, - }, - ) - entities = response.get("entities", []) - scroll_id = response.get("scrollId") - for entity in entities: - yield entity["urn"] - if not scroll_id: - break + ) + PLATFORM_INSTANCE = PlatformResourceSearchField.from_search_field( + UrnSearchField( + field_name="platformInstance.keyword", + urn_value_extractor=DataPlatformInstanceUrn.create_from_id, + ) + ) + + +class ElasticPlatformResourceQuery(ElasticDocumentQuery[PlatformResourceSearchField]): + def __init__(self): + super().__init__() + + @classmethod + def create_from( + cls: Type["ElasticPlatformResourceQuery"], + *args: Tuple[Union[str, PlatformResourceSearchField], str], + ) -> "ElasticPlatformResourceQuery": + return cast(ElasticPlatformResourceQuery, super().create_from(*args)) class PlatformResource(BaseModel): @@ -147,6 +158,12 @@ def remove( cls, key: PlatformResourceKey, ) -> "PlatformResource": + """ + Creates a PlatformResource object with the removed status set to True. + Removed PlatformResource objects are used to soft-delete resources from + the graph. + To hard-delete a resource, use the delete method. + """ return cls( id=key.id, removed=True, @@ -240,28 +257,38 @@ def from_datahub( @staticmethod def search_by_key( - graph_client: DataHubGraph, key: str, primary: bool = True + graph_client: DataHubGraph, + key: str, + primary: bool = True, + is_exact: bool = True, ) -> Iterable["PlatformResource"]: - extra_or_filters = [] - extra_or_filters.append( - { - "field": "primaryKey", - "condition": "EQUAL", - "value": key, - } + """ + Searches for PlatformResource entities by primary or secondary key. + + :param graph_client: DataHubGraph client + :param key: The key to search for + :param primary: Whether to search for primary only or expand the search + to secondary keys (default: True) + :param is_exact: Whether to search for an exact match (default: True) + :return: An iterable of PlatformResource objects + """ + + elastic_platform_resource_group = ( + ElasticPlatformResourceQuery.create_from() + .group(LogicalOperator.OR) + .add_field_match( + PlatformResourceSearchFields.PRIMARY_KEY, key, is_exact=is_exact + ) ) if not primary: # we expand the search to secondary keys - extra_or_filters.append( - { - "field": "secondaryKeys", - "condition": "EQUAL", - "value": key, - } + elastic_platform_resource_group.add_field_match( + PlatformResourceSearchFields.SECONDARY_KEYS, key, is_exact=is_exact ) + query = elastic_platform_resource_group.end() openapi_client = OpenAPIGraphClient(graph_client) for urn in openapi_client.scroll_urns_by_filter( entity_type="platformResource", - extra_or_filters=extra_or_filters, + query=query, ): platform_resource = PlatformResource.from_datahub(graph_client, urn) if platform_resource: @@ -273,18 +300,16 @@ def delete(self, graph_client: DataHubGraph, hard: bool = True) -> None: @staticmethod def search_by_filters( graph_client: DataHubGraph, - and_filters: List[Dict[str, str]] = [], - or_filters: List[Dict[str, str]] = [], + query: Union[ + ElasticPlatformResourceQuery, + ElasticDocumentQuery, + ElasticsearchQueryBuilder, + ], ) -> Iterable["PlatformResource"]: - if and_filters and or_filters: - raise ValueError( - "Only one of and_filters and or_filters should be provided" - ) openapi_client = OpenAPIGraphClient(graph_client) for urn in openapi_client.scroll_urns_by_filter( entity_type="platformResource", - extra_or_filters=or_filters if or_filters else [], - extra_and_filters=and_filters if and_filters else [], + query=query, ): platform_resource = PlatformResource.from_datahub(graph_client, urn) if platform_resource: diff --git a/metadata-ingestion/src/datahub/utilities/openapi_utils.py b/metadata-ingestion/src/datahub/utilities/openapi_utils.py new file mode 100644 index 0000000000000..e704ff7f84cbb --- /dev/null +++ b/metadata-ingestion/src/datahub/utilities/openapi_utils.py @@ -0,0 +1,69 @@ +import logging +from typing import Iterable, Union + +import datahub.metadata.schema_classes as models +from datahub.ingestion.graph.client import DataHubGraph +from datahub.utilities.search_utils import ( + ElasticDocumentQuery, + ElasticsearchQueryBuilder, +) + +logger = logging.getLogger(__name__) + + +class OpenAPIGraphClient: + """ + An experimental client for the DataHubGraph that uses the OpenAPI endpoints + to query entities and aspects. + Does not support all features of the DataHubGraph. + API is subject to change. + + DO NOT USE THIS UNLESS YOU KNOW WHAT YOU ARE DOING. + """ + + ENTITY_KEY_ASPECT_MAP = { + aspect_type.ASPECT_INFO.get("keyForEntity"): name + for name, aspect_type in models.ASPECT_NAME_MAP.items() + if aspect_type.ASPECT_INFO.get("keyForEntity") + } + + def __init__(self, graph: DataHubGraph): + self.graph = graph + self.openapi_base = graph._gms_server.rstrip("/") + "/openapi/v3" + + def scroll_urns_by_filter( + self, + entity_type: str, + query: Union[ElasticDocumentQuery, ElasticsearchQueryBuilder], + ) -> Iterable[str]: + """ + Scroll through all urns that match the given filters. + + """ + + key_aspect = self.ENTITY_KEY_ASPECT_MAP.get(entity_type) + assert key_aspect, f"No key aspect found for entity type {entity_type}" + + count = 1000 + string_query = query.build() + scroll_id = None + logger.debug(f"Scrolling with query: {string_query}") + while True: + response = self.graph._get_generic( + self.openapi_base + f"/entity/{entity_type.lower()}", + params={ + "systemMetadata": "false", + "includeSoftDelete": "false", + "skipCache": "false", + "aspects": [key_aspect], + "scrollId": scroll_id, + "count": count, + "query": string_query, + }, + ) + entities = response.get("entities", []) + scroll_id = response.get("scrollId") + for entity in entities: + yield entity["urn"] + if not scroll_id: + break diff --git a/metadata-ingestion/src/datahub/utilities/search_utils.py b/metadata-ingestion/src/datahub/utilities/search_utils.py new file mode 100644 index 0000000000000..0bd88addd8660 --- /dev/null +++ b/metadata-ingestion/src/datahub/utilities/search_utils.py @@ -0,0 +1,285 @@ +import logging +import re +from enum import Enum +from typing import Generic, List, Optional, Tuple, Type, TypeVar, Union + +logger = logging.getLogger(__name__) + + +class LogicalOperator(Enum): + AND = "AND" + OR = "OR" + + +class SearchField: + def __init__(self, field_name: str): + self.field_name = field_name + + def get_search_value(self, value: str) -> str: + return value + + def __str__(self) -> str: + return self.field_name + + def __repr__(self) -> str: + return self.__str__() + + @classmethod + def from_string_field(cls, field_name: str) -> "SearchField": + return cls(field_name) + + +class QueryNode: + def __init__(self, operator: Optional[LogicalOperator] = None): + self.operator = operator + self.children: List[Union[QueryNode, str]] = [] + + def add_child(self, child: Union["QueryNode", str]) -> None: + self.children.append(child) + + def build(self) -> str: + if not self.children: + return "" + + if self.operator is None: + return ( + self.children[0] + if isinstance(self.children[0], str) + else self.children[0].build() + ) + + child_queries = [] + for child in self.children: + if isinstance(child, str): + child_queries.append(child) + else: + child_queries.append(child.build()) + + joined_queries = f" {self.operator.value} ".join(child_queries) + return f"({joined_queries})" if len(child_queries) > 1 else joined_queries + + +class ElasticsearchQueryBuilder: + SPECIAL_CHARACTERS = r'+-=&|> None: + self.root = QueryNode(operator=operator) + + @classmethod + def escape_special_characters(cls, value: str) -> str: + """ + Escape special characters in the search term. + """ + return re.sub(f"([{re.escape(cls.SPECIAL_CHARACTERS)}])", r"\\\1", value) + + def _create_term( + self, field: SearchField, value: str, is_exact: bool = False + ) -> str: + escaped_value = self.escape_special_characters(field.get_search_value(value)) + field_name: str = field.field_name + if is_exact: + return f'{field_name}:"{escaped_value}"' + return f"{field_name}:{escaped_value}" + + def add_field_match( + self, field: SearchField, value: str, is_exact: bool = True + ) -> "ElasticsearchQueryBuilder": + term = self._create_term(field, value, is_exact) + self.root.add_child(term) + return self + + def add_field_not_match( + self, field: SearchField, value: str, is_exact: bool = True + ) -> "ElasticsearchQueryBuilder": + term = f"-{self._create_term(field, value, is_exact)}" + self.root.add_child(term) + return self + + def add_range( + self, + field: str, + min_value: Optional[str] = None, + max_value: Optional[str] = None, + include_min: bool = True, + include_max: bool = True, + ) -> "ElasticsearchQueryBuilder": + min_bracket = "[" if include_min else "{" + max_bracket = "]" if include_max else "}" + min_val = min_value if min_value is not None else "*" + max_val = max_value if max_value is not None else "*" + range_query = f"{field}:{min_bracket}{min_val} TO {max_val}{max_bracket}" + self.root.add_child(range_query) + return self + + def add_wildcard(self, field: str, pattern: str) -> "ElasticsearchQueryBuilder": + wildcard_query = f"{field}:{pattern}" + self.root.add_child(wildcard_query) + return self + + def add_fuzzy( + self, field: str, value: str, fuzziness: int = 2 + ) -> "ElasticsearchQueryBuilder": + fuzzy_query = f"{field}:{value}~{fuzziness}" + self.root.add_child(fuzzy_query) + return self + + def add_boost( + self, field: str, value: str, boost: float + ) -> "ElasticsearchQueryBuilder": + boosted_query = f"{field}:{value}^{boost}" + self.root.add_child(boosted_query) + return self + + def group(self, operator: LogicalOperator) -> "QueryGroup": + return QueryGroup(self, operator) + + def build(self) -> str: + return self.root.build() + + +class QueryGroup: + def __init__(self, parent: ElasticsearchQueryBuilder, operator: LogicalOperator): + self.parent = parent + self.node = QueryNode(operator) + self.parent.root.add_child(self.node) + + def add_field_match( + self, field: Union[str, SearchField], value: str, is_exact: bool = True + ) -> "QueryGroup": + if isinstance(field, str): + field = SearchField.from_string_field(field) + term = self.parent._create_term(field, value, is_exact) + self.node.add_child(term) + return self + + def add_field_not_match( + self, field: Union[str, SearchField], value: str, is_exact: bool = True + ) -> "QueryGroup": + if isinstance(field, str): + field = SearchField.from_string_field(field) + term = f"-{self.parent._create_term(field, value, is_exact)}" + self.node.add_child(term) + return self + + def add_range( + self, + field: str, + min_value: Optional[str] = None, + max_value: Optional[str] = None, + include_min: bool = True, + include_max: bool = True, + ) -> "QueryGroup": + min_bracket = "[" if include_min else "{" + max_bracket = "]" if include_max else "}" + min_val = min_value if min_value is not None else "*" + max_val = max_value if max_value is not None else "*" + range_query = f"{field}:{min_bracket}{min_val} TO {max_val}{max_bracket}" + self.node.add_child(range_query) + return self + + def add_wildcard(self, field: str, pattern: str) -> "QueryGroup": + wildcard_query = f"{field}:{pattern}" + self.node.add_child(wildcard_query) + return self + + def add_fuzzy(self, field: str, value: str, fuzziness: int = 2) -> "QueryGroup": + fuzzy_query = f"{field}:{value}~{fuzziness}" + self.node.add_child(fuzzy_query) + return self + + def add_boost(self, field: str, value: str, boost: float) -> "QueryGroup": + boosted_query = f"{field}:{value}^{boost}" + self.node.add_child(boosted_query) + return self + + def group(self, operator: LogicalOperator) -> "QueryGroup": + new_group = QueryGroup(self.parent, operator) + self.node.add_child(new_group.node) + return new_group + + def end(self) -> ElasticsearchQueryBuilder: + return self.parent + + +SF = TypeVar("SF", bound=SearchField) + + +class ElasticDocumentQuery(Generic[SF]): + def __init__(self) -> None: + self.query_builder = ElasticsearchQueryBuilder() + + @classmethod + def create_from( + cls: Type["ElasticDocumentQuery[SF]"], + *args: Tuple[Union[str, SF], str], + ) -> "ElasticDocumentQuery[SF]": + instance = cls() + for arg in args: + if isinstance(arg, SearchField): + # If the value is empty, we treat it as a wildcard search + logger.info(f"Adding wildcard search for field {arg}") + instance.add_wildcard(arg, "*") + elif isinstance(arg, tuple) and len(arg) == 2: + field, value = arg + assert isinstance(value, str) + if isinstance(field, SearchField): + instance.add_field_match(field, value) + elif isinstance(field, str): + instance.add_field_match( + SearchField.from_string_field(field), value + ) + else: + raise ValueError("Invalid field type {}".format(type(field))) + return instance + + def add_field_match( + self, field: Union[str, SearchField], value: str, is_exact: bool = True + ) -> "ElasticDocumentQuery": + if isinstance(field, str): + field = SearchField.from_string_field(field) + self.query_builder.add_field_match(field, value, is_exact) + return self + + def add_field_not_match( + self, field: SearchField, value: str, is_exact: bool = True + ) -> "ElasticDocumentQuery": + self.query_builder.add_field_not_match(field, value, is_exact) + return self + + def add_range( + self, + field: SearchField, + min_value: Optional[str] = None, + max_value: Optional[str] = None, + include_min: bool = True, + include_max: bool = True, + ) -> "ElasticDocumentQuery": + field_name: str = field.field_name # type: ignore + self.query_builder.add_range( + field_name, min_value, max_value, include_min, include_max + ) + return self + + def add_wildcard(self, field: SearchField, pattern: str) -> "ElasticDocumentQuery": + field_name: str = field.field_name # type: ignore + self.query_builder.add_wildcard(field_name, pattern) + return self + + def add_fuzzy( + self, field: SearchField, value: str, fuzziness: int = 2 + ) -> "ElasticDocumentQuery": + field_name: str = field.field_name # type: ignore + self.query_builder.add_fuzzy(field_name, value, fuzziness) + return self + + def add_boost( + self, field: SearchField, value: str, boost: float + ) -> "ElasticDocumentQuery": + self.query_builder.add_boost(field.field_name, value, boost) + return self + + def group(self, operator: LogicalOperator) -> QueryGroup: + return self.query_builder.group(operator) + + def build(self) -> str: + return self.query_builder.build() diff --git a/metadata-ingestion/tests/unit/api/entities/platformresource/test_platform_resource.py b/metadata-ingestion/tests/unit/api/entities/platformresource/test_platform_resource.py index e6c9a9466d62b..a84e373dbe72c 100644 --- a/metadata-ingestion/tests/unit/api/entities/platformresource/test_platform_resource.py +++ b/metadata-ingestion/tests/unit/api/entities/platformresource/test_platform_resource.py @@ -4,9 +4,12 @@ import datahub.metadata.schema_classes as models from datahub.api.entities.platformresource.platform_resource import ( + ElasticPlatformResourceQuery, PlatformResource, PlatformResourceKey, + PlatformResourceSearchFields, ) +from datahub.utilities.search_utils import LogicalOperator def test_platform_resource_dict(): @@ -179,3 +182,15 @@ class TestModel(BaseModel): ).encode("utf-8") assert platform_resource_info_mcp.aspect.value.schemaType == "JSON" assert platform_resource_info_mcp.aspect.value.schemaRef == TestModel.__name__ + + +def test_platform_resource_filters(): + + query = ( + ElasticPlatformResourceQuery.create_from() + .group(LogicalOperator.AND) + .add_field_match(PlatformResourceSearchFields.PRIMARY_KEY, "test_1") + .add_field_match(PlatformResourceSearchFields.RESOURCE_TYPE, "server") + .end() + ) + assert query.build() == '(primaryKey:"test_1" AND resourceType:"server")' diff --git a/metadata-ingestion/tests/unit/utilities/test_search_utils.py b/metadata-ingestion/tests/unit/utilities/test_search_utils.py new file mode 100644 index 0000000000000..6fa2e46c7f20e --- /dev/null +++ b/metadata-ingestion/tests/unit/utilities/test_search_utils.py @@ -0,0 +1,71 @@ +from datahub.utilities.search_utils import ( + ElasticDocumentQuery, + LogicalOperator, + SearchField, +) + + +def test_simple_and_filters(): + query = ( + ElasticDocumentQuery.create_from() + .group(LogicalOperator.AND) + .add_field_match("field1", "value1") + .add_field_match("field2", "value2") + .end() + ) + + assert query.build() == '(field1:"value1" AND field2:"value2")' + + +def test_simple_or_filters(): + query = ( + ElasticDocumentQuery.create_from() + .group(LogicalOperator.OR) + .add_field_match("field1", "value1") + .add_field_match("field2", "value2") + .end() + ) + + assert query.build() == '(field1:"value1" OR field2:"value2")' + + # Use SearchFilter to create this query + query = ( + ElasticDocumentQuery.create_from() + .group(LogicalOperator.OR) + .add_field_match(SearchField.from_string_field("field1"), "value1") + .add_field_match(SearchField.from_string_field("field2"), "value2") + .end() + ) + assert query.build() == '(field1:"value1" OR field2:"value2")' + + +def test_simple_field_match(): + query: ElasticDocumentQuery = ElasticDocumentQuery.create_from( + ("field1", "value1:1") + ) + assert query.build() == 'field1:"value1\\:1"' + + # Another way to create the same query + query = ElasticDocumentQuery.create_from() + query.add_field_match("field1", "value1:1") + assert query.build() == 'field1:"value1\\:1"' + + +def test_negation(): + query = ( + ElasticDocumentQuery.create_from() + .group(LogicalOperator.AND) + .add_field_match("field1", "value1") + .add_field_not_match("field2", "value2") + .end() + ) + + assert query.build() == '(field1:"value1" AND -field2:"value2")' + + +def test_multi_arg_create_from(): + query: ElasticDocumentQuery = ElasticDocumentQuery.create_from( + ("field1", "value1"), + ("field2", "value2"), + ) + assert query.build() == '(field1:"value1" AND field2:"value2")' diff --git a/smoke-test/tests/platform_resources/test_platform_resource.py b/smoke-test/tests/platform_resources/test_platform_resource.py index 7ebfd4d6ea15b..39d15f2e8dea6 100644 --- a/smoke-test/tests/platform_resources/test_platform_resource.py +++ b/smoke-test/tests/platform_resources/test_platform_resource.py @@ -5,8 +5,10 @@ import pytest from datahub.api.entities.platformresource.platform_resource import ( + ElasticPlatformResourceQuery, PlatformResource, PlatformResourceKey, + PlatformResourceSearchFields, ) from tests.utils import wait_for_healthcheck_util, wait_for_writes_to_sync @@ -42,7 +44,12 @@ def cleanup_resources(graph_client): logger.warning(f"Failed to delete resource: {e}") # Additional cleanup for any resources that might have been missed - for resource in PlatformResource.search_by_key(graph_client, "test_"): + for resource in PlatformResource.search_by_filters( + graph_client, + ElasticPlatformResourceQuery.create_from().add_wildcard( + PlatformResourceSearchFields.PRIMARY_KEY, "test_*" + ), + ): try: resource.delete(graph_client) except Exception as e: @@ -114,7 +121,7 @@ def test_platform_resource_non_existent(graph_client, test_id): assert platform_resource is None -def test_platform_resource_urn_secondary_key(graph_client, test_id): +def test_platform_resource_urn_secondary_key(graph_client, test_id, cleanup_resources): key = PlatformResourceKey( platform=f"test_platform_{test_id}", resource_type=f"test_resource_type_{test_id}", @@ -129,6 +136,7 @@ def test_platform_resource_urn_secondary_key(graph_client, test_id): secondary_keys=[dataset_urn], ) platform_resource.to_datahub(graph_client) + cleanup_resources.append(platform_resource) wait_for_writes_to_sync() read_platform_resources = [ @@ -141,7 +149,9 @@ def test_platform_resource_urn_secondary_key(graph_client, test_id): assert read_platform_resources[0] == platform_resource -def test_platform_resource_listing_by_resource_type(graph_client, test_id): +def test_platform_resource_listing_by_resource_type( + graph_client, test_id, cleanup_resources +): # Generate two resources with the same resource type key1 = PlatformResourceKey( platform=f"test_platform_{test_id}", @@ -171,13 +181,9 @@ def test_platform_resource_listing_by_resource_type(graph_client, test_id): r for r in PlatformResource.search_by_filters( graph_client, - and_filters=[ - { - "field": "resourceType", - "condition": "EQUAL", - "value": key1.resource_type, - } - ], + query=ElasticPlatformResourceQuery.create_from( + (PlatformResourceSearchFields.RESOURCE_TYPE, key1.resource_type) + ), ) ] assert len(search_results) == 2 @@ -186,3 +192,55 @@ def test_platform_resource_listing_by_resource_type(graph_client, test_id): read_platform_resource_2 = next(r for r in search_results if r.id == key2.id) assert read_platform_resource_1 == platform_resource1 assert read_platform_resource_2 == platform_resource2 + + +def test_platform_resource_listing_complex_queries(graph_client, test_id): + # Generate two resources with the same resource type + key1 = PlatformResourceKey( + platform=f"test_platform1_{test_id}", + resource_type=f"test_resource_type_{test_id}", + primary_key=f"test_primary_key_1_{test_id}", + ) + platform_resource1 = PlatformResource.create( + key=key1, + value={"test_key": f"test_value_1_{test_id}"}, + ) + platform_resource1.to_datahub(graph_client) + + key2 = PlatformResourceKey( + platform=f"test_platform2_{test_id}", + resource_type=f"test_resource_type_{test_id}", + primary_key=f"test_primary_key_2_{test_id}", + ) + platform_resource2 = PlatformResource.create( + key=key2, + value={"test_key": f"test_value_2_{test_id}"}, + ) + platform_resource2.to_datahub(graph_client) + + wait_for_writes_to_sync() + from datahub.api.entities.platformresource.platform_resource import ( + ElasticPlatformResourceQuery, + LogicalOperator, + PlatformResourceSearchFields, + ) + + query = ( + ElasticPlatformResourceQuery.create_from() + .group(LogicalOperator.AND) + .add_field_match(PlatformResourceSearchFields.RESOURCE_TYPE, key1.resource_type) + .add_field_not_match(PlatformResourceSearchFields.PLATFORM, key1.platform) + .end() + ) + + search_results = [ + r + for r in PlatformResource.search_by_filters( + graph_client, + query=query, + ) + ] + assert len(search_results) == 1 + + read_platform_resource = search_results[0] + assert read_platform_resource == platform_resource2 From 2f85bc1ecbd025d9ce0c6d3da644b3f83656755c Mon Sep 17 00:00:00 2001 From: deepgarg-visa <149145061+deepgarg-visa@users.noreply.github.com> Date: Sun, 20 Oct 2024 06:58:32 +0530 Subject: [PATCH 7/9] fix(docs): fix businessattributes doc (#11653) --- docs/businessattributes.md | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/businessattributes.md b/docs/businessattributes.md index 1744f48f879e8..3e912e7e60980 100644 --- a/docs/businessattributes.md +++ b/docs/businessattributes.md @@ -28,7 +28,6 @@ Taking the example of "United States- Social Security Number", if an application What you need to create/update and associate business attributes to dataset schema field * **Manage Business Attributes** platform privilege to create/update/delete business attributes. -* **Edit Dataset Column Business Attribute** metadata privilege to associate business attributes to dataset schema field. ## Using Business Attributes As of now Business Attributes can only be created through UI From dd1a06fb558a2177ea02437ea0aa7172c2ccd781 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Sun, 20 Oct 2024 23:59:45 -0700 Subject: [PATCH 8/9] feat(ingest/fivetran): add safeguards on table/column lineage (#11674) --- .../ingestion/source/fivetran/config.py | 19 +-- .../ingestion/source/fivetran/data_classes.py | 2 +- .../ingestion/source/fivetran/fivetran.py | 23 ++- .../source/fivetran/fivetran_log_api.py | 86 +++++------ .../source/fivetran/fivetran_query.py | 143 +++++++++++------- .../integration/fivetran/test_fivetran.py | 6 +- 6 files changed, 156 insertions(+), 123 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py index 02eb096b240f5..2fb5ffd16ea34 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py @@ -1,6 +1,6 @@ +import dataclasses import logging -from dataclasses import dataclass, field as dataclass_field -from typing import Dict, List, Optional +from typing import Dict, Optional import pydantic from pydantic import Field, root_validator @@ -23,6 +23,7 @@ from datahub.ingestion.source.state.stateful_ingestion_base import ( StatefulIngestionConfigBase, ) +from datahub.utilities.lossy_collections import LossyList from datahub.utilities.perf_timer import PerfTimer logger = logging.getLogger(__name__) @@ -114,24 +115,24 @@ def validate_destination_platfrom_and_config(cls, values: Dict) -> Dict: return values -@dataclass +@dataclasses.dataclass class MetadataExtractionPerfReport(Report): - connectors_metadata_extraction_sec: PerfTimer = dataclass_field( + connectors_metadata_extraction_sec: PerfTimer = dataclasses.field( default_factory=PerfTimer ) - connectors_lineage_extraction_sec: PerfTimer = dataclass_field( + connectors_lineage_extraction_sec: PerfTimer = dataclasses.field( default_factory=PerfTimer ) - connectors_jobs_extraction_sec: PerfTimer = dataclass_field( + connectors_jobs_extraction_sec: PerfTimer = dataclasses.field( default_factory=PerfTimer ) -@dataclass +@dataclasses.dataclass class FivetranSourceReport(StaleEntityRemovalSourceReport): connectors_scanned: int = 0 - filtered_connectors: List[str] = dataclass_field(default_factory=list) - metadata_extraction_perf: MetadataExtractionPerfReport = dataclass_field( + filtered_connectors: LossyList[str] = dataclasses.field(default_factory=LossyList) + metadata_extraction_perf: MetadataExtractionPerfReport = dataclasses.field( default_factory=MetadataExtractionPerfReport ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/data_classes.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/data_classes.py index 18de2b01edd3b..046aa9efe3f59 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/data_classes.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/data_classes.py @@ -24,7 +24,7 @@ class Connector: sync_frequency: int destination_id: str user_id: str - table_lineage: List[TableLineage] + lineage: List[TableLineage] jobs: List["Job"] diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py index 334bb58ea84f8..c27ec57c2e99e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py @@ -27,9 +27,10 @@ PlatformDetail, ) from datahub.ingestion.source.fivetran.data_classes import Connector, Job -from datahub.ingestion.source.fivetran.fivetran_log_api import ( +from datahub.ingestion.source.fivetran.fivetran_log_api import FivetranLogAPI +from datahub.ingestion.source.fivetran.fivetran_query import ( MAX_JOBS_PER_CONNECTOR, - FivetranLogAPI, + MAX_TABLE_LINEAGE_PER_CONNECTOR, ) from datahub.ingestion.source.state.stale_entity_removal_handler import ( StaleEntityRemovalHandler, @@ -106,13 +107,21 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> None: f"Fivetran connector source type: {connector.connector_type} is not supported to mapped with Datahub dataset entity." ) - for table_lineage in connector.table_lineage: + if len(connector.lineage) >= MAX_TABLE_LINEAGE_PER_CONNECTOR: + self.report.warning( + title="Table lineage truncated", + message=f"The connector had more than {MAX_TABLE_LINEAGE_PER_CONNECTOR} table lineage entries. " + f"Only the most recent {MAX_TABLE_LINEAGE_PER_CONNECTOR} entries were ingested.", + context=f"{connector.connector_name} (connector_id: {connector.connector_id})", + ) + + for lineage in connector.lineage: input_dataset_urn = DatasetUrn.create_from_ids( platform_id=source_platform, table_name=( - f"{source_database.lower()}.{table_lineage.source_table}" + f"{source_database.lower()}.{lineage.source_table}" if source_database - else table_lineage.source_table + else lineage.source_table ), env=source_platform_detail.env, platform_instance=source_platform_detail.platform_instance, @@ -121,14 +130,14 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> None: output_dataset_urn = DatasetUrn.create_from_ids( platform_id=self.config.fivetran_log_config.destination_platform, - table_name=f"{self.audit_log.fivetran_log_database.lower()}.{table_lineage.destination_table}", + table_name=f"{self.audit_log.fivetran_log_database.lower()}.{lineage.destination_table}", env=destination_platform_detail.env, platform_instance=destination_platform_detail.platform_instance, ) output_dataset_urn_list.append(output_dataset_urn) if self.config.include_column_lineage: - for column_lineage in table_lineage.column_lineage: + for column_lineage in lineage.column_lineage: fine_grained_lineage.append( FineGrainedLineage( upstreamType=FineGrainedLineageUpstreamType.FIELD_SET, diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py index 5908efe39e2b4..b55c8bbbd607f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py @@ -1,6 +1,7 @@ import functools import json import logging +from collections import defaultdict from typing import Any, Dict, List, Optional, Tuple import sqlglot @@ -22,10 +23,6 @@ logger: logging.Logger = logging.getLogger(__name__) -# We don't want to generate a massive number of dataProcesses for a single connector. -# This is primarily used as a safeguard to prevent performance issues. -MAX_JOBS_PER_CONNECTOR = 1000 - class FivetranLogAPI: def __init__(self, fivetran_log_config: FivetranLogConfig) -> None: @@ -91,55 +88,51 @@ def _query(self, query: str) -> List[Dict]: resp = self.engine.execute(query) return [row for row in resp] - def _get_column_lineage_metadata(self) -> Dict[str, List]: + def _get_column_lineage_metadata(self) -> Dict[Tuple[str, str], List]: """ - Return's dict of column lineage metadata with key as '-' + Returns dict of column lineage metadata with key as (, ) """ - all_column_lineage: Dict[str, List] = {} + all_column_lineage = defaultdict(list) column_lineage_result = self._query( self.fivetran_log_query.get_column_lineage_query() ) for column_lineage in column_lineage_result: - key = f"{column_lineage[Constant.SOURCE_TABLE_ID]}-{column_lineage[Constant.DESTINATION_TABLE_ID]}" - if key not in all_column_lineage: - all_column_lineage[key] = [column_lineage] - else: - all_column_lineage[key].append(column_lineage) - return all_column_lineage + key = ( + column_lineage[Constant.SOURCE_TABLE_ID], + column_lineage[Constant.DESTINATION_TABLE_ID], + ) + all_column_lineage[key].append(column_lineage) + return dict(all_column_lineage) - def _get_connectors_table_lineage_metadata(self) -> Dict[str, List]: + def _get_table_lineage_metadata(self) -> Dict[str, List]: """ - Return's dict of table lineage metadata with key as 'CONNECTOR_ID' + Returns dict of table lineage metadata with key as 'CONNECTOR_ID' """ - connectors_table_lineage_metadata: Dict[str, List] = {} + connectors_table_lineage_metadata = defaultdict(list) table_lineage_result = self._query( self.fivetran_log_query.get_table_lineage_query() ) for table_lineage in table_lineage_result: - if ( + connectors_table_lineage_metadata[ table_lineage[Constant.CONNECTOR_ID] - not in connectors_table_lineage_metadata - ): - connectors_table_lineage_metadata[ - table_lineage[Constant.CONNECTOR_ID] - ] = [table_lineage] - else: - connectors_table_lineage_metadata[ - table_lineage[Constant.CONNECTOR_ID] - ].append(table_lineage) - return connectors_table_lineage_metadata + ].append(table_lineage) + return dict(connectors_table_lineage_metadata) - def _get_table_lineage( + def _extract_connector_lineage( self, - column_lineage_metadata: Dict[str, List], table_lineage_result: Optional[List], + column_lineage_metadata: Dict[Tuple[str, str], List], ) -> List[TableLineage]: table_lineage_list: List[TableLineage] = [] if table_lineage_result is None: return table_lineage_list for table_lineage in table_lineage_result: + # Join the column lineage into the table lineage. column_lineage_result = column_lineage_metadata.get( - f"{table_lineage[Constant.SOURCE_TABLE_ID]}-{table_lineage[Constant.DESTINATION_TABLE_ID]}" + ( + table_lineage[Constant.SOURCE_TABLE_ID], + table_lineage[Constant.DESTINATION_TABLE_ID], + ) ) column_lineage_list: List[ColumnLineage] = [] if column_lineage_result: @@ -152,6 +145,7 @@ def _get_table_lineage( ) for column_lineage in column_lineage_result ] + table_lineage_list.append( TableLineage( source_table=f"{table_lineage[Constant.SOURCE_SCHEMA_NAME]}.{table_lineage[Constant.SOURCE_TABLE_NAME]}", @@ -167,14 +161,9 @@ def _get_all_connector_sync_logs( ) -> Dict[str, Dict[str, Dict[str, Tuple[float, Optional[str]]]]]: sync_logs: Dict[str, Dict[str, Dict[str, Tuple[float, Optional[str]]]]] = {} - # Format connector_ids as a comma-separated string of quoted IDs - formatted_connector_ids = ", ".join(f"'{id}'" for id in connector_ids) - - query = self.fivetran_log_query.get_sync_logs_query().format( - db_clause=self.fivetran_log_query.db_clause, + query = self.fivetran_log_query.get_sync_logs_query( syncs_interval=syncs_interval, - max_jobs_per_connector=MAX_JOBS_PER_CONNECTOR, - connector_ids=formatted_connector_ids, + connector_ids=connector_ids, ) for row in self._query(query): @@ -234,13 +223,13 @@ def get_user_email(self, user_id: str) -> Optional[str]: return None return self._get_users().get(user_id) - def _fill_connectors_table_lineage(self, connectors: List[Connector]) -> None: - table_lineage_metadata = self._get_connectors_table_lineage_metadata() + def _fill_connectors_lineage(self, connectors: List[Connector]) -> None: + table_lineage_metadata = self._get_table_lineage_metadata() column_lineage_metadata = self._get_column_lineage_metadata() for connector in connectors: - connector.table_lineage = self._get_table_lineage( - column_lineage_metadata=column_lineage_metadata, + connector.lineage = self._extract_connector_lineage( table_lineage_result=table_lineage_metadata.get(connector.connector_id), + column_lineage_metadata=column_lineage_metadata, ) def _fill_connectors_jobs( @@ -262,6 +251,7 @@ def get_allowed_connectors_list( ) -> List[Connector]: connectors: List[Connector] = [] with report.metadata_extraction_perf.connectors_metadata_extraction_sec: + logger.info("Fetching connector list") connector_list = self._query(self.fivetran_log_query.get_connectors_query()) for connector in connector_list: if not connector_patterns.allowed(connector[Constant.CONNECTOR_NAME]): @@ -279,12 +269,20 @@ def get_allowed_connectors_list( sync_frequency=connector[Constant.SYNC_FREQUENCY], destination_id=connector[Constant.DESTINATION_ID], user_id=connector[Constant.CONNECTING_USER_ID], - table_lineage=[], - jobs=[], + lineage=[], # filled later + jobs=[], # filled later ) ) + + if not connectors: + # Some of our queries don't work well when there's no connectors, since + # we push down connector id filters. + return [] + with report.metadata_extraction_perf.connectors_lineage_extraction_sec: - self._fill_connectors_table_lineage(connectors) + logger.info("Fetching connector lineage") + self._fill_connectors_lineage(connectors) with report.metadata_extraction_perf.connectors_jobs_extraction_sec: + logger.info("Fetching connector job run history") self._fill_connectors_jobs(connectors, syncs_interval) return connectors diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py index c4680b4b1037a..c9e329b706768 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py @@ -1,3 +1,11 @@ +from typing import List + +# Safeguards to prevent fetching massive amounts of data. +MAX_TABLE_LINEAGE_PER_CONNECTOR = 100 +MAX_COLUMN_LINEAGE_PER_CONNECTOR = 3000 +MAX_JOBS_PER_CONNECTOR = 1000 + + class FivetranLogQuery: # Note: All queries are written in Snowflake SQL. # They will be transpiled to the target database's SQL dialect at runtime. @@ -24,69 +32,88 @@ def get_connectors_query(self) -> str: destination_id FROM {self.db_clause}connector WHERE - _fivetran_deleted = FALSE\ + _fivetran_deleted = FALSE """ def get_users_query(self) -> str: - return f""" - SELECT id as user_id, - given_name, - family_name, - email - FROM {self.db_clause}user""" + return f"""\ +SELECT id as user_id, +given_name, +family_name, +email +FROM {self.db_clause}user +""" - def get_sync_logs_query(self) -> str: - return """ - WITH ranked_syncs AS ( - SELECT - connector_id, - sync_id, - MAX(CASE WHEN message_event = 'sync_start' THEN time_stamp END) as start_time, - MAX(CASE WHEN message_event = 'sync_end' THEN time_stamp END) as end_time, - MAX(CASE WHEN message_event = 'sync_end' THEN message_data END) as end_message_data, - ROW_NUMBER() OVER (PARTITION BY connector_id ORDER BY MAX(time_stamp) DESC) as rn - FROM {db_clause}log - WHERE message_event in ('sync_start', 'sync_end') - AND time_stamp > CURRENT_TIMESTAMP - INTERVAL '{syncs_interval} days' - AND connector_id IN ({connector_ids}) - GROUP BY connector_id, sync_id - ) - SELECT - connector_id, - sync_id, - start_time, - end_time, - end_message_data - FROM ranked_syncs - WHERE rn <= {max_jobs_per_connector} - AND start_time IS NOT NULL - AND end_time IS NOT NULL - ORDER BY connector_id, end_time DESC - """ + def get_sync_logs_query( + self, + syncs_interval: int, + connector_ids: List[str], + ) -> str: + # Format connector_ids as a comma-separated string of quoted IDs + formatted_connector_ids = ", ".join(f"'{id}'" for id in connector_ids) + + return f"""\ +WITH ranked_syncs AS ( + SELECT + connector_id, + sync_id, + MAX(CASE WHEN message_event = 'sync_start' THEN time_stamp END) as start_time, + MAX(CASE WHEN message_event = 'sync_end' THEN time_stamp END) as end_time, + MAX(CASE WHEN message_event = 'sync_end' THEN message_data END) as end_message_data, + ROW_NUMBER() OVER (PARTITION BY connector_id ORDER BY MAX(time_stamp) DESC) as rn + FROM {self.db_clause}log + WHERE message_event in ('sync_start', 'sync_end') + AND time_stamp > CURRENT_TIMESTAMP - INTERVAL '{syncs_interval} days' + AND connector_id IN ({formatted_connector_ids}) + GROUP BY connector_id, sync_id +) +SELECT + connector_id, + sync_id, + start_time, + end_time, + end_message_data +FROM ranked_syncs +WHERE rn <= {MAX_JOBS_PER_CONNECTOR} + AND start_time IS NOT NULL + AND end_time IS NOT NULL +ORDER BY connector_id, end_time DESC +""" def get_table_lineage_query(self) -> str: - return f""" - SELECT stm.connector_id as connector_id, - stm.id as source_table_id, - stm.name as source_table_name, - ssm.name as source_schema_name, - dtm.id as destination_table_id, - dtm.name as destination_table_name, - dsm.name as destination_schema_name - FROM {self.db_clause}table_lineage as tl - JOIN {self.db_clause}source_table_metadata as stm on tl.source_table_id = stm.id - JOIN {self.db_clause}destination_table_metadata as dtm on tl.destination_table_id = dtm.id - JOIN {self.db_clause}source_schema_metadata as ssm on stm.schema_id = ssm.id - JOIN {self.db_clause}destination_schema_metadata as dsm on dtm.schema_id = dsm.id""" + return f"""\ +SELECT + stm.connector_id as connector_id, + stm.id as source_table_id, + stm.name as source_table_name, + ssm.name as source_schema_name, + dtm.id as destination_table_id, + dtm.name as destination_table_name, + dsm.name as destination_schema_name +FROM {self.db_clause}table_lineage as tl +JOIN {self.db_clause}source_table_metadata as stm on tl.source_table_id = stm.id +JOIN {self.db_clause}destination_table_metadata as dtm on tl.destination_table_id = dtm.id +JOIN {self.db_clause}source_schema_metadata as ssm on stm.schema_id = ssm.id +JOIN {self.db_clause}destination_schema_metadata as dsm on dtm.schema_id = dsm.id +QUALIFY ROW_NUMBER() OVER (PARTITION BY stm.connector_id ORDER BY tl.created_at DESC) <= {MAX_TABLE_LINEAGE_PER_CONNECTOR} +ORDER BY stm.connector_id, tl.created_at DESC +""" def get_column_lineage_query(self) -> str: - return f""" - SELECT scm.table_id as source_table_id, - dcm.table_id as destination_table_id, - scm.name as source_column_name, - dcm.name as destination_column_name - FROM {self.db_clause}column_lineage as cl - JOIN {self.db_clause}source_column_metadata as scm - on cl.source_column_id = scm.id - JOIN {self.db_clause}destination_column_metadata as dcm - on cl.destination_column_id = dcm.id""" + return f"""\ +SELECT + scm.table_id as source_table_id, + dcm.table_id as destination_table_id, + scm.name as source_column_name, + dcm.name as destination_column_name +FROM {self.db_clause}column_lineage as cl +JOIN {self.db_clause}source_column_metadata as scm + ON cl.source_column_id = scm.id +JOIN {self.db_clause}destination_column_metadata as dcm + ON cl.destination_column_id = dcm.id +-- Only joining source_table_metadata to get the connector_id. +JOIN {self.db_clause}source_table_metadata as stm + ON scm.table_id = stm.id +QUALIFY ROW_NUMBER() OVER (PARTITION BY stm.connector_id ORDER BY cl.created_at DESC) <= {MAX_COLUMN_LINEAGE_PER_CONNECTOR} +ORDER BY stm.connector_id, cl.created_at DESC +""" diff --git a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py index 33ac09e69a3c0..e72162b12e48f 100644 --- a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py +++ b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py @@ -100,11 +100,9 @@ def default_query_results( "email": "abc.xyz@email.com", } ] - elif query == fivetran_log_query.get_sync_logs_query().format( - db_clause=fivetran_log_query.db_clause, + elif query == fivetran_log_query.get_sync_logs_query( syncs_interval=7, - max_jobs_per_connector=1000, - connector_ids="'calendar_elected'", + connector_ids=["calendar_elected"], ): return [ { From 554288bb05354dae16471381daec3549658bdda3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20L=C3=BCdin?= <13187726+Masterchen09@users.noreply.github.com> Date: Mon, 21 Oct 2024 09:00:09 +0200 Subject: [PATCH 9/9] fix(ui): show DataHub logo for DataHub sources in ingestion souces list (#11658) Co-authored-by: Shirshanka Das --- .../src/app/ingest/source/builder/constants.ts | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/datahub-web-react/src/app/ingest/source/builder/constants.ts b/datahub-web-react/src/app/ingest/source/builder/constants.ts index b67ca388c1054..0e0ba8b22e37e 100644 --- a/datahub-web-react/src/app/ingest/source/builder/constants.ts +++ b/datahub-web-react/src/app/ingest/source/builder/constants.ts @@ -35,6 +35,7 @@ import csvLogo from '../../../../images/csv-logo.png'; import qlikLogo from '../../../../images/qliklogo.png'; import sigmaLogo from '../../../../images/sigmalogo.png'; import sacLogo from '../../../../images/saclogo.svg'; +import datahubLogo from '../../../../images/datahublogo.png'; export const ATHENA = 'athena'; export const ATHENA_URN = `urn:li:dataPlatform:${ATHENA}`; @@ -125,6 +126,11 @@ export const SIGMA = 'sigma'; export const SIGMA_URN = `urn:li:dataPlatform:${SIGMA}`; export const SAC = 'sac'; export const SAC_URN = `urn:li:dataPlatform:${SAC}`; +export const DATAHUB = 'datahub'; +export const DATAHUB_GC = 'datahub-gc'; +export const DATAHUB_LINEAGE_FILE = 'datahub-lineage-file'; +export const DATAHUB_BUSINESS_GLOSSARY = 'datahub-business-glossary'; +export const DATAHUB_URN = `urn:li:dataPlatform:${DATAHUB}`; export const PLATFORM_URN_TO_LOGO = { [ATHENA_URN]: athenaLogo, @@ -165,6 +171,7 @@ export const PLATFORM_URN_TO_LOGO = { [QLIK_SENSE_URN]: qlikLogo, [SIGMA_URN]: sigmaLogo, [SAC_URN]: sacLogo, + [DATAHUB_URN]: datahubLogo, }; export const SOURCE_TO_PLATFORM_URN = { @@ -178,5 +185,7 @@ export const SOURCE_TO_PLATFORM_URN = { [SNOWFLAKE_USAGE]: SNOWFLAKE_URN, [STARBURST_TRINO_USAGE]: TRINO_URN, [DBT_CLOUD]: DBT_URN, - [VERTICA]: VERTICA_URN, + [DATAHUB_GC]: DATAHUB_URN, + [DATAHUB_LINEAGE_FILE]: DATAHUB_URN, + [DATAHUB_BUSINESS_GLOSSARY]: DATAHUB_URN, };