Skip to content

Commit

Permalink
feat(ingest/snowflake): allow option for incremental properties
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate committed Dec 10, 2024
1 parent 0a2ac70 commit 46ca64b
Show file tree
Hide file tree
Showing 9 changed files with 513 additions and 170 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import logging
from typing import Iterable, Optional

from pydantic.fields import Field

from datahub.configuration.common import ConfigModel
from datahub.emitter.mce_builder import set_aspect
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.source_helpers import create_dataset_props_patch_builder
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.schema_classes import (
DatasetPropertiesClass,
MetadataChangeEventClass,
SystemMetadataClass,
)

logger = logging.getLogger(__name__)


def convert_dataset_properties_to_patch(
urn: str,
aspect: DatasetPropertiesClass,
system_metadata: Optional[SystemMetadataClass],
) -> MetadataWorkUnit:
patch_builder = create_dataset_props_patch_builder(urn, aspect, system_metadata)
mcp = next(iter(patch_builder.build()))
return MetadataWorkUnit(id=MetadataWorkUnit.generate_workunit_id(mcp), mcp_raw=mcp)


def auto_incremental_properties(
incremental_properties: bool,
stream: Iterable[MetadataWorkUnit],
) -> Iterable[MetadataWorkUnit]:
if not incremental_properties:
yield from stream
return # early exit

for wu in stream:
urn = wu.get_urn()

if isinstance(wu.metadata, MetadataChangeEventClass):
properties_aspect = wu.get_aspect_of_type(DatasetPropertiesClass)
set_aspect(wu.metadata, None, DatasetPropertiesClass)
if len(wu.metadata.proposedSnapshot.aspects) > 0:
yield wu

if properties_aspect:
yield convert_dataset_properties_to_patch(
urn, properties_aspect, wu.metadata.systemMetadata
)
elif isinstance(wu.metadata, MetadataChangeProposalWrapper) and isinstance(
wu.metadata.aspect, DatasetPropertiesClass
):
properties_aspect = wu.metadata.aspect
if properties_aspect:
yield convert_dataset_properties_to_patch(
urn, properties_aspect, wu.metadata.systemMetadata
)
else:
yield wu


# TODO: Use this in SQLCommonConfig. Currently only used in snowflake
class IncrementalPropertiesConfigMixin(ConfigModel):
incremental_properties: bool = Field(
default=False,
description="When enabled, emits dataset properties as incremental to existing dataset properties "
"in DataHub. When disabled, re-states dataset properties on each run.",
)
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
SchemaFieldClass,
SchemaMetadataClass,
StatusClass,
SystemMetadataClass,
TimeWindowSizeClass,
)
from datahub.metadata.urns import DatasetUrn, GlossaryTermUrn, TagUrn, Urn
Expand Down Expand Up @@ -65,9 +66,10 @@ def auto_workunit(
def create_dataset_props_patch_builder(
dataset_urn: str,
dataset_properties: DatasetPropertiesClass,
system_metadata: Optional[SystemMetadataClass] = None,
) -> DatasetPatchBuilder:
"""Creates a patch builder with a table's or view's attributes and dataset properties"""
patch_builder = DatasetPatchBuilder(dataset_urn)
patch_builder = DatasetPatchBuilder(dataset_urn, system_metadata)
patch_builder.set_display_name(dataset_properties.name)
patch_builder.set_description(dataset_properties.description)
patch_builder.set_created(dataset_properties.created)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ class RedshiftConfig(
description="Whether to extract column level lineage. This config works with rest-sink only.",
)

# TODO - use DatasetPropertiesConfigMixin instead
patch_custom_properties: bool = Field(
default=True,
description="Whether to patch custom properties on existing datasets rather than replace.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,8 @@ def gen_dataset_workunits(
customProperties=custom_properties,
)
if self.config.patch_custom_properties:
# TODO: use auto_incremental_properties workunit processor instead
# Deprecate use of patch_custom_properties
patch_builder = create_dataset_props_patch_builder(
dataset_urn, dataset_properties
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.configuration.validate_field_removal import pydantic_removed_field
from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.ingestion.api.incremental_properties_helper import (
IncrementalPropertiesConfigMixin,
)
from datahub.ingestion.glossary.classification_mixin import (
ClassificationSourceConfigMixin,
)
Expand Down Expand Up @@ -188,6 +191,7 @@ class SnowflakeV2Config(
StatefulUsageConfigMixin,
StatefulProfilingConfigMixin,
ClassificationSourceConfigMixin,
IncrementalPropertiesConfigMixin,
):
include_usage_stats: bool = Field(
default=True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
support_status,
)
from datahub.ingestion.api.incremental_lineage_helper import auto_incremental_lineage
from datahub.ingestion.api.incremental_properties_helper import (
auto_incremental_properties,
)
from datahub.ingestion.api.source import (
CapabilityReport,
MetadataWorkUnitProcessor,
Expand Down Expand Up @@ -446,6 +449,9 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
functools.partial(
auto_incremental_lineage, self.config.incremental_lineage
),
functools.partial(
auto_incremental_properties, self.config.incremental_properties
),
StaleEntityRemovalHandler.create(
self, self.config, self.ctx
).workunit_processor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,8 @@ def process_table(self, table: Table, schema: Schema) -> Iterable[MetadataWorkUn
)

if table_props:
# TODO: use auto_incremental_properties workunit processor instead
# Consider enabling incremental_properties by default
patch_builder = create_dataset_props_patch_builder(dataset_urn, table_props)
for patch_mcp in patch_builder.build():
yield MetadataWorkUnit(
Expand Down
Loading

0 comments on commit 46ca64b

Please sign in to comment.