From 080f2a2100d2f42d9913a7bc85b6efb7d8e5f5b3 Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Fri, 9 Aug 2024 11:40:03 -0500 Subject: [PATCH 1/3] feat(mcl-processor): Update mcl processor hooks (#11134) --- docs/how/kafka-config.md | 21 +++ .../kafka/MaeConsumerApplication.java | 7 +- .../metadata/kafka/MCLKafkaListener.java | 103 +++++++++++++ .../kafka/MCLKafkaListenerRegistrar.java | 120 +++++++++++++++ .../kafka/MetadataChangeLogProcessor.java | 140 ------------------ .../kafka/hook/MetadataChangeLogHook.java | 8 + .../kafka/hook/UpdateIndicesHook.java | 17 ++- .../event/EntityChangeEventGeneratorHook.java | 34 +++-- .../kafka/hook/form/FormAssignmentHook.java | 26 +++- .../hook/incident/IncidentsSummaryHook.java | 45 ++++-- .../ingestion/IngestionSchedulerHook.java | 30 ++-- .../hook/siblings/SiblingAssociationHook.java | 20 ++- .../kafka/hook/spring/MCLGMSSpringTest.java | 16 +- .../kafka/hook/spring/MCLMAESpringTest.java | 16 +- .../MCLSpringCommonTestConfiguration.java | 9 +- .../datahub/event/PlatformEventProcessor.java | 9 +- .../src/main/resources/application.yaml | 10 ++ .../kafka/KafkaEventConsumerFactory.java | 2 +- .../linkedin/gms/CommonApplicationConfig.java | 5 +- 19 files changed, 421 insertions(+), 217 deletions(-) create mode 100644 metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListener.java create mode 100644 metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListenerRegistrar.java delete mode 100644 metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeLogProcessor.java diff --git a/docs/how/kafka-config.md b/docs/how/kafka-config.md index 2f20e8b548f835..06c7418f167136 100644 --- a/docs/how/kafka-config.md +++ b/docs/how/kafka-config.md @@ -116,6 +116,27 @@ We've included an environment variable to customize the consumer group id, if yo - `KAFKA_CONSUMER_GROUP_ID`: The name of the kafka consumer's group id. +#### datahub-mae-consumer MCL Hooks + +By default, all MetadataChangeLog processing hooks execute as part of the same kafka consumer group based on the +previously mentioned `KAFKA_CONSUMER_GROUP_ID`. + +The various MCL Hooks could alsp be separated into separate groups which allows for controlling parallelization and +prioritization of the hooks. + +For example, the `UpdateIndicesHook` and `SiblingsHook` processing can be delayed by other hooks. Separating these +hooks into their own group can reduce latency from these other hooks. The `application.yaml` configuration +includes options for assigning a suffix to the consumer group, see `consumerGroupSuffix`. + +| Environment Variable | Default | Description | +|------------------------------------------------|---------|---------------------------------------------------------------------------------------------| +| SIBLINGS_HOOK_CONSUMER_GROUP_SUFFIX | '' | Siblings processing hook. Considered one of the primary hooks in the `datahub-mae-consumer` | +| UPDATE_INDICES_CONSUMER_GROUP_SUFFIX | '' | Primary processing hook. | +| INGESTION_SCHEDULER_HOOK_CONSUMER_GROUP_SUFFIX | '' | Scheduled ingestion hook. | +| INCIDENTS_HOOK_CONSUMER_GROUP_SUFFIX | '' | Incidents hook. | +| ECE_CONSUMER_GROUP_SUFFIX | '' | Entity Change Event hook which publishes to the Platform Events topic. | +| FORMS_HOOK_CONSUMER_GROUP_SUFFIX | '' | Forms processing. | + ## Applying Configurations ### Docker diff --git a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/MaeConsumerApplication.java b/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/MaeConsumerApplication.java index f6533a6ac1d8a9..617bc8e0b73030 100644 --- a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/MaeConsumerApplication.java +++ b/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/MaeConsumerApplication.java @@ -18,8 +18,6 @@ "com.linkedin.metadata.service", "com.datahub.event", "com.linkedin.gms.factory.kafka", - "com.linkedin.gms.factory.kafka.common", - "com.linkedin.gms.factory.kafka.schemaregistry", "com.linkedin.metadata.boot.kafka", "com.linkedin.metadata.kafka", "com.linkedin.metadata.dao.producer", @@ -34,7 +32,10 @@ "com.linkedin.gms.factory.context", "com.linkedin.gms.factory.timeseries", "com.linkedin.gms.factory.assertion", - "com.linkedin.gms.factory.plugins" + "com.linkedin.gms.factory.plugins", + "com.linkedin.gms.factory.change", + "com.datahub.event.hook", + "com.linkedin.gms.factory.notifications" }, excludeFilters = { @ComponentScan.Filter( diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListener.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListener.java new file mode 100644 index 00000000000000..70b452722abc76 --- /dev/null +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListener.java @@ -0,0 +1,103 @@ +package com.linkedin.metadata.kafka; + +import com.codahale.metrics.Histogram; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import com.linkedin.metadata.EventUtils; +import com.linkedin.metadata.kafka.hook.MetadataChangeLogHook; +import com.linkedin.metadata.utils.metrics.MetricUtils; +import com.linkedin.mxe.MetadataChangeLog; +import io.datahubproject.metadata.context.OperationContext; +import java.util.List; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +@Slf4j +public class MCLKafkaListener { + private static final Histogram kafkaLagStats = + MetricUtils.get() + .histogram( + MetricRegistry.name( + "com.linkedin.metadata.kafka.MetadataChangeLogProcessor", "kafkaLag")); + + private final String consumerGroupId; + private final List hooks; + + public MCLKafkaListener( + OperationContext systemOperationContext, + String consumerGroup, + List hooks) { + this.consumerGroupId = consumerGroup; + this.hooks = hooks; + this.hooks.forEach(hook -> hook.init(systemOperationContext)); + + log.info( + "Enabled MCL Hooks - Group: {} Hooks: {}", + consumerGroup, + hooks.stream().map(hook -> hook.getClass().getSimpleName()).collect(Collectors.toList())); + } + + public void consume(final ConsumerRecord consumerRecord) { + try (Timer.Context i = MetricUtils.timer(this.getClass(), "consume").time()) { + kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp()); + final GenericRecord record = consumerRecord.value(); + log.debug( + "Got MCL event consumer: {} key: {}, topic: {}, partition: {}, offset: {}, value size: {}, timestamp: {}", + consumerGroupId, + consumerRecord.key(), + consumerRecord.topic(), + consumerRecord.partition(), + consumerRecord.offset(), + consumerRecord.serializedValueSize(), + consumerRecord.timestamp()); + MetricUtils.counter(this.getClass(), consumerGroupId + "_received_mcl_count").inc(); + + MetadataChangeLog event; + try { + event = EventUtils.avroToPegasusMCL(record); + } catch (Exception e) { + MetricUtils.counter( + this.getClass(), consumerGroupId + "_avro_to_pegasus_conversion_failure") + .inc(); + log.error("Error deserializing message due to: ", e); + log.error("Message: {}", record.toString()); + return; + } + + log.info( + "Invoking MCL hooks for consumer: {} urn: {}, aspect name: {}, entity type: {}, change type: {}", + consumerGroupId, + event.getEntityUrn(), + event.hasAspectName() ? event.getAspectName() : null, + event.hasEntityType() ? event.getEntityType() : null, + event.hasChangeType() ? event.getChangeType() : null); + + // Here - plug in additional "custom processor hooks" + for (MetadataChangeLogHook hook : this.hooks) { + log.info( + "Invoking MCL hook {} for urn: {}", + hook.getClass().getSimpleName(), + event.getEntityUrn()); + try (Timer.Context ignored = + MetricUtils.timer(this.getClass(), hook.getClass().getSimpleName() + "_latency") + .time()) { + hook.invoke(event); + } catch (Exception e) { + // Just skip this hook and continue. - Note that this represents "at most once"// + // processing. + MetricUtils.counter(this.getClass(), hook.getClass().getSimpleName() + "_failure").inc(); + log.error( + "Failed to execute MCL hook with name {}", hook.getClass().getCanonicalName(), e); + } + } + // TODO: Manually commit kafka offsets after full processing. + MetricUtils.counter(this.getClass(), consumerGroupId + "_consumed_mcl_count").inc(); + log.info( + "Successfully completed MCL hooks for consumer: {} urn: {}", + consumerGroupId, + event.getEntityUrn()); + } + } +} diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListenerRegistrar.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListenerRegistrar.java new file mode 100644 index 00000000000000..fb2880f617d301 --- /dev/null +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListenerRegistrar.java @@ -0,0 +1,120 @@ +package com.linkedin.metadata.kafka; + +import com.linkedin.metadata.kafka.config.MetadataChangeLogProcessorCondition; +import com.linkedin.metadata.kafka.hook.MetadataChangeLogHook; +import com.linkedin.mxe.Topics; +import io.datahubproject.metadata.context.OperationContext; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import javax.annotation.Nonnull; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Conditional; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerEndpoint; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.config.MethodKafkaListenerEndpoint; +import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; +import org.springframework.stereotype.Component; + +@Slf4j +@EnableKafka +@Component +@Conditional(MetadataChangeLogProcessorCondition.class) +public class MCLKafkaListenerRegistrar implements InitializingBean { + + @Autowired + @Qualifier("systemOperationContext") + private OperationContext systemOperationContext; + + @Autowired private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; + + @Autowired + @Qualifier("kafkaEventConsumer") + private KafkaListenerContainerFactory kafkaListenerContainerFactory; + + @Value("${METADATA_CHANGE_LOG_KAFKA_CONSUMER_GROUP_ID:generic-mae-consumer-job-client}") + private String consumerGroupBase; + + @Value("${METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME:" + Topics.METADATA_CHANGE_LOG_VERSIONED + "}") + private String mclVersionedTopicName; + + @Value( + "${METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME:" + Topics.METADATA_CHANGE_LOG_TIMESERIES + "}") + private String mclTimeseriesTopicName; + + @Autowired private List metadataChangeLogHooks; + + @Override + public void afterPropertiesSet() { + Map> hookGroups = + getMetadataChangeLogHooks().stream() + .collect(Collectors.groupingBy(MetadataChangeLogHook::getConsumerGroupSuffix)); + + log.info( + "MetadataChangeLogProcessor Consumer Groups: {}", + hookGroups.keySet().stream().map(this::buildConsumerGroupName).collect(Collectors.toSet())); + + hookGroups.forEach( + (key, hooks) -> { + KafkaListenerEndpoint kafkaListenerEndpoint = + createListenerEndpoint( + buildConsumerGroupName(key), + List.of(mclVersionedTopicName, mclTimeseriesTopicName), + hooks); + registerMCLKafkaListener(kafkaListenerEndpoint, true); + }); + } + + public List getMetadataChangeLogHooks() { + return metadataChangeLogHooks.stream() + .filter(MetadataChangeLogHook::isEnabled) + .sorted(Comparator.comparing(MetadataChangeLogHook::executionOrder)) + .toList(); + } + + @SneakyThrows + public void registerMCLKafkaListener( + KafkaListenerEndpoint kafkaListenerEndpoint, boolean startImmediately) { + kafkaListenerEndpointRegistry.registerListenerContainer( + kafkaListenerEndpoint, kafkaListenerContainerFactory, startImmediately); + } + + private KafkaListenerEndpoint createListenerEndpoint( + String consumerGroupId, List topics, List hooks) { + MethodKafkaListenerEndpoint kafkaListenerEndpoint = + new MethodKafkaListenerEndpoint<>(); + kafkaListenerEndpoint.setId(consumerGroupId); + kafkaListenerEndpoint.setGroupId(consumerGroupId); + kafkaListenerEndpoint.setAutoStartup(true); + kafkaListenerEndpoint.setTopics(topics.toArray(new String[topics.size()])); + kafkaListenerEndpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory()); + kafkaListenerEndpoint.setBean( + new MCLKafkaListener(systemOperationContext, consumerGroupId, hooks)); + try { + kafkaListenerEndpoint.setMethod( + MCLKafkaListener.class.getMethod("consume", ConsumerRecord.class)); + } catch (NoSuchMethodException e) { + throw new RuntimeException(e); + } + + return kafkaListenerEndpoint; + } + + private String buildConsumerGroupName(@Nonnull String suffix) { + if (suffix.isEmpty()) { + return consumerGroupBase; + } else { + return String.join("-", consumerGroupBase, suffix); + } + } +} diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeLogProcessor.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeLogProcessor.java deleted file mode 100644 index 6112ad798d73dc..00000000000000 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeLogProcessor.java +++ /dev/null @@ -1,140 +0,0 @@ -package com.linkedin.metadata.kafka; - -import com.codahale.metrics.Histogram; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.Timer; -import com.linkedin.gms.factory.kafka.KafkaEventConsumerFactory; -import com.linkedin.metadata.EventUtils; -import com.linkedin.metadata.kafka.config.MetadataChangeLogProcessorCondition; -import com.linkedin.metadata.kafka.hook.MetadataChangeLogHook; -import com.linkedin.metadata.kafka.hook.UpdateIndicesHook; -import com.linkedin.metadata.kafka.hook.event.EntityChangeEventGeneratorHook; -import com.linkedin.metadata.kafka.hook.form.FormAssignmentHook; -import com.linkedin.metadata.kafka.hook.incident.IncidentsSummaryHook; -import com.linkedin.metadata.kafka.hook.ingestion.IngestionSchedulerHook; -import com.linkedin.metadata.kafka.hook.siblings.SiblingAssociationHook; -import com.linkedin.metadata.utils.metrics.MetricUtils; -import com.linkedin.mxe.MetadataChangeLog; -import com.linkedin.mxe.Topics; -import io.datahubproject.metadata.context.OperationContext; -import java.util.Comparator; -import java.util.List; -import java.util.stream.Collectors; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; -import org.apache.avro.generic.GenericRecord; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.context.annotation.Conditional; -import org.springframework.context.annotation.Import; -import org.springframework.kafka.annotation.EnableKafka; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.stereotype.Component; - -@Slf4j -@Component -@Conditional(MetadataChangeLogProcessorCondition.class) -@Import({ - UpdateIndicesHook.class, - IngestionSchedulerHook.class, - EntityChangeEventGeneratorHook.class, - KafkaEventConsumerFactory.class, - SiblingAssociationHook.class, - FormAssignmentHook.class, - IncidentsSummaryHook.class, -}) -@EnableKafka -public class MetadataChangeLogProcessor { - - @Getter private final List hooks; - private final Histogram kafkaLagStats = - MetricUtils.get().histogram(MetricRegistry.name(this.getClass(), "kafkaLag")); - - @Autowired - public MetadataChangeLogProcessor( - @Qualifier("systemOperationContext") OperationContext systemOperationContext, - List metadataChangeLogHooks) { - this.hooks = - metadataChangeLogHooks.stream() - .filter(MetadataChangeLogHook::isEnabled) - .sorted(Comparator.comparing(MetadataChangeLogHook::executionOrder)) - .collect(Collectors.toList()); - log.info( - "Enabled hooks: {}", - this.hooks.stream() - .map(hook -> hook.getClass().getSimpleName()) - .collect(Collectors.toList())); - this.hooks.forEach(hook -> hook.init(systemOperationContext)); - } - - @KafkaListener( - id = "${METADATA_CHANGE_LOG_KAFKA_CONSUMER_GROUP_ID:generic-mae-consumer-job-client}", - topics = { - "${METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME:" + Topics.METADATA_CHANGE_LOG_VERSIONED + "}", - "${METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME:" + Topics.METADATA_CHANGE_LOG_TIMESERIES + "}" - }, - containerFactory = "kafkaEventConsumer") - public void consume(final ConsumerRecord consumerRecord) { - try (Timer.Context i = MetricUtils.timer(this.getClass(), "consume").time()) { - kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp()); - final GenericRecord record = consumerRecord.value(); - log.info( - "Got MCL event key: {}, topic: {}, partition: {}, offset: {}, value size: {}, timestamp: {}", - consumerRecord.key(), - consumerRecord.topic(), - consumerRecord.partition(), - consumerRecord.offset(), - consumerRecord.serializedValueSize(), - consumerRecord.timestamp()); - MetricUtils.counter(this.getClass(), "received_mcl_count").inc(); - - MetadataChangeLog event; - try { - event = EventUtils.avroToPegasusMCL(record); - log.debug( - "Successfully converted Avro MCL to Pegasus MCL. urn: {}, key: {}", - event.getEntityUrn(), - event.getEntityKeyAspect()); - } catch (Exception e) { - MetricUtils.counter(this.getClass(), "avro_to_pegasus_conversion_failure").inc(); - log.error("Error deserializing message due to: ", e); - log.error("Message: {}", record.toString()); - return; - } - - log.info( - "Invoking MCL hooks for urn: {}, aspect name: {}, entity type: {}, change type: {}", - event.getEntityUrn(), - event.hasAspectName() ? event.getAspectName() : null, - event.hasEntityType() ? event.getEntityType() : null, - event.hasChangeType() ? event.getChangeType() : null); - - // Here - plug in additional "custom processor hooks" - for (MetadataChangeLogHook hook : this.hooks) { - if (!hook.isEnabled()) { - log.info(String.format("Skipping disabled hook %s", hook.getClass())); - continue; - } - log.info( - "Invoking MCL hook {} for urn: {}", - hook.getClass().getSimpleName(), - event.getEntityUrn()); - try (Timer.Context ignored = - MetricUtils.timer(this.getClass(), hook.getClass().getSimpleName() + "_latency") - .time()) { - hook.invoke(event); - } catch (Exception e) { - // Just skip this hook and continue. - Note that this represents "at most once"// - // processing. - MetricUtils.counter(this.getClass(), hook.getClass().getSimpleName() + "_failure").inc(); - log.error( - "Failed to execute MCL hook with name {}", hook.getClass().getCanonicalName(), e); - } - } - // TODO: Manually commit kafka offsets after full processing. - MetricUtils.counter(this.getClass(), "consumed_mcl_count").inc(); - log.info("Successfully completed MCL hooks for urn: {}", event.getEntityUrn()); - } - } -} diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/MetadataChangeLogHook.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/MetadataChangeLogHook.java index 145d1ded724cc0..06a184c9f89f9c 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/MetadataChangeLogHook.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/MetadataChangeLogHook.java @@ -18,6 +18,14 @@ default MetadataChangeLogHook init(@Nonnull OperationContext systemOperationCont return this; } + /** + * Suffix for the consumer group + * + * @return suffix + */ + @Nonnull + String getConsumerGroupSuffix(); + /** * Return whether the hook is enabled or not. If not enabled, the below invoke method is not * triggered diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java index a0e304b26ea60f..bd804b0f4424ca 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java @@ -2,6 +2,7 @@ import static com.linkedin.metadata.Constants.*; +import com.google.common.annotations.VisibleForTesting; import com.linkedin.gms.factory.common.GraphServiceFactory; import com.linkedin.gms.factory.common.SystemMetadataServiceFactory; import com.linkedin.gms.factory.entityregistry.EntityRegistryFactory; @@ -12,7 +13,9 @@ import com.linkedin.mxe.MetadataChangeLog; import io.datahubproject.metadata.context.OperationContext; import javax.annotation.Nonnull; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Import; import org.springframework.stereotype.Component; @@ -34,15 +37,27 @@ public class UpdateIndicesHook implements MetadataChangeLogHook { private final boolean isEnabled; private final boolean reprocessUIEvents; private OperationContext systemOperationContext; + @Getter private final String consumerGroupSuffix; + @Autowired public UpdateIndicesHook( UpdateIndicesService updateIndicesService, @Nonnull @Value("${updateIndices.enabled:true}") Boolean isEnabled, @Nonnull @Value("${featureFlags.preProcessHooks.reprocessEnabled:false}") - Boolean reprocessUIEvents) { + Boolean reprocessUIEvents, + @Nonnull @Value("${updateIndices.consumerGroupSuffix}") String consumerGroupSuffix) { this.updateIndicesService = updateIndicesService; this.isEnabled = isEnabled; this.reprocessUIEvents = reprocessUIEvents; + this.consumerGroupSuffix = consumerGroupSuffix; + } + + @VisibleForTesting + public UpdateIndicesHook( + UpdateIndicesService updateIndicesService, + @Nonnull Boolean isEnabled, + @Nonnull Boolean reprocessUIEvents) { + this(updateIndicesService, isEnabled, reprocessUIEvents, ""); } @Override diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/event/EntityChangeEventGeneratorHook.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/event/EntityChangeEventGeneratorHook.java index 8dc98d77233ceb..59d068a46d8c6f 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/event/EntityChangeEventGeneratorHook.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/event/EntityChangeEventGeneratorHook.java @@ -1,5 +1,6 @@ package com.linkedin.metadata.kafka.hook.event; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; import com.linkedin.common.AuditStamp; import com.linkedin.common.urn.Urn; @@ -29,6 +30,7 @@ import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; @@ -78,10 +80,11 @@ public class EntityChangeEventGeneratorHook implements MetadataChangeLogHook { private static final Set SUPPORTED_OPERATIONS = ImmutableSet.of("CREATE", "UPSERT", "DELETE"); - private final EntityChangeEventGeneratorRegistry _entityChangeEventGeneratorRegistry; + private final EntityChangeEventGeneratorRegistry entityChangeEventGeneratorRegistry; private final OperationContext systemOperationContext; - private final SystemEntityClient _entityClient; - private final Boolean _isEnabled; + private final SystemEntityClient entityClient; + private final Boolean isEnabled; + @Getter private final String consumerGroupSuffix; @Autowired public EntityChangeEventGeneratorHook( @@ -89,17 +92,28 @@ public EntityChangeEventGeneratorHook( @Nonnull @Qualifier("entityChangeEventGeneratorRegistry") final EntityChangeEventGeneratorRegistry entityChangeEventGeneratorRegistry, @Nonnull final SystemEntityClient entityClient, - @Nonnull @Value("${entityChangeEvents.enabled:true}") Boolean isEnabled) { + @Nonnull @Value("${entityChangeEvents.enabled:true}") Boolean isEnabled, + @Nonnull @Value("${entityChangeEvents.consumerGroupSuffix}") String consumerGroupSuffix) { this.systemOperationContext = systemOperationContext; - _entityChangeEventGeneratorRegistry = + this.entityChangeEventGeneratorRegistry = Objects.requireNonNull(entityChangeEventGeneratorRegistry); - _entityClient = Objects.requireNonNull(entityClient); - _isEnabled = isEnabled; + this.entityClient = Objects.requireNonNull(entityClient); + this.isEnabled = isEnabled; + this.consumerGroupSuffix = consumerGroupSuffix; + } + + @VisibleForTesting + public EntityChangeEventGeneratorHook( + @Nonnull OperationContext systemOperationContext, + @Nonnull final EntityChangeEventGeneratorRegistry entityChangeEventGeneratorRegistry, + @Nonnull final SystemEntityClient entityClient, + @Nonnull Boolean isEnabled) { + this(systemOperationContext, entityChangeEventGeneratorRegistry, entityClient, isEnabled, ""); } @Override public boolean isEnabled() { - return _isEnabled; + return isEnabled; } @Override @@ -166,7 +180,7 @@ private List generateChangeEvents( @Nonnull final Aspect to, @Nonnull AuditStamp auditStamp) { final List> entityChangeEventGenerators = - _entityChangeEventGeneratorRegistry.getEntityChangeEventGenerators(aspectName).stream() + entityChangeEventGeneratorRegistry.getEntityChangeEventGenerators(aspectName).stream() // Note: Assumes that correct types have been registered for the aspect. .map(changeEventGenerator -> (EntityChangeEventGenerator) changeEventGenerator) .collect(Collectors.toList()); @@ -186,7 +200,7 @@ private boolean isEligibleForProcessing(final MetadataChangeLog log) { private void emitPlatformEvent( @Nonnull final PlatformEvent event, @Nonnull final String partitioningKey) throws Exception { - _entityClient.producePlatformEvent( + entityClient.producePlatformEvent( systemOperationContext, Constants.CHANGE_EVENT_PLATFORM_EVENT_NAME, partitioningKey, event); } diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/form/FormAssignmentHook.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/form/FormAssignmentHook.java index 8d093fe0b8a12d..063fa6de92c838 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/form/FormAssignmentHook.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/form/FormAssignmentHook.java @@ -2,6 +2,7 @@ import static com.linkedin.metadata.Constants.*; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; import com.linkedin.events.metadata.ChangeType; import com.linkedin.form.DynamicFormAssignment; @@ -15,6 +16,7 @@ import java.util.Objects; import java.util.Set; import javax.annotation.Nonnull; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -53,17 +55,25 @@ public class FormAssignmentHook implements MetadataChangeLogHook { ImmutableSet.of( ChangeType.UPSERT, ChangeType.CREATE, ChangeType.CREATE_ENTITY, ChangeType.RESTATE); - private final FormService _formService; - private final boolean _isEnabled; + private final FormService formService; + private final boolean isEnabled; private OperationContext systemOperationContext; + @Getter private final String consumerGroupSuffix; @Autowired public FormAssignmentHook( @Nonnull final FormService formService, - @Nonnull @Value("${forms.hook.enabled:true}") Boolean isEnabled) { - _formService = Objects.requireNonNull(formService, "formService is required"); - _isEnabled = isEnabled; + @Nonnull @Value("${forms.hook.enabled:true}") Boolean isEnabled, + @Nonnull @Value("${forms.hook.consumerGroupSuffix}") String consumerGroupSuffix) { + this.formService = Objects.requireNonNull(formService, "formService is required"); + this.isEnabled = isEnabled; + this.consumerGroupSuffix = consumerGroupSuffix; + } + + @VisibleForTesting + public FormAssignmentHook(@Nonnull final FormService formService, @Nonnull Boolean isEnabled) { + this(formService, isEnabled, ""); } @Override @@ -74,12 +84,12 @@ public FormAssignmentHook init(@Nonnull OperationContext systemOperationContext) @Override public boolean isEnabled() { - return _isEnabled; + return isEnabled; } @Override public void invoke(@Nonnull final MetadataChangeLog event) { - if (_isEnabled && isEligibleForProcessing(event)) { + if (isEnabled && isEligibleForProcessing(event)) { if (isFormDynamicFilterUpdated(event)) { handleFormFilterUpdated(event); } @@ -96,7 +106,7 @@ private void handleFormFilterUpdated(@Nonnull final MetadataChangeLog event) { DynamicFormAssignment.class); // 2. Register a automation to assign it. - _formService.upsertFormAssignmentRunner( + formService.upsertFormAssignmentRunner( systemOperationContext, event.getEntityUrn(), formFilters); } diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/incident/IncidentsSummaryHook.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/incident/IncidentsSummaryHook.java index 7c03a11a81f7ac..5483fed9116e17 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/incident/IncidentsSummaryHook.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/incident/IncidentsSummaryHook.java @@ -2,6 +2,7 @@ import static com.linkedin.metadata.Constants.*; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; import com.linkedin.common.IncidentSummaryDetails; import com.linkedin.common.IncidentSummaryDetailsArray; @@ -27,6 +28,7 @@ import java.util.Objects; import java.util.Set; import javax.annotation.Nonnull; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -57,20 +59,31 @@ public class IncidentsSummaryHook implements MetadataChangeLogHook { ImmutableSet.of(INCIDENT_INFO_ASPECT_NAME, STATUS_ASPECT_NAME); private OperationContext systemOperationContext; - private final IncidentService _incidentService; - private final boolean _isEnabled; + private final IncidentService incidentService; + private final boolean isEnabled; + @Getter private final String consumerGroupSuffix; /** Max number of incidents to allow in incident summary, limited to prevent HTTP errors */ - private final int _maxIncidentHistory; + private final int maxIncidentHistory; @Autowired public IncidentsSummaryHook( @Nonnull final IncidentService incidentService, - @Nonnull @Value("${incidents.hook.enabled:true}") Boolean isEnabled, - @Nonnull @Value("${incidents.hook.maxIncidentHistory:100}") Integer maxIncidentHistory) { - _incidentService = Objects.requireNonNull(incidentService, "incidentService is required"); - _isEnabled = isEnabled; - _maxIncidentHistory = maxIncidentHistory; + @Nonnull @Value("${incidents.hook.enabled}") Boolean isEnabled, + @Nonnull @Value("${incidents.hook.maxIncidentHistory}") Integer maxIncidentHistory, + @Nonnull @Value("${incidents.hook.consumerGroupSuffix}") String consumerGroupSuffix) { + this.incidentService = Objects.requireNonNull(incidentService, "incidentService is required"); + this.isEnabled = isEnabled; + this.maxIncidentHistory = maxIncidentHistory; + this.consumerGroupSuffix = consumerGroupSuffix; + } + + @VisibleForTesting + public IncidentsSummaryHook( + @Nonnull final IncidentService incidentService, + @Nonnull Boolean isEnabled, + @Nonnull Integer maxIncidentHistory) { + this(incidentService, isEnabled, maxIncidentHistory, ""); } @Override @@ -81,12 +94,12 @@ public IncidentsSummaryHook init(@Nonnull OperationContext systemOperationContex @Override public boolean isEnabled() { - return _isEnabled; + return isEnabled; } @Override public void invoke(@Nonnull final MetadataChangeLog event) { - if (_isEnabled && isEligibleForProcessing(event)) { + if (isEnabled && isEligibleForProcessing(event)) { log.debug("Urn {} received by Incident Summary Hook.", event.getEntityUrn()); final Urn urn = HookUtils.getUrnFromEvent(event, systemOperationContext.getEntityRegistry()); // Handle the deletion case. @@ -104,7 +117,7 @@ public void invoke(@Nonnull final MetadataChangeLog event) { private void handleIncidentSoftDeleted(@Nonnull final Urn incidentUrn) { // 1. Fetch incident info. IncidentInfo incidentInfo = - _incidentService.getIncidentInfo(systemOperationContext, incidentUrn); + incidentService.getIncidentInfo(systemOperationContext, incidentUrn); // 2. Retrieve associated urns. if (incidentInfo != null) { @@ -127,7 +140,7 @@ private void handleIncidentSoftDeleted(@Nonnull final Urn incidentUrn) { private void handleIncidentUpdated(@Nonnull final Urn incidentUrn) { // 1. Fetch incident info + status IncidentInfo incidentInfo = - _incidentService.getIncidentInfo(systemOperationContext, incidentUrn); + incidentService.getIncidentInfo(systemOperationContext, incidentUrn); // 2. Retrieve associated urns. if (incidentInfo != null) { @@ -179,14 +192,14 @@ private void addIncidentToSummary( IncidentsSummaryUtils.removeIncidentFromResolvedSummary(incidentUrn, summary); // Then, add to active. - IncidentsSummaryUtils.addIncidentToActiveSummary(details, summary, _maxIncidentHistory); + IncidentsSummaryUtils.addIncidentToActiveSummary(details, summary, maxIncidentHistory); } else if (IncidentState.RESOLVED.equals(status.getState())) { // First, ensure this isn't in any summaries anymore. IncidentsSummaryUtils.removeIncidentFromActiveSummary(incidentUrn, summary); // Then, add to resolved. - IncidentsSummaryUtils.addIncidentToResolvedSummary(details, summary, _maxIncidentHistory); + IncidentsSummaryUtils.addIncidentToResolvedSummary(details, summary, maxIncidentHistory); } // 3. Emit the change back! @@ -196,7 +209,7 @@ private void addIncidentToSummary( @Nonnull private IncidentsSummary getIncidentsSummary(@Nonnull final Urn entityUrn) { IncidentsSummary maybeIncidentsSummary = - _incidentService.getIncidentsSummary(systemOperationContext, entityUrn); + incidentService.getIncidentsSummary(systemOperationContext, entityUrn); return maybeIncidentsSummary == null ? new IncidentsSummary() .setResolvedIncidentDetails(new IncidentSummaryDetailsArray()) @@ -260,7 +273,7 @@ private boolean isIncidentUpdate(@Nonnull final MetadataChangeLog event) { private void updateIncidentSummary( @Nonnull final Urn entityUrn, @Nonnull final IncidentsSummary newSummary) { try { - _incidentService.updateIncidentsSummary(systemOperationContext, entityUrn, newSummary); + incidentService.updateIncidentsSummary(systemOperationContext, entityUrn, newSummary); } catch (Exception e) { log.error( String.format( diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/ingestion/IngestionSchedulerHook.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/ingestion/IngestionSchedulerHook.java index c13f0f75708f74..5569fade7e6eb1 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/ingestion/IngestionSchedulerHook.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/ingestion/IngestionSchedulerHook.java @@ -15,6 +15,7 @@ import com.linkedin.mxe.MetadataChangeLog; import io.datahubproject.metadata.context.OperationContext; import javax.annotation.Nonnull; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -29,27 +30,36 @@ @Component @Import({EntityRegistryFactory.class, IngestionSchedulerFactory.class}) public class IngestionSchedulerHook implements MetadataChangeLogHook { - private final IngestionScheduler _scheduler; - private final boolean _isEnabled; + private final IngestionScheduler scheduler; + private final boolean isEnabled; private OperationContext systemOperationContext; + @Getter private final String consumerGroupSuffix; @Autowired public IngestionSchedulerHook( @Nonnull final IngestionScheduler scheduler, - @Nonnull @Value("${ingestionScheduler.enabled:true}") Boolean isEnabled) { - _scheduler = scheduler; - _isEnabled = isEnabled; + @Nonnull @Value("${ingestionScheduler.enabled:true}") Boolean isEnabled, + @Nonnull @Value("${ingestionScheduler.consumerGroupSuffix}") String consumerGroupSuffix) { + this.scheduler = scheduler; + this.isEnabled = isEnabled; + this.consumerGroupSuffix = consumerGroupSuffix; + } + + @VisibleForTesting + public IngestionSchedulerHook( + @Nonnull final IngestionScheduler scheduler, @Nonnull Boolean isEnabled) { + this(scheduler, isEnabled, ""); } @Override public boolean isEnabled() { - return _isEnabled; + return isEnabled; } @Override public IngestionSchedulerHook init(@Nonnull OperationContext systemOperationContext) { this.systemOperationContext = systemOperationContext; - _scheduler.init(); + scheduler.init(); return this; } @@ -66,11 +76,11 @@ public void invoke(@Nonnull MetadataChangeLog event) { final Urn urn = getUrnFromEvent(event); if (ChangeType.DELETE.equals(event.getChangeType())) { - _scheduler.unscheduleNextIngestionSourceExecution(urn); + scheduler.unscheduleNextIngestionSourceExecution(urn); } else { // Update the scheduler to reflect the latest changes. final DataHubIngestionSourceInfo info = getInfoFromEvent(event); - _scheduler.scheduleNextIngestionSourceExecution(urn, info); + scheduler.scheduleNextIngestionSourceExecution(urn, info); } } } @@ -138,6 +148,6 @@ private DataHubIngestionSourceInfo getInfoFromEvent(final MetadataChangeLog even @VisibleForTesting IngestionScheduler scheduler() { - return _scheduler; + return scheduler; } } diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/siblings/SiblingAssociationHook.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/siblings/SiblingAssociationHook.java index f068679da7757e..bbe0feed7de115 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/siblings/SiblingAssociationHook.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/siblings/SiblingAssociationHook.java @@ -41,6 +41,7 @@ import java.util.List; import java.util.stream.Collectors; import javax.annotation.Nonnull; +import lombok.Getter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -70,17 +71,28 @@ public class SiblingAssociationHook implements MetadataChangeLogHook { private final SystemEntityClient systemEntityClient; private final EntitySearchService entitySearchService; - private final boolean _isEnabled; + private final boolean isEnabled; private OperationContext systemOperationContext; + @Getter private final String consumerGroupSuffix; @Autowired public SiblingAssociationHook( @Nonnull final SystemEntityClient systemEntityClient, @Nonnull final EntitySearchService searchService, - @Nonnull @Value("${siblings.enabled:true}") Boolean isEnabled) { + @Nonnull @Value("${siblings.enabled:true}") Boolean isEnabled, + @Nonnull @Value("${siblings.consumerGroupSuffix}") String consumerGroupSuffix) { this.systemEntityClient = systemEntityClient; entitySearchService = searchService; - _isEnabled = isEnabled; + this.isEnabled = isEnabled; + this.consumerGroupSuffix = consumerGroupSuffix; + } + + @VisibleForTesting + public SiblingAssociationHook( + @Nonnull final SystemEntityClient systemEntityClient, + @Nonnull final EntitySearchService searchService, + @Nonnull Boolean isEnabled) { + this(systemEntityClient, searchService, isEnabled, ""); } @Value("${siblings.enabled:false}") @@ -99,7 +111,7 @@ public SiblingAssociationHook init(@Nonnull OperationContext systemOperationCont @Override public boolean isEnabled() { - return _isEnabled; + return isEnabled; } @Override diff --git a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLGMSSpringTest.java b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLGMSSpringTest.java index c2a8de161eafe1..10f149e6062957 100644 --- a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLGMSSpringTest.java +++ b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLGMSSpringTest.java @@ -3,7 +3,7 @@ import static org.testng.AssertJUnit.*; import com.linkedin.gms.factory.config.ConfigurationProvider; -import com.linkedin.metadata.kafka.MetadataChangeLogProcessor; +import com.linkedin.metadata.kafka.MCLKafkaListenerRegistrar; import com.linkedin.metadata.kafka.hook.UpdateIndicesHook; import com.linkedin.metadata.kafka.hook.event.EntityChangeEventGeneratorHook; import com.linkedin.metadata.kafka.hook.incident.IncidentsSummaryHook; @@ -35,23 +35,23 @@ public class MCLGMSSpringTest extends AbstractTestNGSpringContextTests { @Test public void testHooks() { - MetadataChangeLogProcessor metadataChangeLogProcessor = - applicationContext.getBean(MetadataChangeLogProcessor.class); + MCLKafkaListenerRegistrar registrar = + applicationContext.getBean(MCLKafkaListenerRegistrar.class); assertTrue( - metadataChangeLogProcessor.getHooks().stream() + registrar.getMetadataChangeLogHooks().stream() .noneMatch(hook -> hook instanceof IngestionSchedulerHook)); assertTrue( - metadataChangeLogProcessor.getHooks().stream() + registrar.getMetadataChangeLogHooks().stream() .anyMatch(hook -> hook instanceof UpdateIndicesHook)); assertTrue( - metadataChangeLogProcessor.getHooks().stream() + registrar.getMetadataChangeLogHooks().stream() .anyMatch(hook -> hook instanceof SiblingAssociationHook)); assertTrue( - metadataChangeLogProcessor.getHooks().stream() + registrar.getMetadataChangeLogHooks().stream() .anyMatch(hook -> hook instanceof EntityChangeEventGeneratorHook)); assertEquals( 1, - metadataChangeLogProcessor.getHooks().stream() + registrar.getMetadataChangeLogHooks().stream() .filter(hook -> hook instanceof IncidentsSummaryHook) .count()); } diff --git a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLMAESpringTest.java b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLMAESpringTest.java index 23de7707cc571d..2049e974999b18 100644 --- a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLMAESpringTest.java +++ b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLMAESpringTest.java @@ -4,7 +4,7 @@ import static org.testng.AssertJUnit.assertTrue; import com.linkedin.gms.factory.config.ConfigurationProvider; -import com.linkedin.metadata.kafka.MetadataChangeLogProcessor; +import com.linkedin.metadata.kafka.MCLKafkaListenerRegistrar; import com.linkedin.metadata.kafka.hook.UpdateIndicesHook; import com.linkedin.metadata.kafka.hook.event.EntityChangeEventGeneratorHook; import com.linkedin.metadata.kafka.hook.incident.IncidentsSummaryHook; @@ -33,23 +33,23 @@ public class MCLMAESpringTest extends AbstractTestNGSpringContextTests { @Test public void testHooks() { - MetadataChangeLogProcessor metadataChangeLogProcessor = - applicationContext.getBean(MetadataChangeLogProcessor.class); + MCLKafkaListenerRegistrar registrar = + applicationContext.getBean(MCLKafkaListenerRegistrar.class); assertTrue( - metadataChangeLogProcessor.getHooks().stream() + registrar.getMetadataChangeLogHooks().stream() .noneMatch(hook -> hook instanceof IngestionSchedulerHook)); assertTrue( - metadataChangeLogProcessor.getHooks().stream() + registrar.getMetadataChangeLogHooks().stream() .anyMatch(hook -> hook instanceof UpdateIndicesHook)); assertTrue( - metadataChangeLogProcessor.getHooks().stream() + registrar.getMetadataChangeLogHooks().stream() .anyMatch(hook -> hook instanceof SiblingAssociationHook)); assertTrue( - metadataChangeLogProcessor.getHooks().stream() + registrar.getMetadataChangeLogHooks().stream() .anyMatch(hook -> hook instanceof EntityChangeEventGeneratorHook)); assertEquals( 1, - metadataChangeLogProcessor.getHooks().stream() + registrar.getMetadataChangeLogHooks().stream() .filter(hook -> hook instanceof IncidentsSummaryHook) .count()); } diff --git a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLSpringCommonTestConfiguration.java b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLSpringCommonTestConfiguration.java index f6f71a12a6951f..68768051eccad0 100644 --- a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLSpringCommonTestConfiguration.java +++ b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLSpringCommonTestConfiguration.java @@ -34,10 +34,13 @@ @ComponentScan( basePackages = { "com.linkedin.metadata.kafka", - "com.linkedin.gms.factory.kafka.common", - "com.linkedin.gms.factory.kafka.schemaregistry", + "com.linkedin.gms.factory.kafka", "com.linkedin.gms.factory.entity.update.indices", - "com.linkedin.gms.factory.timeline.eventgenerator" + "com.linkedin.gms.factory.timeline.eventgenerator", + "com.linkedin.metadata.dao.producer", + "com.linkedin.gms.factory.change", + "com.datahub.event.hook", + "com.linkedin.gms.factory.notifications" }) public class MCLSpringCommonTestConfiguration { diff --git a/metadata-jobs/pe-consumer/src/main/java/com/datahub/event/PlatformEventProcessor.java b/metadata-jobs/pe-consumer/src/main/java/com/datahub/event/PlatformEventProcessor.java index c4116b314254ca..358a2ac0c2ee33 100644 --- a/metadata-jobs/pe-consumer/src/main/java/com/datahub/event/PlatformEventProcessor.java +++ b/metadata-jobs/pe-consumer/src/main/java/com/datahub/event/PlatformEventProcessor.java @@ -3,9 +3,7 @@ import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; -import com.datahub.event.hook.BusinessAttributeUpdateHook; import com.datahub.event.hook.PlatformEventHook; -import com.linkedin.gms.factory.kafka.KafkaEventConsumerFactory; import com.linkedin.metadata.EventUtils; import com.linkedin.metadata.utils.metrics.MetricUtils; import com.linkedin.mxe.PlatformEvent; @@ -21,7 +19,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Conditional; -import org.springframework.context.annotation.Import; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @@ -29,7 +26,6 @@ @Slf4j @Component @Conditional(PlatformEventProcessorCondition.class) -@Import({BusinessAttributeUpdateHook.class, KafkaEventConsumerFactory.class}) @EnableKafka public class PlatformEventProcessor { @@ -49,6 +45,11 @@ public PlatformEventProcessor( platformEventHooks.stream() .filter(PlatformEventHook::isEnabled) .collect(Collectors.toList()); + log.info( + "Enabled platform hooks: {}", + this.hooks.stream() + .map(hook -> hook.getClass().getSimpleName()) + .collect(Collectors.toList())); this.hooks.forEach(PlatformEventHook::init); } diff --git a/metadata-service/configuration/src/main/resources/application.yaml b/metadata-service/configuration/src/main/resources/application.yaml index 2514060ff2d617..5b3673ddca52c6 100644 --- a/metadata-service/configuration/src/main/resources/application.yaml +++ b/metadata-service/configuration/src/main/resources/application.yaml @@ -296,10 +296,18 @@ metadataTests: siblings: enabled: ${ENABLE_SIBLING_HOOK:true} # enable to turn on automatic sibling associations for dbt + consumerGroupSuffix: ${SIBLINGS_HOOK_CONSUMER_GROUP_SUFFIX:} updateIndices: enabled: ${ENABLE_UPDATE_INDICES_HOOK:true} + consumerGroupSuffix: ${UPDATE_INDICES_CONSUMER_GROUP_SUFFIX:} ingestionScheduler: enabled: ${ENABLE_INGESTION_SCHEDULER_HOOK:true} # enable to execute ingestion scheduling + consumerGroupSuffix: ${INGESTION_SCHEDULER_HOOK_CONSUMER_GROUP_SUFFIX:} +incidents: + hook: + enabled: ${ENABLE_INCIDENTS_HOOK:true} + maxIncidentHistory: ${MAX_INCIDENT_HISTORY:100} + consumerGroupSuffix: ${INCIDENTS_HOOK_CONSUMER_GROUP_SUFFIX:} bootstrap: upgradeDefaultBrowsePaths: @@ -376,6 +384,7 @@ featureFlags: entityChangeEvents: enabled: ${ENABLE_ENTITY_CHANGE_EVENTS_HOOK:true} + consumerGroupSuffix: ${ECE_CONSUMER_GROUP_SUFFIX:} views: enabled: ${VIEWS_ENABLED:true} @@ -460,6 +469,7 @@ springdoc.api-docs.groups.enabled: true forms: hook: enabled: { $FORMS_HOOK_ENABLED:true } + consumerGroupSuffix: ${FORMS_HOOK_CONSUMER_GROUP_SUFFIX:} businessAttribute: fetchRelatedEntitiesCount: ${BUSINESS_ATTRIBUTE_RELATED_ENTITIES_COUNT:20000} diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java index 9501b03482d045..aecb4f0afb12cc 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java @@ -96,7 +96,7 @@ private static Map buildCustomizedProperties( } @Bean(name = "kafkaEventConsumer") - protected KafkaListenerContainerFactory createInstance( + protected KafkaListenerContainerFactory kafkaEventConsumer( @Qualifier("kafkaConsumerFactory") DefaultKafkaConsumerFactory kafkaConsumerFactory, @Qualifier("configurationProvider") ConfigurationProvider configurationProvider) { diff --git a/metadata-service/war/src/main/java/com/linkedin/gms/CommonApplicationConfig.java b/metadata-service/war/src/main/java/com/linkedin/gms/CommonApplicationConfig.java index bc623c3cc983c2..e47a2b4e278e4e 100644 --- a/metadata-service/war/src/main/java/com/linkedin/gms/CommonApplicationConfig.java +++ b/metadata-service/war/src/main/java/com/linkedin/gms/CommonApplicationConfig.java @@ -37,7 +37,10 @@ "com.linkedin.gms.factory.search", "com.linkedin.gms.factory.secret", "com.linkedin.gms.factory.timeseries", - "com.linkedin.gms.factory.plugins" + "com.linkedin.gms.factory.plugins", + "com.linkedin.gms.factory.change", + "com.datahub.event.hook", + "com.linkedin.gms.factory.notifications" }) @PropertySource(value = "classpath:/application.yaml", factory = YamlPropertySourceFactory.class) @Configuration From 573c1cb8407c2a5d152e5abb6b7d9f012eea75cb Mon Sep 17 00:00:00 2001 From: David Leifker Date: Fri, 9 Aug 2024 12:38:44 -0500 Subject: [PATCH 2/3] fix(openapi): fix openapi v2 endpoints & v3 documentation update --- docs/api/tutorials/structured-properties.md | 204 ++++++++++-------- .../controller/GenericEntitiesController.java | 23 +- .../v2/controller/EntityController.java | 25 +++ .../v3/controller/EntityController.java | 27 +++ 4 files changed, 169 insertions(+), 110 deletions(-) diff --git a/docs/api/tutorials/structured-properties.md b/docs/api/tutorials/structured-properties.md index 6f6c6541554d97..00e992f2bd0bbf 100644 --- a/docs/api/tutorials/structured-properties.md +++ b/docs/api/tutorials/structured-properties.md @@ -158,29 +158,37 @@ curl -X 'POST' -v \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ + "value": { "qualifiedName": "io.acryl.privacy.retentionTime", - "valueType": "urn:li:dataType:datahub.number", - "description": "Retention Time is used to figure out how long to retain records in a dataset", - "displayName": "Retention Time", - "cardinality": "MULTIPLE", - "entityTypes": [ - "urn:li:entityType:datahub.dataset", - "urn:li:entityType:datahub.dataFlow" - ], - "allowedValues": [ - { - "value": {"double": 30}, - "description": "30 days, usually reserved for datasets that are ephemeral and contain pii" - }, - { - "value": {"double": 60}, - "description": "Use this for datasets that drive monthly reporting but contain pii" - }, - { - "value": {"double": 365}, - "description": "Use this for non-sensitive data that can be retained for longer" - } - ] + "valueType": "urn:li:dataType:datahub.number", + "description": "Retention Time is used to figure out how long to retain records in a dataset", + "displayName": "Retention Time", + "cardinality": "MULTIPLE", + "entityTypes": [ + "urn:li:entityType:datahub.dataset", + "urn:li:entityType:datahub.dataFlow" + ], + "allowedValues": [ + { + "value": { + "double": 30 + }, + "description": "30 days, usually reserved for datasets that are ephemeral and contain pii" + }, + { + "value": { + "double": 60 + }, + "description": "Use this for datasets that drive monthly reporting but contain pii" + }, + { + "value": { + "double": 365 + }, + "description": "Use this for non-sensitive data that can be retained for longer" + } + ] + } }' | jq ``` @@ -474,14 +482,16 @@ curl -X 'POST' -v \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ - "properties": [ - { - "propertyUrn": "urn:li:structuredProperty:io.acryl.privacy.retentionTime", - "values": [ - {"double": 60.0} - ] - } - ] + "value": { + "properties": [ + { + "propertyUrn": "urn:li:structuredProperty:io.acryl.privacy.retentionTime", + "values": [ + {"double": 60.0} + ] + } + ] + } }' | jq ``` Example Response: @@ -627,23 +637,25 @@ curl -X 'POST' -v \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ - "qualifiedName": "io.acryl.privacy.retentionTime02", - "displayName": "Retention Time 02", - "valueType": "urn:li:dataType:datahub.string", - "allowedValues": [ - { - "value": {"string": "foo2"}, - "description": "test foo2 value" - }, - { - "value": {"string": "bar2"}, - "description": "test bar2 value" - } - ], - "cardinality": "SINGLE", - "entityTypes": [ - "urn:li:entityType:datahub.dataset" - ] + "value": { + "qualifiedName": "io.acryl.privacy.retentionTime02", + "displayName": "Retention Time 02", + "valueType": "urn:li:dataType:datahub.string", + "allowedValues": [ + { + "value": {"string": "foo2"}, + "description": "test foo2 value" + }, + { + "value": {"string": "bar2"}, + "description": "test bar2 value" + } + ], + "cardinality": "SINGLE", + "entityTypes": [ + "urn:li:entityType:datahub.dataset" + ] + } }' | jq ``` @@ -686,24 +698,26 @@ Specically, this will set `io.acryl.privacy.retentionTime` as `60.0` and `io.acr ```shell curl -X 'POST' -v \ - 'http://localhost:8080/openapi/v3/entity/dataset/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Ahive%2CSampleHiveDataset%2CPROD%29/structuredProperties' \ + 'http://localhost:8080/openapi/v3/entity/dataset/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Ahive%2CSampleHiveDataset%2CPROD%29/structuredProperties?createIfNotExists=false' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ - "properties": [ - { - "propertyUrn": "urn:li:structuredProperty:io.acryl.privacy.retentionTime", - "values": [ - {"double": 60.0} - ] - }, - { - "propertyUrn": "urn:li:structuredProperty:io.acryl.privacy.retentionTime02", - "values": [ - {"string": "bar2"} - ] - } - ] + "value": { + "properties": [ + { + "propertyUrn": "urn:li:structuredProperty:io.acryl.privacy.retentionTime", + "values": [ + {"double": 60.0} + ] + }, + { + "propertyUrn": "urn:li:structuredProperty:io.acryl.privacy.retentionTime02", + "values": [ + {"string": "bar2"} + ] + } + ] + } }' | jq ``` @@ -1111,7 +1125,9 @@ curl -X 'POST' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ -"removed": true + "value": { + "removed": true + } }' | jq ``` @@ -1132,11 +1148,13 @@ If you want to **remove the soft delete**, you can do so by either hard deleting ```shell curl -X 'POST' \ - 'http://localhost:8080/openapi/v3/entity/structuredProperty/urn%3Ali%3AstructuredProperty%3Aio.acryl.privacy.retentionTime/status?systemMetadata=false' \ + 'http://localhost:8080/openapi/v3/entity/structuredProperty/urn%3Ali%3AstructuredProperty%3Aio.acryl.privacy.retentionTime/status?systemMetadata=false&createIfNotExists=false' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ -"removed": false + "value": { + "removed": true + } }' | jq ``` @@ -1271,34 +1289,42 @@ Change the cardinality to `SINGLE` and add a `version`. ```shell curl -X 'POST' -v \ - 'http://localhost:8080/openapi/v3/entity/structuredProperty/urn%3Ali%3AstructuredProperty%3Aio.acryl.privacy.retentionTime/propertyDefinition' \ + 'http://localhost:8080/openapi/v3/entity/structuredProperty/urn%3Ali%3AstructuredProperty%3Aio.acryl.privacy.retentionTime/propertyDefinition?createIfNotExists=false' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ + "value": { "qualifiedName": "io.acryl.privacy.retentionTime", - "valueType": "urn:li:dataType:datahub.number", - "description": "Retention Time is used to figure out how long to retain records in a dataset", - "displayName": "Retention Time", - "cardinality": "SINGLE", - "version": "20240614080000", - "entityTypes": [ - "urn:li:entityType:datahub.dataset", - "urn:li:entityType:datahub.dataFlow" - ], - "allowedValues": [ - { - "value": {"double": 30}, - "description": "30 days, usually reserved for datasets that are ephemeral and contain pii" - }, - { - "value": {"double": 60}, - "description": "Use this for datasets that drive monthly reporting but contain pii" - }, - { - "value": {"double": 365}, - "description": "Use this for non-sensitive data that can be retained for longer" - } - ] + "valueType": "urn:li:dataType:datahub.number", + "description": "Retention Time is used to figure out how long to retain records in a dataset", + "displayName": "Retention Time", + "cardinality": "SINGLE", + "version": "20240614080000", + "entityTypes": [ + "urn:li:entityType:datahub.dataset", + "urn:li:entityType:datahub.dataFlow" + ], + "allowedValues": [ + { + "value": { + "double": 30 + }, + "description": "30 days, usually reserved for datasets that are ephemeral and contain pii" + }, + { + "value": { + "double": 60 + }, + "description": "Use this for datasets that drive monthly reporting but contain pii" + }, + { + "value": { + "double": 365 + }, + "description": "Use this for non-sensitive data that can be retained for longer" + } + ] + } }' | jq ``` diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/controller/GenericEntitiesController.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/controller/GenericEntitiesController.java index de5d2ae1118d4a..f415a4f47c9dc2 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/controller/GenericEntitiesController.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/controller/GenericEntitiesController.java @@ -13,14 +13,11 @@ import com.datahub.authorization.AuthorizerChain; import com.datahub.util.RecordUtils; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableSet; import com.linkedin.common.urn.Urn; -import com.linkedin.data.ByteString; import com.linkedin.data.template.RecordTemplate; import com.linkedin.entity.EnvelopedAspect; -import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.aspect.AspectRetriever; import com.linkedin.metadata.aspect.batch.AspectsBatch; import com.linkedin.metadata.aspect.batch.ChangeMCP; @@ -41,7 +38,6 @@ import com.linkedin.metadata.search.SearchEntityArray; import com.linkedin.metadata.search.SearchService; import com.linkedin.metadata.utils.AuditStampUtils; -import com.linkedin.metadata.utils.GenericRecordUtils; import com.linkedin.metadata.utils.SearchUtil; import com.linkedin.mxe.SystemMetadata; import com.linkedin.util.Pair; @@ -57,7 +53,6 @@ import jakarta.servlet.http.HttpServletRequest; import java.lang.reflect.InvocationTargetException; import java.net.URISyntaxException; -import java.nio.charset.StandardCharsets; import java.util.*; import java.util.stream.Collectors; import javax.annotation.Nonnull; @@ -726,28 +721,14 @@ protected RecordTemplate toRecordTemplate( aspectSpec.getDataTemplateClass(), envelopedAspect.getValue().data()); } - protected ChangeMCP toUpsertItem( + protected abstract ChangeMCP toUpsertItem( @Nonnull AspectRetriever aspectRetriever, Urn entityUrn, AspectSpec aspectSpec, Boolean createIfNotExists, String jsonAspect, Actor actor) - throws JsonProcessingException { - JsonNode jsonNode = objectMapper.readTree(jsonAspect); - String aspectJson = jsonNode.get("value").toString(); - return ChangeItemImpl.builder() - .urn(entityUrn) - .aspectName(aspectSpec.getName()) - .changeType(Boolean.TRUE.equals(createIfNotExists) ? ChangeType.CREATE : ChangeType.UPSERT) - .auditStamp(AuditStampUtils.createAuditStamp(actor.toUrnStr())) - .recordTemplate( - GenericRecordUtils.deserializeAspect( - ByteString.copyString(aspectJson, StandardCharsets.UTF_8), - GenericRecordUtils.JSON, - aspectSpec)) - .build(aspectRetriever); - } + throws URISyntaxException, JsonProcessingException; protected ChangeMCP toUpsertItem( @Nonnull AspectRetriever aspectRetriever, diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/EntityController.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/EntityController.java index 54a7724cadd345..1207eb331b795e 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/EntityController.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/EntityController.java @@ -13,8 +13,11 @@ import com.linkedin.data.ByteString; import com.linkedin.data.template.RecordTemplate; import com.linkedin.entity.EnvelopedAspect; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.aspect.AspectRetriever; import com.linkedin.metadata.aspect.batch.AspectsBatch; import com.linkedin.metadata.aspect.batch.BatchItem; +import com.linkedin.metadata.aspect.batch.ChangeMCP; import com.linkedin.metadata.entity.EntityApiUtils; import com.linkedin.metadata.entity.IngestResult; import com.linkedin.metadata.entity.UpdateAspectResult; @@ -260,4 +263,26 @@ protected List buildEntityList( } return responseList; } + + @Override + protected ChangeMCP toUpsertItem( + @Nonnull AspectRetriever aspectRetriever, + Urn entityUrn, + AspectSpec aspectSpec, + Boolean createIfNotExists, + String jsonAspect, + Actor actor) + throws URISyntaxException { + return ChangeItemImpl.builder() + .urn(entityUrn) + .aspectName(aspectSpec.getName()) + .changeType(Boolean.TRUE.equals(createIfNotExists) ? ChangeType.CREATE : ChangeType.UPSERT) + .auditStamp(AuditStampUtils.createAuditStamp(actor.toUrnStr())) + .recordTemplate( + GenericRecordUtils.deserializeAspect( + ByteString.copyString(jsonAspect, StandardCharsets.UTF_8), + GenericRecordUtils.JSON, + aspectSpec)) + .build(aspectRetriever); + } } diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/controller/EntityController.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/controller/EntityController.java index a0478c9af16092..fbc9bf2956cfd3 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/controller/EntityController.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/controller/EntityController.java @@ -14,8 +14,11 @@ import com.linkedin.common.urn.Urn; import com.linkedin.data.ByteString; import com.linkedin.entity.EnvelopedAspect; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.aspect.AspectRetriever; import com.linkedin.metadata.aspect.batch.AspectsBatch; import com.linkedin.metadata.aspect.batch.BatchItem; +import com.linkedin.metadata.aspect.batch.ChangeMCP; import com.linkedin.metadata.entity.EntityApiUtils; import com.linkedin.metadata.entity.IngestResult; import com.linkedin.metadata.entity.UpdateAspectResult; @@ -348,4 +351,28 @@ protected AspectsBatch toMCPBatch( .retrieverContext(opContext.getRetrieverContext().get()) .build(); } + + @Override + protected ChangeMCP toUpsertItem( + @Nonnull AspectRetriever aspectRetriever, + Urn entityUrn, + AspectSpec aspectSpec, + Boolean createIfNotExists, + String jsonAspect, + Actor actor) + throws JsonProcessingException { + JsonNode jsonNode = objectMapper.readTree(jsonAspect); + String aspectJson = jsonNode.get("value").toString(); + return ChangeItemImpl.builder() + .urn(entityUrn) + .aspectName(aspectSpec.getName()) + .changeType(Boolean.TRUE.equals(createIfNotExists) ? ChangeType.CREATE : ChangeType.UPSERT) + .auditStamp(AuditStampUtils.createAuditStamp(actor.toUrnStr())) + .recordTemplate( + GenericRecordUtils.deserializeAspect( + ByteString.copyString(aspectJson, StandardCharsets.UTF_8), + GenericRecordUtils.JSON, + aspectSpec)) + .build(aspectRetriever); + } } From 469654ced75c3340276028068a6ca201eadc0cdf Mon Sep 17 00:00:00 2001 From: David Leifker Date: Fri, 9 Aug 2024 12:40:34 -0500 Subject: [PATCH 3/3] Revert "fix(openapi): fix openapi v2 endpoints & v3 documentation update" This reverts commit 573c1cb8407c2a5d152e5abb6b7d9f012eea75cb. --- docs/api/tutorials/structured-properties.md | 204 ++++++++---------- .../controller/GenericEntitiesController.java | 23 +- .../v2/controller/EntityController.java | 25 --- .../v3/controller/EntityController.java | 27 --- 4 files changed, 110 insertions(+), 169 deletions(-) diff --git a/docs/api/tutorials/structured-properties.md b/docs/api/tutorials/structured-properties.md index 00e992f2bd0bbf..6f6c6541554d97 100644 --- a/docs/api/tutorials/structured-properties.md +++ b/docs/api/tutorials/structured-properties.md @@ -158,37 +158,29 @@ curl -X 'POST' -v \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ - "value": { "qualifiedName": "io.acryl.privacy.retentionTime", - "valueType": "urn:li:dataType:datahub.number", - "description": "Retention Time is used to figure out how long to retain records in a dataset", - "displayName": "Retention Time", - "cardinality": "MULTIPLE", - "entityTypes": [ - "urn:li:entityType:datahub.dataset", - "urn:li:entityType:datahub.dataFlow" - ], - "allowedValues": [ - { - "value": { - "double": 30 - }, - "description": "30 days, usually reserved for datasets that are ephemeral and contain pii" - }, - { - "value": { - "double": 60 - }, - "description": "Use this for datasets that drive monthly reporting but contain pii" - }, - { - "value": { - "double": 365 - }, - "description": "Use this for non-sensitive data that can be retained for longer" - } - ] - } + "valueType": "urn:li:dataType:datahub.number", + "description": "Retention Time is used to figure out how long to retain records in a dataset", + "displayName": "Retention Time", + "cardinality": "MULTIPLE", + "entityTypes": [ + "urn:li:entityType:datahub.dataset", + "urn:li:entityType:datahub.dataFlow" + ], + "allowedValues": [ + { + "value": {"double": 30}, + "description": "30 days, usually reserved for datasets that are ephemeral and contain pii" + }, + { + "value": {"double": 60}, + "description": "Use this for datasets that drive monthly reporting but contain pii" + }, + { + "value": {"double": 365}, + "description": "Use this for non-sensitive data that can be retained for longer" + } + ] }' | jq ``` @@ -482,16 +474,14 @@ curl -X 'POST' -v \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ - "value": { - "properties": [ - { - "propertyUrn": "urn:li:structuredProperty:io.acryl.privacy.retentionTime", - "values": [ - {"double": 60.0} - ] - } - ] - } + "properties": [ + { + "propertyUrn": "urn:li:structuredProperty:io.acryl.privacy.retentionTime", + "values": [ + {"double": 60.0} + ] + } + ] }' | jq ``` Example Response: @@ -637,25 +627,23 @@ curl -X 'POST' -v \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ - "value": { - "qualifiedName": "io.acryl.privacy.retentionTime02", - "displayName": "Retention Time 02", - "valueType": "urn:li:dataType:datahub.string", - "allowedValues": [ - { - "value": {"string": "foo2"}, - "description": "test foo2 value" - }, - { - "value": {"string": "bar2"}, - "description": "test bar2 value" - } - ], - "cardinality": "SINGLE", - "entityTypes": [ - "urn:li:entityType:datahub.dataset" - ] - } + "qualifiedName": "io.acryl.privacy.retentionTime02", + "displayName": "Retention Time 02", + "valueType": "urn:li:dataType:datahub.string", + "allowedValues": [ + { + "value": {"string": "foo2"}, + "description": "test foo2 value" + }, + { + "value": {"string": "bar2"}, + "description": "test bar2 value" + } + ], + "cardinality": "SINGLE", + "entityTypes": [ + "urn:li:entityType:datahub.dataset" + ] }' | jq ``` @@ -698,26 +686,24 @@ Specically, this will set `io.acryl.privacy.retentionTime` as `60.0` and `io.acr ```shell curl -X 'POST' -v \ - 'http://localhost:8080/openapi/v3/entity/dataset/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Ahive%2CSampleHiveDataset%2CPROD%29/structuredProperties?createIfNotExists=false' \ + 'http://localhost:8080/openapi/v3/entity/dataset/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Ahive%2CSampleHiveDataset%2CPROD%29/structuredProperties' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ - "value": { - "properties": [ - { - "propertyUrn": "urn:li:structuredProperty:io.acryl.privacy.retentionTime", - "values": [ - {"double": 60.0} - ] - }, - { - "propertyUrn": "urn:li:structuredProperty:io.acryl.privacy.retentionTime02", - "values": [ - {"string": "bar2"} - ] - } - ] - } + "properties": [ + { + "propertyUrn": "urn:li:structuredProperty:io.acryl.privacy.retentionTime", + "values": [ + {"double": 60.0} + ] + }, + { + "propertyUrn": "urn:li:structuredProperty:io.acryl.privacy.retentionTime02", + "values": [ + {"string": "bar2"} + ] + } + ] }' | jq ``` @@ -1125,9 +1111,7 @@ curl -X 'POST' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ - "value": { - "removed": true - } +"removed": true }' | jq ``` @@ -1148,13 +1132,11 @@ If you want to **remove the soft delete**, you can do so by either hard deleting ```shell curl -X 'POST' \ - 'http://localhost:8080/openapi/v3/entity/structuredProperty/urn%3Ali%3AstructuredProperty%3Aio.acryl.privacy.retentionTime/status?systemMetadata=false&createIfNotExists=false' \ + 'http://localhost:8080/openapi/v3/entity/structuredProperty/urn%3Ali%3AstructuredProperty%3Aio.acryl.privacy.retentionTime/status?systemMetadata=false' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ - "value": { - "removed": true - } +"removed": false }' | jq ``` @@ -1289,42 +1271,34 @@ Change the cardinality to `SINGLE` and add a `version`. ```shell curl -X 'POST' -v \ - 'http://localhost:8080/openapi/v3/entity/structuredProperty/urn%3Ali%3AstructuredProperty%3Aio.acryl.privacy.retentionTime/propertyDefinition?createIfNotExists=false' \ + 'http://localhost:8080/openapi/v3/entity/structuredProperty/urn%3Ali%3AstructuredProperty%3Aio.acryl.privacy.retentionTime/propertyDefinition' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ - "value": { "qualifiedName": "io.acryl.privacy.retentionTime", - "valueType": "urn:li:dataType:datahub.number", - "description": "Retention Time is used to figure out how long to retain records in a dataset", - "displayName": "Retention Time", - "cardinality": "SINGLE", - "version": "20240614080000", - "entityTypes": [ - "urn:li:entityType:datahub.dataset", - "urn:li:entityType:datahub.dataFlow" - ], - "allowedValues": [ - { - "value": { - "double": 30 - }, - "description": "30 days, usually reserved for datasets that are ephemeral and contain pii" - }, - { - "value": { - "double": 60 - }, - "description": "Use this for datasets that drive monthly reporting but contain pii" - }, - { - "value": { - "double": 365 - }, - "description": "Use this for non-sensitive data that can be retained for longer" - } - ] - } + "valueType": "urn:li:dataType:datahub.number", + "description": "Retention Time is used to figure out how long to retain records in a dataset", + "displayName": "Retention Time", + "cardinality": "SINGLE", + "version": "20240614080000", + "entityTypes": [ + "urn:li:entityType:datahub.dataset", + "urn:li:entityType:datahub.dataFlow" + ], + "allowedValues": [ + { + "value": {"double": 30}, + "description": "30 days, usually reserved for datasets that are ephemeral and contain pii" + }, + { + "value": {"double": 60}, + "description": "Use this for datasets that drive monthly reporting but contain pii" + }, + { + "value": {"double": 365}, + "description": "Use this for non-sensitive data that can be retained for longer" + } + ] }' | jq ``` diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/controller/GenericEntitiesController.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/controller/GenericEntitiesController.java index f415a4f47c9dc2..de5d2ae1118d4a 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/controller/GenericEntitiesController.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/controller/GenericEntitiesController.java @@ -13,11 +13,14 @@ import com.datahub.authorization.AuthorizerChain; import com.datahub.util.RecordUtils; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableSet; import com.linkedin.common.urn.Urn; +import com.linkedin.data.ByteString; import com.linkedin.data.template.RecordTemplate; import com.linkedin.entity.EnvelopedAspect; +import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.aspect.AspectRetriever; import com.linkedin.metadata.aspect.batch.AspectsBatch; import com.linkedin.metadata.aspect.batch.ChangeMCP; @@ -38,6 +41,7 @@ import com.linkedin.metadata.search.SearchEntityArray; import com.linkedin.metadata.search.SearchService; import com.linkedin.metadata.utils.AuditStampUtils; +import com.linkedin.metadata.utils.GenericRecordUtils; import com.linkedin.metadata.utils.SearchUtil; import com.linkedin.mxe.SystemMetadata; import com.linkedin.util.Pair; @@ -53,6 +57,7 @@ import jakarta.servlet.http.HttpServletRequest; import java.lang.reflect.InvocationTargetException; import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; import java.util.*; import java.util.stream.Collectors; import javax.annotation.Nonnull; @@ -721,14 +726,28 @@ protected RecordTemplate toRecordTemplate( aspectSpec.getDataTemplateClass(), envelopedAspect.getValue().data()); } - protected abstract ChangeMCP toUpsertItem( + protected ChangeMCP toUpsertItem( @Nonnull AspectRetriever aspectRetriever, Urn entityUrn, AspectSpec aspectSpec, Boolean createIfNotExists, String jsonAspect, Actor actor) - throws URISyntaxException, JsonProcessingException; + throws JsonProcessingException { + JsonNode jsonNode = objectMapper.readTree(jsonAspect); + String aspectJson = jsonNode.get("value").toString(); + return ChangeItemImpl.builder() + .urn(entityUrn) + .aspectName(aspectSpec.getName()) + .changeType(Boolean.TRUE.equals(createIfNotExists) ? ChangeType.CREATE : ChangeType.UPSERT) + .auditStamp(AuditStampUtils.createAuditStamp(actor.toUrnStr())) + .recordTemplate( + GenericRecordUtils.deserializeAspect( + ByteString.copyString(aspectJson, StandardCharsets.UTF_8), + GenericRecordUtils.JSON, + aspectSpec)) + .build(aspectRetriever); + } protected ChangeMCP toUpsertItem( @Nonnull AspectRetriever aspectRetriever, diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/EntityController.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/EntityController.java index 1207eb331b795e..54a7724cadd345 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/EntityController.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/EntityController.java @@ -13,11 +13,8 @@ import com.linkedin.data.ByteString; import com.linkedin.data.template.RecordTemplate; import com.linkedin.entity.EnvelopedAspect; -import com.linkedin.events.metadata.ChangeType; -import com.linkedin.metadata.aspect.AspectRetriever; import com.linkedin.metadata.aspect.batch.AspectsBatch; import com.linkedin.metadata.aspect.batch.BatchItem; -import com.linkedin.metadata.aspect.batch.ChangeMCP; import com.linkedin.metadata.entity.EntityApiUtils; import com.linkedin.metadata.entity.IngestResult; import com.linkedin.metadata.entity.UpdateAspectResult; @@ -263,26 +260,4 @@ protected List buildEntityList( } return responseList; } - - @Override - protected ChangeMCP toUpsertItem( - @Nonnull AspectRetriever aspectRetriever, - Urn entityUrn, - AspectSpec aspectSpec, - Boolean createIfNotExists, - String jsonAspect, - Actor actor) - throws URISyntaxException { - return ChangeItemImpl.builder() - .urn(entityUrn) - .aspectName(aspectSpec.getName()) - .changeType(Boolean.TRUE.equals(createIfNotExists) ? ChangeType.CREATE : ChangeType.UPSERT) - .auditStamp(AuditStampUtils.createAuditStamp(actor.toUrnStr())) - .recordTemplate( - GenericRecordUtils.deserializeAspect( - ByteString.copyString(jsonAspect, StandardCharsets.UTF_8), - GenericRecordUtils.JSON, - aspectSpec)) - .build(aspectRetriever); - } } diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/controller/EntityController.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/controller/EntityController.java index fbc9bf2956cfd3..a0478c9af16092 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/controller/EntityController.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/controller/EntityController.java @@ -14,11 +14,8 @@ import com.linkedin.common.urn.Urn; import com.linkedin.data.ByteString; import com.linkedin.entity.EnvelopedAspect; -import com.linkedin.events.metadata.ChangeType; -import com.linkedin.metadata.aspect.AspectRetriever; import com.linkedin.metadata.aspect.batch.AspectsBatch; import com.linkedin.metadata.aspect.batch.BatchItem; -import com.linkedin.metadata.aspect.batch.ChangeMCP; import com.linkedin.metadata.entity.EntityApiUtils; import com.linkedin.metadata.entity.IngestResult; import com.linkedin.metadata.entity.UpdateAspectResult; @@ -351,28 +348,4 @@ protected AspectsBatch toMCPBatch( .retrieverContext(opContext.getRetrieverContext().get()) .build(); } - - @Override - protected ChangeMCP toUpsertItem( - @Nonnull AspectRetriever aspectRetriever, - Urn entityUrn, - AspectSpec aspectSpec, - Boolean createIfNotExists, - String jsonAspect, - Actor actor) - throws JsonProcessingException { - JsonNode jsonNode = objectMapper.readTree(jsonAspect); - String aspectJson = jsonNode.get("value").toString(); - return ChangeItemImpl.builder() - .urn(entityUrn) - .aspectName(aspectSpec.getName()) - .changeType(Boolean.TRUE.equals(createIfNotExists) ? ChangeType.CREATE : ChangeType.UPSERT) - .auditStamp(AuditStampUtils.createAuditStamp(actor.toUrnStr())) - .recordTemplate( - GenericRecordUtils.deserializeAspect( - ByteString.copyString(aspectJson, StandardCharsets.UTF_8), - GenericRecordUtils.JSON, - aspectSpec)) - .build(aspectRetriever); - } }