diff --git a/datahub-frontend/app/auth/AuthModule.java b/datahub-frontend/app/auth/AuthModule.java index d0d17fda263926..7bb25478907011 100644 --- a/datahub-frontend/app/auth/AuthModule.java +++ b/datahub-frontend/app/auth/AuthModule.java @@ -21,6 +21,7 @@ import com.linkedin.util.Configuration; import config.ConfigurationProvider; import controllers.SsoCallbackController; +import io.datahubproject.metadata.context.ValidationContext; import java.nio.charset.StandardCharsets; import java.util.Collections; @@ -187,6 +188,7 @@ protected OperationContext provideOperationContext( .authorizationContext(AuthorizationContext.builder().authorizer(Authorizer.EMPTY).build()) .searchContext(SearchContext.EMPTY) .entityRegistryContext(EntityRegistryContext.builder().build(EmptyEntityRegistry.EMPTY)) + .validationContext(ValidationContext.builder().alternateValidation(false).build()) .build(systemAuthentication); } diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java index f3a4c47c59f0b7..661717c6309cfc 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java @@ -29,6 +29,7 @@ import io.datahubproject.metadata.context.OperationContextConfig; import io.datahubproject.metadata.context.RetrieverContext; import io.datahubproject.metadata.context.ServicesRegistryContext; +import io.datahubproject.metadata.context.ValidationContext; import io.datahubproject.metadata.services.RestrictedService; import java.util.List; import javax.annotation.Nonnull; @@ -161,7 +162,8 @@ protected OperationContext javaSystemOperationContext( @Nonnull final GraphService graphService, @Nonnull final SearchService searchService, @Qualifier("baseElasticSearchComponents") - BaseElasticSearchComponentsFactory.BaseElasticSearchComponents components) { + BaseElasticSearchComponentsFactory.BaseElasticSearchComponents components, + @Nonnull final ConfigurationProvider configurationProvider) { EntityServiceAspectRetriever entityServiceAspectRetriever = EntityServiceAspectRetriever.builder() @@ -186,6 +188,10 @@ protected OperationContext javaSystemOperationContext( .aspectRetriever(entityServiceAspectRetriever) .graphRetriever(systemGraphRetriever) .searchRetriever(searchServiceSearchRetriever) + .build(), + ValidationContext.builder() + .alternateValidation( + configurationProvider.getFeatureFlags().isAlternateMCPValidation()) .build()); entityServiceAspectRetriever.setSystemOperationContext(systemOperationContext); diff --git a/docker/profiles/docker-compose.gms.yml b/docker/profiles/docker-compose.gms.yml index 2683734c2d5e56..147bbd35ff6460 100644 --- a/docker/profiles/docker-compose.gms.yml +++ b/docker/profiles/docker-compose.gms.yml @@ -101,6 +101,7 @@ x-datahub-gms-service: &datahub-gms-service environment: &datahub-gms-env <<: [*primary-datastore-mysql-env, *graph-datastore-search-env, *search-datastore-env, *datahub-quickstart-telemetry-env, *kafka-env] ELASTICSEARCH_QUERY_CUSTOM_CONFIG_FILE: ${ELASTICSEARCH_QUERY_CUSTOM_CONFIG_FILE:-search_config.yaml} + ALTERNATE_MCP_VALIDATION: ${ALTERNATE_MCP_VALIDATION:-true} healthcheck: test: curl -sS --fail http://datahub-gms:${DATAHUB_GMS_PORT:-8080}/health start_period: 90s @@ -182,6 +183,7 @@ x-datahub-mce-consumer-service: &datahub-mce-consumer-service - ${DATAHUB_LOCAL_MCE_ENV:-empty2.env} environment: &datahub-mce-consumer-env <<: [*primary-datastore-mysql-env, *graph-datastore-search-env, *search-datastore-env, *datahub-quickstart-telemetry-env, *kafka-env] + ALTERNATE_MCP_VALIDATION: ${ALTERNATE_MCP_VALIDATION:-true} x-datahub-mce-consumer-service-dev: &datahub-mce-consumer-service-dev <<: *datahub-mce-consumer-service diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/plugins/PluginSpec.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/plugins/PluginSpec.java index f99dd18d3c9c1f..54ccd3877395fc 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/plugins/PluginSpec.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/plugins/PluginSpec.java @@ -12,7 +12,7 @@ @AllArgsConstructor @EqualsAndHashCode public abstract class PluginSpec { - protected static String ENTITY_WILDCARD = "*"; + protected static String WILDCARD = "*"; @Nonnull public abstract AspectPluginConfig getConfig(); @@ -50,7 +50,7 @@ protected boolean isEntityAspectSupported( return (getConfig().getSupportedEntityAspectNames().stream() .anyMatch( supported -> - ENTITY_WILDCARD.equals(supported.getEntityName()) + WILDCARD.equals(supported.getEntityName()) || supported.getEntityName().equals(entityName))) && isAspectSupported(aspectName); } @@ -59,13 +59,16 @@ protected boolean isAspectSupported(@Nonnull String aspectName) { return getConfig().getSupportedEntityAspectNames().stream() .anyMatch( supported -> - ENTITY_WILDCARD.equals(supported.getAspectName()) + WILDCARD.equals(supported.getAspectName()) || supported.getAspectName().equals(aspectName)); } protected boolean isChangeTypeSupported(@Nullable ChangeType changeType) { return (changeType == null && getConfig().getSupportedOperations().isEmpty()) || getConfig().getSupportedOperations().stream() - .anyMatch(supported -> supported.equalsIgnoreCase(String.valueOf(changeType))); + .anyMatch( + supported -> + WILDCARD.equals(supported) + || supported.equalsIgnoreCase(String.valueOf(changeType))); } } diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/EntityAspect.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/EntityAspect.java index cba770d841b940..976db4133c0043 100644 --- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/EntityAspect.java +++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/EntityAspect.java @@ -9,6 +9,7 @@ import com.linkedin.metadata.aspect.SystemAspect; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.EntitySpec; +import com.linkedin.mxe.GenericAspect; import com.linkedin.mxe.SystemMetadata; import java.sql.Timestamp; import javax.annotation.Nonnull; @@ -65,7 +66,7 @@ public static class EntitySystemAspect implements SystemAspect { @Nullable private final RecordTemplate recordTemplate; @Nonnull private final EntitySpec entitySpec; - @Nonnull private final AspectSpec aspectSpec; + @Nullable private final AspectSpec aspectSpec; @Nonnull public String getUrnRaw() { @@ -151,7 +152,7 @@ private EntityAspect.EntitySystemAspect build() { public EntityAspect.EntitySystemAspect build( @Nonnull EntitySpec entitySpec, - @Nonnull AspectSpec aspectSpec, + @Nullable AspectSpec aspectSpec, @Nonnull EntityAspect entityAspect) { this.entityAspect = entityAspect; this.urn = UrnUtils.getUrn(entityAspect.getUrn()); @@ -159,7 +160,11 @@ public EntityAspect.EntitySystemAspect build( if (entityAspect.getMetadata() != null) { this.recordTemplate = RecordUtils.toRecordTemplate( - aspectSpec.getDataTemplateClass(), entityAspect.getMetadata()); + (Class) + (aspectSpec == null + ? GenericAspect.class + : aspectSpec.getDataTemplateClass()), + entityAspect.getMetadata()); } return new EntitySystemAspect(entityAspect, urn, recordTemplate, entitySpec, aspectSpec); diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java index 1fba8426317209..7f56abe64f9a77 100644 --- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java +++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java @@ -3,6 +3,7 @@ import com.linkedin.common.AuditStamp; import com.linkedin.data.template.RecordTemplate; import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.aspect.AspectRetriever; import com.linkedin.metadata.aspect.RetrieverContext; import com.linkedin.metadata.aspect.SystemAspect; import com.linkedin.metadata.aspect.batch.AspectsBatch; @@ -11,6 +12,7 @@ import com.linkedin.metadata.aspect.batch.MCPItem; import com.linkedin.metadata.aspect.plugins.hooks.MutationHook; import com.linkedin.metadata.aspect.plugins.validation.ValidationExceptionCollection; +import com.linkedin.metadata.models.EntitySpec; import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.util.Pair; import java.util.Collection; @@ -114,19 +116,7 @@ private Stream proposedItemsToChangeItemStream(List { - if (ChangeType.PATCH.equals(mcpItem.getChangeType())) { - return PatchItemImpl.PatchItemImplBuilder.build( - mcpItem.getMetadataChangeProposal(), - mcpItem.getAuditStamp(), - retrieverContext.getAspectRetriever().getEntityRegistry()); - } - return ChangeItemImpl.ChangeItemImplBuilder.build( - mcpItem.getMetadataChangeProposal(), - mcpItem.getAuditStamp(), - retrieverContext.getAspectRetriever()); - }); + .map(mcpItem -> patchDiscriminator(mcpItem, retrieverContext.getAspectRetriever())); List mutatedItems = applyProposalMutationHooks(proposedItems, retrieverContext).collect(Collectors.toList()); Stream proposedItemsToChangeItems = @@ -134,12 +124,7 @@ private Stream proposedItemsToChangeItemStream(List mcpItem.getMetadataChangeProposal() != null) // Filter on proposed items again to avoid applying builder to Patch Item side effects .filter(mcpItem -> mcpItem instanceof ProposedItem) - .map( - mcpItem -> - ChangeItemImpl.ChangeItemImplBuilder.build( - mcpItem.getMetadataChangeProposal(), - mcpItem.getAuditStamp(), - retrieverContext.getAspectRetriever())); + .map(mcpItem -> patchDiscriminator(mcpItem, retrieverContext.getAspectRetriever())); Stream sideEffectItems = mutatedItems.stream().filter(mcpItem -> !(mcpItem instanceof ProposedItem)); Stream combinedChangeItems = @@ -147,6 +132,17 @@ private Stream proposedItemsToChangeItemStream(List mcps, AuditStamp auditStamp, RetrieverContext retrieverContext) { + return mcps(mcps, auditStamp, retrieverContext, false); + } + + public AspectsBatchImplBuilder mcps( + Collection mcps, + AuditStamp auditStamp, + RetrieverContext retrieverContext, + boolean alternateMCPValidation) { retrieverContext(retrieverContext); items( @@ -171,6 +175,18 @@ public AspectsBatchImplBuilder mcps( .map( mcp -> { try { + if (alternateMCPValidation) { + EntitySpec entitySpec = + retrieverContext + .getAspectRetriever() + .getEntityRegistry() + .getEntitySpec(mcp.getEntityType()); + return ProposedItem.builder() + .metadataChangeProposal(mcp) + .entitySpec(entitySpec) + .auditStamp(auditStamp) + .build(); + } if (mcp.getChangeType().equals(ChangeType.PATCH)) { return PatchItemImpl.PatchItemImplBuilder.build( mcp, diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/PatchItemImpl.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/PatchItemImpl.java index 43a7d00248a224..ec0a8422e3c4a2 100644 --- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/PatchItemImpl.java +++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/PatchItemImpl.java @@ -203,7 +203,7 @@ public static PatchItemImpl build( .build(entityRegistry); } - private static JsonPatch convertToJsonPatch(MetadataChangeProposal mcp) { + public static JsonPatch convertToJsonPatch(MetadataChangeProposal mcp) { JsonNode json; try { return Json.createPatch( diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ProposedItem.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ProposedItem.java index 132a731d278af8..88187ef159f233 100644 --- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ProposedItem.java +++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ProposedItem.java @@ -9,8 +9,10 @@ import com.linkedin.metadata.models.EntitySpec; import com.linkedin.metadata.utils.EntityKeyUtils; import com.linkedin.metadata.utils.GenericRecordUtils; +import com.linkedin.metadata.utils.SystemMetadataUtils; import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.mxe.SystemMetadata; +import java.util.Objects; import javax.annotation.Nonnull; import javax.annotation.Nullable; import lombok.Builder; @@ -83,4 +85,18 @@ public SystemMetadata getSystemMetadata() { public ChangeType getChangeType() { return metadataChangeProposal.getChangeType(); } + + public static class ProposedItemBuilder { + public ProposedItem build() { + // Ensure systemMetadata + return new ProposedItem( + Objects.requireNonNull(this.metadataChangeProposal) + .setSystemMetadata( + SystemMetadataUtils.generateSystemMetadataIfEmpty( + this.metadataChangeProposal.getSystemMetadata())), + this.auditStamp, + this.entitySpec, + this.aspectSpec); + } + } } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/aspect/hooks/IgnoreUnknownMutator.java b/metadata-io/src/main/java/com/linkedin/metadata/aspect/hooks/IgnoreUnknownMutator.java index 8d6bdffceacb93..f5cc421042e368 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/aspect/hooks/IgnoreUnknownMutator.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/aspect/hooks/IgnoreUnknownMutator.java @@ -1,8 +1,14 @@ package com.linkedin.metadata.aspect.hooks; +import static com.linkedin.events.metadata.ChangeType.CREATE; +import static com.linkedin.events.metadata.ChangeType.CREATE_ENTITY; +import static com.linkedin.events.metadata.ChangeType.UPDATE; +import static com.linkedin.events.metadata.ChangeType.UPSERT; + import com.datahub.util.exception.ModelConversionException; import com.linkedin.data.template.RecordTemplate; import com.linkedin.data.transform.filter.request.MaskTree; +import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.aspect.RetrieverContext; import com.linkedin.metadata.aspect.batch.MCPItem; import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; @@ -14,6 +20,7 @@ import com.linkedin.mxe.GenericAspect; import com.linkedin.restli.internal.server.util.RestUtils; import java.util.Collection; +import java.util.Set; import java.util.stream.Stream; import javax.annotation.Nonnull; import lombok.Getter; @@ -27,6 +34,11 @@ @Getter @Accessors(chain = true) public class IgnoreUnknownMutator extends MutationHook { + private static final Set SUPPORTED_MIME_TYPES = + Set.of("application/json", "application/json-patch+json"); + private static final Set MUTATION_TYPES = + Set.of(CREATE, CREATE_ENTITY, UPSERT, UPDATE); + @Nonnull private AspectPluginConfig config; @Override @@ -42,8 +54,8 @@ protected Stream proposalMutation( item.getAspectSpec().getName()); return false; } - if (!"application/json" - .equals(item.getMetadataChangeProposal().getAspect().getContentType())) { + if (!SUPPORTED_MIME_TYPES.contains( + item.getMetadataChangeProposal().getAspect().getContentType())) { log.warn( "Dropping unknown content type {} for aspect {} on entity {}", item.getMetadataChangeProposal().getAspect().getContentType(), @@ -55,25 +67,27 @@ protected Stream proposalMutation( }) .peek( item -> { - try { - AspectSpec aspectSpec = item.getEntitySpec().getAspectSpec(item.getAspectName()); - GenericAspect aspect = item.getMetadataChangeProposal().getAspect(); - RecordTemplate recordTemplate = - GenericRecordUtils.deserializeAspect( - aspect.getValue(), aspect.getContentType(), aspectSpec); + if (MUTATION_TYPES.contains(item.getChangeType())) { try { - ValidationApiUtils.validateOrThrow(recordTemplate); - } catch (ValidationException | ModelConversionException e) { - log.warn( - "Failed to validate aspect. Coercing aspect {} on entity {}", - item.getAspectName(), - item.getEntitySpec().getName()); - RestUtils.trimRecordTemplate(recordTemplate, new MaskTree(), false); - item.getMetadataChangeProposal() - .setAspect(GenericRecordUtils.serializeAspect(recordTemplate)); + AspectSpec aspectSpec = item.getEntitySpec().getAspectSpec(item.getAspectName()); + GenericAspect aspect = item.getMetadataChangeProposal().getAspect(); + RecordTemplate recordTemplate = + GenericRecordUtils.deserializeAspect( + aspect.getValue(), aspect.getContentType(), aspectSpec); + try { + ValidationApiUtils.validateOrThrow(recordTemplate); + } catch (ValidationException | ModelConversionException e) { + log.warn( + "Failed to validate aspect. Coercing aspect {} on entity {}", + item.getAspectName(), + item.getEntitySpec().getName()); + RestUtils.trimRecordTemplate(recordTemplate, new MaskTree(), false); + item.getMetadataChangeProposal() + .setAspect(GenericRecordUtils.serializeAspect(recordTemplate)); + } + } catch (Exception e) { + throw new RuntimeException(e); } - } catch (Exception e) { - throw new RuntimeException(e); } }); } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java b/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java index 60a991c19ae8bf..8b625b3ae22895 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java @@ -23,7 +23,6 @@ import com.linkedin.metadata.aspect.EnvelopedAspectArray; import com.linkedin.metadata.aspect.VersionedAspect; import com.linkedin.metadata.aspect.batch.AspectsBatch; -import com.linkedin.metadata.aspect.batch.BatchItem; import com.linkedin.metadata.browse.BrowseResult; import com.linkedin.metadata.browse.BrowseResultV2; import com.linkedin.metadata.entity.DeleteEntityService; @@ -56,6 +55,7 @@ import com.linkedin.parseq.retry.backoff.BackoffPolicy; import com.linkedin.parseq.retry.backoff.ExponentialBackoff; import com.linkedin.r2.RemoteInvocationException; +import com.linkedin.util.Pair; import io.datahubproject.metadata.context.OperationContext; import io.opentelemetry.extension.annotations.WithSpan; import java.net.URISyntaxException; @@ -762,38 +762,41 @@ public List batchIngestProposals( AspectsBatch batch = AspectsBatchImpl.builder() - .mcps(metadataChangeProposals, auditStamp, opContext.getRetrieverContext().get()) + .mcps( + metadataChangeProposals, + auditStamp, + opContext.getRetrieverContext().get(), + opContext.getValidationContext().isAlternateValidation()) .build(); - Map> resultMap = + Map, List> resultMap = entityService.ingestProposal(opContext, batch, async).stream() - .collect(Collectors.groupingBy(IngestResult::getRequest)); - - // Update runIds - batch.getItems().stream() - .filter(resultMap::containsKey) - .forEach( - requestItem -> { - List results = resultMap.get(requestItem); - Optional resultUrn = - results.stream().map(IngestResult::getUrn).filter(Objects::nonNull).findFirst(); - resultUrn.ifPresent( - urn -> tryIndexRunId(opContext, urn, requestItem.getSystemMetadata())); - }); + .collect( + Collectors.groupingBy( + result -> + Pair.of( + result.getRequest().getUrn(), result.getRequest().getAspectName()))); // Preserve ordering return batch.getItems().stream() .map( requestItem -> { - if (resultMap.containsKey(requestItem)) { - List results = resultMap.get(requestItem); - return results.stream() - .filter(r -> r.getUrn() != null) - .findFirst() - .map(r -> r.getUrn().toString()) - .orElse(null); - } - return null; + // Urns generated + List urnsForRequest = + resultMap + .getOrDefault( + Pair.of(requestItem.getUrn(), requestItem.getAspectName()), List.of()) + .stream() + .map(IngestResult::getUrn) + .filter(Objects::nonNull) + .distinct() + .collect(Collectors.toList()); + + // Update runIds + urnsForRequest.forEach( + urn -> tryIndexRunId(opContext, urn, requestItem.getSystemMetadata())); + + return urnsForRequest.isEmpty() ? null : urnsForRequest.get(0).toString(); }) .collect(Collectors.toList()); } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java index 00feb547ca3300..9f608be4f3d189 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java @@ -1173,15 +1173,15 @@ public IngestResult ingestProposal( * @return an {@link IngestResult} containing the results */ @Override - public Set ingestProposal( + public List ingestProposal( @Nonnull OperationContext opContext, AspectsBatch aspectsBatch, final boolean async) { Stream timeseriesIngestResults = ingestTimeseriesProposal(opContext, aspectsBatch, async); Stream nonTimeseriesIngestResults = async ? ingestProposalAsync(aspectsBatch) : ingestProposalSync(opContext, aspectsBatch); - return Stream.concat(timeseriesIngestResults, nonTimeseriesIngestResults) - .collect(Collectors.toSet()); + return Stream.concat(nonTimeseriesIngestResults, timeseriesIngestResults) + .collect(Collectors.toList()); } /** @@ -1192,11 +1192,13 @@ public Set ingestProposal( */ private Stream ingestTimeseriesProposal( @Nonnull OperationContext opContext, AspectsBatch aspectsBatch, final boolean async) { + List unsupported = aspectsBatch.getItems().stream() .filter( item -> - item.getAspectSpec().isTimeseries() + item.getAspectSpec() != null + && item.getAspectSpec().isTimeseries() && item.getChangeType() != ChangeType.UPSERT) .collect(Collectors.toList()); if (!unsupported.isEmpty()) { @@ -1212,7 +1214,7 @@ private Stream ingestTimeseriesProposal( // Create default non-timeseries aspects for timeseries aspects List timeseriesKeyAspects = aspectsBatch.getMCPItems().stream() - .filter(item -> item.getAspectSpec().isTimeseries()) + .filter(item -> item.getAspectSpec() != null && item.getAspectSpec().isTimeseries()) .map( item -> ChangeItemImpl.builder() @@ -1238,10 +1240,10 @@ private Stream ingestTimeseriesProposal( } // Emit timeseries MCLs - List, Boolean>>>> timeseriesResults = + List, Boolean>>>> timeseriesResults = aspectsBatch.getItems().stream() - .filter(item -> item.getAspectSpec().isTimeseries()) - .map(item -> (ChangeItemImpl) item) + .filter(item -> item.getAspectSpec() != null && item.getAspectSpec().isTimeseries()) + .map(item -> (MCPItem) item) .map( item -> Pair.of( @@ -1272,7 +1274,7 @@ private Stream ingestTimeseriesProposal( } }); - ChangeItemImpl request = result.getFirst(); + MCPItem request = result.getFirst(); return IngestResult.builder() .urn(request.getUrn()) .request(request) @@ -1292,7 +1294,7 @@ private Stream ingestTimeseriesProposal( private Stream ingestProposalAsync(AspectsBatch aspectsBatch) { List nonTimeseries = aspectsBatch.getMCPItems().stream() - .filter(item -> !item.getAspectSpec().isTimeseries()) + .filter(item -> item.getAspectSpec() == null || !item.getAspectSpec().isTimeseries()) .collect(Collectors.toList()); List> futures = @@ -1328,6 +1330,7 @@ private Stream ingestProposalAsync(AspectsBatch aspectsBatch) { private Stream ingestProposalSync( @Nonnull OperationContext opContext, AspectsBatch aspectsBatch) { + AspectsBatchImpl nonTimeseries = AspectsBatchImpl.builder() .retrieverContext(aspectsBatch.getRetrieverContext()) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/filter/BaseQueryFilterRewriter.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/filter/BaseQueryFilterRewriter.java index d545f60a1ee8fa..367705d369c7ce 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/filter/BaseQueryFilterRewriter.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/filter/BaseQueryFilterRewriter.java @@ -110,12 +110,9 @@ private BoolQueryBuilder handleNestedFilters( mustNotQueryBuilders.forEach(expandedQueryBuilder::mustNot); expandedQueryBuilder.queryName(boolQueryBuilder.queryName()); expandedQueryBuilder.adjustPureNegative(boolQueryBuilder.adjustPureNegative()); + expandedQueryBuilder.minimumShouldMatch(boolQueryBuilder.minimumShouldMatch()); expandedQueryBuilder.boost(boolQueryBuilder.boost()); - if (!expandedQueryBuilder.should().isEmpty()) { - expandedQueryBuilder.minimumShouldMatch(1); - } - return expandedQueryBuilder; } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/client/JavaEntityClientTest.java b/metadata-io/src/test/java/com/linkedin/metadata/client/JavaEntityClientTest.java index 2ca966b104e034..7b1fccafbb9e63 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/client/JavaEntityClientTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/client/JavaEntityClientTest.java @@ -5,9 +5,22 @@ import static org.testng.Assert.assertThrows; import com.codahale.metrics.Counter; +import com.linkedin.common.AuditStamp; +import com.linkedin.common.Status; +import com.linkedin.common.UrnArray; +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; import com.linkedin.data.template.RequiredFieldNotPresentException; +import com.linkedin.domain.Domains; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.Constants; +import com.linkedin.metadata.aspect.batch.AspectsBatch; import com.linkedin.metadata.entity.DeleteEntityService; import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.entity.IngestResult; +import com.linkedin.metadata.entity.UpdateAspectResult; +import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl; +import com.linkedin.metadata.entity.ebean.batch.ProposedItem; import com.linkedin.metadata.event.EventProducer; import com.linkedin.metadata.search.EntitySearchService; import com.linkedin.metadata.search.LineageSearchService; @@ -15,8 +28,14 @@ import com.linkedin.metadata.search.client.CachingEntitySearchService; import com.linkedin.metadata.service.RollbackService; import com.linkedin.metadata.timeseries.TimeseriesAspectService; +import com.linkedin.metadata.utils.AuditStampUtils; +import com.linkedin.metadata.utils.GenericRecordUtils; import com.linkedin.metadata.utils.metrics.MetricUtils; +import com.linkedin.mxe.MetadataChangeProposal; +import com.linkedin.r2.RemoteInvocationException; import io.datahubproject.metadata.context.OperationContext; +import io.datahubproject.test.metadata.context.TestOperationContexts; +import java.util.List; import java.util.function.Supplier; import org.mockito.MockedStatic; import org.testng.annotations.AfterMethod; @@ -25,7 +44,7 @@ public class JavaEntityClientTest { - private EntityService _entityService; + private EntityService _entityService; private DeleteEntityService _deleteEntityService; private EntitySearchService _entitySearchService; private CachingEntitySearchService _cachingEntitySearchService; @@ -52,7 +71,7 @@ public void setupTest() { _metricUtils = mockStatic(MetricUtils.class); _counter = mock(Counter.class); when(MetricUtils.counter(any(), any())).thenReturn(_counter); - opContext = mock(OperationContext.class); + opContext = TestOperationContexts.systemContextNoSearchAuthorization(); } @AfterMethod @@ -131,4 +150,97 @@ void testThrowAfterNonRetryableException() { () -> MetricUtils.counter(client.getClass(), "exception_" + e.getClass().getName()), times(1)); } + + @Test + void tesIngestOrderingWithProposedItem() throws RemoteInvocationException { + JavaEntityClient client = getJavaEntityClient(); + Urn testUrn = UrnUtils.getUrn("urn:li:container:orderingTest"); + AuditStamp auditStamp = AuditStampUtils.createDefaultAuditStamp(); + MetadataChangeProposal mcp = + new MetadataChangeProposal() + .setEntityUrn(testUrn) + .setAspectName("status") + .setEntityType("container") + .setChangeType(ChangeType.UPSERT) + .setAspect(GenericRecordUtils.serializeAspect(new Status().setRemoved(true))); + + when(_entityService.ingestProposal( + any(OperationContext.class), any(AspectsBatch.class), eq(false))) + .thenReturn( + List.of( + // Misc - unrelated urn + IngestResult.builder() + .urn(UrnUtils.getUrn("urn:li:container:domains")) + .request( + ChangeItemImpl.builder() + .entitySpec( + opContext + .getEntityRegistry() + .getEntitySpec(Constants.CONTAINER_ENTITY_NAME)) + .aspectSpec( + opContext + .getEntityRegistry() + .getEntitySpec(Constants.CONTAINER_ENTITY_NAME) + .getAspectSpec(Constants.DOMAINS_ASPECT_NAME)) + .changeType(ChangeType.UPSERT) + .urn(UrnUtils.getUrn("urn:li:container:domains")) + .aspectName("domains") + .recordTemplate(new Domains().setDomains(new UrnArray())) + .auditStamp(auditStamp) + .build(opContext.getAspectRetriever())) + .isUpdate(true) + .publishedMCL(true) + .sqlCommitted(true) + .build(), + // Side effect - unrelated urn + IngestResult.builder() + .urn(UrnUtils.getUrn("urn:li:container:sideEffect")) + .request( + ChangeItemImpl.builder() + .entitySpec( + opContext + .getEntityRegistry() + .getEntitySpec(Constants.CONTAINER_ENTITY_NAME)) + .aspectSpec( + opContext + .getEntityRegistry() + .getEntitySpec(Constants.CONTAINER_ENTITY_NAME) + .getAspectSpec(Constants.STATUS_ASPECT_NAME)) + .changeType(ChangeType.UPSERT) + .urn(UrnUtils.getUrn("urn:li:container:sideEffect")) + .aspectName("status") + .recordTemplate(new Status().setRemoved(false)) + .auditStamp(auditStamp) + .build(opContext.getAspectRetriever())) + .isUpdate(true) + .publishedMCL(true) + .sqlCommitted(true) + .build(), + // Expected response + IngestResult.builder() + .urn(testUrn) + .request( + ProposedItem.builder() + .metadataChangeProposal(mcp) + .entitySpec( + opContext + .getEntityRegistry() + .getEntitySpec(Constants.CONTAINER_ENTITY_NAME)) + .aspectSpec( + opContext + .getEntityRegistry() + .getEntitySpec(Constants.CONTAINER_ENTITY_NAME) + .getAspectSpec(Constants.STATUS_ASPECT_NAME)) + .auditStamp(auditStamp) + .build()) + .result(UpdateAspectResult.builder().mcp(mcp).urn(testUrn).build()) + .isUpdate(true) + .publishedMCL(true) + .sqlCommitted(true) + .build())); + + String urnStr = client.ingestProposal(opContext, mcp, false); + + assertEquals(urnStr, "urn:li:container:orderingTest"); + } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java index e8d3c654f6f639..04c9297b1ed7aa 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java @@ -104,7 +104,8 @@ public void setupTest() { null, opContext -> ((EntityServiceAspectRetriever) opContext.getAspectRetrieverOpt().get()) - .setSystemOperationContext(opContext)); + .setSystemOperationContext(opContext), + null); } /** diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/cassandra/CassandraEntityServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/cassandra/CassandraEntityServiceTest.java index 5535866f87c99b..550f55e6bfd0b9 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/entity/cassandra/CassandraEntityServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/cassandra/CassandraEntityServiceTest.java @@ -99,7 +99,8 @@ private void configureComponents() { null, opContext -> ((EntityServiceAspectRetriever) opContext.getAspectRetrieverOpt().get()) - .setSystemOperationContext(opContext)); + .setSystemOperationContext(opContext), + null); } /** diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/query/filter/BaseQueryFilterRewriterTest.java b/metadata-io/src/test/java/com/linkedin/metadata/search/query/filter/BaseQueryFilterRewriterTest.java new file mode 100644 index 00000000000000..6293d96fa35b84 --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/query/filter/BaseQueryFilterRewriterTest.java @@ -0,0 +1,75 @@ +package com.linkedin.metadata.search.query.filter; + +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; + +import com.linkedin.metadata.query.filter.Condition; +import com.linkedin.metadata.search.elasticsearch.query.filter.BaseQueryFilterRewriter; +import com.linkedin.metadata.search.elasticsearch.query.filter.QueryFilterRewriteChain; +import com.linkedin.metadata.search.elasticsearch.query.filter.QueryFilterRewriterContext; +import com.linkedin.metadata.search.elasticsearch.query.filter.QueryFilterRewriterSearchType; +import io.datahubproject.metadata.context.OperationContext; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.QueryBuilders; +import org.testng.annotations.Test; + +public abstract class BaseQueryFilterRewriterTest { + + abstract OperationContext getOpContext(); + + abstract T getTestRewriter(); + + abstract String getTargetField(); + + abstract String getTargetFieldValue(); + + abstract Condition getTargetCondition(); + + @Test + public void testPreservedMinimumMatchRewrite() { + BaseQueryFilterRewriter test = getTestRewriter(); + + // Setup nested container + BoolQueryBuilder testQuery = QueryBuilders.boolQuery().minimumShouldMatch(99); + testQuery.filter( + QueryBuilders.boolQuery() + .filter( + QueryBuilders.boolQuery() + .filter(QueryBuilders.termsQuery(getTargetField(), getTargetFieldValue())))); + testQuery.filter(QueryBuilders.existsQuery("someField")); + testQuery.should( + QueryBuilders.boolQuery() + .minimumShouldMatch(100) + .should( + QueryBuilders.boolQuery() + .minimumShouldMatch(101) + .should(QueryBuilders.termsQuery(getTargetField(), getTargetFieldValue())))); + + BoolQueryBuilder expectedRewrite = QueryBuilders.boolQuery().minimumShouldMatch(99); + expectedRewrite.filter( + QueryBuilders.boolQuery() + .filter( + QueryBuilders.boolQuery() + .filter(QueryBuilders.termsQuery(getTargetField(), getTargetFieldValue())))); + expectedRewrite.filter(QueryBuilders.existsQuery("someField")); + expectedRewrite.should( + QueryBuilders.boolQuery() + .minimumShouldMatch(100) + .should( + QueryBuilders.boolQuery() + .minimumShouldMatch(101) + .should(QueryBuilders.termsQuery(getTargetField(), getTargetFieldValue())))); + + assertEquals( + test.rewrite( + getOpContext(), + QueryFilterRewriterContext.builder() + .condition(getTargetCondition()) + .searchType(QueryFilterRewriterSearchType.FULLTEXT_SEARCH) + .queryFilterRewriteChain(mock(QueryFilterRewriteChain.class)) + .build(false), + testQuery), + expectedRewrite, + "Expected preservation of minimumShouldMatch"); + } +} diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/query/filter/ContainerExpansionRewriterTest.java b/metadata-io/src/test/java/com/linkedin/metadata/search/query/filter/ContainerExpansionRewriterTest.java index 5246e4dbe5bf92..fd768424e13c19 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/query/filter/ContainerExpansionRewriterTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/query/filter/ContainerExpansionRewriterTest.java @@ -39,7 +39,8 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -public class ContainerExpansionRewriterTest { +public class ContainerExpansionRewriterTest + extends BaseQueryFilterRewriterTest { private static final String FIELD_NAME = "container.keyword"; private final String grandParentUrn = "urn:li:container:grand"; private final String parentUrn = "urn:li:container:foo"; @@ -74,15 +75,40 @@ public void init() { .searchRetriever(TestOperationContexts.emptySearchRetriever) .build(), null, + null, null); } + @Override + OperationContext getOpContext() { + return opContext; + } + + @Override + ContainerExpansionRewriter getTestRewriter() { + return ContainerExpansionRewriter.builder() + .config(QueryFilterRewriterConfiguration.ExpansionRewriterConfiguration.DEFAULT) + .build(); + } + + @Override + String getTargetField() { + return FIELD_NAME; + } + + @Override + String getTargetFieldValue() { + return childUrn; + } + + @Override + Condition getTargetCondition() { + return Condition.ANCESTORS_INCL; + } + @Test public void testTermsQueryRewrite() { - ContainerExpansionRewriter test = - ContainerExpansionRewriter.builder() - .config(QueryFilterRewriterConfiguration.ExpansionRewriterConfiguration.DEFAULT) - .build(); + ContainerExpansionRewriter test = getTestRewriter(); TermsQueryBuilder notTheFieldQuery = QueryBuilders.termsQuery("notTheField", childUrn); assertEquals( diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/query/filter/DomainExpansionRewriterTest.java b/metadata-io/src/test/java/com/linkedin/metadata/search/query/filter/DomainExpansionRewriterTest.java index edc6449438581f..8741e24b1bca50 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/query/filter/DomainExpansionRewriterTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/query/filter/DomainExpansionRewriterTest.java @@ -39,7 +39,8 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -public class DomainExpansionRewriterTest { +public class DomainExpansionRewriterTest + extends BaseQueryFilterRewriterTest { private static final String FIELD_NAME = "domains.keyword"; private final String grandParentUrn = "urn:li:domain:grand"; private final String parentUrn = "urn:li:domain:foo"; @@ -74,15 +75,40 @@ public void init() { .searchRetriever(TestOperationContexts.emptySearchRetriever) .build(), null, + null, null); } + @Override + OperationContext getOpContext() { + return opContext; + } + + @Override + DomainExpansionRewriter getTestRewriter() { + return DomainExpansionRewriter.builder() + .config(QueryFilterRewriterConfiguration.ExpansionRewriterConfiguration.DEFAULT) + .build(); + } + + @Override + String getTargetField() { + return FIELD_NAME; + } + + @Override + String getTargetFieldValue() { + return parentUrn; + } + + @Override + Condition getTargetCondition() { + return Condition.DESCENDANTS_INCL; + } + @Test public void testTermsQueryRewrite() { - DomainExpansionRewriter test = - DomainExpansionRewriter.builder() - .config(QueryFilterRewriterConfiguration.ExpansionRewriterConfiguration.DEFAULT) - .build(); + DomainExpansionRewriter test = getTestRewriter(); TermsQueryBuilder notTheFieldQuery = QueryBuilders.termsQuery("notTheField", parentUrn); assertEquals( diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/utils/ESAccessControlUtilTest.java b/metadata-io/src/test/java/com/linkedin/metadata/search/utils/ESAccessControlUtilTest.java index b3dcaf174ed38d..12eb468fcc61a2 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/utils/ESAccessControlUtilTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/utils/ESAccessControlUtilTest.java @@ -97,6 +97,7 @@ public class ESAccessControlUtilTest { null, null, null, + null, null); private static final String VIEW_PRIVILEGE = "VIEW_ENTITY_PAGE"; @@ -106,6 +107,18 @@ public class ESAccessControlUtilTest { private static final Urn RESTRICTED_RESULT_URN = UrnUtils.getUrn("urn:li:restricted:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)"); + private static final String PREFIX_MATCH = + "urn:li:dataset:(urn:li:dataPlatform:snowflake,long_tail_companions.adoption.human"; + private static final Urn PREFIX_MATCH_URN = + UrnUtils.getUrn( + "urn:li:dataset:(urn:li:dataPlatform:snowflake,long_tail_companions.adoption.humans,PROD)"); + private static final Urn PREFIX_NO_MATCH_URN = + UrnUtils.getUrn( + "urn:li:dataset:(urn:li:dataPlatform:snowflake,long_tail_companions.adoption.meta_humans,PROD)"); + private static final Urn RESTRICTED_PREFIX_NO_MATCH_URN = + UrnUtils.getUrn( + "urn:li:restricted:(urn:li:dataPlatform:snowflake,long_tail_companions.adoption.meta_humans,PROD)"); + /** Comprehensive list of policy variations */ private static final Map TEST_POLICIES = ImmutableMap.builder() @@ -251,6 +264,30 @@ public class ESAccessControlUtilTest { .setValues( new StringArray( List.of(DOMAIN_A.toString()))))))))) + .put( + "urnPrefixAllUsers", + new DataHubPolicyInfo() + .setDisplayName("") + .setState(PoliciesConfig.ACTIVE_POLICY_STATE) + .setType(PoliciesConfig.METADATA_POLICY_TYPE) + .setActors( + new DataHubActorFilter() + .setAllUsers(true) + .setGroups(new UrnArray()) + .setUsers(new UrnArray())) + .setPrivileges(new StringArray(List.of(VIEW_PRIVILEGE))) + .setResources( + new DataHubResourceFilter() + .setFilter( + new PolicyMatchFilter() + .setCriteria( + new PolicyMatchCriterionArray( + List.of( + new PolicyMatchCriterion() + .setField("URN") + .setCondition(PolicyMatchCondition.STARTS_WITH) + .setValues( + new StringArray(List.of(PREFIX_MATCH))))))))) .build(); /** User A is a technical owner of the result User B has no ownership */ @@ -264,7 +301,7 @@ public void testAllUsersRestrictions() throws RemoteInvocationException, URISynt // USER A OperationContext userAContext = - sessionWithUserAGroupAandC(Set.of(TEST_POLICIES.get("allUsers"))); + sessionWithUserAGroupAandC(List.of(TEST_POLICIES.get("allUsers"))); SearchResult result = mockSearchResult(); ESAccessControlUtil.restrictSearchResult( @@ -272,20 +309,18 @@ public void testAllUsersRestrictions() throws RemoteInvocationException, URISynt assertEquals(result.getEntities().get(0).getEntity(), UNRESTRICTED_RESULT_URN); result = mockSearchResult(); - ESAccessControlUtil.restrictSearchResult( - userAContext.withSearchFlags(flags -> flags.setIncludeRestricted(false)), result); + ESAccessControlUtil.restrictSearchResult(userAContext, result); assertEquals(result.getEntities().get(0).getEntity(), UNRESTRICTED_RESULT_URN); // USER B - OperationContext userBContext = sessionWithUserBNoGroup(Set.of(TEST_POLICIES.get("allUsers"))); + OperationContext userBContext = sessionWithUserBNoGroup(List.of(TEST_POLICIES.get("allUsers"))); result = mockSearchResult(); ESAccessControlUtil.restrictSearchResult( userBContext.withSearchFlags(flags -> flags.setIncludeRestricted(true)), result); assertEquals(result.getEntities().get(0).getEntity(), UNRESTRICTED_RESULT_URN); result = mockSearchResult(); - ESAccessControlUtil.restrictSearchResult( - userBContext.withSearchFlags(flags -> flags.setIncludeRestricted(false)), result); + ESAccessControlUtil.restrictSearchResult(userBContext, result); assertEquals(result.getEntities().get(0).getEntity(), UNRESTRICTED_RESULT_URN); } @@ -293,7 +328,7 @@ public void testAllUsersRestrictions() throws RemoteInvocationException, URISynt public void testSingeUserRestrictions() throws RemoteInvocationException, URISyntaxException { // USER A - OperationContext userAContext = sessionWithUserAGroupAandC(Set.of(TEST_POLICIES.get("userA"))); + OperationContext userAContext = sessionWithUserAGroupAandC(List.of(TEST_POLICIES.get("userA"))); SearchResult result = mockSearchResult(); ESAccessControlUtil.restrictSearchResult( @@ -301,12 +336,11 @@ public void testSingeUserRestrictions() throws RemoteInvocationException, URISyn assertEquals(result.getEntities().get(0).getEntity(), UNRESTRICTED_RESULT_URN); result = mockSearchResult(); - ESAccessControlUtil.restrictSearchResult( - userAContext.withSearchFlags(flags -> flags.setIncludeRestricted(false)), result); + ESAccessControlUtil.restrictSearchResult(userAContext, result); assertEquals(result.getEntities().get(0).getEntity(), UNRESTRICTED_RESULT_URN); // USER B - OperationContext userBContext = sessionWithUserBNoGroup(Set.of(TEST_POLICIES.get("userA"))); + OperationContext userBContext = sessionWithUserBNoGroup(List.of(TEST_POLICIES.get("userA"))); result = mockSearchResult(); ESAccessControlUtil.restrictSearchResult( @@ -322,7 +356,7 @@ public void testAllGroupsRestrictions() throws RemoteInvocationException, URISyn // USER A OperationContext userAContext = - sessionWithUserAGroupAandC(Set.of(TEST_POLICIES.get("allGroups"))); + sessionWithUserAGroupAandC(List.of(TEST_POLICIES.get("allGroups"))); SearchResult result = mockSearchResult(); ESAccessControlUtil.restrictSearchResult( @@ -330,12 +364,12 @@ public void testAllGroupsRestrictions() throws RemoteInvocationException, URISyn assertEquals(result.getEntities().get(0).getEntity(), UNRESTRICTED_RESULT_URN); result = mockSearchResult(); - ESAccessControlUtil.restrictSearchResult( - userAContext.withSearchFlags(flags -> flags.setIncludeRestricted(false)), result); + ESAccessControlUtil.restrictSearchResult(userAContext, result); assertEquals(result.getEntities().get(0).getEntity(), UNRESTRICTED_RESULT_URN); // USER B (No Groups!) - OperationContext userBContext = sessionWithUserBNoGroup(Set.of(TEST_POLICIES.get("allGroups"))); + OperationContext userBContext = + sessionWithUserBNoGroup(List.of(TEST_POLICIES.get("allGroups"))); result = mockSearchResult(); ESAccessControlUtil.restrictSearchResult( @@ -352,7 +386,7 @@ public void testSingleGroupRestrictions() throws RemoteInvocationException, URIS // GROUP B Policy // USER A final OperationContext userAContext = - sessionWithUserAGroupAandC(Set.of(TEST_POLICIES.get("groupB"))); + sessionWithUserAGroupAandC(List.of(TEST_POLICIES.get("groupB"))); SearchResult result = mockSearchResult(); ESAccessControlUtil.restrictSearchResult( @@ -364,7 +398,7 @@ public void testSingleGroupRestrictions() throws RemoteInvocationException, URIS // USER B (No Groups!) final OperationContext userBContext = - sessionWithUserBNoGroup(Set.of(TEST_POLICIES.get("groupB"))); + sessionWithUserBNoGroup(List.of(TEST_POLICIES.get("groupB"))); result = mockSearchResult(); ESAccessControlUtil.restrictSearchResult( @@ -377,7 +411,7 @@ public void testSingleGroupRestrictions() throws RemoteInvocationException, URIS // Group C Policy // USER A final OperationContext userAGroupCContext = - sessionWithUserAGroupAandC(Set.of(TEST_POLICIES.get("groupC"))); + sessionWithUserAGroupAandC(List.of(TEST_POLICIES.get("groupC"))); result = mockSearchResult(); ESAccessControlUtil.restrictSearchResult( @@ -385,13 +419,12 @@ public void testSingleGroupRestrictions() throws RemoteInvocationException, URIS assertEquals(result.getEntities().get(0).getEntity(), UNRESTRICTED_RESULT_URN); result = mockSearchResult(); - ESAccessControlUtil.restrictSearchResult( - userAGroupCContext.withSearchFlags(flags -> flags.setIncludeRestricted(false)), result); + ESAccessControlUtil.restrictSearchResult(userAGroupCContext, result); assertEquals(result.getEntities().get(0).getEntity(), UNRESTRICTED_RESULT_URN); // USER B (No Groups!) final OperationContext userBgroupCContext = - sessionWithUserBNoGroup(Set.of(TEST_POLICIES.get("groupC"))); + sessionWithUserBNoGroup(List.of(TEST_POLICIES.get("groupC"))); result = mockSearchResult(); ESAccessControlUtil.restrictSearchResult( @@ -407,7 +440,7 @@ public void testAnyOwnerRestrictions() throws RemoteInvocationException, URISynt // USER A OperationContext userAContext = - sessionWithUserAGroupAandC(Set.of(TEST_POLICIES.get("anyOwner"))); + sessionWithUserAGroupAandC(List.of(TEST_POLICIES.get("anyOwner"))); SearchResult result = mockSearchResult(); ESAccessControlUtil.restrictSearchResult( @@ -415,12 +448,11 @@ public void testAnyOwnerRestrictions() throws RemoteInvocationException, URISynt assertEquals(result.getEntities().get(0).getEntity(), UNRESTRICTED_RESULT_URN); result = mockSearchResult(); - ESAccessControlUtil.restrictSearchResult( - userAContext.withSearchFlags(flags -> flags.setIncludeRestricted(false)), result); + ESAccessControlUtil.restrictSearchResult(userAContext, result); assertEquals(result.getEntities().get(0).getEntity(), UNRESTRICTED_RESULT_URN); // USER B (not an owner) - OperationContext userBContext = sessionWithUserBNoGroup(Set.of(TEST_POLICIES.get("anyOwner"))); + OperationContext userBContext = sessionWithUserBNoGroup(List.of(TEST_POLICIES.get("anyOwner"))); result = mockSearchResult(); ESAccessControlUtil.restrictSearchResult( @@ -436,7 +468,7 @@ public void testBusinessOwnerRestrictions() throws RemoteInvocationException, UR // USER A final OperationContext userAContext = - sessionWithUserAGroupAandC(Set.of(TEST_POLICIES.get("businessOwner"))); + sessionWithUserAGroupAandC(List.of(TEST_POLICIES.get("businessOwner"))); SearchResult result = mockSearchResult(); ESAccessControlUtil.restrictSearchResult( @@ -448,7 +480,7 @@ public void testBusinessOwnerRestrictions() throws RemoteInvocationException, UR // USER B final OperationContext userBContext = - sessionWithUserBNoGroup(Set.of(TEST_POLICIES.get("businessOwner"))); + sessionWithUserBNoGroup(List.of(TEST_POLICIES.get("businessOwner"))); result = mockSearchResult(); ESAccessControlUtil.restrictSearchResult( @@ -464,7 +496,7 @@ public void testDomainRestrictions() throws RemoteInvocationException, URISyntax // USER A OperationContext userAContext = - sessionWithUserAGroupAandC(Set.of(TEST_POLICIES.get("domainA"))); + sessionWithUserAGroupAandC(List.of(TEST_POLICIES.get("domainA"))); SearchResult result = mockSearchResult(); ESAccessControlUtil.restrictSearchResult( @@ -477,7 +509,7 @@ public void testDomainRestrictions() throws RemoteInvocationException, URISyntax assertEquals(result.getEntities().get(0).getEntity(), UNRESTRICTED_RESULT_URN); // USER B - OperationContext userBContext = sessionWithUserBNoGroup(Set.of(TEST_POLICIES.get("domainA"))); + OperationContext userBContext = sessionWithUserBNoGroup(List.of(TEST_POLICIES.get("domainA"))); result = mockSearchResult(); ESAccessControlUtil.restrictSearchResult( userBContext.withSearchFlags(flags -> flags.setIncludeRestricted(true)), result); @@ -489,6 +521,28 @@ public void testDomainRestrictions() throws RemoteInvocationException, URISyntax assertEquals(result.getEntities().get(0).getEntity(), UNRESTRICTED_RESULT_URN); } + @Test + public void testPrefixRestrictions() throws RemoteInvocationException, URISyntaxException { + + // USER A + OperationContext userAContext = + sessionWithUserAGroupAandC(List.of(TEST_POLICIES.get("urnPrefixAllUsers"))); + + SearchResult result = mockPrefixSearchResult(); + ESAccessControlUtil.restrictSearchResult( + userAContext.withSearchFlags(flags -> flags.setIncludeRestricted(true)), result); + assertEquals(result.getEntities().size(), 2); + assertEquals(result.getEntities().get(0).getEntity(), PREFIX_MATCH_URN); + assertEquals(result.getEntities().get(1).getEntity(), RESTRICTED_PREFIX_NO_MATCH_URN); + + result = mockPrefixSearchResult(); + ESAccessControlUtil.restrictSearchResult( + userAContext.withSearchFlags(flags -> flags.setIncludeRestricted(false)), result); + assertEquals(result.getEntities().size(), 2); + assertEquals(result.getEntities().get(0).getEntity(), PREFIX_MATCH_URN); + assertEquals(result.getEntities().get(1).getEntity(), PREFIX_NO_MATCH_URN); + } + private static RestrictedService mockRestrictedService() { RestrictedService mockRestrictedService = mock(RestrictedService.class); when(mockRestrictedService.encryptRestrictedUrn(any())) @@ -500,6 +554,23 @@ private static RestrictedService mockRestrictedService() { return mockRestrictedService; } + private static SearchResult mockPrefixSearchResult() { + SearchResult result = new SearchResult(); + result.setFrom(0); + result.setPageSize(10); + result.setNumEntities(1); + result.setEntities( + new SearchEntityArray( + new SearchEntity() + .setEntity(PREFIX_MATCH_URN) + .setMatchedFields(new MatchedFieldArray()), + new SearchEntity() + .setEntity(PREFIX_NO_MATCH_URN) + .setMatchedFields(new MatchedFieldArray()))); + result.setMetadata(mock(SearchResultMetadata.class)); + return result; + } + private static SearchResult mockSearchResult() { SearchResult result = new SearchResult(); result.setFrom(0); @@ -514,18 +585,18 @@ private static SearchResult mockSearchResult() { return result; } - private static OperationContext sessionWithUserAGroupAandC(Set policies) + private static OperationContext sessionWithUserAGroupAandC(List policies) throws RemoteInvocationException, URISyntaxException { return sessionWithUserGroups(USER_A_AUTH, policies, List.of(TEST_GROUP_A, TEST_GROUP_C)); } - private static OperationContext sessionWithUserBNoGroup(Set policies) + private static OperationContext sessionWithUserBNoGroup(List policies) throws RemoteInvocationException, URISyntaxException { return sessionWithUserGroups(USER_B_AUTH, policies, List.of()); } private static OperationContext sessionWithUserGroups( - Authentication auth, Set policies, List groups) + Authentication auth, List policies, List groups) throws RemoteInvocationException, URISyntaxException { Urn actorUrn = UrnUtils.getUrn(auth.getActor().toUrnStr()); Authorizer dataHubAuthorizer = @@ -538,7 +609,7 @@ public static class TestDataHubAuthorizer extends DataHubAuthorizer { public TestDataHubAuthorizer( @Nonnull OperationContext opContext, - @Nonnull Set policies, + @Nonnull List policies, @Nonnull Map> userGroups, @Nonnull Map>> resourceOwnerTypes) throws RemoteInvocationException, URISyntaxException { @@ -569,6 +640,7 @@ public TestDataHubAuthorizer( Collectors.groupingBy( Pair::getKey, Collectors.mapping(Pair::getValue, Collectors.toList()))); policyCache.putAll(byPrivilegeName); + policyCache.put(ALL, policies); } finally { readWriteLock.writeLock().unlock(); } diff --git a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLSpringCommonTestConfiguration.java b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLSpringCommonTestConfiguration.java index b34bb5bd0e0a81..c5f08fa8dcc8b6 100644 --- a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLSpringCommonTestConfiguration.java +++ b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLSpringCommonTestConfiguration.java @@ -21,6 +21,7 @@ import io.datahubproject.metadata.context.OperationContextConfig; import io.datahubproject.metadata.context.RetrieverContext; import io.datahubproject.metadata.context.ServicesRegistryContext; +import io.datahubproject.metadata.context.ValidationContext; import io.datahubproject.test.metadata.context.TestOperationContexts; import org.apache.avro.generic.GenericRecord; import org.springframework.beans.factory.annotation.Qualifier; @@ -88,7 +89,8 @@ public OperationContext operationContext( entityRegistry, mock(ServicesRegistryContext.class), indexConvention, - mock(RetrieverContext.class)); + mock(RetrieverContext.class), + mock(ValidationContext.class)); } @MockBean SpringStandardPluginConfiguration springStandardPluginConfiguration; diff --git a/metadata-operation-context/src/main/java/io/datahubproject/metadata/context/OperationContext.java b/metadata-operation-context/src/main/java/io/datahubproject/metadata/context/OperationContext.java index 61bf40f54817ee..9a058c526647c2 100644 --- a/metadata-operation-context/src/main/java/io/datahubproject/metadata/context/OperationContext.java +++ b/metadata-operation-context/src/main/java/io/datahubproject/metadata/context/OperationContext.java @@ -71,6 +71,7 @@ public static OperationContext asSession( .build()) .authorizationContext(AuthorizationContext.builder().authorizer(authorizer).build()) .requestContext(requestContext) + .validationContext(systemOperationContext.getValidationContext()) .build(sessionAuthentication); } @@ -122,7 +123,8 @@ public static OperationContext asSystem( @Nonnull EntityRegistry entityRegistry, @Nullable ServicesRegistryContext servicesRegistryContext, @Nullable IndexConvention indexConvention, - @Nullable RetrieverContext retrieverContext) { + @Nullable RetrieverContext retrieverContext, + @Nonnull ValidationContext validationContext) { return asSystem( config, systemAuthentication, @@ -130,6 +132,7 @@ public static OperationContext asSystem( servicesRegistryContext, indexConvention, retrieverContext, + validationContext, ObjectMapperContext.DEFAULT); } @@ -140,6 +143,7 @@ public static OperationContext asSystem( @Nullable ServicesRegistryContext servicesRegistryContext, @Nullable IndexConvention indexConvention, @Nullable RetrieverContext retrieverContext, + @Nonnull ValidationContext validationContext, @Nonnull ObjectMapperContext objectMapperContext) { ActorContext systemActorContext = @@ -161,6 +165,7 @@ public static OperationContext asSystem( .authorizationContext(AuthorizationContext.builder().authorizer(Authorizer.EMPTY).build()) .retrieverContext(retrieverContext) .objectMapperContext(objectMapperContext) + .validationContext(validationContext) .build(systemAuthentication); } @@ -174,6 +179,7 @@ public static OperationContext asSystem( @Nullable private final RequestContext requestContext; @Nullable private final RetrieverContext retrieverContext; @Nonnull private final ObjectMapperContext objectMapperContext; + @Nonnull private final ValidationContext validationContext; public OperationContext withSearchFlags( @Nonnull Function flagDefaults) { @@ -460,9 +466,8 @@ public OperationContext build(@Nonnull ActorContext sessionActor) { this.servicesRegistryContext, this.requestContext, this.retrieverContext, - this.objectMapperContext != null - ? this.objectMapperContext - : ObjectMapperContext.DEFAULT); + this.objectMapperContext != null ? this.objectMapperContext : ObjectMapperContext.DEFAULT, + this.validationContext); } private OperationContext build() { diff --git a/metadata-operation-context/src/main/java/io/datahubproject/metadata/context/ValidationContext.java b/metadata-operation-context/src/main/java/io/datahubproject/metadata/context/ValidationContext.java new file mode 100644 index 00000000000000..76560f087f22d8 --- /dev/null +++ b/metadata-operation-context/src/main/java/io/datahubproject/metadata/context/ValidationContext.java @@ -0,0 +1,18 @@ +package io.datahubproject.metadata.context; + +import java.util.Optional; +import lombok.Builder; +import lombok.Getter; + +/** Context holder for environment variables relevant to operations */ +@Builder +@Getter +public class ValidationContext implements ContextInterface { + // Uses alternate validation flow for MCP ingestion + private final boolean alternateValidation; + + @Override + public Optional getCacheKeyComponent() { + return Optional.of(alternateValidation ? 1 : 0); + } +} diff --git a/metadata-operation-context/src/main/java/io/datahubproject/test/metadata/context/TestOperationContexts.java b/metadata-operation-context/src/main/java/io/datahubproject/test/metadata/context/TestOperationContexts.java index cdcbb540eeda43..42de6b7398c616 100644 --- a/metadata-operation-context/src/main/java/io/datahubproject/test/metadata/context/TestOperationContexts.java +++ b/metadata-operation-context/src/main/java/io/datahubproject/test/metadata/context/TestOperationContexts.java @@ -31,6 +31,7 @@ import io.datahubproject.metadata.context.RequestContext; import io.datahubproject.metadata.context.RetrieverContext; import io.datahubproject.metadata.context.ServicesRegistryContext; +import io.datahubproject.metadata.context.ValidationContext; import java.util.List; import java.util.Map; import java.util.Optional; @@ -58,6 +59,8 @@ public class TestOperationContexts { .build(); private static EntityRegistry defaultEntityRegistryInstance; + private static ValidationContext defaultValidationContext = + ValidationContext.builder().alternateValidation(false).build(); public static EntityRegistry defaultEntityRegistry() { if (defaultEntityRegistryInstance == null) { @@ -114,6 +117,11 @@ public static OperationContext systemContextNoSearchAuthorization() { return systemContextNoSearchAuthorization(null, null, null); } + public static OperationContext systemContextNoValidate() { + return systemContextNoSearchAuthorization( + null, null, null, () -> ValidationContext.builder().alternateValidation(true).build()); + } + public static OperationContext systemContextNoSearchAuthorization( @Nullable EntityRegistry entityRegistry, @Nullable IndexConvention indexConvention) { return systemContextNoSearchAuthorization(() -> entityRegistry, null, () -> indexConvention); @@ -160,9 +168,27 @@ public static OperationContext systemContextNoSearchAuthorization( entityRegistrySupplier, retrieverContextSupplier, indexConventionSupplier, + null, null); } + public static OperationContext systemContextNoSearchAuthorization( + @Nullable Supplier entityRegistrySupplier, + @Nullable Supplier retrieverContextSupplier, + @Nullable Supplier indexConventionSupplier, + @Nullable Supplier environmentContextSupplier) { + + return systemContext( + null, + null, + null, + entityRegistrySupplier, + retrieverContextSupplier, + indexConventionSupplier, + null, + environmentContextSupplier); + } + public static OperationContext systemContext( @Nullable Supplier configSupplier, @Nullable Supplier systemAuthSupplier, @@ -170,7 +196,8 @@ public static OperationContext systemContext( @Nullable Supplier entityRegistrySupplier, @Nullable Supplier retrieverContextSupplier, @Nullable Supplier indexConventionSupplier, - @Nullable Consumer postConstruct) { + @Nullable Consumer postConstruct, + @Nullable Supplier environmentContextSupplier) { OperationContextConfig config = Optional.ofNullable(configSupplier).map(Supplier::get).orElse(DEFAULT_OPCONTEXT_CONFIG); @@ -196,6 +223,11 @@ public static OperationContext systemContext( ServicesRegistryContext servicesRegistryContext = Optional.ofNullable(servicesRegistrySupplier).orElse(() -> null).get(); + ValidationContext validationContext = + Optional.ofNullable(environmentContextSupplier) + .map(Supplier::get) + .orElse(defaultValidationContext); + OperationContext operationContext = OperationContext.asSystem( config, @@ -203,7 +235,8 @@ public static OperationContext systemContext( entityRegistry, servicesRegistryContext, indexConvention, - retrieverContext); + retrieverContext, + validationContext); if (postConstruct != null) { postConstruct.accept(operationContext); diff --git a/metadata-operation-context/src/test/java/io/datahubproject/metadata/context/OperationContextTest.java b/metadata-operation-context/src/test/java/io/datahubproject/metadata/context/OperationContextTest.java index 6a5e9e04f7dda5..3e092e20127ee5 100644 --- a/metadata-operation-context/src/test/java/io/datahubproject/metadata/context/OperationContextTest.java +++ b/metadata-operation-context/src/test/java/io/datahubproject/metadata/context/OperationContextTest.java @@ -25,7 +25,8 @@ public void testSystemPrivilegeEscalation() { mock(EntityRegistry.class), mock(ServicesRegistryContext.class), null, - mock(RetrieverContext.class)); + mock(RetrieverContext.class), + mock(ValidationContext.class)); OperationContext opContext = systemOpContext.asSession(RequestContext.TEST, Authorizer.EMPTY, userAuth); diff --git a/metadata-service/auth-impl/src/test/java/com/datahub/authorization/DataHubAuthorizerTest.java b/metadata-service/auth-impl/src/test/java/com/datahub/authorization/DataHubAuthorizerTest.java index 985cfe48f6bcf6..4437682bfeb0a1 100644 --- a/metadata-service/auth-impl/src/test/java/com/datahub/authorization/DataHubAuthorizerTest.java +++ b/metadata-service/auth-impl/src/test/java/com/datahub/authorization/DataHubAuthorizerTest.java @@ -54,6 +54,7 @@ import io.datahubproject.metadata.context.OperationContextConfig; import io.datahubproject.metadata.context.RetrieverContext; import io.datahubproject.metadata.context.ServicesRegistryContext; +import io.datahubproject.metadata.context.ValidationContext; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -318,7 +319,8 @@ public void setupTest() throws Exception { mock(EntityRegistry.class), mock(ServicesRegistryContext.class), mock(IndexConvention.class), - mock(RetrieverContext.class)); + mock(RetrieverContext.class), + mock(ValidationContext.class)); _dataHubAuthorizer = new DataHubAuthorizer( @@ -598,7 +600,6 @@ private DataHubPolicyInfo createDataHubPolicyInfoFor( dataHubPolicyInfo.setDisplayName("My Test Display"); dataHubPolicyInfo.setDescription("My test display!"); dataHubPolicyInfo.setEditable(true); - dataHubPolicyInfo.setActors(actorFilter); final DataHubResourceFilter resourceFilter = new DataHubResourceFilter(); diff --git a/metadata-service/configuration/src/main/java/com/linkedin/datahub/graphql/featureflags/FeatureFlags.java b/metadata-service/configuration/src/main/java/com/linkedin/datahub/graphql/featureflags/FeatureFlags.java index 167515a13c4da2..0c62bdc1963261 100644 --- a/metadata-service/configuration/src/main/java/com/linkedin/datahub/graphql/featureflags/FeatureFlags.java +++ b/metadata-service/configuration/src/main/java/com/linkedin/datahub/graphql/featureflags/FeatureFlags.java @@ -23,4 +23,5 @@ public class FeatureFlags { private boolean dataContractsEnabled = false; private boolean editableDatasetNameEnabled = false; private boolean showSeparateSiblings = false; + private boolean alternateMCPValidation = false; } diff --git a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/MCPValidationConfig.java b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/MCPValidationConfig.java new file mode 100644 index 00000000000000..622dbb010a5766 --- /dev/null +++ b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/MCPValidationConfig.java @@ -0,0 +1,9 @@ +package com.linkedin.metadata.config; + +import com.linkedin.metadata.config.structuredProperties.extensions.ModelExtensionValidationConfiguration; +import lombok.Data; + +@Data +public class MCPValidationConfig { + private ModelExtensionValidationConfiguration extensions; +} diff --git a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/MetadataChangeProposalConfig.java b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/MetadataChangeProposalConfig.java index 4e8c18912c40ea..86b4a1b8562b76 100644 --- a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/MetadataChangeProposalConfig.java +++ b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/MetadataChangeProposalConfig.java @@ -8,6 +8,7 @@ public class MetadataChangeProposalConfig { ThrottlesConfig throttle; + MCPValidationConfig validation; SideEffectsConfig sideEffects; @Data diff --git a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/structuredProperties/extensions/ModelExtensionValidationConfiguration.java b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/structuredProperties/extensions/ModelExtensionValidationConfiguration.java new file mode 100644 index 00000000000000..71db3091540380 --- /dev/null +++ b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/structuredProperties/extensions/ModelExtensionValidationConfiguration.java @@ -0,0 +1,10 @@ +package com.linkedin.metadata.config.structuredProperties.extensions; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +@Data +@Slf4j +public class ModelExtensionValidationConfiguration { + private boolean enabled; +} diff --git a/metadata-service/configuration/src/main/resources/application.yaml b/metadata-service/configuration/src/main/resources/application.yaml index ef3ae76d81fae3..50170410cd635f 100644 --- a/metadata-service/configuration/src/main/resources/application.yaml +++ b/metadata-service/configuration/src/main/resources/application.yaml @@ -435,6 +435,7 @@ featureFlags: schemaFieldEntityFetchEnabled: ${SCHEMA_FIELD_ENTITY_FETCH_ENABLED:true} # Enables fetching for schema field entities from the database when we hydrate them on schema fields businessAttributeEntityEnabled: ${BUSINESS_ATTRIBUTE_ENTITY_ENABLED:false} # Enables business attribute entity which can be associated with field of dataset dataContractsEnabled: ${DATA_CONTRACTS_ENABLED:true} # Enables the Data Contracts feature (Tab) in the UI + alternateMCPValidation: ${ALTERNATE_MCP_VALIDATION:false} # Enables alternate MCP validation flow showSeparateSiblings: ${SHOW_SEPARATE_SIBLINGS:false} # If turned on, all siblings will be separated with no way to get to a "combined" sibling view editableDatasetNameEnabled: ${EDITABLE_DATASET_NAME_ENABLED:false} # Enables the ability to edit the dataset name in the UI @@ -539,6 +540,8 @@ businessAttribute: metadataChangeProposal: validation: ignoreUnknown: ${MCP_VALIDATION_IGNORE_UNKNOWN:true} + extensions: + enabled: ${MCP_VALIDATION_EXTENSIONS_ENABLED:false} sideEffects: schemaField: enabled: ${MCP_SIDE_EFFECTS_SCHEMA_FIELD_ENABLED:false} diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/CacheConfig.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/CacheConfig.java index 383716a80cc60a..545f8e087838d6 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/CacheConfig.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/CacheConfig.java @@ -48,7 +48,7 @@ private Caffeine caffeineCacheBuilder() { return Caffeine.newBuilder() .initialCapacity(100) .maximumSize(cacheMaxSize) - .expireAfterAccess(cacheTtlSeconds, TimeUnit.SECONDS) + .expireAfterWrite(cacheTtlSeconds, TimeUnit.SECONDS) .recordStats(); } @@ -86,10 +86,7 @@ public CacheManager hazelcastCacheManager( @Bean @ConditionalOnProperty(name = "searchService.cacheImplementation", havingValue = "hazelcast") public MapConfig defaultMapConfig() { - // TODO: This setting is equivalent to expireAfterAccess, refreshes timer after a get, put, - // containsKey etc. - // is this behavior what we actually desire? Should we change it now? - MapConfig mapConfig = new MapConfig().setMaxIdleSeconds(cacheTtlSeconds); + MapConfig mapConfig = new MapConfig().setTimeToLiveSeconds(cacheTtlSeconds); EvictionConfig evictionConfig = new EvictionConfig() diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/context/SystemOperationContextFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/context/SystemOperationContextFactory.java index b8c4796dd43122..f5235dc3682fce 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/context/SystemOperationContextFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/context/SystemOperationContextFactory.java @@ -16,6 +16,7 @@ import io.datahubproject.metadata.context.OperationContextConfig; import io.datahubproject.metadata.context.RetrieverContext; import io.datahubproject.metadata.context.ServicesRegistryContext; +import io.datahubproject.metadata.context.ValidationContext; import io.datahubproject.metadata.services.RestrictedService; import javax.annotation.Nonnull; import org.springframework.beans.factory.annotation.Qualifier; @@ -43,7 +44,8 @@ protected OperationContext javaSystemOperationContext( @Nonnull final GraphService graphService, @Nonnull final SearchService searchService, @Qualifier("baseElasticSearchComponents") - BaseElasticSearchComponentsFactory.BaseElasticSearchComponents components) { + BaseElasticSearchComponentsFactory.BaseElasticSearchComponents components, + @Nonnull final ConfigurationProvider configurationProvider) { EntityServiceAspectRetriever entityServiceAspectRetriever = EntityServiceAspectRetriever.builder() @@ -68,6 +70,10 @@ protected OperationContext javaSystemOperationContext( .aspectRetriever(entityServiceAspectRetriever) .graphRetriever(systemGraphRetriever) .searchRetriever(searchServiceSearchRetriever) + .build(), + ValidationContext.builder() + .alternateValidation( + configurationProvider.getFeatureFlags().isAlternateMCPValidation()) .build()); entityServiceAspectRetriever.setSystemOperationContext(systemOperationContext); @@ -95,7 +101,8 @@ protected OperationContext restliSystemOperationContext( @Nonnull final GraphService graphService, @Nonnull final SearchService searchService, @Qualifier("baseElasticSearchComponents") - BaseElasticSearchComponentsFactory.BaseElasticSearchComponents components) { + BaseElasticSearchComponentsFactory.BaseElasticSearchComponents components, + @Nonnull final ConfigurationProvider configurationProvider) { EntityClientAspectRetriever entityServiceAspectRetriever = EntityClientAspectRetriever.builder().entityClient(systemEntityClient).build(); @@ -117,6 +124,10 @@ protected OperationContext restliSystemOperationContext( .aspectRetriever(entityServiceAspectRetriever) .graphRetriever(systemGraphRetriever) .searchRetriever(searchServiceSearchRetriever) + .build(), + ValidationContext.builder() + .alternateValidation( + configurationProvider.getFeatureFlags().isAlternateMCPValidation()) .build()); entityServiceAspectRetriever.setSystemOperationContext(systemOperationContext); diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java index 943b1c7184a60d..b2db0857a6a5c8 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java @@ -45,7 +45,7 @@ public MutationHook ignoreUnknownMutator() { AspectPluginConfig.builder() .className(IgnoreUnknownMutator.class.getName()) .enabled(ignoreUnknownEnabled && !extensionsEnabled) - .supportedOperations(List.of("CREATE", "CREATE_ENTITY", "UPSERT")) + .supportedOperations(List.of("*")) .supportedEntityAspectNames( List.of( AspectPluginConfig.EntityAspectName.builder() diff --git a/metadata-service/openapi-servlet/models/src/main/java/io/datahubproject/openapi/v2/models/GenericEntityV2.java b/metadata-service/openapi-servlet/models/src/main/java/io/datahubproject/openapi/v2/models/GenericEntityV2.java index 83c23f3552409b..e281a39f764e53 100644 --- a/metadata-service/openapi-servlet/models/src/main/java/io/datahubproject/openapi/v2/models/GenericEntityV2.java +++ b/metadata-service/openapi-servlet/models/src/main/java/io/datahubproject/openapi/v2/models/GenericEntityV2.java @@ -3,6 +3,7 @@ import com.datahub.util.RecordUtils; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.linkedin.data.template.RecordTemplate; import com.linkedin.mxe.SystemMetadata; @@ -37,29 +38,42 @@ public static class GenericEntityV2Builder { public GenericEntityV2 build( ObjectMapper objectMapper, Map> aspects) { + return build(objectMapper, aspects, false); + } + + public GenericEntityV2 build( + ObjectMapper objectMapper, + Map> aspects, + boolean isAsyncAlternateValidation) { Map jsonObjectMap = aspects.entrySet().stream() .map( e -> { try { - Map valueMap = + Map valueMap = Map.of( "value", objectMapper.readTree( RecordUtils.toJsonString(e.getValue().getFirst()) .getBytes(StandardCharsets.UTF_8))); + Object aspectValue = + isAsyncAlternateValidation + ? valueMap.get("value").get("value") + : valueMap.get("value"); + if (e.getValue().getSecond() != null) { return Map.entry( e.getKey(), new GenericAspectV2( Map.of( - "systemMetadata", e.getValue().getSecond(), - "value", valueMap.get("value")))); + "systemMetadata", + e.getValue().getSecond(), + "value", + aspectValue))); } else { return Map.entry( - e.getKey(), - new GenericAspectV2(Map.of("value", valueMap.get("value")))); + e.getKey(), new GenericAspectV2(Map.of("value", aspectValue))); } } catch (IOException ex) { throw new RuntimeException(ex); diff --git a/metadata-service/openapi-servlet/models/src/main/java/io/datahubproject/openapi/v3/models/GenericEntityV3.java b/metadata-service/openapi-servlet/models/src/main/java/io/datahubproject/openapi/v3/models/GenericEntityV3.java index 54d6ac2c1736f4..dbbfc117a8ab30 100644 --- a/metadata-service/openapi-servlet/models/src/main/java/io/datahubproject/openapi/v3/models/GenericEntityV3.java +++ b/metadata-service/openapi-servlet/models/src/main/java/io/datahubproject/openapi/v3/models/GenericEntityV3.java @@ -29,6 +29,10 @@ public GenericEntityV3(Map m) { super(m); } + public String getUrn() { + return (String) get("urn"); + } + @Override public Map getAspects() { return this.entrySet().stream() @@ -40,17 +44,31 @@ public static class GenericEntityV3Builder { public GenericEntityV3 build( ObjectMapper objectMapper, @Nonnull Urn urn, Map aspects) { + return build(objectMapper, urn, aspects, false); + } + + public GenericEntityV3 build( + ObjectMapper objectMapper, + @Nonnull Urn urn, + Map aspects, + boolean isAsyncAlternateValidation) { Map jsonObjectMap = aspects.entrySet().stream() .map( entry -> { try { String aspectName = entry.getKey(); - Map aspectValue = + Map aspectValueMap = objectMapper.readValue( RecordUtils.toJsonString(entry.getValue().getAspect()) .getBytes(StandardCharsets.UTF_8), new TypeReference<>() {}); + + Map aspectValue = + isAsyncAlternateValidation + ? (Map) aspectValueMap.get("value") + : aspectValueMap; + Map systemMetadata = entry.getValue().getSystemMetadata() != null ? objectMapper.convertValue( diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/controller/GenericEntitiesController.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/controller/GenericEntitiesController.java index 7427f293c848f5..ee23cced7f468b 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/controller/GenericEntitiesController.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/controller/GenericEntitiesController.java @@ -163,7 +163,9 @@ protected abstract List buildEntityVersionedAspectList( throws URISyntaxException; protected abstract List buildEntityList( - Set ingestResults, boolean withSystemMetadata); + Collection ingestResults, + boolean withSystemMetadata, + boolean isAsyncAlternateValidation); protected abstract E buildGenericEntity( @Nonnull String aspectName, @@ -510,12 +512,14 @@ public ResponseEntity> createEntity( } AspectsBatch batch = toMCPBatch(opContext, jsonEntityList, authentication.getActor()); - Set results = entityService.ingestProposal(opContext, batch, async); + List results = entityService.ingestProposal(opContext, batch, async); if (!async) { - return ResponseEntity.ok(buildEntityList(results, withSystemMetadata)); + return ResponseEntity.ok(buildEntityList(results, withSystemMetadata, false)); } else { - return ResponseEntity.accepted().body(buildEntityList(results, withSystemMetadata)); + boolean isAsyncAlternateValidation = opContext.getValidationContext().isAlternateValidation(); + return ResponseEntity.accepted() + .body(buildEntityList(results, withSystemMetadata, isAsyncAlternateValidation)); } } @@ -602,7 +606,7 @@ public ResponseEntity createAspect( jsonAspect, authentication.getActor()); - Set results = + List results = entityService.ingestProposal( opContext, AspectsBatchImpl.builder() diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/EntityController.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/EntityController.java index 7bec052a9fd5d2..d55f2fd1c2a04e 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/EntityController.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/EntityController.java @@ -23,11 +23,14 @@ import com.linkedin.metadata.entity.UpdateAspectResult; import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl; import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl; +import com.linkedin.metadata.entity.ebean.batch.ProposedItem; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.search.SearchEntity; import com.linkedin.metadata.search.SearchEntityArray; import com.linkedin.metadata.utils.AuditStampUtils; import com.linkedin.metadata.utils.GenericRecordUtils; +import com.linkedin.metadata.utils.SystemMetadataUtils; +import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.mxe.SystemMetadata; import com.linkedin.util.Pair; import io.datahubproject.metadata.context.OperationContext; @@ -161,7 +164,25 @@ protected AspectsBatch toMCPBatch( AspectSpec aspectSpec = lookupAspectSpec(entityUrn, aspect.getKey()).get(); - if (aspectSpec != null) { + if (opContext.getValidationContext().isAlternateValidation()) { + ProposedItem.ProposedItemBuilder builder = + ProposedItem.builder() + .metadataChangeProposal( + new MetadataChangeProposal() + .setEntityUrn(entityUrn) + .setAspectName(aspect.getKey()) + .setEntityType(entityUrn.getEntityType()) + .setChangeType(ChangeType.UPSERT) + .setAspect(GenericRecordUtils.serializeAspect(aspect.getValue())) + .setSystemMetadata(SystemMetadataUtils.createDefaultSystemMetadata())) + .auditStamp(AuditStampUtils.createAuditStamp(actor.toUrnStr())) + .entitySpec( + opContext + .getAspectRetriever() + .getEntityRegistry() + .getEntitySpec(entityUrn.getEntityType())); + items.add(builder.build()); + } else if (aspectSpec != null) { ChangeItemImpl.ChangeItemImplBuilder builder = ChangeItemImpl.builder() .urn(entityUrn) @@ -181,7 +202,7 @@ protected AspectsBatch toMCPBatch( objectMapper.writeValueAsString(aspect.getValue().get("systemMetadata")))); } - items.add(builder.build(opContext.getRetrieverContext().get().getAspectRetriever())); + items.add(builder.build(opContext.getAspectRetrieverOpt().get())); } } } @@ -263,7 +284,9 @@ private List toRecordTemplates( @Override protected List buildEntityList( - Set ingestResults, boolean withSystemMetadata) { + Collection ingestResults, + boolean withSystemMetadata, + boolean isAsyncAlternateValidation) { List responseList = new LinkedList<>(); Map> entityMap = @@ -282,7 +305,7 @@ protected List buildEntityList( responseList.add( GenericEntityV2.builder() .urn(urnAspects.getKey().toString()) - .build(objectMapper, aspectsMap)); + .build(objectMapper, aspectsMap, isAsyncAlternateValidation)); } return responseList; } diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/controller/EntityController.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/controller/EntityController.java index 55cf310be3438d..64046b23db7beb 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/controller/EntityController.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/controller/EntityController.java @@ -13,6 +13,8 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.linkedin.common.urn.Urn; import com.linkedin.data.ByteString; +import com.linkedin.data.template.SetMode; +import com.linkedin.data.template.StringMap; import com.linkedin.entity.EnvelopedAspect; import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.aspect.AspectRetriever; @@ -24,6 +26,7 @@ import com.linkedin.metadata.entity.UpdateAspectResult; import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl; import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl; +import com.linkedin.metadata.entity.ebean.batch.ProposedItem; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.EntitySpec; import com.linkedin.metadata.query.filter.SortCriterion; @@ -34,6 +37,7 @@ import com.linkedin.metadata.utils.AuditStampUtils; import com.linkedin.metadata.utils.GenericRecordUtils; import com.linkedin.metadata.utils.SearchUtil; +import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.mxe.SystemMetadata; import io.datahubproject.metadata.context.OperationContext; import io.datahubproject.metadata.context.RequestContext; @@ -281,7 +285,9 @@ private Map toAspectItemMap( @Override protected List buildEntityList( - Set ingestResults, boolean withSystemMetadata) { + Collection ingestResults, + boolean withSystemMetadata, + boolean isAsyncAlternateValidation) { List responseList = new LinkedList<>(); Map> entityMap = @@ -304,7 +310,8 @@ protected List buildEntityList( .build())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); responseList.add( - GenericEntityV3.builder().build(objectMapper, urnAspects.getKey(), aspectsMap)); + GenericEntityV3.builder() + .build(objectMapper, urnAspects.getKey(), aspectsMap, isAsyncAlternateValidation)); } return responseList; } @@ -442,23 +449,42 @@ protected AspectsBatch toMCPBatch( } AspectSpec aspectSpec = lookupAspectSpec(entityUrn, aspect.getKey()).orElse(null); + SystemMetadata systemMetadata = null; + if (aspect.getValue().has("systemMetadata")) { + systemMetadata = + EntityApiUtils.parseSystemMetadata( + objectMapper.writeValueAsString(aspect.getValue().get("systemMetadata"))); + ((ObjectNode) aspect.getValue()).remove("systemMetadata"); + } + Map headers = null; + if (aspect.getValue().has("headers")) { + headers = + objectMapper.convertValue( + aspect.getValue().get("headers"), new TypeReference<>() {}); + } - if (aspectSpec != null) { - - SystemMetadata systemMetadata = null; - if (aspect.getValue().has("systemMetadata")) { - systemMetadata = - EntityApiUtils.parseSystemMetadata( - objectMapper.writeValueAsString(aspect.getValue().get("systemMetadata"))); - ((ObjectNode) aspect.getValue()).remove("systemMetadata"); - } - Map headers = null; - if (aspect.getValue().has("headers")) { - headers = - objectMapper.convertValue( - aspect.getValue().get("headers"), new TypeReference<>() {}); - } - + if (opContext.getValidationContext().isAlternateValidation()) { + ProposedItem.ProposedItemBuilder builder = + ProposedItem.builder() + .metadataChangeProposal( + new MetadataChangeProposal() + .setEntityUrn(entityUrn) + .setAspectName(aspect.getKey()) + .setEntityType(entityUrn.getEntityType()) + .setChangeType(ChangeType.UPSERT) + .setAspect(GenericRecordUtils.serializeAspect(aspect.getValue())) + .setHeaders( + headers != null ? new StringMap(headers) : null, + SetMode.IGNORE_NULL) + .setSystemMetadata(systemMetadata, SetMode.IGNORE_NULL)) + .auditStamp(AuditStampUtils.createAuditStamp(actor.toUrnStr())) + .entitySpec( + opContext + .getAspectRetriever() + .getEntityRegistry() + .getEntitySpec(entityUrn.getEntityType())); + items.add(builder.build()); + } else if (aspectSpec != null) { ChangeItemImpl.ChangeItemImplBuilder builder = ChangeItemImpl.builder() .urn(entityUrn) diff --git a/metadata-service/restli-client-api/src/main/java/com/linkedin/entity/client/EntityClient.java b/metadata-service/restli-client-api/src/main/java/com/linkedin/entity/client/EntityClient.java index 5f086e79a387a8..cf6e571cb8cbeb 100644 --- a/metadata-service/restli-client-api/src/main/java/com/linkedin/entity/client/EntityClient.java +++ b/metadata-service/restli-client-api/src/main/java/com/linkedin/entity/client/EntityClient.java @@ -36,6 +36,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import javax.annotation.Nonnull; @@ -523,12 +524,16 @@ default String ingestProposal( * * @return the urn string ingested */ + @Nullable default String ingestProposal( @Nonnull OperationContext opContext, @Nonnull final MetadataChangeProposal metadataChangeProposal, final boolean async) throws RemoteInvocationException { - return batchIngestProposals(opContext, List.of(metadataChangeProposal), async).get(0); + return batchIngestProposals(opContext, List.of(metadataChangeProposal), async).stream() + .filter(Objects::nonNull) + .findFirst() + .orElse(null); } @Deprecated diff --git a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java index 12422044bc8ef9..634e8d32cc7222 100644 --- a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java +++ b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java @@ -33,6 +33,7 @@ import com.linkedin.metadata.resources.restli.RestliUtils; import com.linkedin.metadata.search.EntitySearchService; import com.linkedin.metadata.timeseries.TimeseriesAspectService; +import com.linkedin.mxe.GenericAspect; import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.mxe.SystemMetadata; import com.linkedin.parseq.Task; @@ -51,6 +52,7 @@ import io.datahubproject.metadata.context.RequestContext; import io.opentelemetry.extension.annotations.WithSpan; import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; import java.time.Clock; import java.util.Arrays; import java.util.List; @@ -61,6 +63,7 @@ import javax.inject.Inject; import javax.inject.Named; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; /** Single unified resource for fetching, updating, searching, & browsing DataHub entities */ @Slf4j @@ -85,6 +88,8 @@ public class AspectResource extends CollectionResourceTaskTemplate _entityService; @@ -238,16 +243,20 @@ public Task ingestProposal( @ActionParam(PARAM_PROPOSAL) @Nonnull MetadataChangeProposal metadataChangeProposal, @ActionParam(PARAM_ASYNC) @Optional(UNSET) String async) throws URISyntaxException { - log.info("INGEST PROPOSAL proposal: {}", metadataChangeProposal); - - final boolean asyncBool; - if (UNSET.equals(async)) { - asyncBool = Boolean.parseBoolean(System.getenv(ASYNC_INGEST_DEFAULT_NAME)); - } else { - asyncBool = Boolean.parseBoolean(async); - } - return ingestProposals(List.of(metadataChangeProposal), asyncBool); + String urn = metadataChangeProposal.getEntityUrn() != null ? metadataChangeProposal.getEntityUrn().toString() : + java.util.Optional.ofNullable(metadataChangeProposal.getEntityKeyAspect()).orElse(new GenericAspect()) + .getValue().asString(StandardCharsets.UTF_8); + String proposedValue = java.util.Optional.ofNullable(metadataChangeProposal.getAspect()).orElse(new GenericAspect()) + .getValue().asString(StandardCharsets.UTF_8); + + final boolean asyncBool; + if (UNSET.equals(async)) { + asyncBool = Boolean.parseBoolean(System.getenv(ASYNC_INGEST_DEFAULT_NAME)); + } else { + asyncBool = Boolean.parseBoolean(async); + } + return ingestProposals(List.of(metadataChangeProposal), asyncBool); } @Action(name = ACTION_INGEST_PROPOSAL_BATCH) @@ -303,10 +312,21 @@ private Task ingestProposals( log.debug("Proposals: {}", metadataChangeProposals); try { final AspectsBatch batch = AspectsBatchImpl.builder() - .mcps(metadataChangeProposals, auditStamp, opContext.getRetrieverContext().get()) + .mcps(metadataChangeProposals, auditStamp, opContext.getRetrieverContext().get(), + opContext.getValidationContext().isAlternateValidation()) .build(); - Set results = + batch.getMCPItems().forEach(item -> + log.info( + "INGEST PROPOSAL content: urn: {}, async: {}, value: {}", + item.getUrn(), + asyncBool, + StringUtils.abbreviate(java.util.Optional.ofNullable(item.getMetadataChangeProposal()) + .map(MetadataChangeProposal::getAspect) + .orElse(new GenericAspect()) + .getValue().asString(StandardCharsets.UTF_8), MAX_LOG_WIDTH))); + + List results = _entityService.ingestProposal(opContext, batch, asyncBool); for (IngestResult result : results) { diff --git a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/EntityResource.java b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/EntityResource.java index 9755a76848adfc..30aa3ffa578c17 100644 --- a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/EntityResource.java +++ b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/EntityResource.java @@ -927,7 +927,7 @@ public Task deleteEntity( opContext.getEntityRegistry(), urn.getEntityType()); if (aspectName != null && !timeseriesAspectNames.contains(aspectName)) { throw new UnsupportedOperationException( - String.format("Not supported for non-timeseries aspect '{}'.", aspectName)); + String.format("Not supported for non-timeseries aspect %s.", aspectName)); } List timeseriesAspectsToDelete = (aspectName == null) ? timeseriesAspectNames : ImmutableList.of(aspectName); diff --git a/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java b/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java index 9872f45648d7b4..82db3d88b9e12d 100644 --- a/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java +++ b/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java @@ -27,6 +27,7 @@ import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.service.UpdateIndicesService; import com.linkedin.metadata.utils.GenericRecordUtils; +import com.linkedin.mxe.GenericAspect; import com.linkedin.mxe.MetadataChangeLog; import com.linkedin.mxe.MetadataChangeProposal; import java.net.URISyntaxException; @@ -136,4 +137,31 @@ public void testAsyncDefaultAspects() throws URISyntaxException { .produceMetadataChangeLog(eq(urn), any(AspectSpec.class), any(MetadataChangeLog.class)); verifyNoMoreInteractions(producer); } + + @Test + public void testNoValidateAsync() throws URISyntaxException { + OperationContext noValidateOpContext = TestOperationContexts.systemContextNoValidate(); + aspectResource.setSystemOperationContext(noValidateOpContext); + reset(producer, aspectDao); + MetadataChangeProposal mcp = new MetadataChangeProposal(); + mcp.setEntityType(DATASET_ENTITY_NAME); + Urn urn = new DatasetUrn(new DataPlatformUrn("platform"), "name", FabricType.PROD); + mcp.setEntityUrn(urn); + GenericAspect properties = GenericRecordUtils.serializeAspect(new DatasetProperties().setName("name")); + mcp.setAspect(GenericRecordUtils.serializeAspect(properties)); + mcp.setAspectName("notAnAspect"); + mcp.setChangeType(ChangeType.UPSERT); + mcp.setSystemMetadata(new SystemMetadata()); + + Authentication mockAuthentication = mock(Authentication.class); + AuthenticationContext.setAuthentication(mockAuthentication); + Actor actor = new Actor(ActorType.USER, "user"); + when(mockAuthentication.getActor()).thenReturn(actor); + aspectResource.ingestProposal(mcp, "true"); + verify(producer, times(1)).produceMetadataChangeProposal(urn, mcp); + verifyNoMoreInteractions(producer); + verifyNoMoreInteractions(aspectDao); + reset(producer, aspectDao); + aspectResource.setSystemOperationContext(opContext); + } } diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/EntityService.java b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/EntityService.java index 66f7ff50a36245..beb8bd3d090a5f 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/EntityService.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/EntityService.java @@ -487,7 +487,7 @@ RollbackRunResult rollbackWithConditions( Map conditions, boolean hardDelete); - Set ingestProposal( + List ingestProposal( @Nonnull OperationContext opContext, AspectsBatch aspectsBatch, final boolean async); /** diff --git a/metadata-utils/src/main/java/com/linkedin/metadata/utils/GenericRecordUtils.java b/metadata-utils/src/main/java/com/linkedin/metadata/utils/GenericRecordUtils.java index 6638481c1d2794..fafca9b1139731 100644 --- a/metadata-utils/src/main/java/com/linkedin/metadata/utils/GenericRecordUtils.java +++ b/metadata-utils/src/main/java/com/linkedin/metadata/utils/GenericRecordUtils.java @@ -1,6 +1,7 @@ package com.linkedin.metadata.utils; import com.datahub.util.RecordUtils; +import com.fasterxml.jackson.databind.JsonNode; import com.linkedin.common.urn.Urn; import com.linkedin.data.ByteString; import com.linkedin.data.template.RecordTemplate; @@ -70,6 +71,14 @@ public static GenericAspect serializeAspect(@Nonnull String str) { return genericAspect; } + @Nonnull + public static GenericAspect serializeAspect(@Nonnull JsonNode json) { + GenericAspect genericAspect = new GenericAspect(); + genericAspect.setValue(ByteString.unsafeWrap(json.toString().getBytes(StandardCharsets.UTF_8))); + genericAspect.setContentType(GenericRecordUtils.JSON); + return genericAspect; + } + @Nonnull public static GenericPayload serializePayload(@Nonnull RecordTemplate payload) { GenericPayload genericPayload = new GenericPayload(); diff --git a/smoke-test/conftest.py b/smoke-test/conftest.py index 69a58cd766182e..6d148db9886a48 100644 --- a/smoke-test/conftest.py +++ b/smoke-test/conftest.py @@ -15,16 +15,19 @@ os.environ["DATAHUB_TELEMETRY_ENABLED"] = "false" +def build_auth_session(): + wait_for_healthcheck_util(requests) + return TestSessionWrapper(get_frontend_session()) + + @pytest.fixture(scope="session") def auth_session(): - wait_for_healthcheck_util(requests) - auth_session = TestSessionWrapper(get_frontend_session()) + auth_session = build_auth_session() yield auth_session auth_session.destroy() -@pytest.fixture(scope="session") -def graph_client(auth_session) -> DataHubGraph: +def build_graph_client(auth_session): print(auth_session.cookies) graph: DataHubGraph = DataHubGraph( config=DatahubClientConfig( @@ -34,6 +37,11 @@ def graph_client(auth_session) -> DataHubGraph: return graph +@pytest.fixture(scope="session") +def graph_client(auth_session) -> DataHubGraph: + return build_graph_client(auth_session) + + def pytest_sessionfinish(session, exitstatus): """whole test run finishes.""" send_message(exitstatus) diff --git a/smoke-test/cypress-dev.sh b/smoke-test/cypress-dev.sh index 3db81b11c67fa1..bb9b287cb654e2 100755 --- a/smoke-test/cypress-dev.sh +++ b/smoke-test/cypress-dev.sh @@ -16,7 +16,16 @@ set -x # set environment variables for the test source ./set-test-env-vars.sh -python -c 'from tests.cypress.integration_test import ingest_data; ingest_data()' +LOAD_DATA=$(cat < None: ) else: # we want to sleep for an additional period of time for Elastic writes buffer to clear - time.sleep(_ELASTIC_BUFFER_WRITES_TIME_IN_SEC) + time.sleep(ELASTICSEARCH_REFRESH_INTERVAL_SECONDS) diff --git a/smoke-test/tests/lineage/test_lineage.py b/smoke-test/tests/lineage/test_lineage.py index 8757741d1cb230..4a43fd591ae2e1 100644 --- a/smoke-test/tests/lineage/test_lineage.py +++ b/smoke-test/tests/lineage/test_lineage.py @@ -796,7 +796,7 @@ def test_expectation(self, graph: DataHubGraph) -> bool: expectation.impacted_entities[impacted_entity] ), f"Expected impacted entity paths to be {expectation.impacted_entities[impacted_entity]}, found {impacted_entity_paths}" except Exception: - breakpoint() + # breakpoint() raise # for i in range(len(impacted_entity_paths)): # assert impacted_entity_paths[i].path == expectation.impacted_entities[impacted_entity][i].path, f"Expected impacted entity paths to be {expectation.impacted_entities[impacted_entity][i].path}, found {impacted_entity_paths[i].path}" diff --git a/smoke-test/tests/platform_resources/test_platform_resource.py b/smoke-test/tests/platform_resources/test_platform_resource.py index 39d15f2e8dea6d..23a147bda695b3 100644 --- a/smoke-test/tests/platform_resources/test_platform_resource.py +++ b/smoke-test/tests/platform_resources/test_platform_resource.py @@ -11,17 +11,11 @@ PlatformResourceSearchFields, ) -from tests.utils import wait_for_healthcheck_util, wait_for_writes_to_sync +from tests.utils import wait_for_writes_to_sync logger = logging.getLogger(__name__) -@pytest.fixture(scope="module") -def wait_for_healthchecks(auth_session): - wait_for_healthcheck_util(auth_session) - yield - - def generate_random_id(length=8): return "".join(random.choices(string.ascii_lowercase + string.digits, k=length)) diff --git a/smoke-test/tests/restli/__init__.py b/smoke-test/tests/restli/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/smoke-test/tests/restli/restli_test.py b/smoke-test/tests/restli/restli_test.py new file mode 100644 index 00000000000000..7e9cf6aa76235c --- /dev/null +++ b/smoke-test/tests/restli/restli_test.py @@ -0,0 +1,98 @@ +import dataclasses +import json +import time +from typing import List + +import pytest +from datahub.emitter.aspect import JSON_CONTENT_TYPE +from datahub.emitter.mce_builder import make_dashboard_urn +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.emitter.serialization_helper import pre_json_transform +from datahub.metadata.schema_classes import ( + AuditStampClass, + ChangeAuditStampsClass, + DashboardInfoClass, + GenericAspectClass, + MetadataChangeProposalClass, +) +from datahub.utilities.urns.urn import guess_entity_type + +from tests.utils import delete_urns + +generated_urns: List[str] = [] + + +@dataclasses.dataclass +class MetadataChangeProposalInvalidWrapper(MetadataChangeProposalWrapper): + @staticmethod + def _make_generic_aspect(dict) -> GenericAspectClass: + serialized = json.dumps(pre_json_transform(dict)) + return GenericAspectClass( + value=serialized.encode(), + contentType=JSON_CONTENT_TYPE, + ) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def __post_init__(self) -> None: + if self.entityUrn: + self.entityType = guess_entity_type(self.entityUrn) + + def make_mcp(self) -> MetadataChangeProposalClass: + serializedAspect = None + if self.aspect is not None: + serializedAspect = ( + MetadataChangeProposalInvalidWrapper._make_generic_aspect(self.aspect) + ) + + mcp = self._make_mcp_without_aspects() + mcp.aspect = serializedAspect + return mcp + + +@pytest.fixture(scope="module") +def ingest_cleanup_data(auth_session, graph_client, request): + yield + delete_urns(graph_client, generated_urns) + + +def test_gms_ignore_unknown_dashboard_info(graph_client): + dashboard_urn = make_dashboard_urn(platform="looker", name="test-ignore-unknown") + generated_urns.extend([dashboard_urn]) + + audit_stamp = pre_json_transform( + ChangeAuditStampsClass( + created=AuditStampClass( + time=int(time.time() * 1000), + actor="urn:li:corpuser:datahub", + ) + ).to_obj() + ) + + invalid_dashboard_info = { + "title": "Ignore Unknown Title", + "description": "Ignore Unknown Description", + "lastModified": audit_stamp, + "notAValidField": "invalid field value", + } + mcpw = MetadataChangeProposalInvalidWrapper( + entityUrn=dashboard_urn, + aspectName="dashboardInfo", + aspect=invalid_dashboard_info, + ) + + mcp = mcpw.make_mcp() + assert "notAValidField" in str(mcp) + assert "invalid field value" in str(mcp) + + graph_client.emit_mcp(mcpw, async_flag=False) + + dashboard_info = graph_client.get_aspect( + entity_urn=dashboard_urn, + aspect_type=DashboardInfoClass, + ) + + assert dashboard_info + assert dashboard_info.title == invalid_dashboard_info["title"] + assert dashboard_info.description == invalid_dashboard_info["description"] diff --git a/smoke-test/tests/tests/data.json b/smoke-test/tests/tests/data.json index b91c9d9f0ec3b3..7dccc70457e719 100644 --- a/smoke-test/tests/tests/data.json +++ b/smoke-test/tests/tests/data.json @@ -148,7 +148,7 @@ "changeType": "UPSERT", "aspectName": "testInfo", "aspect": { - "value": "{\"name\": \"Sample Test 1\", \"category\": \"Examples\", \"description\": \"An example of Metadata Test\", \"definition\": { \"type\": \"JSON\", \"json\": \"{\\\"on\\\":{\\\"dataset\\\":[{\\\"query\\\":\\\"dataPlatformInstance.platform\\\",\\\"condition\\\":\\\"EQUALS\\\",\\\"values\\\":[\\\"urn:li:dataPlatform:bigQuery\\\"]}]},\\\"rules\\\":{\\\"or\\\":[{\\\"query\\\":\\\"glossaryTerms.terms.glossaryTermInfo.parentNode\\\",\\\"condition\\\":\\\"EQUALS\\\",\\\"values\\\":[\\\"urn:li:glossaryNode:Category\\\"]},{\\\"query\\\":\\\"container.container.glossaryTerms.terms.glossaryTermInfo.parentNode\\\",\\\"condition\\\":\\\"EQUALS\\\",\\\"values\\\":[\\\"urn:li:glossaryNode:Category\\\"]}]}}\"} }", + "value": "{\"name\": \"Sample Test 1\", \"category\": \"Examples\", \"description\": \"An example of Metadata Test\", \"definition\": { \"type\": \"JSON\", \"json\": \"{\\\"on\\\":{\\\"types\\\":[\\\"dataset\\\"],\\\"dataset\\\":[{\\\"query\\\":\\\"dataPlatformInstance.platform\\\",\\\"condition\\\":\\\"EQUALS\\\",\\\"values\\\":[\\\"urn:li:dataPlatform:bigQuery\\\"]}]},\\\"rules\\\":{\\\"or\\\":[{\\\"query\\\":\\\"glossaryTerms.terms.glossaryTermInfo.parentNode\\\",\\\"condition\\\":\\\"EQUALS\\\",\\\"values\\\":[\\\"urn:li:glossaryNode:Category\\\"]},{\\\"query\\\":\\\"container.container.glossaryTerms.terms.glossaryTermInfo.parentNode\\\",\\\"condition\\\":\\\"EQUALS\\\",\\\"values\\\":[\\\"urn:li:glossaryNode:Category\\\"]}]}}\"} }", "contentType": "application/json" }, "systemMetadata": null @@ -161,7 +161,7 @@ "changeType": "UPSERT", "aspectName": "testInfo", "aspect": { - "value": "{\"name\": \"Sample Test 2\", \"category\": \"Examples\", \"description\": \"An example of another Metadata Test\", \"definition\": { \"type\": \"JSON\", \"json\": \"{\\\"on\\\":{\\\"dataset\\\":[{\\\"query\\\":\\\"dataPlatformInstance.platform\\\",\\\"condition\\\":\\\"EQUALS\\\",\\\"values\\\":[\\\"urn:li:dataPlatform:bigQuery\\\"]}]},\\\"rules\\\":{\\\"or\\\":[{\\\"query\\\":\\\"glossaryTerms.terms.glossaryTermInfo.parentNode\\\",\\\"condition\\\":\\\"EQUALS\\\",\\\"values\\\":[\\\"urn:li:glossaryNode:Category\\\"]},{\\\"query\\\":\\\"container.container.glossaryTerms.terms.glossaryTermInfo.parentNode\\\",\\\"condition\\\":\\\"EQUALS\\\",\\\"values\\\":[\\\"urn:li:glossaryNode:Category\\\"]}]}}\"} }", + "value": "{\"name\": \"Sample Test 2\", \"category\": \"Examples\", \"description\": \"An example of another Metadata Test\", \"definition\": { \"type\": \"JSON\", \"json\": \"{\\\"on\\\":{\\\"types\\\":[\\\"dataset\\\"],\\\"dataset\\\":[{\\\"query\\\":\\\"dataPlatformInstance.platform\\\",\\\"condition\\\":\\\"EQUALS\\\",\\\"values\\\":[\\\"urn:li:dataPlatform:bigQuery\\\"]}]},\\\"rules\\\":{\\\"or\\\":[{\\\"query\\\":\\\"glossaryTerms.terms.glossaryTermInfo.parentNode\\\",\\\"condition\\\":\\\"EQUALS\\\",\\\"values\\\":[\\\"urn:li:glossaryNode:Category\\\"]},{\\\"query\\\":\\\"container.container.glossaryTerms.terms.glossaryTermInfo.parentNode\\\",\\\"condition\\\":\\\"EQUALS\\\",\\\"values\\\":[\\\"urn:li:glossaryNode:Category\\\"]}]}}\"} }", "contentType": "application/json" }, "systemMetadata": null diff --git a/smoke-test/tests/tests/tests_test.py b/smoke-test/tests/tests/tests_test.py index bc9ebe46c5279f..8e6af2f3f5bf4b 100644 --- a/smoke-test/tests/tests/tests_test.py +++ b/smoke-test/tests/tests/tests_test.py @@ -1,10 +1,20 @@ +import time +from typing import List + import pytest import tenacity -from tests.utils import delete_urns_from_file, get_sleep_info, ingest_file_via_rest +from tests.utils import ( + delete_urns, + delete_urns_from_file, + get_sleep_info, + ingest_file_via_rest, +) sleep_sec, sleep_times = get_sleep_info() +TEST_URNS: List[str] = [] + @pytest.fixture(scope="module", autouse=True) def ingest_cleanup_data(auth_session, graph_client, request): @@ -12,17 +22,20 @@ def ingest_cleanup_data(auth_session, graph_client, request): ingest_file_via_rest(auth_session, "tests/tests/data.json") yield print("removing test data") + delete_urns(graph_client, TEST_URNS) delete_urns_from_file(graph_client, "tests/tests/data.json") -test_id = "test id" test_name = "test name" test_category = "test category" test_description = "test description" test_description = "test description" -def create_test(auth_session): +def create_test(auth_session, test_id="test id"): + test_id = f"{test_id}_{int(time.time())}" + TEST_URNS.extend([f"urn:li:test:{test_id}"]) + # Create new Test create_test_json = { "query": """mutation createTest($input: CreateTestInput!) {\n @@ -53,21 +66,50 @@ def create_test(auth_session): return res_data["data"]["createTest"] -def delete_test(auth_session, test_urn): - delete_test_json = { - "query": """mutation deleteTest($urn: String!) {\n - deleteTest(urn: $urn) +@pytest.mark.dependency() +def test_get_test_results(auth_session): + urn = ( + "urn:li:dataset:(urn:li:dataPlatform:kafka,test-tests-sample,PROD)" # Test urn + ) + json = { + "query": """query getDataset($urn: String!) {\n + dataset(urn: $urn) {\n + urn\n + testResults {\n + failing {\n + test {\n + urn\n + }\n + type + }\n + passing {\n + test {\n + urn\n + }\n + type + }\n + }\n + }\n }""", - "variables": {"urn": test_urn}, + "variables": {"urn": urn}, } - response = auth_session.post( - f"{auth_session.frontend_url()}/api/v2/graphql", json=delete_test_json + f"{auth_session.frontend_url()}/api/v2/graphql", json=json ) response.raise_for_status() + res_data = response.json() + assert res_data + assert res_data["data"] + assert res_data["data"]["dataset"] + assert res_data["data"]["dataset"]["urn"] == urn + assert res_data["data"]["dataset"]["testResults"] == { + "failing": [{"test": {"urn": "urn:li:test:test-1"}, "type": "FAILURE"}], + "passing": [{"test": {"urn": "urn:li:test:test-2"}, "type": "SUCCESS"}], + } -@pytest.mark.dependency() + +@pytest.mark.dependency(depends=["test_get_test_results"]) def test_create_test(auth_session): test_urn = create_test(auth_session) @@ -105,17 +147,14 @@ def test_create_test(auth_session): } assert "errors" not in res_data - # Delete test - delete_test(auth_session, test_urn) - - # Ensure the test no longer exists + # Ensure that soft-deleted tests response = auth_session.post( f"{auth_session.frontend_url()}/api/v2/graphql", json=get_test_json ) response.raise_for_status() res_data = response.json() - assert res_data["data"]["test"] is None + assert res_data["data"]["test"] is not None assert "errors" not in res_data @@ -125,7 +164,6 @@ def test_update_test(auth_session): test_name = "new name" test_category = "new category" test_description = "new description" - test_description = "new description" # Update Test update_test_json = { @@ -188,8 +226,6 @@ def test_update_test(auth_session): } assert "errors" not in res_data - delete_test(auth_session, test_urn) - @tenacity.retry( stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec) @@ -225,45 +261,3 @@ def test_list_tests_retries(auth_session): @pytest.mark.dependency(depends=["test_update_test"]) def test_list_tests(auth_session): test_list_tests_retries(auth_session) - - -def test_get_test_results(auth_session): - urn = ( - "urn:li:dataset:(urn:li:dataPlatform:kafka,test-tests-sample,PROD)" # Test urn - ) - json = { - "query": """query getDataset($urn: String!) {\n - dataset(urn: $urn) {\n - urn\n - testResults {\n - failing {\n - test {\n - urn\n - }\n - type - }\n - passing {\n - test {\n - urn\n - }\n - type - }\n - }\n - }\n - }""", - "variables": {"urn": urn}, - } - response = auth_session.post( - f"{auth_session.frontend_url()}/api/v2/graphql", json=json - ) - response.raise_for_status() - res_data = response.json() - - assert res_data - assert res_data["data"] - assert res_data["data"]["dataset"] - assert res_data["data"]["dataset"]["urn"] == urn - assert res_data["data"]["dataset"]["testResults"] == { - "failing": [{"test": {"urn": "urn:li:test:test-1"}, "type": "FAILURE"}], - "passing": [{"test": {"urn": "urn:li:test:test-2"}, "type": "SUCCESS"}], - } diff --git a/smoke-test/tests/tokens/revokable_access_token_test.py b/smoke-test/tests/tokens/revokable_access_token_test.py index 0ae18c32662c5c..af29437c051e19 100644 --- a/smoke-test/tests/tokens/revokable_access_token_test.py +++ b/smoke-test/tests/tokens/revokable_access_token_test.py @@ -17,11 +17,6 @@ (admin_user, admin_pass) = get_admin_credentials() -@pytest.fixture(autouse=True) -def setup(auth_session): - wait_for_writes_to_sync() - - @pytest.fixture() def auth_exclude_filter(): return {