Skip to content

Commit

Permalink
feat(validations): Ingest and metadata schema validators (#11619)
Browse files Browse the repository at this point in the history
Co-authored-by: Pedro Silva <[email protected]>
  • Loading branch information
david-leifker and pedro93 authored Oct 15, 2024
1 parent fdae71d commit 5d3e464
Show file tree
Hide file tree
Showing 10 changed files with 663 additions and 4 deletions.
2 changes: 2 additions & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
- #11484 - Metadata service authentication enabled by default
- #11484 - Rest API authorization enabled by default
- #10472 - `SANDBOX` added as a FabricType. No rollbacks allowed once metadata with this fabric type is added without manual cleanups in databases.
- #11619 - schema field/column paths can no longer be empty strings
- #11619 - schema field/column paths can no longer be duplicated within the schema

### Potential Downtime

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package com.linkedin.metadata.aspect.validation;

import static com.linkedin.metadata.Constants.*;

import com.linkedin.metadata.aspect.RetrieverContext;
import com.linkedin.metadata.aspect.batch.BatchItem;
import com.linkedin.metadata.aspect.batch.ChangeMCP;
import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig;
import com.linkedin.metadata.aspect.plugins.validation.AspectPayloadValidator;
import com.linkedin.metadata.aspect.plugins.validation.AspectValidationException;
import com.linkedin.metadata.aspect.plugins.validation.ValidationExceptionCollection;
import com.linkedin.schema.EditableSchemaFieldInfo;
import com.linkedin.schema.EditableSchemaMetadata;
import com.linkedin.schema.SchemaField;
import com.linkedin.schema.SchemaMetadata;
import java.util.Collection;
import java.util.Optional;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;

/**
* 1. Validates the Schema Field Path specification, specifically that all field IDs must be unique
* across all fields within a schema. 2. Validates that the field path id is not empty.
*
* @see <a href="https://datahubproject.io/docs/advanced/field-path-spec-v2/#requirements">Field
* Path V2 docs</a>
*/
@Setter
@Getter
@Accessors(chain = true)
public class FieldPathValidator extends AspectPayloadValidator {
@Nonnull private AspectPluginConfig config;

/** Prevent any MCP for SchemaMetadata where field ids are duplicated. */
@Override
protected Stream<AspectValidationException> validateProposedAspects(
@Nonnull Collection<? extends BatchItem> mcpItems,
@Nonnull RetrieverContext retrieverContext) {

ValidationExceptionCollection exceptions = ValidationExceptionCollection.newCollection();

mcpItems.forEach(
i -> {
if (i.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME)) {
processSchemaMetadataAspect(i, exceptions);
} else {
processEditableSchemaMetadataAspect(i, exceptions);
}
});

return exceptions.streamAllExceptions();
}

@Override
protected Stream<AspectValidationException> validatePreCommitAspects(
@Nonnull Collection<ChangeMCP> changeMCPs, @Nonnull RetrieverContext retrieverContext) {
return Stream.of();
}

private static void processEditableSchemaMetadataAspect(
BatchItem i, ValidationExceptionCollection exceptions) {
final EditableSchemaMetadata schemaMetadata = i.getAspect(EditableSchemaMetadata.class);
final long uniquePaths =
validateAndCount(
i,
schemaMetadata.getEditableSchemaFieldInfo().stream()
.map(EditableSchemaFieldInfo::getFieldPath),
exceptions);

if (uniquePaths != schemaMetadata.getEditableSchemaFieldInfo().size()) {
exceptions.addException(
i,
String.format(
"Cannot perform %s action on proposal. EditableSchemaMetadata aspect has duplicated field paths",
i.getChangeType()));
}
}

private static void processSchemaMetadataAspect(
BatchItem i, ValidationExceptionCollection exceptions) {
final SchemaMetadata schemaMetadata = i.getAspect(SchemaMetadata.class);
final long uniquePaths =
validateAndCount(
i, schemaMetadata.getFields().stream().map(SchemaField::getFieldPath), exceptions);

if (uniquePaths != schemaMetadata.getFields().size()) {
exceptions.addException(
i,
String.format(
"Cannot perform %s action on proposal. SchemaMetadata aspect has duplicated field paths",
i.getChangeType()));
}
}

private static long validateAndCount(
BatchItem i, Stream<String> fieldPaths, ValidationExceptionCollection exceptions) {
return fieldPaths
.distinct()
// inspect the stream of fieldPath validation errors since we're already iterating
.peek(
fieldPath ->
validateFieldPath(fieldPath)
.ifPresent(message -> exceptions.addException(i, message)))
.count();
}

private static Optional<String> validateFieldPath(String fieldPath) {
if (fieldPath == null || fieldPath.isEmpty()) {
return Optional.of("SchemaMetadata aspect has empty field path.");
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
package com.linkedin.metadata.aspect.validators;

import static com.linkedin.metadata.Constants.*;
import static org.mockito.Mockito.*;
import static org.testng.Assert.*;

import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.AspectRetriever;
import com.linkedin.metadata.aspect.GraphRetriever;
import com.linkedin.metadata.aspect.RetrieverContext;
import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig;
import com.linkedin.metadata.aspect.plugins.validation.AspectValidationException;
import com.linkedin.metadata.aspect.validation.CreateIfNotExistsValidator;
import com.linkedin.metadata.aspect.validation.FieldPathValidator;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.schema.EditableSchemaFieldInfo;
import com.linkedin.schema.EditableSchemaFieldInfoArray;
import com.linkedin.schema.EditableSchemaMetadata;
import com.linkedin.schema.SchemaField;
import com.linkedin.schema.SchemaFieldArray;
import com.linkedin.schema.SchemaFieldDataType;
import com.linkedin.schema.SchemaMetadata;
import com.linkedin.schema.StringType;
import com.linkedin.test.metadata.aspect.TestEntityRegistry;
import com.linkedin.test.metadata.aspect.batch.TestMCP;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;

public class FieldPathValidatorTest {

private static final AspectPluginConfig validatorConfig =
AspectPluginConfig.builder()
.supportedOperations(
Arrays.stream(ChangeType.values())
.map(Objects::toString)
.collect(Collectors.toList()))
.className(CreateIfNotExistsValidator.class.getName())
.supportedEntityAspectNames(List.of(AspectPluginConfig.EntityAspectName.ALL))
.enabled(true)
.build();
private EntityRegistry entityRegistry;
private RetrieverContext mockRetrieverContext;
private static final DatasetUrn TEST_DATASET_URN;
private final FieldPathValidator test = new FieldPathValidator().setConfig(validatorConfig);

static {
try {
TEST_DATASET_URN =
DatasetUrn.createFromUrn(
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,test,PROD)"));
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}

@BeforeTest
public void init() {
entityRegistry = new TestEntityRegistry();
AspectRetriever mockAspectRetriever = mock(AspectRetriever.class);
when(mockAspectRetriever.getEntityRegistry()).thenReturn(entityRegistry);
GraphRetriever mockGraphRetriever = mock(GraphRetriever.class);
mockRetrieverContext = mock(RetrieverContext.class);
when(mockRetrieverContext.getAspectRetriever()).thenReturn(mockAspectRetriever);
when(mockRetrieverContext.getGraphRetriever()).thenReturn(mockGraphRetriever);
}

@Test
public void testValidateNonDuplicatedSchemaFieldPath() {
final SchemaMetadata schema = getMockSchemaMetadataAspect(false);
assertEquals(
test.validateProposed(
Set.of(
TestMCP.builder()
.changeType(ChangeType.UPSERT)
.urn(TEST_DATASET_URN)
.entitySpec(entityRegistry.getEntitySpec(TEST_DATASET_URN.getEntityType()))
.aspectSpec(
entityRegistry
.getEntitySpec(TEST_DATASET_URN.getEntityType())
.getAspectSpec(SCHEMA_METADATA_ASPECT_NAME))
.recordTemplate(schema)
.build()),
mockRetrieverContext)
.count(),
0);
}

@Test
public void testValidateDuplicatedSchemaFieldPath() {
final SchemaMetadata schema = getMockSchemaMetadataAspect(true);

assertEquals(
test.validateProposed(
Set.of(
TestMCP.builder()
.changeType(ChangeType.UPSERT)
.urn(TEST_DATASET_URN)
.entitySpec(entityRegistry.getEntitySpec(TEST_DATASET_URN.getEntityType()))
.aspectSpec(
entityRegistry
.getEntitySpec(TEST_DATASET_URN.getEntityType())
.getAspectSpec(SCHEMA_METADATA_ASPECT_NAME))
.recordTemplate(schema)
.build()),
mockRetrieverContext)
.count(),
1);
}

@Test
public void testValidateNonDuplicatedEditableSchemaFieldPath() {
final EditableSchemaMetadata schema = getMockEditableSchemaMetadataAspect(false);
assertEquals(
test.validateProposed(
Set.of(
TestMCP.builder()
.changeType(ChangeType.UPSERT)
.urn(TEST_DATASET_URN)
.entitySpec(entityRegistry.getEntitySpec(TEST_DATASET_URN.getEntityType()))
.aspectSpec(
entityRegistry
.getEntitySpec(TEST_DATASET_URN.getEntityType())
.getAspectSpec(EDITABLE_SCHEMA_METADATA_ASPECT_NAME))
.recordTemplate(schema)
.build()),
mockRetrieverContext)
.count(),
0);
}

@Test
public void testValidateDuplicatedEditableSchemaFieldPath() {
final EditableSchemaMetadata schema = getMockEditableSchemaMetadataAspect(true);

assertEquals(
test.validateProposed(
Set.of(
TestMCP.builder()
.changeType(ChangeType.UPSERT)
.urn(TEST_DATASET_URN)
.entitySpec(entityRegistry.getEntitySpec(TEST_DATASET_URN.getEntityType()))
.aspectSpec(
entityRegistry
.getEntitySpec(TEST_DATASET_URN.getEntityType())
.getAspectSpec(EDITABLE_SCHEMA_METADATA_ASPECT_NAME))
.recordTemplate(schema)
.build()),
mockRetrieverContext)
.count(),
1);
}

@Test
public void testEmptySchemaFieldPath() {
final SchemaMetadata schema = getMockSchemaMetadataAspect(false, "");
TestMCP testItem =
TestMCP.builder()
.changeType(ChangeType.UPSERT)
.urn(TEST_DATASET_URN)
.entitySpec(entityRegistry.getEntitySpec(TEST_DATASET_URN.getEntityType()))
.aspectSpec(
entityRegistry
.getEntitySpec(TEST_DATASET_URN.getEntityType())
.getAspectSpec(SCHEMA_METADATA_ASPECT_NAME))
.recordTemplate(schema)
.build();

Set<AspectValidationException> exceptions =
test.validateProposed(Set.of(testItem), mockRetrieverContext).collect(Collectors.toSet());

assertEquals(
exceptions,
Set.of(
AspectValidationException.forItem(
testItem, "SchemaMetadata aspect has empty field path.")));
}

private static SchemaMetadata getMockSchemaMetadataAspect(boolean duplicateFields) {
return getMockSchemaMetadataAspect(duplicateFields, null);
}

private static SchemaMetadata getMockSchemaMetadataAspect(
boolean duplicateFields, @Nullable String fieldPath) {
List<SchemaField> fields = new ArrayList<>();
fields.add(
new SchemaField()
.setType(
new SchemaFieldDataType()
.setType(SchemaFieldDataType.Type.create(new StringType())))
.setNullable(false)
.setNativeDataType("string")
.setFieldPath(fieldPath == null ? "test" : fieldPath));

if (duplicateFields) {
fields.add(
new SchemaField()
.setType(
new SchemaFieldDataType()
.setType(SchemaFieldDataType.Type.create(new StringType())))
.setNullable(false)
.setNativeDataType("string")
.setFieldPath(fieldPath == null ? "test" : fieldPath));
}

return new SchemaMetadata()
.setPlatform(TEST_DATASET_URN.getPlatformEntity())
.setFields(new SchemaFieldArray(fields));
}

private static EditableSchemaMetadata getMockEditableSchemaMetadataAspect(
boolean duplicateFields) {

List<EditableSchemaFieldInfo> fields = new ArrayList<>();
fields.add(new EditableSchemaFieldInfo().setFieldPath("test"));

if (duplicateFields) {
fields.add(new EditableSchemaFieldInfo().setFieldPath("test"));
}

return new EditableSchemaMetadata()
.setEditableSchemaFieldInfo(new EditableSchemaFieldInfoArray(fields));
}
}
7 changes: 7 additions & 0 deletions li-utils/src/main/java/com/linkedin/metadata/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,13 @@ public class Constants {
public static final String EXECUTION_REQUEST_INPUT_ASPECT_NAME = "dataHubExecutionRequestInput";
public static final String EXECUTION_REQUEST_SIGNAL_ASPECT_NAME = "dataHubExecutionRequestSignal";
public static final String EXECUTION_REQUEST_RESULT_ASPECT_NAME = "dataHubExecutionRequestResult";
public static final String EXECUTION_REQUEST_STATUS_RUNNING = "RUNNING";
public static final String EXECUTION_REQUEST_STATUS_FAILURE = "FAILURE";
public static final String EXECUTION_REQUEST_STATUS_SUCCESS = "SUCCESS";
public static final String EXECUTION_REQUEST_STATUS_TIMEOUT = "TIMEOUT";
public static final String EXECUTION_REQUEST_STATUS_CANCELLED = "CANCELLED";
public static final String EXECUTION_REQUEST_STATUS_ABORTED = "ABORTED";
public static final String EXECUTION_REQUEST_STATUS_DUPLICATE = "DUPLICATE";

// DataHub Access Token
public static final String ACCESS_TOKEN_KEY_ASPECT_NAME = "dataHubAccessTokenKey";
Expand Down
Loading

0 comments on commit 5d3e464

Please sign in to comment.