diff --git a/metadata-ingestion/src/datahub/ingestion/api/incremental_properties_helper.py b/metadata-ingestion/src/datahub/ingestion/api/incremental_properties_helper.py new file mode 100644 index 00000000000000..151b0c72a6c2de --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/api/incremental_properties_helper.py @@ -0,0 +1,69 @@ +import logging +from typing import Iterable, Optional + +from pydantic.fields import Field + +from datahub.configuration.common import ConfigModel +from datahub.emitter.mce_builder import set_aspect +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.source_helpers import create_dataset_props_patch_builder +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.metadata.schema_classes import ( + DatasetPropertiesClass, + MetadataChangeEventClass, + SystemMetadataClass, +) + +logger = logging.getLogger(__name__) + + +def convert_dataset_properties_to_patch( + urn: str, + aspect: DatasetPropertiesClass, + system_metadata: Optional[SystemMetadataClass], +) -> MetadataWorkUnit: + patch_builder = create_dataset_props_patch_builder(urn, aspect, system_metadata) + mcp = next(iter(patch_builder.build())) + return MetadataWorkUnit(id=MetadataWorkUnit.generate_workunit_id(mcp), mcp_raw=mcp) + + +def auto_incremental_properties( + incremental_properties: bool, + stream: Iterable[MetadataWorkUnit], +) -> Iterable[MetadataWorkUnit]: + if not incremental_properties: + yield from stream + return # early exit + + for wu in stream: + urn = wu.get_urn() + + if isinstance(wu.metadata, MetadataChangeEventClass): + properties_aspect = wu.get_aspect_of_type(DatasetPropertiesClass) + set_aspect(wu.metadata, None, DatasetPropertiesClass) + if len(wu.metadata.proposedSnapshot.aspects) > 0: + yield wu + + if properties_aspect: + yield convert_dataset_properties_to_patch( + urn, properties_aspect, wu.metadata.systemMetadata + ) + elif isinstance(wu.metadata, MetadataChangeProposalWrapper) and isinstance( + wu.metadata.aspect, DatasetPropertiesClass + ): + properties_aspect = wu.metadata.aspect + if properties_aspect: + yield convert_dataset_properties_to_patch( + urn, properties_aspect, wu.metadata.systemMetadata + ) + else: + yield wu + + +# TODO: Use this in SQLCommonConfig. Currently only used in snowflake +class IncrementalPropertiesConfigMixin(ConfigModel): + incremental_properties: bool = Field( + default=False, + description="When enabled, emits dataset properties as incremental to existing dataset properties " + "in DataHub. When disabled, re-states dataset properties on each run.", + ) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index 8511f8529ac125..0c86e1cf47203f 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -32,6 +32,7 @@ SchemaFieldClass, SchemaMetadataClass, StatusClass, + SystemMetadataClass, TimeWindowSizeClass, ) from datahub.metadata.urns import DatasetUrn, GlossaryTermUrn, TagUrn, Urn @@ -65,9 +66,10 @@ def auto_workunit( def create_dataset_props_patch_builder( dataset_urn: str, dataset_properties: DatasetPropertiesClass, + system_metadata: Optional[SystemMetadataClass] = None, ) -> DatasetPatchBuilder: """Creates a patch builder with a table's or view's attributes and dataset properties""" - patch_builder = DatasetPatchBuilder(dataset_urn) + patch_builder = DatasetPatchBuilder(dataset_urn, system_metadata) patch_builder.set_display_name(dataset_properties.name) patch_builder.set_description(dataset_properties.description) patch_builder.set_created(dataset_properties.created) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py index 2ff73323a14e35..cad48eaf1c2375 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py @@ -159,6 +159,7 @@ class RedshiftConfig( description="Whether to extract column level lineage. This config works with rest-sink only.", ) + # TODO - use DatasetPropertiesConfigMixin instead patch_custom_properties: bool = Field( default=True, description="Whether to patch custom properties on existing datasets rather than replace.", diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index 06cbb7fbae27cc..49f7941563c1a7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -831,6 +831,8 @@ def gen_dataset_workunits( customProperties=custom_properties, ) if self.config.patch_custom_properties: + # TODO: use auto_incremental_properties workunit processor instead + # Deprecate use of patch_custom_properties patch_builder = create_dataset_props_patch_builder( dataset_urn, dataset_properties ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py index c30a26fbbd02cc..1d1cc3c2af4f08 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py @@ -16,6 +16,9 @@ from datahub.configuration.time_window_config import BaseTimeWindowConfig from datahub.configuration.validate_field_removal import pydantic_removed_field from datahub.configuration.validate_field_rename import pydantic_renamed_field +from datahub.ingestion.api.incremental_properties_helper import ( + IncrementalPropertiesConfigMixin, +) from datahub.ingestion.glossary.classification_mixin import ( ClassificationSourceConfigMixin, ) @@ -188,6 +191,7 @@ class SnowflakeV2Config( StatefulUsageConfigMixin, StatefulProfilingConfigMixin, ClassificationSourceConfigMixin, + IncrementalPropertiesConfigMixin, ): include_usage_stats: bool = Field( default=True, diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py index 538841018067e2..c3a7912c40e8ee 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py @@ -17,6 +17,9 @@ support_status, ) from datahub.ingestion.api.incremental_lineage_helper import auto_incremental_lineage +from datahub.ingestion.api.incremental_properties_helper import ( + auto_incremental_properties, +) from datahub.ingestion.api.source import ( CapabilityReport, MetadataWorkUnitProcessor, @@ -446,6 +449,9 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: functools.partial( auto_incremental_lineage, self.config.incremental_lineage ), + functools.partial( + auto_incremental_properties, self.config.incremental_properties + ), StaleEntityRemovalHandler.create( self, self.config, self.ctx ).workunit_processor, diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py index f758746193cd83..9d9a746580f939 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py @@ -556,6 +556,8 @@ def process_table(self, table: Table, schema: Schema) -> Iterable[MetadataWorkUn ) if table_props: + # TODO: use auto_incremental_properties workunit processor instead + # Consider enabling incremental_properties by default patch_builder = create_dataset_props_patch_builder(dataset_urn, table_props) for patch_mcp in patch_builder.build(): yield MetadataWorkUnit( diff --git a/metadata-ingestion/tests/integration/snowflake/snowflake_privatelink_golden.json b/metadata-ingestion/tests/integration/snowflake/snowflake_privatelink_golden.json index d232ae710e8916..3040c6c4e9196f 100644 --- a/metadata-ingestion/tests/integration/snowflake/snowflake_privatelink_golden.json +++ b/metadata-ingestion/tests/integration/snowflake/snowflake_privatelink_golden.json @@ -138,27 +138,49 @@ { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_3,PROD)", - "changeType": "UPSERT", + "changeType": "PATCH", "aspectName": "datasetProperties", "aspect": { - "json": { - "customProperties": {"CLUSTERING_KEY": "LINEAR(COL_1)"}, - "externalUrl": "https://app.abc12345.ap-south-1.privatelink.snowflakecomputing.com/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_3/", - "name": "TABLE_3", - "qualifiedName": "TEST_DB.TEST_SCHEMA.TABLE_3", - "description": "Comment for Table", - "created": { - "time": 1623110400000 + "json": [ + { + "op": "add", + "path": "/name", + "value": "TABLE_3" }, - "lastModified": { - "time": 1623110400000 + { + "op": "add", + "path": "/description", + "value": "Comment for Table" }, - "tags": [] - } + { + "op": "add", + "path": "/created", + "value": { + "time": 1623110400000 + } + }, + { + "op": "add", + "path": "/lastModified", + "value": { + "time": 1623110400000 + } + }, + { + "op": "add", + "path": "/qualifiedName", + "value": "TEST_DB.TEST_SCHEMA.TABLE_3" + }, + { + "op": "add", + "path": "/customProperties/CLUSTERING_KEY", + "value": "LINEAR(COL_1)" + } + ] }, "systemMetadata": { "lastObserved": 1654621200000, - "runId": "snowflake-2022_06_07-17_00_00", + "runId": "snowflake-2022_06_07-17_00_00-ad3hnf", "lastRunId": "no-run-id-provided" } }, @@ -567,27 +589,44 @@ { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.view_1,PROD)", - "changeType": "UPSERT", + "changeType": "PATCH", "aspectName": "datasetProperties", "aspect": { - "json": { - "customProperties": {}, - "externalUrl": "https://app.abc12345.ap-south-1.privatelink.snowflakecomputing.com/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/view/VIEW_1/", - "name": "VIEW_1", - "qualifiedName": "TEST_DB.TEST_SCHEMA.VIEW_1", - "description": "Comment for View", - "created": { - "time": 1623110400000 + "json": [ + { + "op": "add", + "path": "/name", + "value": "VIEW_1" }, - "lastModified": { - "time": 1623110400000 + { + "op": "add", + "path": "/description", + "value": "Comment for View" }, - "tags": [] - } + { + "op": "add", + "path": "/created", + "value": { + "time": 1623110400000 + } + }, + { + "op": "add", + "path": "/lastModified", + "value": { + "time": 1623110400000 + } + }, + { + "op": "add", + "path": "/qualifiedName", + "value": "TEST_DB.TEST_SCHEMA.VIEW_1" + } + ] }, "systemMetadata": { "lastObserved": 1654621200000, - "runId": "snowflake-2022_06_07-17_00_00", + "runId": "snowflake-2022_06_07-17_00_00-ad3hnf", "lastRunId": "no-run-id-provided" } }, @@ -808,27 +847,49 @@ { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_1,PROD)", - "changeType": "UPSERT", + "changeType": "PATCH", "aspectName": "datasetProperties", "aspect": { - "json": { - "customProperties": {"CLUSTERING_KEY": "LINEAR(COL_1)"}, - "externalUrl": "https://app.abc12345.ap-south-1.privatelink.snowflakecomputing.com/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_1/", - "name": "TABLE_1", - "qualifiedName": "TEST_DB.TEST_SCHEMA.TABLE_1", - "description": "Comment for Table", - "created": { - "time": 1623110400000 + "json": [ + { + "op": "add", + "path": "/name", + "value": "TABLE_1" }, - "lastModified": { - "time": 1623110400000 + { + "op": "add", + "path": "/description", + "value": "Comment for Table" }, - "tags": [] - } + { + "op": "add", + "path": "/created", + "value": { + "time": 1623110400000 + } + }, + { + "op": "add", + "path": "/lastModified", + "value": { + "time": 1623110400000 + } + }, + { + "op": "add", + "path": "/qualifiedName", + "value": "TEST_DB.TEST_SCHEMA.TABLE_1" + }, + { + "op": "add", + "path": "/customProperties/CLUSTERING_KEY", + "value": "LINEAR(COL_1)" + } + ] }, "systemMetadata": { "lastObserved": 1654621200000, - "runId": "snowflake-2022_06_07-17_00_00", + "runId": "snowflake-2022_06_07-17_00_00-ad3hnf", "lastRunId": "no-run-id-provided" } }, @@ -1473,27 +1534,49 @@ { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_10,PROD)", - "changeType": "UPSERT", + "changeType": "PATCH", "aspectName": "datasetProperties", "aspect": { - "json": { - "customProperties": {"CLUSTERING_KEY": "LINEAR(COL_1)"}, - "externalUrl": "https://app.abc12345.ap-south-1.privatelink.snowflakecomputing.com/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_10/", - "name": "TABLE_10", - "qualifiedName": "TEST_DB.TEST_SCHEMA.TABLE_10", - "description": "Comment for Table", - "created": { - "time": 1623110400000 + "json": [ + { + "op": "add", + "path": "/name", + "value": "TABLE_10" }, - "lastModified": { - "time": 1623110400000 + { + "op": "add", + "path": "/description", + "value": "Comment for Table" }, - "tags": [] - } + { + "op": "add", + "path": "/created", + "value": { + "time": 1623110400000 + } + }, + { + "op": "add", + "path": "/lastModified", + "value": { + "time": 1623110400000 + } + }, + { + "op": "add", + "path": "/qualifiedName", + "value": "TEST_DB.TEST_SCHEMA.TABLE_10" + }, + { + "op": "add", + "path": "/customProperties/CLUSTERING_KEY", + "value": "LINEAR(COL_1)" + } + ] }, "systemMetadata": { "lastObserved": 1654621200000, - "runId": "snowflake-2022_06_07-17_00_00", + "runId": "snowflake-2022_06_07-17_00_00-ad3hnf", "lastRunId": "no-run-id-provided" } }, @@ -1712,54 +1795,98 @@ { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_5,PROD)", - "changeType": "UPSERT", + "changeType": "PATCH", "aspectName": "datasetProperties", "aspect": { - "json": { - "customProperties": {"CLUSTERING_KEY": "LINEAR(COL_1)"}, - "externalUrl": "https://app.abc12345.ap-south-1.privatelink.snowflakecomputing.com/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_5/", - "name": "TABLE_5", - "qualifiedName": "TEST_DB.TEST_SCHEMA.TABLE_5", - "description": "Comment for Table", - "created": { - "time": 1623110400000 + "json": [ + { + "op": "add", + "path": "/name", + "value": "TABLE_5" }, - "lastModified": { - "time": 1623110400000 + { + "op": "add", + "path": "/description", + "value": "Comment for Table" }, - "tags": [] - } + { + "op": "add", + "path": "/created", + "value": { + "time": 1623110400000 + } + }, + { + "op": "add", + "path": "/lastModified", + "value": { + "time": 1623110400000 + } + }, + { + "op": "add", + "path": "/qualifiedName", + "value": "TEST_DB.TEST_SCHEMA.TABLE_5" + }, + { + "op": "add", + "path": "/customProperties/CLUSTERING_KEY", + "value": "LINEAR(COL_1)" + } + ] }, "systemMetadata": { "lastObserved": 1654621200000, - "runId": "snowflake-2022_06_07-17_00_00", + "runId": "snowflake-2022_06_07-17_00_00-ad3hnf", "lastRunId": "no-run-id-provided" } }, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_2,PROD)", - "changeType": "UPSERT", + "changeType": "PATCH", "aspectName": "datasetProperties", "aspect": { - "json": { - "customProperties": {"CLUSTERING_KEY": "LINEAR(COL_1)"}, - "externalUrl": "https://app.abc12345.ap-south-1.privatelink.snowflakecomputing.com/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_2/", - "name": "TABLE_2", - "qualifiedName": "TEST_DB.TEST_SCHEMA.TABLE_2", - "description": "Comment for Table", - "created": { - "time": 1623110400000 + "json": [ + { + "op": "add", + "path": "/name", + "value": "TABLE_2" }, - "lastModified": { - "time": 1623110400000 + { + "op": "add", + "path": "/description", + "value": "Comment for Table" }, - "tags": [] - } + { + "op": "add", + "path": "/created", + "value": { + "time": 1623110400000 + } + }, + { + "op": "add", + "path": "/lastModified", + "value": { + "time": 1623110400000 + } + }, + { + "op": "add", + "path": "/qualifiedName", + "value": "TEST_DB.TEST_SCHEMA.TABLE_2" + }, + { + "op": "add", + "path": "/customProperties/CLUSTERING_KEY", + "value": "LINEAR(COL_1)" + } + ] }, "systemMetadata": { "lastObserved": 1654621200000, - "runId": "snowflake-2022_06_07-17_00_00", + "runId": "snowflake-2022_06_07-17_00_00-ad3hnf", "lastRunId": "no-run-id-provided" } }, @@ -2301,27 +2428,49 @@ { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_6,PROD)", - "changeType": "UPSERT", + "changeType": "PATCH", "aspectName": "datasetProperties", "aspect": { - "json": { - "customProperties": {"CLUSTERING_KEY": "LINEAR(COL_1)"}, - "externalUrl": "https://app.abc12345.ap-south-1.privatelink.snowflakecomputing.com/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_6/", - "name": "TABLE_6", - "qualifiedName": "TEST_DB.TEST_SCHEMA.TABLE_6", - "description": "Comment for Table", - "created": { - "time": 1623110400000 + "json": [ + { + "op": "add", + "path": "/name", + "value": "TABLE_6" }, - "lastModified": { - "time": 1623110400000 + { + "op": "add", + "path": "/description", + "value": "Comment for Table" }, - "tags": [] - } + { + "op": "add", + "path": "/created", + "value": { + "time": 1623110400000 + } + }, + { + "op": "add", + "path": "/lastModified", + "value": { + "time": 1623110400000 + } + }, + { + "op": "add", + "path": "/qualifiedName", + "value": "TEST_DB.TEST_SCHEMA.TABLE_6" + }, + { + "op": "add", + "path": "/customProperties/CLUSTERING_KEY", + "value": "LINEAR(COL_1)" + } + ] }, "systemMetadata": { "lastObserved": 1654621200000, - "runId": "snowflake-2022_06_07-17_00_00", + "runId": "snowflake-2022_06_07-17_00_00-ad3hnf", "lastRunId": "no-run-id-provided" } }, @@ -2621,27 +2770,49 @@ { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_7,PROD)", - "changeType": "UPSERT", + "changeType": "PATCH", "aspectName": "datasetProperties", "aspect": { - "json": { - "customProperties": {"CLUSTERING_KEY": "LINEAR(COL_1)"}, - "externalUrl": "https://app.abc12345.ap-south-1.privatelink.snowflakecomputing.com/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_7/", - "name": "TABLE_7", - "qualifiedName": "TEST_DB.TEST_SCHEMA.TABLE_7", - "description": "Comment for Table", - "created": { - "time": 1623110400000 + "json": [ + { + "op": "add", + "path": "/name", + "value": "TABLE_7" }, - "lastModified": { - "time": 1623110400000 + { + "op": "add", + "path": "/description", + "value": "Comment for Table" }, - "tags": [] - } + { + "op": "add", + "path": "/created", + "value": { + "time": 1623110400000 + } + }, + { + "op": "add", + "path": "/lastModified", + "value": { + "time": 1623110400000 + } + }, + { + "op": "add", + "path": "/qualifiedName", + "value": "TEST_DB.TEST_SCHEMA.TABLE_7" + }, + { + "op": "add", + "path": "/customProperties/CLUSTERING_KEY", + "value": "LINEAR(COL_1)" + } + ] }, "systemMetadata": { "lastObserved": 1654621200000, - "runId": "snowflake-2022_06_07-17_00_00", + "runId": "snowflake-2022_06_07-17_00_00-ad3hnf", "lastRunId": "no-run-id-provided" } }, @@ -2664,27 +2835,49 @@ { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_4,PROD)", - "changeType": "UPSERT", + "changeType": "PATCH", "aspectName": "datasetProperties", "aspect": { - "json": { - "customProperties": {"CLUSTERING_KEY": "LINEAR(COL_1)"}, - "externalUrl": "https://app.abc12345.ap-south-1.privatelink.snowflakecomputing.com/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_4/", - "name": "TABLE_4", - "qualifiedName": "TEST_DB.TEST_SCHEMA.TABLE_4", - "description": "Comment for Table", - "created": { - "time": 1623110400000 + "json": [ + { + "op": "add", + "path": "/name", + "value": "TABLE_4" }, - "lastModified": { - "time": 1623110400000 + { + "op": "add", + "path": "/description", + "value": "Comment for Table" }, - "tags": [] - } + { + "op": "add", + "path": "/created", + "value": { + "time": 1623110400000 + } + }, + { + "op": "add", + "path": "/lastModified", + "value": { + "time": 1623110400000 + } + }, + { + "op": "add", + "path": "/qualifiedName", + "value": "TEST_DB.TEST_SCHEMA.TABLE_4" + }, + { + "op": "add", + "path": "/customProperties/CLUSTERING_KEY", + "value": "LINEAR(COL_1)" + } + ] }, "systemMetadata": { "lastObserved": 1654621200000, - "runId": "snowflake-2022_06_07-17_00_00", + "runId": "snowflake-2022_06_07-17_00_00-ad3hnf", "lastRunId": "no-run-id-provided" } }, @@ -3162,27 +3355,49 @@ { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_8,PROD)", - "changeType": "UPSERT", + "changeType": "PATCH", "aspectName": "datasetProperties", "aspect": { - "json": { - "customProperties": {"CLUSTERING_KEY": "LINEAR(COL_1)"}, - "externalUrl": "https://app.abc12345.ap-south-1.privatelink.snowflakecomputing.com/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_8/", - "name": "TABLE_8", - "qualifiedName": "TEST_DB.TEST_SCHEMA.TABLE_8", - "description": "Comment for Table", - "created": { - "time": 1623110400000 + "json": [ + { + "op": "add", + "path": "/name", + "value": "TABLE_8" }, - "lastModified": { - "time": 1623110400000 + { + "op": "add", + "path": "/description", + "value": "Comment for Table" }, - "tags": [] - } + { + "op": "add", + "path": "/created", + "value": { + "time": 1623110400000 + } + }, + { + "op": "add", + "path": "/lastModified", + "value": { + "time": 1623110400000 + } + }, + { + "op": "add", + "path": "/qualifiedName", + "value": "TEST_DB.TEST_SCHEMA.TABLE_8" + }, + { + "op": "add", + "path": "/customProperties/CLUSTERING_KEY", + "value": "LINEAR(COL_1)" + } + ] }, "systemMetadata": { "lastObserved": 1654621200000, - "runId": "snowflake-2022_06_07-17_00_00", + "runId": "snowflake-2022_06_07-17_00_00-ad3hnf", "lastRunId": "no-run-id-provided" } }, @@ -3302,27 +3517,49 @@ { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.table_9,PROD)", - "changeType": "UPSERT", + "changeType": "PATCH", "aspectName": "datasetProperties", "aspect": { - "json": { - "customProperties": {"CLUSTERING_KEY": "LINEAR(COL_1)"}, - "externalUrl": "https://app.abc12345.ap-south-1.privatelink.snowflakecomputing.com/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_9/", - "name": "TABLE_9", - "qualifiedName": "TEST_DB.TEST_SCHEMA.TABLE_9", - "description": "Comment for Table", - "created": { - "time": 1623110400000 + "json": [ + { + "op": "add", + "path": "/name", + "value": "TABLE_9" }, - "lastModified": { - "time": 1623110400000 + { + "op": "add", + "path": "/description", + "value": "Comment for Table" }, - "tags": [] - } + { + "op": "add", + "path": "/created", + "value": { + "time": 1623110400000 + } + }, + { + "op": "add", + "path": "/lastModified", + "value": { + "time": 1623110400000 + } + }, + { + "op": "add", + "path": "/qualifiedName", + "value": "TEST_DB.TEST_SCHEMA.TABLE_9" + }, + { + "op": "add", + "path": "/customProperties/CLUSTERING_KEY", + "value": "LINEAR(COL_1)" + } + ] }, "systemMetadata": { "lastObserved": 1654621200000, - "runId": "snowflake-2022_06_07-17_00_00", + "runId": "snowflake-2022_06_07-17_00_00-ad3hnf", "lastRunId": "no-run-id-provided" } }, @@ -3607,27 +3844,44 @@ { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,instance1.test_db.test_schema.view_2,PROD)", - "changeType": "UPSERT", + "changeType": "PATCH", "aspectName": "datasetProperties", "aspect": { - "json": { - "customProperties": {}, - "externalUrl": "https://app.abc12345.ap-south-1.privatelink.snowflakecomputing.com/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/view/VIEW_2/", - "name": "VIEW_2", - "qualifiedName": "TEST_DB.TEST_SCHEMA.VIEW_2", - "description": "Comment for View", - "created": { - "time": 1623110400000 + "json": [ + { + "op": "add", + "path": "/name", + "value": "VIEW_2" }, - "lastModified": { - "time": 1623110400000 + { + "op": "add", + "path": "/description", + "value": "Comment for View" }, - "tags": [] - } + { + "op": "add", + "path": "/created", + "value": { + "time": 1623110400000 + } + }, + { + "op": "add", + "path": "/lastModified", + "value": { + "time": 1623110400000 + } + }, + { + "op": "add", + "path": "/qualifiedName", + "value": "TEST_DB.TEST_SCHEMA.VIEW_2" + } + ] }, "systemMetadata": { "lastObserved": 1654621200000, - "runId": "snowflake-2022_06_07-17_00_00", + "runId": "snowflake-2022_06_07-17_00_00-ad3hnf", "lastRunId": "no-run-id-provided" } }, diff --git a/metadata-ingestion/tests/integration/snowflake/test_snowflake.py b/metadata-ingestion/tests/integration/snowflake/test_snowflake.py index ca694b02cff010..1d7470d24f7689 100644 --- a/metadata-ingestion/tests/integration/snowflake/test_snowflake.py +++ b/metadata-ingestion/tests/integration/snowflake/test_snowflake.py @@ -187,7 +187,9 @@ def test_snowflake_basic(pytestconfig, tmp_path, mock_time, mock_datahub_graph): @freeze_time(FROZEN_TIME) -def test_snowflake_private_link(pytestconfig, tmp_path, mock_time, mock_datahub_graph): +def test_snowflake_private_link_and_incremental_mcps( + pytestconfig, tmp_path, mock_time, mock_datahub_graph +): test_resources_dir = pytestconfig.rootpath / "tests/integration/snowflake" # Run the metadata ingestion pipeline. @@ -218,6 +220,7 @@ def test_snowflake_private_link(pytestconfig, tmp_path, mock_time, mock_datahub_ include_usage_stats=False, format_sql_queries=True, incremental_lineage=False, + incremental_properties=True, include_operational_stats=False, platform_instance="instance1", start_time=datetime(2022, 6, 6, 0, 0, 0, 0).replace(