From 6f0313ed44540f19d9e234026c9dbebc12c40b1e Mon Sep 17 00:00:00 2001 From: David Leifker Date: Mon, 14 Oct 2024 17:38:45 -0500 Subject: [PATCH] feat(validation): Ingest and schema validator --- docs/how/updating-datahub.md | 2 + docs/managed-datahub/release-notes/v_0_3_6.md | 2 + .../aspect/validation/FieldPathValidator.java | 105 +++++------ .../validators/FieldPathValidatorTest.java | 133 +++++++------- .../java/com/linkedin/metadata/Constants.java | 7 + .../datahub/ingestion/run/pipeline_config.py | 2 +- .../ExecutionRequestResultValidator.java | 70 ++++++++ .../ExecutionRequestResultValidatorTest.java | 166 ++++++++++++++++++ .../src/main/resources/entity-registry.yml | 19 +- .../SpringStandardPluginConfiguration.java | 46 +++++ 10 files changed, 415 insertions(+), 137 deletions(-) create mode 100644 metadata-io/src/main/java/com/linkedin/metadata/aspect/validation/ExecutionRequestResultValidator.java create mode 100644 metadata-io/src/test/java/com/linkedin/metadata/aspect/validation/ExecutionRequestResultValidatorTest.java diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index 00e020bd2a3875..dbcc7da8467035 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -24,6 +24,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe - #11484 - Metadata service authentication enabled by default - #11484 - Rest API authorization enabled by default - #10472 - `SANDBOX` added as a FabricType. No rollbacks allowed once metadata with this fabric type is added without manual cleanups in databases. +- #11619 - schema field/column paths can no longer be empty strings +- #11619 - schema field/column paths can no longer be duplicated within the schema ### Potential Downtime diff --git a/docs/managed-datahub/release-notes/v_0_3_6.md b/docs/managed-datahub/release-notes/v_0_3_6.md index cf1c9ff4e8c6a4..f328140ab9ac05 100644 --- a/docs/managed-datahub/release-notes/v_0_3_6.md +++ b/docs/managed-datahub/release-notes/v_0_3_6.md @@ -21,6 +21,8 @@ If you are using an older CLI/SDK version, then please upgrade it. This applies - Automatic downgrade to a previous release is not supported. Please reach out to support if required. - Metadata tests that are created/edited will not automatically run on a small batch (less than 10,000 assets) immediately anymore, but instead will rely on the scheduled run. Scheduled runs and metadata tests running on asset changes are unimpacted. This bug is fixed in 0.3.6.1. - Rest API Authorization enabled by default + - #11619 - schema field/column paths can no longer be empty strings + - #11619 - schema field/column paths can no longer be duplicated within the schema - Bug Fixes - No longer push notifications for soft-deleted entities diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/validation/FieldPathValidator.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/validation/FieldPathValidator.java index 47603504dd8a06..7c279254e1bc33 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/validation/FieldPathValidator.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/validation/FieldPathValidator.java @@ -2,19 +2,19 @@ import static com.linkedin.metadata.Constants.*; -import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.aspect.RetrieverContext; import com.linkedin.metadata.aspect.batch.BatchItem; import com.linkedin.metadata.aspect.batch.ChangeMCP; import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; import com.linkedin.metadata.aspect.plugins.validation.AspectPayloadValidator; import com.linkedin.metadata.aspect.plugins.validation.AspectValidationException; +import com.linkedin.metadata.aspect.plugins.validation.ValidationExceptionCollection; import com.linkedin.schema.EditableSchemaFieldInfo; import com.linkedin.schema.EditableSchemaMetadata; import com.linkedin.schema.SchemaField; import com.linkedin.schema.SchemaMetadata; import java.util.Collection; -import java.util.Objects; +import java.util.Optional; import java.util.stream.Stream; import javax.annotation.Nonnull; import lombok.Getter; @@ -22,8 +22,8 @@ import lombok.experimental.Accessors; /** - * Validates the Schema Field Path specification, specifically that all field IDs must be unique - * across all fields within a schema. + * 1. Validates the Schema Field Path specification, specifically that all field IDs must be unique + * across all fields within a schema. 2. Validates that the field path id is not empty. * * @see Field * Path V2 docs @@ -34,82 +34,83 @@ public class FieldPathValidator extends AspectPayloadValidator { @Nonnull private AspectPluginConfig config; - /** - * Prevent any MCP for SchemaMetadata where field ids are duplicated (except for MCPs with {@link - * ChangeType#DELETE} and {@link ChangeType#PATCH}, the latter gets handled pre-commit to the DB). - */ + /** Prevent any MCP for SchemaMetadata where field ids are duplicated. */ @Override protected Stream validateProposedAspects( @Nonnull Collection mcpItems, @Nonnull RetrieverContext retrieverContext) { - return mcpItems.stream() - .filter( - i -> - !ChangeType.DELETE.equals(i.getChangeType()) - && !ChangeType.PATCH.equals(i.getChangeType())) - .filter( - i -> - i.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME) - || i.getAspectName().equals(EDITABLE_SCHEMA_METADATA_ASPECT_NAME)) - .map( - i -> { - if (i.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME)) { - return processSchemaMetadataAspect(i); - } else { - return processEditableSchemaMetadataAspect(i); - } - }) - .filter(Objects::nonNull); + + ValidationExceptionCollection exceptions = ValidationExceptionCollection.newCollection(); + + mcpItems.forEach( + i -> { + if (i.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME)) { + processSchemaMetadataAspect(i, exceptions); + } else { + processEditableSchemaMetadataAspect(i, exceptions); + } + }); + + return exceptions.streamAllExceptions(); } @Override protected Stream validatePreCommitAspects( @Nonnull Collection changeMCPs, @Nonnull RetrieverContext retrieverContext) { - return changeMCPs.stream() - .filter(i -> ChangeType.PATCH.equals(i.getChangeType())) - .filter( - i -> - i.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME) - || i.getAspectName().equals(EDITABLE_SCHEMA_METADATA_ASPECT_NAME)) - .map( - i -> { - if (i.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME)) { - return processSchemaMetadataAspect(i); - } else { - return processEditableSchemaMetadataAspect(i); - } - }) - .filter(Objects::nonNull); + return Stream.of(); } - private static AspectValidationException processEditableSchemaMetadataAspect(BatchItem i) { + private static void processEditableSchemaMetadataAspect( + BatchItem i, ValidationExceptionCollection exceptions) { final EditableSchemaMetadata schemaMetadata = i.getAspect(EditableSchemaMetadata.class); final long uniquePaths = - schemaMetadata.getEditableSchemaFieldInfo().stream() - .map(EditableSchemaFieldInfo::getFieldPath) - .distinct() - .count(); + validateAndCount( + i, + schemaMetadata.getEditableSchemaFieldInfo().stream() + .map(EditableSchemaFieldInfo::getFieldPath), + exceptions); + if (uniquePaths != schemaMetadata.getEditableSchemaFieldInfo().size()) { - return AspectValidationException.forItem( + exceptions.addException( i, String.format( "Cannot perform %s action on proposal. EditableSchemaMetadata aspect has duplicated field paths", i.getChangeType())); } - return null; } - private static AspectValidationException processSchemaMetadataAspect(BatchItem i) { + private static void processSchemaMetadataAspect( + BatchItem i, ValidationExceptionCollection exceptions) { final SchemaMetadata schemaMetadata = i.getAspect(SchemaMetadata.class); final long uniquePaths = - schemaMetadata.getFields().stream().map(SchemaField::getFieldPath).distinct().count(); + validateAndCount( + i, schemaMetadata.getFields().stream().map(SchemaField::getFieldPath), exceptions); + if (uniquePaths != schemaMetadata.getFields().size()) { - return AspectValidationException.forItem( + exceptions.addException( i, String.format( "Cannot perform %s action on proposal. SchemaMetadata aspect has duplicated field paths", i.getChangeType())); } - return null; + } + + private static long validateAndCount( + BatchItem i, Stream fieldPaths, ValidationExceptionCollection exceptions) { + return fieldPaths + .distinct() + // inspect the stream of fieldPath validation errors since we're already iterating + .peek( + fieldPath -> + validateFieldPath(fieldPath) + .ifPresent(message -> exceptions.addException(i, message))) + .count(); + } + + private static Optional validateFieldPath(String fieldPath) { + if (fieldPath == null || fieldPath.isEmpty()) { + return Optional.of("SchemaMetadata aspect has empty field path."); + } + return Optional.empty(); } } diff --git a/entity-registry/src/test/java/com/linkedin/metadata/aspect/validators/FieldPathValidatorTest.java b/entity-registry/src/test/java/com/linkedin/metadata/aspect/validators/FieldPathValidatorTest.java index 1b2b40b2daddc8..bd5912764edce3 100644 --- a/entity-registry/src/test/java/com/linkedin/metadata/aspect/validators/FieldPathValidatorTest.java +++ b/entity-registry/src/test/java/com/linkedin/metadata/aspect/validators/FieldPathValidatorTest.java @@ -4,16 +4,14 @@ import static org.mockito.Mockito.*; import static org.testng.Assert.*; -import com.google.common.collect.ImmutableList; -import com.linkedin.common.UrnArray; import com.linkedin.common.urn.DatasetUrn; import com.linkedin.common.urn.UrnUtils; -import com.linkedin.domain.Domains; import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.aspect.AspectRetriever; import com.linkedin.metadata.aspect.GraphRetriever; import com.linkedin.metadata.aspect.RetrieverContext; import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; +import com.linkedin.metadata.aspect.plugins.validation.AspectValidationException; import com.linkedin.metadata.aspect.validation.CreateIfNotExistsValidator; import com.linkedin.metadata.aspect.validation.FieldPathValidator; import com.linkedin.metadata.models.registry.EntityRegistry; @@ -34,6 +32,7 @@ import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; +import javax.annotation.Nullable; import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; @@ -51,15 +50,21 @@ public class FieldPathValidatorTest { .build(); private EntityRegistry entityRegistry; private RetrieverContext mockRetrieverContext; - private DatasetUrn testDatasetUrn; + private static final DatasetUrn TEST_DATASET_URN; private final FieldPathValidator test = new FieldPathValidator().setConfig(validatorConfig); - @BeforeTest - public void init() throws URISyntaxException { - testDatasetUrn = - DatasetUrn.createFromUrn( - UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,test,PROD)")); + static { + try { + TEST_DATASET_URN = + DatasetUrn.createFromUrn( + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,test,PROD)")); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + @BeforeTest + public void init() { entityRegistry = new TestEntityRegistry(); AspectRetriever mockAspectRetriever = mock(AspectRetriever.class); when(mockAspectRetriever.getEntityRegistry()).thenReturn(entityRegistry); @@ -69,29 +74,6 @@ public void init() throws URISyntaxException { when(mockRetrieverContext.getGraphRetriever()).thenReturn(mockGraphRetriever); } - @Test - public void testValidateIncorrectAspect() { - final Domains domains = - new Domains() - .setDomains(new UrnArray(ImmutableList.of(UrnUtils.getUrn("urn:li:domain:123")))); - assertEquals( - test.validateProposed( - Set.of( - TestMCP.builder() - .changeType(ChangeType.UPSERT) - .urn(testDatasetUrn) - .entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType())) - .aspectSpec( - entityRegistry - .getEntitySpec(testDatasetUrn.getEntityType()) - .getAspectSpec(DOMAINS_ASPECT_NAME)) - .recordTemplate(domains) - .build()), - mockRetrieverContext) - .count(), - 0); - } - @Test public void testValidateNonDuplicatedSchemaFieldPath() { final SchemaMetadata schema = getMockSchemaMetadataAspect(false); @@ -100,11 +82,11 @@ public void testValidateNonDuplicatedSchemaFieldPath() { Set.of( TestMCP.builder() .changeType(ChangeType.UPSERT) - .urn(testDatasetUrn) - .entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType())) + .urn(TEST_DATASET_URN) + .entitySpec(entityRegistry.getEntitySpec(TEST_DATASET_URN.getEntityType())) .aspectSpec( entityRegistry - .getEntitySpec(testDatasetUrn.getEntityType()) + .getEntitySpec(TEST_DATASET_URN.getEntityType()) .getAspectSpec(SCHEMA_METADATA_ASPECT_NAME)) .recordTemplate(schema) .build()), @@ -122,11 +104,11 @@ public void testValidateDuplicatedSchemaFieldPath() { Set.of( TestMCP.builder() .changeType(ChangeType.UPSERT) - .urn(testDatasetUrn) - .entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType())) + .urn(TEST_DATASET_URN) + .entitySpec(entityRegistry.getEntitySpec(TEST_DATASET_URN.getEntityType())) .aspectSpec( entityRegistry - .getEntitySpec(testDatasetUrn.getEntityType()) + .getEntitySpec(TEST_DATASET_URN.getEntityType()) .getAspectSpec(SCHEMA_METADATA_ASPECT_NAME)) .recordTemplate(schema) .build()), @@ -135,28 +117,6 @@ public void testValidateDuplicatedSchemaFieldPath() { 1); } - @Test - public void testValidateDeleteDuplicatedSchemaFieldPath() { - final SchemaMetadata schema = getMockSchemaMetadataAspect(true); - - assertEquals( - test.validateProposed( - Set.of( - TestMCP.builder() - .changeType(ChangeType.DELETE) - .urn(testDatasetUrn) - .entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType())) - .aspectSpec( - entityRegistry - .getEntitySpec(testDatasetUrn.getEntityType()) - .getAspectSpec(SCHEMA_METADATA_ASPECT_NAME)) - .recordTemplate(schema) - .build()), - mockRetrieverContext) - .count(), - 0); - } - @Test public void testValidateNonDuplicatedEditableSchemaFieldPath() { final EditableSchemaMetadata schema = getMockEditableSchemaMetadataAspect(false); @@ -165,11 +125,11 @@ public void testValidateNonDuplicatedEditableSchemaFieldPath() { Set.of( TestMCP.builder() .changeType(ChangeType.UPSERT) - .urn(testDatasetUrn) - .entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType())) + .urn(TEST_DATASET_URN) + .entitySpec(entityRegistry.getEntitySpec(TEST_DATASET_URN.getEntityType())) .aspectSpec( entityRegistry - .getEntitySpec(testDatasetUrn.getEntityType()) + .getEntitySpec(TEST_DATASET_URN.getEntityType()) .getAspectSpec(EDITABLE_SCHEMA_METADATA_ASPECT_NAME)) .recordTemplate(schema) .build()), @@ -187,11 +147,11 @@ public void testValidateDuplicatedEditableSchemaFieldPath() { Set.of( TestMCP.builder() .changeType(ChangeType.UPSERT) - .urn(testDatasetUrn) - .entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType())) + .urn(TEST_DATASET_URN) + .entitySpec(entityRegistry.getEntitySpec(TEST_DATASET_URN.getEntityType())) .aspectSpec( entityRegistry - .getEntitySpec(testDatasetUrn.getEntityType()) + .getEntitySpec(TEST_DATASET_URN.getEntityType()) .getAspectSpec(EDITABLE_SCHEMA_METADATA_ASPECT_NAME)) .recordTemplate(schema) .build()), @@ -200,7 +160,37 @@ public void testValidateDuplicatedEditableSchemaFieldPath() { 1); } - private SchemaMetadata getMockSchemaMetadataAspect(boolean duplicateFields) { + @Test + public void testEmptySchemaFieldPath() { + final SchemaMetadata schema = getMockSchemaMetadataAspect(false, ""); + TestMCP testItem = + TestMCP.builder() + .changeType(ChangeType.UPSERT) + .urn(TEST_DATASET_URN) + .entitySpec(entityRegistry.getEntitySpec(TEST_DATASET_URN.getEntityType())) + .aspectSpec( + entityRegistry + .getEntitySpec(TEST_DATASET_URN.getEntityType()) + .getAspectSpec(SCHEMA_METADATA_ASPECT_NAME)) + .recordTemplate(schema) + .build(); + + Set exceptions = + test.validateProposed(Set.of(testItem), mockRetrieverContext).collect(Collectors.toSet()); + + assertEquals( + exceptions, + Set.of( + AspectValidationException.forItem( + testItem, "SchemaMetadata aspect has empty field path."))); + } + + private static SchemaMetadata getMockSchemaMetadataAspect(boolean duplicateFields) { + return getMockSchemaMetadataAspect(duplicateFields, null); + } + + private static SchemaMetadata getMockSchemaMetadataAspect( + boolean duplicateFields, @Nullable String fieldPath) { List fields = new ArrayList<>(); fields.add( new SchemaField() @@ -209,7 +199,7 @@ private SchemaMetadata getMockSchemaMetadataAspect(boolean duplicateFields) { .setType(SchemaFieldDataType.Type.create(new StringType()))) .setNullable(false) .setNativeDataType("string") - .setFieldPath("test")); + .setFieldPath(fieldPath == null ? "test" : fieldPath)); if (duplicateFields) { fields.add( @@ -219,15 +209,16 @@ private SchemaMetadata getMockSchemaMetadataAspect(boolean duplicateFields) { .setType(SchemaFieldDataType.Type.create(new StringType()))) .setNullable(false) .setNativeDataType("string") - .setFieldPath("test")); + .setFieldPath(fieldPath == null ? "test" : fieldPath)); } return new SchemaMetadata() - .setPlatform(testDatasetUrn.getPlatformEntity()) + .setPlatform(TEST_DATASET_URN.getPlatformEntity()) .setFields(new SchemaFieldArray(fields)); } - private EditableSchemaMetadata getMockEditableSchemaMetadataAspect(boolean duplicateFields) { + private static EditableSchemaMetadata getMockEditableSchemaMetadataAspect( + boolean duplicateFields) { List fields = new ArrayList<>(); fields.add(new EditableSchemaFieldInfo().setFieldPath("test")); diff --git a/li-utils/src/main/java/com/linkedin/metadata/Constants.java b/li-utils/src/main/java/com/linkedin/metadata/Constants.java index e085a5876a42b0..8961677b568788 100644 --- a/li-utils/src/main/java/com/linkedin/metadata/Constants.java +++ b/li-utils/src/main/java/com/linkedin/metadata/Constants.java @@ -319,6 +319,13 @@ public class Constants { public static final String EXECUTION_REQUEST_INPUT_ASPECT_NAME = "dataHubExecutionRequestInput"; public static final String EXECUTION_REQUEST_SIGNAL_ASPECT_NAME = "dataHubExecutionRequestSignal"; public static final String EXECUTION_REQUEST_RESULT_ASPECT_NAME = "dataHubExecutionRequestResult"; + public static final String EXECUTION_REQUEST_STATUS_RUNNING = "RUNNING"; + public static final String EXECUTION_REQUEST_STATUS_FAILURE = "FAILURE"; + public static final String EXECUTION_REQUEST_STATUS_SUCCESS = "SUCCESS"; + public static final String EXECUTION_REQUEST_STATUS_TIMEOUT = "TIMEOUT"; + public static final String EXECUTION_REQUEST_STATUS_CANCELLED = "CANCELLED"; + public static final String EXECUTION_REQUEST_STATUS_ABORTED = "ABORTED"; + public static final String EXECUTION_REQUEST_STATUS_DUPLICATE = "DUPLICATE"; // DataHub Access Token public static final String ACCESS_TOKEN_KEY_ASPECT_NAME = "dataHubAccessTokenKey"; diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py index 98629ba030695a..696f26e8ad806b 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py @@ -94,7 +94,7 @@ def run_id_should_be_semantic( if "source" in values and hasattr(values["source"], "type"): source_type = values["source"].type current_time = datetime.datetime.now().strftime("%Y_%m_%d-%H_%M_%S") - return f"{source_type}-{current_time}" + return f"{source_type}-{current_time}-{uuid.uuid4().hex[:7]}" # uuid to allow multiple started at the same second return str(uuid.uuid1()) # default run_id if we cannot infer a source type else: diff --git a/metadata-io/src/main/java/com/linkedin/metadata/aspect/validation/ExecutionRequestResultValidator.java b/metadata-io/src/main/java/com/linkedin/metadata/aspect/validation/ExecutionRequestResultValidator.java new file mode 100644 index 00000000000000..b77d3b48d5bd58 --- /dev/null +++ b/metadata-io/src/main/java/com/linkedin/metadata/aspect/validation/ExecutionRequestResultValidator.java @@ -0,0 +1,70 @@ +package com.linkedin.metadata.aspect.validation; + +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_ABORTED; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_CANCELLED; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_DUPLICATE; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_SUCCESS; + +import com.linkedin.execution.ExecutionRequestResult; +import com.linkedin.metadata.aspect.RetrieverContext; +import com.linkedin.metadata.aspect.batch.BatchItem; +import com.linkedin.metadata.aspect.batch.ChangeMCP; +import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; +import com.linkedin.metadata.aspect.plugins.validation.AspectPayloadValidator; +import com.linkedin.metadata.aspect.plugins.validation.AspectValidationException; +import java.util.Collection; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Stream; +import javax.annotation.Nonnull; +import lombok.Getter; +import lombok.Setter; +import lombok.experimental.Accessors; +import lombok.extern.slf4j.Slf4j; + +/** A Validator for StructuredProperties Aspect that is attached to entities like Datasets, etc. */ +@Setter +@Getter +@Slf4j +@Accessors(chain = true) +public class ExecutionRequestResultValidator extends AspectPayloadValidator { + private static final Set IMMUTABLE_STATUS = + Set.of( + EXECUTION_REQUEST_STATUS_ABORTED, + EXECUTION_REQUEST_STATUS_CANCELLED, + EXECUTION_REQUEST_STATUS_SUCCESS, + EXECUTION_REQUEST_STATUS_DUPLICATE); + + @Nonnull private AspectPluginConfig config; + + @Override + protected Stream validateProposedAspects( + @Nonnull Collection mcpItems, + @Nonnull RetrieverContext retrieverContext) { + return Stream.of(); + } + + @Override + protected Stream validatePreCommitAspects( + @Nonnull Collection changeMCPs, @Nonnull RetrieverContext retrieverContext) { + return changeMCPs.stream() + .filter(item -> item.getPreviousRecordTemplate() != null) + .map( + item -> { + ExecutionRequestResult existingResult = + item.getPreviousAspect(ExecutionRequestResult.class); + + if (IMMUTABLE_STATUS.contains(existingResult.getStatus())) { + ExecutionRequestResult currentResult = item.getAspect(ExecutionRequestResult.class); + return AspectValidationException.forItem( + item, + String.format( + "Invalid update to immutable state for aspect dataHubExecutionRequestResult. Execution urn: %s previous status: %s. Denied status update: %s", + item.getUrn(), existingResult.getStatus(), currentResult.getStatus())); + } + + return null; + }) + .filter(Objects::nonNull); + } +} diff --git a/metadata-io/src/test/java/com/linkedin/metadata/aspect/validation/ExecutionRequestResultValidatorTest.java b/metadata-io/src/test/java/com/linkedin/metadata/aspect/validation/ExecutionRequestResultValidatorTest.java new file mode 100644 index 00000000000000..f46772ca7b350d --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/aspect/validation/ExecutionRequestResultValidatorTest.java @@ -0,0 +1,166 @@ +package com.linkedin.metadata.aspect.validation; + +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_ENTITY_NAME; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_RESULT_ASPECT_NAME; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_ABORTED; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_CANCELLED; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_DUPLICATE; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_FAILURE; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_RUNNING; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_SUCCESS; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_TIMEOUT; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.execution.ExecutionRequestResult; +import com.linkedin.metadata.aspect.RetrieverContext; +import com.linkedin.metadata.aspect.SystemAspect; +import com.linkedin.metadata.aspect.batch.ChangeMCP; +import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; +import com.linkedin.metadata.aspect.plugins.validation.AspectValidationException; +import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl; +import com.linkedin.metadata.utils.AuditStampUtils; +import io.datahubproject.metadata.context.OperationContext; +import io.datahubproject.test.metadata.context.TestOperationContexts; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.testng.annotations.Test; + +public class ExecutionRequestResultValidatorTest { + private static final OperationContext TEST_CONTEXT = + TestOperationContexts.systemContextNoSearchAuthorization(); + private static final AspectPluginConfig TEST_PLUGIN_CONFIG = + AspectPluginConfig.builder() + .className(ExecutionRequestResultValidator.class.getName()) + .enabled(true) + .supportedOperations(List.of("UPSERT")) + .supportedEntityAspectNames( + List.of( + AspectPluginConfig.EntityAspectName.builder() + .entityName(EXECUTION_REQUEST_ENTITY_NAME) + .aspectName(EXECUTION_REQUEST_RESULT_ASPECT_NAME) + .build())) + .build(); + private static final Urn TEST_URN = UrnUtils.getUrn("urn:li:dataHubExecutionRequest:xyz"); + + @Test + public void testAllowed() { + ExecutionRequestResultValidator test = new ExecutionRequestResultValidator(); + test.setConfig(TEST_PLUGIN_CONFIG); + + Set allowedUpdateStates = + Set.of( + EXECUTION_REQUEST_STATUS_RUNNING, + EXECUTION_REQUEST_STATUS_FAILURE, + EXECUTION_REQUEST_STATUS_TIMEOUT); + Set destinationStates = new HashSet<>(allowedUpdateStates); + destinationStates.addAll( + Set.of( + EXECUTION_REQUEST_STATUS_ABORTED, + EXECUTION_REQUEST_STATUS_CANCELLED, + EXECUTION_REQUEST_STATUS_SUCCESS, + EXECUTION_REQUEST_STATUS_DUPLICATE)); + + List testItems = + new ArrayList<>( + // Tests with previous state + allowedUpdateStates.stream() + .flatMap( + prevState -> + destinationStates.stream() + .map( + destState -> { + SystemAspect prevData = mock(SystemAspect.class); + when(prevData.getRecordTemplate()) + .thenReturn( + new ExecutionRequestResult().setStatus(prevState)); + return ChangeItemImpl.builder() + .changeType(ChangeType.UPSERT) + .urn(TEST_URN) + .aspectName(EXECUTION_REQUEST_RESULT_ASPECT_NAME) + .recordTemplate( + new ExecutionRequestResult().setStatus(destState)) + .previousSystemAspect(prevData) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(TEST_CONTEXT.getAspectRetriever()); + })) + .toList()); + // Tests with no previous + testItems.addAll( + destinationStates.stream() + .map( + destState -> + ChangeItemImpl.builder() + .changeType(ChangeType.UPSERT) + .urn(TEST_URN) + .aspectName(EXECUTION_REQUEST_RESULT_ASPECT_NAME) + .recordTemplate(new ExecutionRequestResult().setStatus(destState)) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(TEST_CONTEXT.getAspectRetriever())) + .toList()); + + List result = + test.validatePreCommitAspects(testItems, mock(RetrieverContext.class)).toList(); + + assertTrue(result.isEmpty(), "Did not expect any validation errors."); + } + + @Test + public void testDenied() { + ExecutionRequestResultValidator test = new ExecutionRequestResultValidator(); + test.setConfig(TEST_PLUGIN_CONFIG); + + Set deniedUpdateStates = + Set.of( + EXECUTION_REQUEST_STATUS_ABORTED, + EXECUTION_REQUEST_STATUS_CANCELLED, + EXECUTION_REQUEST_STATUS_SUCCESS, + EXECUTION_REQUEST_STATUS_DUPLICATE); + Set destinationStates = new HashSet<>(deniedUpdateStates); + destinationStates.addAll( + Set.of( + EXECUTION_REQUEST_STATUS_RUNNING, + EXECUTION_REQUEST_STATUS_FAILURE, + EXECUTION_REQUEST_STATUS_TIMEOUT)); + + List testItems = + new ArrayList<>( + // Tests with previous state + deniedUpdateStates.stream() + .flatMap( + prevState -> + destinationStates.stream() + .map( + destState -> { + SystemAspect prevData = mock(SystemAspect.class); + when(prevData.getRecordTemplate()) + .thenReturn( + new ExecutionRequestResult().setStatus(prevState)); + return ChangeItemImpl.builder() + .changeType(ChangeType.UPSERT) + .urn(TEST_URN) + .aspectName(EXECUTION_REQUEST_RESULT_ASPECT_NAME) + .recordTemplate( + new ExecutionRequestResult().setStatus(destState)) + .previousSystemAspect(prevData) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(TEST_CONTEXT.getAspectRetriever()); + })) + .toList()); + + List result = + test.validatePreCommitAspects(testItems, mock(RetrieverContext.class)).toList(); + + assertEquals( + result.size(), + deniedUpdateStates.size() * destinationStates.size(), + "Expected ALL items to be denied."); + } +} diff --git a/metadata-models/src/main/resources/entity-registry.yml b/metadata-models/src/main/resources/entity-registry.yml index eee8ef9ebaf07f..ec9c3fee1c404c 100644 --- a/metadata-models/src/main/resources/entity-registry.yml +++ b/metadata-models/src/main/resources/entity-registry.yml @@ -661,19 +661,6 @@ plugins: supportedEntityAspectNames: - entityName: '*' aspectName: '*' - - className: 'com.linkedin.metadata.aspect.validation.FieldPathValidator' - enabled: true - supportedOperations: - - CREATE - - CREATE_ENTITY - - UPSERT - - UPDATE - - RESTATE - supportedEntityAspectNames: - - entityName: '*' - aspectName: 'schemaMetadata' - - entityName: '*' - aspectName: 'editableSchemaMetadata' - className: 'com.linkedin.metadata.aspect.validation.ConditionalWriteValidator' enabled: true supportedOperations: @@ -686,6 +673,12 @@ plugins: supportedEntityAspectNames: - entityName: '*' aspectName: '*' + - className: 'com.linkedin.metadata.aspect.plugins.validation.AspectPayloadValidator' + enabled: true + spring: + enabled: true + packageScan: + - com.linkedin.gms.factory.plugins mcpSideEffects: - className: 'com.linkedin.metadata.structuredproperties.hooks.PropertyDefinitionDeleteSideEffect' packageScan: diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java index 4a2095685abe1f..943b1c7184a60d 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java @@ -1,5 +1,8 @@ package com.linkedin.gms.factory.plugins; +import static com.linkedin.metadata.Constants.EDITABLE_SCHEMA_METADATA_ASPECT_NAME; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_ENTITY_NAME; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_RESULT_ASPECT_NAME; import static com.linkedin.metadata.Constants.SCHEMA_METADATA_ASPECT_NAME; import com.linkedin.metadata.Constants; @@ -7,6 +10,9 @@ import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; import com.linkedin.metadata.aspect.plugins.hooks.MCPSideEffect; import com.linkedin.metadata.aspect.plugins.hooks.MutationHook; +import com.linkedin.metadata.aspect.plugins.validation.AspectPayloadValidator; +import com.linkedin.metadata.aspect.validation.ExecutionRequestResultValidator; +import com.linkedin.metadata.aspect.validation.FieldPathValidator; import com.linkedin.metadata.dataproducts.sideeffects.DataProductUnsetSideEffect; import com.linkedin.metadata.schemafields.sideeffects.SchemaFieldSideEffect; import com.linkedin.metadata.timeline.eventgenerator.EntityChangeEventGeneratorRegistry; @@ -21,6 +27,7 @@ @Configuration @Slf4j public class SpringStandardPluginConfiguration { + private static final String ALL = "*"; @Value("${metadataChangeProposal.validation.ignoreUnknown}") private boolean ignoreUnknownEnabled; @@ -104,4 +111,43 @@ public MCPSideEffect dataProductUnsetSideEffect() { log.info("Initialized {}", SchemaFieldSideEffect.class.getName()); return new DataProductUnsetSideEffect().setConfig(config); } + + @Bean + public AspectPayloadValidator fieldPathValidator() { + return new FieldPathValidator() + .setConfig( + AspectPluginConfig.builder() + .className(FieldPathValidator.class.getName()) + .enabled(true) + .supportedOperations( + List.of("CREATE", "CREATE_ENTITY", "UPSERT", "UPDATE", "RESTATE")) + .supportedEntityAspectNames( + List.of( + AspectPluginConfig.EntityAspectName.builder() + .entityName(ALL) + .aspectName(SCHEMA_METADATA_ASPECT_NAME) + .build(), + AspectPluginConfig.EntityAspectName.builder() + .entityName(ALL) + .aspectName(EDITABLE_SCHEMA_METADATA_ASPECT_NAME) + .build())) + .build()); + } + + @Bean + public AspectPayloadValidator dataHubExecutionRequestResultValidator() { + return new ExecutionRequestResultValidator() + .setConfig( + AspectPluginConfig.builder() + .className(ExecutionRequestResultValidator.class.getName()) + .enabled(true) + .supportedOperations(List.of("UPSERT", "UPDATE")) + .supportedEntityAspectNames( + List.of( + AspectPluginConfig.EntityAspectName.builder() + .entityName(EXECUTION_REQUEST_ENTITY_NAME) + .aspectName(EXECUTION_REQUEST_RESULT_ASPECT_NAME) + .build())) + .build()); + } }