From 5d3e464c21f384d5eb25a0291a77067a9605a9fc Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Tue, 15 Oct 2024 12:35:41 -0500 Subject: [PATCH] feat(validations): Ingest and metadata schema validators (#11619) Co-authored-by: Pedro Silva --- docs/how/updating-datahub.md | 2 + .../aspect/validation/FieldPathValidator.java | 116 +++++++++ .../validators/FieldPathValidatorTest.java | 233 ++++++++++++++++++ .../java/com/linkedin/metadata/Constants.java | 7 + .../datahub/ingestion/run/pipeline_config.py | 17 +- .../datahub/testing/compare_metadata_json.py | 4 + .../ExecutionRequestResultValidator.java | 70 ++++++ .../ExecutionRequestResultValidatorTest.java | 166 +++++++++++++ .../src/main/resources/entity-registry.yml | 6 + .../SpringStandardPluginConfiguration.java | 46 ++++ 10 files changed, 663 insertions(+), 4 deletions(-) create mode 100644 entity-registry/src/main/java/com/linkedin/metadata/aspect/validation/FieldPathValidator.java create mode 100644 entity-registry/src/test/java/com/linkedin/metadata/aspect/validators/FieldPathValidatorTest.java create mode 100644 metadata-io/src/main/java/com/linkedin/metadata/aspect/validation/ExecutionRequestResultValidator.java create mode 100644 metadata-io/src/test/java/com/linkedin/metadata/aspect/validation/ExecutionRequestResultValidatorTest.java diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index 00e020bd2a3875..dbcc7da8467035 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -24,6 +24,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe - #11484 - Metadata service authentication enabled by default - #11484 - Rest API authorization enabled by default - #10472 - `SANDBOX` added as a FabricType. No rollbacks allowed once metadata with this fabric type is added without manual cleanups in databases. +- #11619 - schema field/column paths can no longer be empty strings +- #11619 - schema field/column paths can no longer be duplicated within the schema ### Potential Downtime diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/validation/FieldPathValidator.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/validation/FieldPathValidator.java new file mode 100644 index 00000000000000..7c279254e1bc33 --- /dev/null +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/validation/FieldPathValidator.java @@ -0,0 +1,116 @@ +package com.linkedin.metadata.aspect.validation; + +import static com.linkedin.metadata.Constants.*; + +import com.linkedin.metadata.aspect.RetrieverContext; +import com.linkedin.metadata.aspect.batch.BatchItem; +import com.linkedin.metadata.aspect.batch.ChangeMCP; +import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; +import com.linkedin.metadata.aspect.plugins.validation.AspectPayloadValidator; +import com.linkedin.metadata.aspect.plugins.validation.AspectValidationException; +import com.linkedin.metadata.aspect.plugins.validation.ValidationExceptionCollection; +import com.linkedin.schema.EditableSchemaFieldInfo; +import com.linkedin.schema.EditableSchemaMetadata; +import com.linkedin.schema.SchemaField; +import com.linkedin.schema.SchemaMetadata; +import java.util.Collection; +import java.util.Optional; +import java.util.stream.Stream; +import javax.annotation.Nonnull; +import lombok.Getter; +import lombok.Setter; +import lombok.experimental.Accessors; + +/** + * 1. Validates the Schema Field Path specification, specifically that all field IDs must be unique + * across all fields within a schema. 2. Validates that the field path id is not empty. + * + * @see Field + * Path V2 docs + */ +@Setter +@Getter +@Accessors(chain = true) +public class FieldPathValidator extends AspectPayloadValidator { + @Nonnull private AspectPluginConfig config; + + /** Prevent any MCP for SchemaMetadata where field ids are duplicated. */ + @Override + protected Stream validateProposedAspects( + @Nonnull Collection mcpItems, + @Nonnull RetrieverContext retrieverContext) { + + ValidationExceptionCollection exceptions = ValidationExceptionCollection.newCollection(); + + mcpItems.forEach( + i -> { + if (i.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME)) { + processSchemaMetadataAspect(i, exceptions); + } else { + processEditableSchemaMetadataAspect(i, exceptions); + } + }); + + return exceptions.streamAllExceptions(); + } + + @Override + protected Stream validatePreCommitAspects( + @Nonnull Collection changeMCPs, @Nonnull RetrieverContext retrieverContext) { + return Stream.of(); + } + + private static void processEditableSchemaMetadataAspect( + BatchItem i, ValidationExceptionCollection exceptions) { + final EditableSchemaMetadata schemaMetadata = i.getAspect(EditableSchemaMetadata.class); + final long uniquePaths = + validateAndCount( + i, + schemaMetadata.getEditableSchemaFieldInfo().stream() + .map(EditableSchemaFieldInfo::getFieldPath), + exceptions); + + if (uniquePaths != schemaMetadata.getEditableSchemaFieldInfo().size()) { + exceptions.addException( + i, + String.format( + "Cannot perform %s action on proposal. EditableSchemaMetadata aspect has duplicated field paths", + i.getChangeType())); + } + } + + private static void processSchemaMetadataAspect( + BatchItem i, ValidationExceptionCollection exceptions) { + final SchemaMetadata schemaMetadata = i.getAspect(SchemaMetadata.class); + final long uniquePaths = + validateAndCount( + i, schemaMetadata.getFields().stream().map(SchemaField::getFieldPath), exceptions); + + if (uniquePaths != schemaMetadata.getFields().size()) { + exceptions.addException( + i, + String.format( + "Cannot perform %s action on proposal. SchemaMetadata aspect has duplicated field paths", + i.getChangeType())); + } + } + + private static long validateAndCount( + BatchItem i, Stream fieldPaths, ValidationExceptionCollection exceptions) { + return fieldPaths + .distinct() + // inspect the stream of fieldPath validation errors since we're already iterating + .peek( + fieldPath -> + validateFieldPath(fieldPath) + .ifPresent(message -> exceptions.addException(i, message))) + .count(); + } + + private static Optional validateFieldPath(String fieldPath) { + if (fieldPath == null || fieldPath.isEmpty()) { + return Optional.of("SchemaMetadata aspect has empty field path."); + } + return Optional.empty(); + } +} diff --git a/entity-registry/src/test/java/com/linkedin/metadata/aspect/validators/FieldPathValidatorTest.java b/entity-registry/src/test/java/com/linkedin/metadata/aspect/validators/FieldPathValidatorTest.java new file mode 100644 index 00000000000000..bd5912764edce3 --- /dev/null +++ b/entity-registry/src/test/java/com/linkedin/metadata/aspect/validators/FieldPathValidatorTest.java @@ -0,0 +1,233 @@ +package com.linkedin.metadata.aspect.validators; + +import static com.linkedin.metadata.Constants.*; +import static org.mockito.Mockito.*; +import static org.testng.Assert.*; + +import com.linkedin.common.urn.DatasetUrn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.aspect.AspectRetriever; +import com.linkedin.metadata.aspect.GraphRetriever; +import com.linkedin.metadata.aspect.RetrieverContext; +import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; +import com.linkedin.metadata.aspect.plugins.validation.AspectValidationException; +import com.linkedin.metadata.aspect.validation.CreateIfNotExistsValidator; +import com.linkedin.metadata.aspect.validation.FieldPathValidator; +import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.schema.EditableSchemaFieldInfo; +import com.linkedin.schema.EditableSchemaFieldInfoArray; +import com.linkedin.schema.EditableSchemaMetadata; +import com.linkedin.schema.SchemaField; +import com.linkedin.schema.SchemaFieldArray; +import com.linkedin.schema.SchemaFieldDataType; +import com.linkedin.schema.SchemaMetadata; +import com.linkedin.schema.StringType; +import com.linkedin.test.metadata.aspect.TestEntityRegistry; +import com.linkedin.test.metadata.aspect.batch.TestMCP; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +public class FieldPathValidatorTest { + + private static final AspectPluginConfig validatorConfig = + AspectPluginConfig.builder() + .supportedOperations( + Arrays.stream(ChangeType.values()) + .map(Objects::toString) + .collect(Collectors.toList())) + .className(CreateIfNotExistsValidator.class.getName()) + .supportedEntityAspectNames(List.of(AspectPluginConfig.EntityAspectName.ALL)) + .enabled(true) + .build(); + private EntityRegistry entityRegistry; + private RetrieverContext mockRetrieverContext; + private static final DatasetUrn TEST_DATASET_URN; + private final FieldPathValidator test = new FieldPathValidator().setConfig(validatorConfig); + + static { + try { + TEST_DATASET_URN = + DatasetUrn.createFromUrn( + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,test,PROD)")); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + @BeforeTest + public void init() { + entityRegistry = new TestEntityRegistry(); + AspectRetriever mockAspectRetriever = mock(AspectRetriever.class); + when(mockAspectRetriever.getEntityRegistry()).thenReturn(entityRegistry); + GraphRetriever mockGraphRetriever = mock(GraphRetriever.class); + mockRetrieverContext = mock(RetrieverContext.class); + when(mockRetrieverContext.getAspectRetriever()).thenReturn(mockAspectRetriever); + when(mockRetrieverContext.getGraphRetriever()).thenReturn(mockGraphRetriever); + } + + @Test + public void testValidateNonDuplicatedSchemaFieldPath() { + final SchemaMetadata schema = getMockSchemaMetadataAspect(false); + assertEquals( + test.validateProposed( + Set.of( + TestMCP.builder() + .changeType(ChangeType.UPSERT) + .urn(TEST_DATASET_URN) + .entitySpec(entityRegistry.getEntitySpec(TEST_DATASET_URN.getEntityType())) + .aspectSpec( + entityRegistry + .getEntitySpec(TEST_DATASET_URN.getEntityType()) + .getAspectSpec(SCHEMA_METADATA_ASPECT_NAME)) + .recordTemplate(schema) + .build()), + mockRetrieverContext) + .count(), + 0); + } + + @Test + public void testValidateDuplicatedSchemaFieldPath() { + final SchemaMetadata schema = getMockSchemaMetadataAspect(true); + + assertEquals( + test.validateProposed( + Set.of( + TestMCP.builder() + .changeType(ChangeType.UPSERT) + .urn(TEST_DATASET_URN) + .entitySpec(entityRegistry.getEntitySpec(TEST_DATASET_URN.getEntityType())) + .aspectSpec( + entityRegistry + .getEntitySpec(TEST_DATASET_URN.getEntityType()) + .getAspectSpec(SCHEMA_METADATA_ASPECT_NAME)) + .recordTemplate(schema) + .build()), + mockRetrieverContext) + .count(), + 1); + } + + @Test + public void testValidateNonDuplicatedEditableSchemaFieldPath() { + final EditableSchemaMetadata schema = getMockEditableSchemaMetadataAspect(false); + assertEquals( + test.validateProposed( + Set.of( + TestMCP.builder() + .changeType(ChangeType.UPSERT) + .urn(TEST_DATASET_URN) + .entitySpec(entityRegistry.getEntitySpec(TEST_DATASET_URN.getEntityType())) + .aspectSpec( + entityRegistry + .getEntitySpec(TEST_DATASET_URN.getEntityType()) + .getAspectSpec(EDITABLE_SCHEMA_METADATA_ASPECT_NAME)) + .recordTemplate(schema) + .build()), + mockRetrieverContext) + .count(), + 0); + } + + @Test + public void testValidateDuplicatedEditableSchemaFieldPath() { + final EditableSchemaMetadata schema = getMockEditableSchemaMetadataAspect(true); + + assertEquals( + test.validateProposed( + Set.of( + TestMCP.builder() + .changeType(ChangeType.UPSERT) + .urn(TEST_DATASET_URN) + .entitySpec(entityRegistry.getEntitySpec(TEST_DATASET_URN.getEntityType())) + .aspectSpec( + entityRegistry + .getEntitySpec(TEST_DATASET_URN.getEntityType()) + .getAspectSpec(EDITABLE_SCHEMA_METADATA_ASPECT_NAME)) + .recordTemplate(schema) + .build()), + mockRetrieverContext) + .count(), + 1); + } + + @Test + public void testEmptySchemaFieldPath() { + final SchemaMetadata schema = getMockSchemaMetadataAspect(false, ""); + TestMCP testItem = + TestMCP.builder() + .changeType(ChangeType.UPSERT) + .urn(TEST_DATASET_URN) + .entitySpec(entityRegistry.getEntitySpec(TEST_DATASET_URN.getEntityType())) + .aspectSpec( + entityRegistry + .getEntitySpec(TEST_DATASET_URN.getEntityType()) + .getAspectSpec(SCHEMA_METADATA_ASPECT_NAME)) + .recordTemplate(schema) + .build(); + + Set exceptions = + test.validateProposed(Set.of(testItem), mockRetrieverContext).collect(Collectors.toSet()); + + assertEquals( + exceptions, + Set.of( + AspectValidationException.forItem( + testItem, "SchemaMetadata aspect has empty field path."))); + } + + private static SchemaMetadata getMockSchemaMetadataAspect(boolean duplicateFields) { + return getMockSchemaMetadataAspect(duplicateFields, null); + } + + private static SchemaMetadata getMockSchemaMetadataAspect( + boolean duplicateFields, @Nullable String fieldPath) { + List fields = new ArrayList<>(); + fields.add( + new SchemaField() + .setType( + new SchemaFieldDataType() + .setType(SchemaFieldDataType.Type.create(new StringType()))) + .setNullable(false) + .setNativeDataType("string") + .setFieldPath(fieldPath == null ? "test" : fieldPath)); + + if (duplicateFields) { + fields.add( + new SchemaField() + .setType( + new SchemaFieldDataType() + .setType(SchemaFieldDataType.Type.create(new StringType()))) + .setNullable(false) + .setNativeDataType("string") + .setFieldPath(fieldPath == null ? "test" : fieldPath)); + } + + return new SchemaMetadata() + .setPlatform(TEST_DATASET_URN.getPlatformEntity()) + .setFields(new SchemaFieldArray(fields)); + } + + private static EditableSchemaMetadata getMockEditableSchemaMetadataAspect( + boolean duplicateFields) { + + List fields = new ArrayList<>(); + fields.add(new EditableSchemaFieldInfo().setFieldPath("test")); + + if (duplicateFields) { + fields.add(new EditableSchemaFieldInfo().setFieldPath("test")); + } + + return new EditableSchemaMetadata() + .setEditableSchemaFieldInfo(new EditableSchemaFieldInfoArray(fields)); + } +} diff --git a/li-utils/src/main/java/com/linkedin/metadata/Constants.java b/li-utils/src/main/java/com/linkedin/metadata/Constants.java index e085a5876a42b0..8961677b568788 100644 --- a/li-utils/src/main/java/com/linkedin/metadata/Constants.java +++ b/li-utils/src/main/java/com/linkedin/metadata/Constants.java @@ -319,6 +319,13 @@ public class Constants { public static final String EXECUTION_REQUEST_INPUT_ASPECT_NAME = "dataHubExecutionRequestInput"; public static final String EXECUTION_REQUEST_SIGNAL_ASPECT_NAME = "dataHubExecutionRequestSignal"; public static final String EXECUTION_REQUEST_RESULT_ASPECT_NAME = "dataHubExecutionRequestResult"; + public static final String EXECUTION_REQUEST_STATUS_RUNNING = "RUNNING"; + public static final String EXECUTION_REQUEST_STATUS_FAILURE = "FAILURE"; + public static final String EXECUTION_REQUEST_STATUS_SUCCESS = "SUCCESS"; + public static final String EXECUTION_REQUEST_STATUS_TIMEOUT = "TIMEOUT"; + public static final String EXECUTION_REQUEST_STATUS_CANCELLED = "CANCELLED"; + public static final String EXECUTION_REQUEST_STATUS_ABORTED = "ABORTED"; + public static final String EXECUTION_REQUEST_STATUS_DUPLICATE = "DUPLICATE"; // DataHub Access Token public static final String ACCESS_TOKEN_KEY_ASPECT_NAME = "dataHubAccessTokenKey"; diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py index 98629ba030695a..2b2f992249f1e8 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py @@ -1,6 +1,7 @@ import datetime import logging -import uuid +import random +import string from typing import Any, Dict, List, Optional from pydantic import Field, validator @@ -71,6 +72,15 @@ class FlagsConfig(ConfigModel): ) +def _generate_run_id(source_type: Optional[str] = None) -> str: + current_time = datetime.datetime.now().strftime("%Y_%m_%d-%H_%M_%S") + random_suffix = "".join(random.choices(string.ascii_lowercase + string.digits, k=6)) + + if source_type is None: + source_type = "ingestion" + return f"{source_type}-{current_time}-{random_suffix}" + + class PipelineConfig(ConfigModel): source: SourceConfig sink: Optional[DynamicTypedConfig] = None @@ -91,12 +101,11 @@ def run_id_should_be_semantic( cls, v: Optional[str], values: Dict[str, Any], **kwargs: Any ) -> str: if v == DEFAULT_RUN_ID: + source_type = None if "source" in values and hasattr(values["source"], "type"): source_type = values["source"].type - current_time = datetime.datetime.now().strftime("%Y_%m_%d-%H_%M_%S") - return f"{source_type}-{current_time}" - return str(uuid.uuid1()) # default run_id if we cannot infer a source type + return _generate_run_id(source_type) else: assert v is not None return v diff --git a/metadata-ingestion/src/datahub/testing/compare_metadata_json.py b/metadata-ingestion/src/datahub/testing/compare_metadata_json.py index 61b222f8d2dd50..155773f9898b47 100644 --- a/metadata-ingestion/src/datahub/testing/compare_metadata_json.py +++ b/metadata-ingestion/src/datahub/testing/compare_metadata_json.py @@ -27,6 +27,8 @@ r"root\[\d+\]\['aspect'\]\['json'\]\['lastUpdatedTimestamp'\]", r"root\[\d+\]\['aspect'\]\['json'\]\['created'\]", r"root\[\d+\]\['aspect'\]\['json'\]\['lastModified'\]", + r"root\[\d+\].*?\['systemMetadata'\]\['runId'\]", + r"root\[\d+\].*?\['systemMetadata'\]\['lastRunId'\]", ] @@ -82,6 +84,8 @@ def assert_metadata_files_equal( json_path = f"root[{i}]['aspect']['json'][{j}]['value']" ignore_paths = (*ignore_paths, re.escape(json_path)) + ignore_paths = (*ignore_paths, *default_exclude_paths) + diff = diff_metadata_json(output, golden, ignore_paths, ignore_order=ignore_order) if diff and update_golden: if isinstance(diff, MCPDiff) and diff.is_delta_valid: diff --git a/metadata-io/src/main/java/com/linkedin/metadata/aspect/validation/ExecutionRequestResultValidator.java b/metadata-io/src/main/java/com/linkedin/metadata/aspect/validation/ExecutionRequestResultValidator.java new file mode 100644 index 00000000000000..b77d3b48d5bd58 --- /dev/null +++ b/metadata-io/src/main/java/com/linkedin/metadata/aspect/validation/ExecutionRequestResultValidator.java @@ -0,0 +1,70 @@ +package com.linkedin.metadata.aspect.validation; + +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_ABORTED; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_CANCELLED; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_DUPLICATE; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_SUCCESS; + +import com.linkedin.execution.ExecutionRequestResult; +import com.linkedin.metadata.aspect.RetrieverContext; +import com.linkedin.metadata.aspect.batch.BatchItem; +import com.linkedin.metadata.aspect.batch.ChangeMCP; +import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; +import com.linkedin.metadata.aspect.plugins.validation.AspectPayloadValidator; +import com.linkedin.metadata.aspect.plugins.validation.AspectValidationException; +import java.util.Collection; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Stream; +import javax.annotation.Nonnull; +import lombok.Getter; +import lombok.Setter; +import lombok.experimental.Accessors; +import lombok.extern.slf4j.Slf4j; + +/** A Validator for StructuredProperties Aspect that is attached to entities like Datasets, etc. */ +@Setter +@Getter +@Slf4j +@Accessors(chain = true) +public class ExecutionRequestResultValidator extends AspectPayloadValidator { + private static final Set IMMUTABLE_STATUS = + Set.of( + EXECUTION_REQUEST_STATUS_ABORTED, + EXECUTION_REQUEST_STATUS_CANCELLED, + EXECUTION_REQUEST_STATUS_SUCCESS, + EXECUTION_REQUEST_STATUS_DUPLICATE); + + @Nonnull private AspectPluginConfig config; + + @Override + protected Stream validateProposedAspects( + @Nonnull Collection mcpItems, + @Nonnull RetrieverContext retrieverContext) { + return Stream.of(); + } + + @Override + protected Stream validatePreCommitAspects( + @Nonnull Collection changeMCPs, @Nonnull RetrieverContext retrieverContext) { + return changeMCPs.stream() + .filter(item -> item.getPreviousRecordTemplate() != null) + .map( + item -> { + ExecutionRequestResult existingResult = + item.getPreviousAspect(ExecutionRequestResult.class); + + if (IMMUTABLE_STATUS.contains(existingResult.getStatus())) { + ExecutionRequestResult currentResult = item.getAspect(ExecutionRequestResult.class); + return AspectValidationException.forItem( + item, + String.format( + "Invalid update to immutable state for aspect dataHubExecutionRequestResult. Execution urn: %s previous status: %s. Denied status update: %s", + item.getUrn(), existingResult.getStatus(), currentResult.getStatus())); + } + + return null; + }) + .filter(Objects::nonNull); + } +} diff --git a/metadata-io/src/test/java/com/linkedin/metadata/aspect/validation/ExecutionRequestResultValidatorTest.java b/metadata-io/src/test/java/com/linkedin/metadata/aspect/validation/ExecutionRequestResultValidatorTest.java new file mode 100644 index 00000000000000..f46772ca7b350d --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/aspect/validation/ExecutionRequestResultValidatorTest.java @@ -0,0 +1,166 @@ +package com.linkedin.metadata.aspect.validation; + +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_ENTITY_NAME; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_RESULT_ASPECT_NAME; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_ABORTED; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_CANCELLED; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_DUPLICATE; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_FAILURE; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_RUNNING; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_SUCCESS; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_TIMEOUT; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.execution.ExecutionRequestResult; +import com.linkedin.metadata.aspect.RetrieverContext; +import com.linkedin.metadata.aspect.SystemAspect; +import com.linkedin.metadata.aspect.batch.ChangeMCP; +import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; +import com.linkedin.metadata.aspect.plugins.validation.AspectValidationException; +import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl; +import com.linkedin.metadata.utils.AuditStampUtils; +import io.datahubproject.metadata.context.OperationContext; +import io.datahubproject.test.metadata.context.TestOperationContexts; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.testng.annotations.Test; + +public class ExecutionRequestResultValidatorTest { + private static final OperationContext TEST_CONTEXT = + TestOperationContexts.systemContextNoSearchAuthorization(); + private static final AspectPluginConfig TEST_PLUGIN_CONFIG = + AspectPluginConfig.builder() + .className(ExecutionRequestResultValidator.class.getName()) + .enabled(true) + .supportedOperations(List.of("UPSERT")) + .supportedEntityAspectNames( + List.of( + AspectPluginConfig.EntityAspectName.builder() + .entityName(EXECUTION_REQUEST_ENTITY_NAME) + .aspectName(EXECUTION_REQUEST_RESULT_ASPECT_NAME) + .build())) + .build(); + private static final Urn TEST_URN = UrnUtils.getUrn("urn:li:dataHubExecutionRequest:xyz"); + + @Test + public void testAllowed() { + ExecutionRequestResultValidator test = new ExecutionRequestResultValidator(); + test.setConfig(TEST_PLUGIN_CONFIG); + + Set allowedUpdateStates = + Set.of( + EXECUTION_REQUEST_STATUS_RUNNING, + EXECUTION_REQUEST_STATUS_FAILURE, + EXECUTION_REQUEST_STATUS_TIMEOUT); + Set destinationStates = new HashSet<>(allowedUpdateStates); + destinationStates.addAll( + Set.of( + EXECUTION_REQUEST_STATUS_ABORTED, + EXECUTION_REQUEST_STATUS_CANCELLED, + EXECUTION_REQUEST_STATUS_SUCCESS, + EXECUTION_REQUEST_STATUS_DUPLICATE)); + + List testItems = + new ArrayList<>( + // Tests with previous state + allowedUpdateStates.stream() + .flatMap( + prevState -> + destinationStates.stream() + .map( + destState -> { + SystemAspect prevData = mock(SystemAspect.class); + when(prevData.getRecordTemplate()) + .thenReturn( + new ExecutionRequestResult().setStatus(prevState)); + return ChangeItemImpl.builder() + .changeType(ChangeType.UPSERT) + .urn(TEST_URN) + .aspectName(EXECUTION_REQUEST_RESULT_ASPECT_NAME) + .recordTemplate( + new ExecutionRequestResult().setStatus(destState)) + .previousSystemAspect(prevData) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(TEST_CONTEXT.getAspectRetriever()); + })) + .toList()); + // Tests with no previous + testItems.addAll( + destinationStates.stream() + .map( + destState -> + ChangeItemImpl.builder() + .changeType(ChangeType.UPSERT) + .urn(TEST_URN) + .aspectName(EXECUTION_REQUEST_RESULT_ASPECT_NAME) + .recordTemplate(new ExecutionRequestResult().setStatus(destState)) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(TEST_CONTEXT.getAspectRetriever())) + .toList()); + + List result = + test.validatePreCommitAspects(testItems, mock(RetrieverContext.class)).toList(); + + assertTrue(result.isEmpty(), "Did not expect any validation errors."); + } + + @Test + public void testDenied() { + ExecutionRequestResultValidator test = new ExecutionRequestResultValidator(); + test.setConfig(TEST_PLUGIN_CONFIG); + + Set deniedUpdateStates = + Set.of( + EXECUTION_REQUEST_STATUS_ABORTED, + EXECUTION_REQUEST_STATUS_CANCELLED, + EXECUTION_REQUEST_STATUS_SUCCESS, + EXECUTION_REQUEST_STATUS_DUPLICATE); + Set destinationStates = new HashSet<>(deniedUpdateStates); + destinationStates.addAll( + Set.of( + EXECUTION_REQUEST_STATUS_RUNNING, + EXECUTION_REQUEST_STATUS_FAILURE, + EXECUTION_REQUEST_STATUS_TIMEOUT)); + + List testItems = + new ArrayList<>( + // Tests with previous state + deniedUpdateStates.stream() + .flatMap( + prevState -> + destinationStates.stream() + .map( + destState -> { + SystemAspect prevData = mock(SystemAspect.class); + when(prevData.getRecordTemplate()) + .thenReturn( + new ExecutionRequestResult().setStatus(prevState)); + return ChangeItemImpl.builder() + .changeType(ChangeType.UPSERT) + .urn(TEST_URN) + .aspectName(EXECUTION_REQUEST_RESULT_ASPECT_NAME) + .recordTemplate( + new ExecutionRequestResult().setStatus(destState)) + .previousSystemAspect(prevData) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(TEST_CONTEXT.getAspectRetriever()); + })) + .toList()); + + List result = + test.validatePreCommitAspects(testItems, mock(RetrieverContext.class)).toList(); + + assertEquals( + result.size(), + deniedUpdateStates.size() * destinationStates.size(), + "Expected ALL items to be denied."); + } +} diff --git a/metadata-models/src/main/resources/entity-registry.yml b/metadata-models/src/main/resources/entity-registry.yml index 9b692b51dc2b5f..ec9c3fee1c404c 100644 --- a/metadata-models/src/main/resources/entity-registry.yml +++ b/metadata-models/src/main/resources/entity-registry.yml @@ -673,6 +673,12 @@ plugins: supportedEntityAspectNames: - entityName: '*' aspectName: '*' + - className: 'com.linkedin.metadata.aspect.plugins.validation.AspectPayloadValidator' + enabled: true + spring: + enabled: true + packageScan: + - com.linkedin.gms.factory.plugins mcpSideEffects: - className: 'com.linkedin.metadata.structuredproperties.hooks.PropertyDefinitionDeleteSideEffect' packageScan: diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java index 4a2095685abe1f..943b1c7184a60d 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java @@ -1,5 +1,8 @@ package com.linkedin.gms.factory.plugins; +import static com.linkedin.metadata.Constants.EDITABLE_SCHEMA_METADATA_ASPECT_NAME; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_ENTITY_NAME; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_RESULT_ASPECT_NAME; import static com.linkedin.metadata.Constants.SCHEMA_METADATA_ASPECT_NAME; import com.linkedin.metadata.Constants; @@ -7,6 +10,9 @@ import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; import com.linkedin.metadata.aspect.plugins.hooks.MCPSideEffect; import com.linkedin.metadata.aspect.plugins.hooks.MutationHook; +import com.linkedin.metadata.aspect.plugins.validation.AspectPayloadValidator; +import com.linkedin.metadata.aspect.validation.ExecutionRequestResultValidator; +import com.linkedin.metadata.aspect.validation.FieldPathValidator; import com.linkedin.metadata.dataproducts.sideeffects.DataProductUnsetSideEffect; import com.linkedin.metadata.schemafields.sideeffects.SchemaFieldSideEffect; import com.linkedin.metadata.timeline.eventgenerator.EntityChangeEventGeneratorRegistry; @@ -21,6 +27,7 @@ @Configuration @Slf4j public class SpringStandardPluginConfiguration { + private static final String ALL = "*"; @Value("${metadataChangeProposal.validation.ignoreUnknown}") private boolean ignoreUnknownEnabled; @@ -104,4 +111,43 @@ public MCPSideEffect dataProductUnsetSideEffect() { log.info("Initialized {}", SchemaFieldSideEffect.class.getName()); return new DataProductUnsetSideEffect().setConfig(config); } + + @Bean + public AspectPayloadValidator fieldPathValidator() { + return new FieldPathValidator() + .setConfig( + AspectPluginConfig.builder() + .className(FieldPathValidator.class.getName()) + .enabled(true) + .supportedOperations( + List.of("CREATE", "CREATE_ENTITY", "UPSERT", "UPDATE", "RESTATE")) + .supportedEntityAspectNames( + List.of( + AspectPluginConfig.EntityAspectName.builder() + .entityName(ALL) + .aspectName(SCHEMA_METADATA_ASPECT_NAME) + .build(), + AspectPluginConfig.EntityAspectName.builder() + .entityName(ALL) + .aspectName(EDITABLE_SCHEMA_METADATA_ASPECT_NAME) + .build())) + .build()); + } + + @Bean + public AspectPayloadValidator dataHubExecutionRequestResultValidator() { + return new ExecutionRequestResultValidator() + .setConfig( + AspectPluginConfig.builder() + .className(ExecutionRequestResultValidator.class.getName()) + .enabled(true) + .supportedOperations(List.of("UPSERT", "UPDATE")) + .supportedEntityAspectNames( + List.of( + AspectPluginConfig.EntityAspectName.builder() + .entityName(EXECUTION_REQUEST_ENTITY_NAME) + .aspectName(EXECUTION_REQUEST_RESULT_ASPECT_NAME) + .build())) + .build()); + } }