Skip to content

Commit

Permalink
feat(validation): Ingest and schema validator
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker committed Oct 15, 2024
1 parent a9cb610 commit 6f0313e
Show file tree
Hide file tree
Showing 10 changed files with 415 additions and 137 deletions.
2 changes: 2 additions & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
- #11484 - Metadata service authentication enabled by default
- #11484 - Rest API authorization enabled by default
- #10472 - `SANDBOX` added as a FabricType. No rollbacks allowed once metadata with this fabric type is added without manual cleanups in databases.
- #11619 - schema field/column paths can no longer be empty strings
- #11619 - schema field/column paths can no longer be duplicated within the schema

### Potential Downtime

Expand Down
2 changes: 2 additions & 0 deletions docs/managed-datahub/release-notes/v_0_3_6.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ If you are using an older CLI/SDK version, then please upgrade it. This applies
- Automatic downgrade to a previous release is not supported. Please reach out to support if required.
- Metadata tests that are created/edited will not automatically run on a small batch (less than 10,000 assets) immediately anymore, but instead will rely on the scheduled run. Scheduled runs and metadata tests running on asset changes are unimpacted. This bug is fixed in 0.3.6.1.
- Rest API Authorization enabled by default
- #11619 - schema field/column paths can no longer be empty strings
- #11619 - schema field/column paths can no longer be duplicated within the schema

- Bug Fixes
- No longer push notifications for soft-deleted entities
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,28 @@

import static com.linkedin.metadata.Constants.*;

import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.RetrieverContext;
import com.linkedin.metadata.aspect.batch.BatchItem;
import com.linkedin.metadata.aspect.batch.ChangeMCP;
import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig;
import com.linkedin.metadata.aspect.plugins.validation.AspectPayloadValidator;
import com.linkedin.metadata.aspect.plugins.validation.AspectValidationException;
import com.linkedin.metadata.aspect.plugins.validation.ValidationExceptionCollection;
import com.linkedin.schema.EditableSchemaFieldInfo;
import com.linkedin.schema.EditableSchemaMetadata;
import com.linkedin.schema.SchemaField;
import com.linkedin.schema.SchemaMetadata;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;

/**
* Validates the Schema Field Path specification, specifically that all field IDs must be unique
* across all fields within a schema.
* 1. Validates the Schema Field Path specification, specifically that all field IDs must be unique
* across all fields within a schema. 2. Validates that the field path id is not empty.
*
* @see <a href="https://datahubproject.io/docs/advanced/field-path-spec-v2/#requirements">Field
* Path V2 docs</a>
Expand All @@ -34,82 +34,83 @@
public class FieldPathValidator extends AspectPayloadValidator {
@Nonnull private AspectPluginConfig config;

/**
* Prevent any MCP for SchemaMetadata where field ids are duplicated (except for MCPs with {@link
* ChangeType#DELETE} and {@link ChangeType#PATCH}, the latter gets handled pre-commit to the DB).
*/
/** Prevent any MCP for SchemaMetadata where field ids are duplicated. */
@Override
protected Stream<AspectValidationException> validateProposedAspects(
@Nonnull Collection<? extends BatchItem> mcpItems,
@Nonnull RetrieverContext retrieverContext) {
return mcpItems.stream()
.filter(
i ->
!ChangeType.DELETE.equals(i.getChangeType())
&& !ChangeType.PATCH.equals(i.getChangeType()))
.filter(
i ->
i.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME)
|| i.getAspectName().equals(EDITABLE_SCHEMA_METADATA_ASPECT_NAME))
.map(
i -> {
if (i.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME)) {
return processSchemaMetadataAspect(i);
} else {
return processEditableSchemaMetadataAspect(i);
}
})
.filter(Objects::nonNull);

ValidationExceptionCollection exceptions = ValidationExceptionCollection.newCollection();

mcpItems.forEach(
i -> {
if (i.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME)) {
processSchemaMetadataAspect(i, exceptions);
} else {
processEditableSchemaMetadataAspect(i, exceptions);
}
});

return exceptions.streamAllExceptions();
}

@Override
protected Stream<AspectValidationException> validatePreCommitAspects(
@Nonnull Collection<ChangeMCP> changeMCPs, @Nonnull RetrieverContext retrieverContext) {
return changeMCPs.stream()
.filter(i -> ChangeType.PATCH.equals(i.getChangeType()))
.filter(
i ->
i.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME)
|| i.getAspectName().equals(EDITABLE_SCHEMA_METADATA_ASPECT_NAME))
.map(
i -> {
if (i.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME)) {
return processSchemaMetadataAspect(i);
} else {
return processEditableSchemaMetadataAspect(i);
}
})
.filter(Objects::nonNull);
return Stream.of();
}

private static AspectValidationException processEditableSchemaMetadataAspect(BatchItem i) {
private static void processEditableSchemaMetadataAspect(
BatchItem i, ValidationExceptionCollection exceptions) {
final EditableSchemaMetadata schemaMetadata = i.getAspect(EditableSchemaMetadata.class);
final long uniquePaths =
schemaMetadata.getEditableSchemaFieldInfo().stream()
.map(EditableSchemaFieldInfo::getFieldPath)
.distinct()
.count();
validateAndCount(
i,
schemaMetadata.getEditableSchemaFieldInfo().stream()
.map(EditableSchemaFieldInfo::getFieldPath),
exceptions);

if (uniquePaths != schemaMetadata.getEditableSchemaFieldInfo().size()) {
return AspectValidationException.forItem(
exceptions.addException(
i,
String.format(
"Cannot perform %s action on proposal. EditableSchemaMetadata aspect has duplicated field paths",
i.getChangeType()));
}
return null;
}

private static AspectValidationException processSchemaMetadataAspect(BatchItem i) {
private static void processSchemaMetadataAspect(
BatchItem i, ValidationExceptionCollection exceptions) {
final SchemaMetadata schemaMetadata = i.getAspect(SchemaMetadata.class);
final long uniquePaths =
schemaMetadata.getFields().stream().map(SchemaField::getFieldPath).distinct().count();
validateAndCount(
i, schemaMetadata.getFields().stream().map(SchemaField::getFieldPath), exceptions);

if (uniquePaths != schemaMetadata.getFields().size()) {
return AspectValidationException.forItem(
exceptions.addException(
i,
String.format(
"Cannot perform %s action on proposal. SchemaMetadata aspect has duplicated field paths",
i.getChangeType()));
}
return null;
}

private static long validateAndCount(
BatchItem i, Stream<String> fieldPaths, ValidationExceptionCollection exceptions) {
return fieldPaths
.distinct()
// inspect the stream of fieldPath validation errors since we're already iterating
.peek(
fieldPath ->
validateFieldPath(fieldPath)
.ifPresent(message -> exceptions.addException(i, message)))
.count();
}

private static Optional<String> validateFieldPath(String fieldPath) {
if (fieldPath == null || fieldPath.isEmpty()) {
return Optional.of("SchemaMetadata aspect has empty field path.");
}
return Optional.empty();
}
}
Loading

0 comments on commit 6f0313e

Please sign in to comment.