From 947175942c5287d13557478af6aee866cf384650 Mon Sep 17 00:00:00 2001 From: RyanHolstien Date: Fri, 9 Aug 2024 13:25:23 -0500 Subject: [PATCH] fix(mutator): mutator hook fixes --- .../models/registry/ConfigEntityRegistry.java | 2 +- .../registry/SnapshotEntityRegistry.java | 23 ++++++++ .../metadata/aspect/plugins/PluginsTest.java | 36 +++++++++--- .../java/com/datahub/util/RecordUtils.java | 12 ++++ .../entity/ebean/batch/AspectsBatchImpl.java | 55 ++++++++++++++++--- .../entityregistry/EntityRegistryFactory.java | 17 +++++- 6 files changed, 125 insertions(+), 20 deletions(-) diff --git a/entity-registry/src/main/java/com/linkedin/metadata/models/registry/ConfigEntityRegistry.java b/entity-registry/src/main/java/com/linkedin/metadata/models/registry/ConfigEntityRegistry.java index 4238c333615ecf..8dd642f63dd975 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/models/registry/ConfigEntityRegistry.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/models/registry/ConfigEntityRegistry.java @@ -52,7 +52,7 @@ public class ConfigEntityRegistry implements EntityRegistry { private final DataSchemaFactory dataSchemaFactory; @Getter private final PluginFactory pluginFactory; - @Nullable + @Getter @Nullable private BiFunction, PluginFactory> pluginFactoryProvider; private final Map entityNameToSpec; diff --git a/entity-registry/src/main/java/com/linkedin/metadata/models/registry/SnapshotEntityRegistry.java b/entity-registry/src/main/java/com/linkedin/metadata/models/registry/SnapshotEntityRegistry.java index c60f89c510cd7f..12dc7e7c179a5a 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/models/registry/SnapshotEntityRegistry.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/models/registry/SnapshotEntityRegistry.java @@ -22,6 +22,8 @@ import com.linkedin.metadata.aspect.patch.template.dataset.UpstreamLineageTemplate; import com.linkedin.metadata.aspect.patch.template.form.FormInfoTemplate; import com.linkedin.metadata.aspect.patch.template.structuredproperty.StructuredPropertyDefinitionTemplate; +import com.linkedin.metadata.aspect.plugins.PluginFactory; +import com.linkedin.metadata.aspect.plugins.config.PluginConfiguration; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.DefaultEntitySpec; import com.linkedin.metadata.models.EntitySpec; @@ -32,8 +34,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.BiFunction; import java.util.stream.Collectors; import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import lombok.Getter; + /** * Implementation of {@link EntityRegistry} that builds {@link DefaultEntitySpec} objects from the a @@ -46,6 +52,11 @@ public class SnapshotEntityRegistry implements EntityRegistry { private final AspectTemplateEngine _aspectTemplateEngine; private final Map _aspectNameToSpec; + + @Getter + @Nullable + private BiFunction, PluginFactory> pluginFactoryProvider; + private static final SnapshotEntityRegistry INSTANCE = new SnapshotEntityRegistry(); public SnapshotEntityRegistry() { @@ -56,6 +67,18 @@ public SnapshotEntityRegistry() { entitySpecs = new ArrayList<>(entityNameToSpec.values()); _aspectNameToSpec = populateAspectMap(entitySpecs); _aspectTemplateEngine = populateTemplateEngine(_aspectNameToSpec); + pluginFactoryProvider = null; + } + + public SnapshotEntityRegistry( + BiFunction, PluginFactory> pluginFactoryProvider) { + entityNameToSpec = new EntitySpecBuilder().buildEntitySpecs(new Snapshot().schema()) + .stream() + .collect(Collectors.toMap(spec -> spec.getName().toLowerCase(), spec -> spec)); + entitySpecs = new ArrayList<>(entityNameToSpec.values()); + _aspectNameToSpec = populateAspectMap(entitySpecs); + _aspectTemplateEngine = populateTemplateEngine(_aspectNameToSpec); + this.pluginFactoryProvider = pluginFactoryProvider; } public SnapshotEntityRegistry(UnionTemplate snapshot) { diff --git a/entity-registry/src/test/java/com/linkedin/metadata/aspect/plugins/PluginsTest.java b/entity-registry/src/test/java/com/linkedin/metadata/aspect/plugins/PluginsTest.java index cecf21849f3aaa..b98df05d721ddb 100644 --- a/entity-registry/src/test/java/com/linkedin/metadata/aspect/plugins/PluginsTest.java +++ b/entity-registry/src/test/java/com/linkedin/metadata/aspect/plugins/PluginsTest.java @@ -6,6 +6,7 @@ import com.datahub.test.TestEntityProfile; import com.linkedin.data.schema.annotation.PathSpecBasedSchemaAnnotationVisitor; import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; import com.linkedin.metadata.models.EntitySpec; import com.linkedin.metadata.models.EventSpec; import com.linkedin.metadata.models.registry.ConfigEntityRegistry; @@ -262,23 +263,42 @@ public void testUnloadedMerge() throws EntityRegistryException { mergedEntityRegistry.apply(configEntityRegistry2); assertEquals( - mergedEntityRegistry.getAllAspectPayloadValidators().stream() - .filter(p -> p.getConfig().getSupportedOperations().contains("DELETE")) + mergedEntityRegistry + .getPluginFactory() + .getPluginConfiguration() + .getAspectPayloadValidators() + .stream() + .filter(AspectPluginConfig::isEnabled) + .filter(p -> p.getSupportedOperations().contains("DELETE")) .count(), 1); + assertEquals( - mergedEntityRegistry.getAllMutationHooks().stream() - .filter(p -> p.getConfig().getSupportedOperations().contains("DELETE")) + mergedEntityRegistry.getPluginFactory().getPluginConfiguration().getMutationHooks().stream() + .filter(AspectPluginConfig::isEnabled) + .filter(p -> p.getSupportedOperations().contains("DELETE")) .count(), 1); + assertEquals( - mergedEntityRegistry.getAllMCLSideEffects().stream() - .filter(p -> p.getConfig().getSupportedOperations().contains("DELETE")) + mergedEntityRegistry + .getPluginFactory() + .getPluginConfiguration() + .getMclSideEffects() + .stream() + .filter(AspectPluginConfig::isEnabled) + .filter(p -> p.getSupportedOperations().contains("DELETE")) .count(), 1); + assertEquals( - mergedEntityRegistry.getAllMCPSideEffects().stream() - .filter(p -> p.getConfig().getSupportedOperations().contains("DELETE")) + mergedEntityRegistry + .getPluginFactory() + .getPluginConfiguration() + .getMcpSideEffects() + .stream() + .filter(AspectPluginConfig::isEnabled) + .filter(p -> p.getSupportedOperations().contains("DELETE")) .count(), 1); } diff --git a/li-utils/src/main/java/com/datahub/util/RecordUtils.java b/li-utils/src/main/java/com/datahub/util/RecordUtils.java index 8183ecc21ee27b..2955943919e3b1 100644 --- a/li-utils/src/main/java/com/datahub/util/RecordUtils.java +++ b/li-utils/src/main/java/com/datahub/util/RecordUtils.java @@ -99,6 +99,18 @@ public static T toRecordTemplate( return toRecordTemplate(type, dataMap); } + @Nonnull + public static DataMap toDataMap(@Nonnull String jsonString) { + DataMap dataMap; + try { + dataMap = DATA_TEMPLATE_CODEC.stringToMap(jsonString); + } catch (IOException e) { + throw new ModelConversionException("Failed to deserialize DataMap: " + jsonString); + } + + return dataMap; + } + /** * Creates a {@link RecordTemplate} object from a {@link DataMap}. * diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java index 7a1af12272ac57..d57038bf1c007c 100644 --- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java +++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java @@ -9,6 +9,7 @@ import com.linkedin.metadata.aspect.batch.BatchItem; import com.linkedin.metadata.aspect.batch.ChangeMCP; import com.linkedin.metadata.aspect.batch.MCPItem; +import com.linkedin.metadata.aspect.plugins.hooks.MutationHook; import com.linkedin.metadata.aspect.plugins.validation.ValidationExceptionCollection; import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.util.Pair; @@ -98,15 +99,51 @@ public Pair>, List> toUpsertBatchItems( return Pair.of(newUrnAspectNames, upsertBatchItems); } - private Stream proposedItemsToChangeItemStream(List proposedItems) { - return applyProposalMutationHooks(proposedItems, retrieverContext) - .filter(mcpItem -> mcpItem.getMetadataChangeProposal() != null) - .map( - mcpItem -> - ChangeItemImpl.ChangeItemImplBuilder.build( - mcpItem.getMetadataChangeProposal(), - mcpItem.getAuditStamp(), - retrieverContext.getAspectRetriever())); + private Stream proposedItemsToChangeItemStream(List proposedItems) { + List mutationHooks = + retrieverContext.getAspectRetriever().getEntityRegistry().getAllMutationHooks(); + Stream unmutatedItems = + proposedItems.stream() + .filter( + proposedItem -> + mutationHooks.stream() + .noneMatch( + mutationHook -> + mutationHook.shouldApply( + proposedItem.getChangeType(), + proposedItem.getUrn(), + proposedItem.getAspectName()))) + .map( + mcpItem -> { + if (ChangeType.PATCH.equals(mcpItem.getChangeType())) { + return PatchItemImpl.PatchItemImplBuilder.build( + mcpItem.getMetadataChangeProposal(), + mcpItem.getAuditStamp(), + retrieverContext.getAspectRetriever().getEntityRegistry()); + } + return ChangeItemImpl.ChangeItemImplBuilder.build( + mcpItem.getMetadataChangeProposal(), + mcpItem.getAuditStamp(), + retrieverContext.getAspectRetriever()); + }); + List mutatedItems = + applyProposalMutationHooks(proposedItems, retrieverContext).collect(Collectors.toList()); + Stream proposedItemsToChangeItems = + mutatedItems.stream() + .filter(mcpItem -> mcpItem.getMetadataChangeProposal() != null) + // Filter on proposed items again to avoid applying builder to Patch Item side effects + .filter(mcpItem -> mcpItem instanceof ProposedItem) + .map( + mcpItem -> + ChangeItemImpl.ChangeItemImplBuilder.build( + mcpItem.getMetadataChangeProposal(), + mcpItem.getAuditStamp(), + retrieverContext.getAspectRetriever())); + Stream sideEffectItems = + mutatedItems.stream().filter(mcpItem -> !(mcpItem instanceof ProposedItem)); + Stream combinedChangeItems = + Stream.concat(proposedItemsToChangeItems, unmutatedItems); + return Stream.concat(combinedChangeItems, sideEffectItems); } public static class AspectsBatchImplBuilder { diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityregistry/EntityRegistryFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityregistry/EntityRegistryFactory.java index 2c65eeafe063bc..fb6b1b8bc802a8 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityregistry/EntityRegistryFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityregistry/EntityRegistryFactory.java @@ -1,15 +1,22 @@ package com.linkedin.gms.factory.entityregistry; +import com.datahub.plugins.metadata.aspect.SpringPluginFactory; +import com.linkedin.gms.factory.plugins.SpringStandardPluginConfiguration; +import com.linkedin.metadata.aspect.plugins.PluginFactory; +import com.linkedin.metadata.aspect.plugins.config.PluginConfiguration; import com.linkedin.metadata.models.registry.ConfigEntityRegistry; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.models.registry.EntityRegistryException; import com.linkedin.metadata.models.registry.MergedEntityRegistry; import com.linkedin.metadata.models.registry.PluginEntityRegistryLoader; import com.linkedin.metadata.models.registry.SnapshotEntityRegistry; +import java.util.List; +import java.util.function.BiFunction; import javax.annotation.Nonnull; import lombok.SneakyThrows; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; @@ -27,13 +34,19 @@ public class EntityRegistryFactory { @Qualifier("pluginEntityRegistry") private PluginEntityRegistryLoader pluginEntityRegistryLoader; + @Autowired private ApplicationContext applicationContext; + @SneakyThrows @Bean("entityRegistry") @Primary @Nonnull - protected EntityRegistry getInstance() throws EntityRegistryException { + protected EntityRegistry getInstance(SpringStandardPluginConfiguration springStandardPluginConfiguration) + throws EntityRegistryException { + BiFunction, PluginFactory> pluginFactoryProvider = + (config, loaders) -> new SpringPluginFactory(applicationContext, config, loaders); MergedEntityRegistry baseEntityRegistry = - new MergedEntityRegistry(SnapshotEntityRegistry.getInstance()).apply(configEntityRegistry); + new MergedEntityRegistry(new SnapshotEntityRegistry(pluginFactoryProvider)) + .apply(configEntityRegistry); pluginEntityRegistryLoader.withBaseRegistry(baseEntityRegistry).start(true); return baseEntityRegistry; }