Skip to content

Commit

Permalink
feat(ingest/transformer/domain): Add support for on conflict do nothi…
Browse files Browse the repository at this point in the history
…ng to dataset domain transformers (#11649)
  • Loading branch information
asikowitz authored Oct 18, 2024
1 parent ba7a43f commit 72d1236
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 27 deletions.
28 changes: 15 additions & 13 deletions metadata-ingestion/docs/transformer/dataset_transformer.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,13 @@ transformers:
```
## Simple Add Dataset ownership
### Config Details
| Field | Required | Type | Default | Description |
|--------------------|----------|--------------|-------------|---------------------------------------------------------------------|
| `owner_urns` | ✅ | list[string] | | List of owner urns. |
| `ownership_type` | | string | "DATAOWNER" | ownership type of the owners (either as enum or ownership type urn) |
| `replace_existing` | | boolean | `false` | Whether to remove ownership from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |
| Field | Required | Type | Default | Description |
|--------------------|----------|--------------|-------------|------------------------------------------------------------------------------------------------------------|
| `owner_urns` | ✅ | list[string] | | List of owner urns. |
| `ownership_type` | | string | "DATAOWNER" | ownership type of the owners (either as enum or ownership type urn) |
| `replace_existing` | | boolean | `false` | Whether to remove ownership from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |
| `on_conflict` | | enum | `DO_UPDATE` | Whether to make changes if domains already exist. If set to DO_NOTHING, `semantics` setting is irrelevant. |

For transformer behaviour on `replace_existing` and `semantics`, please refer section [Relationship Between replace_existing And semantics](#relationship-between-replace_existing-and-semantics).

Expand Down Expand Up @@ -191,13 +192,14 @@ transformers:

## Pattern Add Dataset ownership
### Config Details
| Field | Required | Type | Default | Description |
|--------------------|----------|----------------------|-------------|-----------------------------------------------------------------------------------------|
| `owner_pattern` | ✅ | map[regx, list[urn]] | | entity urn with regular expression and list of owners urn apply to matching entity urn. |
| `ownership_type` | | string | "DATAOWNER" | ownership type of the owners (either as enum or ownership type urn) |
| `replace_existing` | | boolean | `false` | Whether to remove owners from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |
| `is_container` | | bool | `false` | Whether to also consider a container or not. If true, then ownership will be attached to both the dataset and its container. |
| Field | Required | Type | Default | Description |
|--------------------|----------|----------------------|-------------|------------------------------------------------------------------------------------------------------------------------------|
| `owner_pattern` | ✅ | map[regx, list[urn]] | | entity urn with regular expression and list of owners urn apply to matching entity urn. |
| `ownership_type` | | string | "DATAOWNER" | ownership type of the owners (either as enum or ownership type urn) |
| `replace_existing` | | boolean | `false` | Whether to remove owners from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |
| `is_container` | | bool | `false` | Whether to also consider a container or not. If true, then ownership will be attached to both the dataset and its container. |
| `on_conflict` | | enum | `DO_UPDATE` | Whether to make changes if domains already exist. If set to DO_NOTHING, `semantics` setting is irrelevant. |

let’s suppose we’d like to append a series of users who we know to own a different dataset from a data source but aren't detected during normal ingestion. To do so, we can use the `pattern_add_dataset_ownership` module that’s included in the ingestion framework. This will match the pattern to `urn` of the dataset and assign the respective owners.

Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ def get_tags(self, entity_urn: str) -> Optional[GlobalTagsClass]:
def get_glossary_terms(self, entity_urn: str) -> Optional[GlossaryTermsClass]:
return self.get_aspect(entity_urn=entity_urn, aspect_type=GlossaryTermsClass)

@functools.lru_cache(maxsize=1)
def get_domain(self, entity_urn: str) -> Optional[DomainsClass]:
return self.get_aspect(entity_urn=entity_urn, aspect_type=DomainsClass)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
from enum import auto
from typing import Callable, Dict, List, Optional, Sequence, Union, cast

from datahub.configuration._config_enum import ConfigEnum
from datahub.configuration.common import (
ConfigurationError,
KeyValuePattern,
Expand All @@ -23,6 +25,13 @@
logger = logging.getLogger(__name__)


class TransformerOnConflict(ConfigEnum):
"""Describes the behavior of the transformer when writing an aspect that already exists."""

DO_UPDATE = auto() # On conflict, apply the new aspect
DO_NOTHING = auto() # On conflict, do not apply the new aspect


class AddDatasetDomainSemanticsConfig(TransformerSemanticsConfigModel):
get_domains_to_add: Union[
Callable[[str], DomainsClass],
Expand All @@ -32,10 +41,12 @@ class AddDatasetDomainSemanticsConfig(TransformerSemanticsConfigModel):
_resolve_domain_fn = pydantic_resolve_key("get_domains_to_add")

is_container: bool = False
on_conflict: TransformerOnConflict = TransformerOnConflict.DO_UPDATE


class SimpleDatasetDomainSemanticsConfig(TransformerSemanticsConfigModel):
domains: List[str]
on_conflict: TransformerOnConflict = TransformerOnConflict.DO_UPDATE


class PatternDatasetDomainSemanticsConfig(TransformerSemanticsConfigModel):
Expand Down Expand Up @@ -80,12 +91,13 @@ def get_domain_class(

@staticmethod
def _merge_with_server_domains(
graph: DataHubGraph, urn: str, mce_domain: Optional[DomainsClass]
graph: Optional[DataHubGraph], urn: str, mce_domain: Optional[DomainsClass]
) -> Optional[DomainsClass]:
if not mce_domain or not mce_domain.domains:
# nothing to add, no need to consult server
return None

assert graph
server_domain = graph.get_domain(entity_urn=urn)
if server_domain:
# compute patch
Expand Down Expand Up @@ -155,7 +167,7 @@ def transform_aspect(
self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect]
) -> Optional[Aspect]:
in_domain_aspect: DomainsClass = cast(DomainsClass, aspect)
domain_aspect = DomainsClass(domains=[])
domain_aspect: DomainsClass = DomainsClass(domains=[])
# Check if we have received existing aspect
if in_domain_aspect is not None and self.config.replace_existing is False:
domain_aspect.domains.extend(in_domain_aspect.domains)
Expand All @@ -164,16 +176,18 @@ def transform_aspect(

domain_aspect.domains.extend(domain_to_add.domains)

if self.config.semantics == TransformerSemantics.PATCH:
assert self.ctx.graph
patch_domain_aspect: Optional[
DomainsClass
] = AddDatasetDomain._merge_with_server_domains(
self.ctx.graph, entity_urn, domain_aspect
)
return cast(Optional[Aspect], patch_domain_aspect)

return cast(Optional[Aspect], domain_aspect)
final_aspect: Optional[DomainsClass] = domain_aspect
if domain_aspect.domains:
if self.config.on_conflict == TransformerOnConflict.DO_NOTHING:
assert self.ctx.graph
server_domain = self.ctx.graph.get_domain(entity_urn)
if server_domain and server_domain.domains:
return None
if self.config.semantics == TransformerSemantics.PATCH:
final_aspect = AddDatasetDomain._merge_with_server_domains(
self.ctx.graph, entity_urn, domain_aspect
)
return cast(Optional[Aspect], final_aspect)


class SimpleAddDatasetDomain(AddDatasetDomain):
Expand All @@ -186,8 +200,7 @@ def __init__(
domains = AddDatasetDomain.get_domain_class(ctx.graph, config.domains)
generic_config = AddDatasetDomainSemanticsConfig(
get_domains_to_add=lambda _: domains,
semantics=config.semantics,
replace_existing=config.replace_existing,
**config.dict(exclude={"domains"}),
)
super().__init__(generic_config, ctx)

Expand Down
76 changes: 76 additions & 0 deletions metadata-ingestion/tests/unit/test_transform_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
from datahub.ingestion.transformer.dataset_domain import (
PatternAddDatasetDomain,
SimpleAddDatasetDomain,
TransformerOnConflict,
)
from datahub.ingestion.transformer.dataset_domain_based_on_tags import (
DatasetTagDomainMapper,
Expand Down Expand Up @@ -2498,6 +2499,81 @@ def fake_get_domain(entity_urn: str) -> models.DomainsClass:
assert server_domain in transformed_aspect.domains


def test_simple_add_dataset_domain_on_conflict_do_nothing(
pytestconfig, tmp_path, mock_time, mock_datahub_graph_instance
):
acryl_domain = builder.make_domain_urn("acryl.io")
datahub_domain = builder.make_domain_urn("datahubproject.io")
server_domain = builder.make_domain_urn("test.io")

pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph_instance

# Return fake aspect to simulate server behaviour
def fake_get_domain(entity_urn: str) -> models.DomainsClass:
return models.DomainsClass(domains=[server_domain])

pipeline_context.graph.get_domain = fake_get_domain # type: ignore

output = run_dataset_transformer_pipeline(
transformer_type=SimpleAddDatasetDomain,
aspect=models.DomainsClass(domains=[datahub_domain]),
config={
"replace_existing": False,
"semantics": TransformerSemantics.PATCH,
"domains": [acryl_domain],
"on_conflict": TransformerOnConflict.DO_NOTHING,
},
pipeline_context=pipeline_context,
)

assert len(output) == 1
assert output[0] is not None
assert output[0].record is not None
assert isinstance(output[0].record, EndOfStream)


def test_simple_add_dataset_domain_on_conflict_do_nothing_no_conflict(
pytestconfig, tmp_path, mock_time, mock_datahub_graph_instance
):
acryl_domain = builder.make_domain_urn("acryl.io")
datahub_domain = builder.make_domain_urn("datahubproject.io")
irrelevant_domain = builder.make_domain_urn("test.io")

pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph_instance

# Return fake aspect to simulate server behaviour
def fake_get_domain(entity_urn: str) -> models.DomainsClass:
return models.DomainsClass(domains=[])

pipeline_context.graph.get_domain = fake_get_domain # type: ignore

output = run_dataset_transformer_pipeline(
transformer_type=SimpleAddDatasetDomain,
aspect=models.DomainsClass(domains=[datahub_domain]),
config={
"replace_existing": False,
"semantics": TransformerSemantics.PATCH,
"domains": [acryl_domain],
"on_conflict": TransformerOnConflict.DO_NOTHING,
},
pipeline_context=pipeline_context,
)

assert len(output) == 2
assert output[0] is not None
assert output[0].record is not None
assert isinstance(output[0].record, MetadataChangeProposalWrapper)
assert output[0].record.aspect is not None
assert isinstance(output[0].record.aspect, models.DomainsClass)
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
assert len(transformed_aspect.domains) == 2
assert datahub_domain in transformed_aspect.domains
assert acryl_domain in transformed_aspect.domains
assert irrelevant_domain not in transformed_aspect.domains


def test_pattern_add_dataset_domain_aspect_name(mock_datahub_graph_instance):
pipeline_context: PipelineContext = PipelineContext(
run_id="test_simple_add_dataset_domain"
Expand Down

0 comments on commit 72d1236

Please sign in to comment.