From 4601aacfadcc49c13571533ca97ee3da442b7026 Mon Sep 17 00:00:00 2001 From: mikeburke24 <42984275+mikeburke24@users.noreply.github.com> Date: Tue, 19 Nov 2024 04:29:37 -0600 Subject: [PATCH 01/10] fix(ingest): upgrade msal (#11883) --- metadata-ingestion/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 3152d0290ec227..2469af74b03343 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -245,7 +245,7 @@ # Instead, we put the fix in our PyHive fork, so no thrift pin is needed. } -microsoft_common = {"msal>=1.22.0"} +microsoft_common = {"msal>=1.24.0"} iceberg_common = { # Iceberg Python SDK From a1c783ba3aab69360c700732619d70a08966a0ef Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Tue, 19 Nov 2024 10:58:18 -0600 Subject: [PATCH 02/10] refactor(kafka): reconfigure consumers to allow different config (#11869) --- docker/profiles/docker-compose.gms.yml | 1 - .../kafka/MCLKafkaListenerRegistrar.java | 4 +- .../kafka/MetadataChangeEventsProcessor.java | 4 +- .../MetadataChangeProposalsProcessor.java | 3 +- .../datahub/event/PlatformEventProcessor.java | 4 +- .../config/kafka/ConsumerConfiguration.java | 9 ++ .../config/kafka/KafkaConfiguration.java | 4 + .../src/main/resources/application.yaml | 7 ++ .../kafka/KafkaEventConsumerFactory.java | 88 +++++++++++++++++-- .../test/SchemaRegistryControllerTest.java | 7 +- 10 files changed, 116 insertions(+), 15 deletions(-) diff --git a/docker/profiles/docker-compose.gms.yml b/docker/profiles/docker-compose.gms.yml index 147bbd35ff6460..824c8024b05d63 100644 --- a/docker/profiles/docker-compose.gms.yml +++ b/docker/profiles/docker-compose.gms.yml @@ -40,7 +40,6 @@ x-kafka-env: &kafka-env # KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081 SCHEMA_REGISTRY_TYPE: INTERNAL KAFKA_SCHEMAREGISTRY_URL: http://datahub-gms:8080/schema-registry/api/ - SPRING_KAFKA_CONSUMER_AUTO_OFFSET_RESET: ${SPRING_KAFKA_CONSUMER_AUTO_OFFSET_RESET:-earliest} x-datahub-quickstart-telemetry-env: &datahub-quickstart-telemetry-env DATAHUB_SERVER_TYPE: ${DATAHUB_SERVER_TYPE:-quickstart} diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListenerRegistrar.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListenerRegistrar.java index fb2880f617d301..c909b0034a9125 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListenerRegistrar.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListenerRegistrar.java @@ -1,5 +1,7 @@ package com.linkedin.metadata.kafka; +import static com.linkedin.metadata.config.kafka.KafkaConfiguration.MCL_EVENT_CONSUMER_NAME; + import com.linkedin.metadata.kafka.config.MetadataChangeLogProcessorCondition; import com.linkedin.metadata.kafka.hook.MetadataChangeLogHook; import com.linkedin.mxe.Topics; @@ -39,7 +41,7 @@ public class MCLKafkaListenerRegistrar implements InitializingBean { @Autowired private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; @Autowired - @Qualifier("kafkaEventConsumer") + @Qualifier(MCL_EVENT_CONSUMER_NAME) private KafkaListenerContainerFactory kafkaListenerContainerFactory; @Value("${METADATA_CHANGE_LOG_KAFKA_CONSUMER_GROUP_ID:generic-mae-consumer-job-client}") diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java index 1b3d19915b439e..5d2f6452e69197 100644 --- a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java +++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java @@ -1,5 +1,7 @@ package com.linkedin.metadata.kafka; +import static com.linkedin.metadata.config.kafka.KafkaConfiguration.DEFAULT_EVENT_CONSUMER_NAME; + import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; @@ -60,7 +62,7 @@ public class MetadataChangeEventsProcessor { "${METADATA_CHANGE_EVENT_NAME:${KAFKA_MCE_TOPIC_NAME:" + Topics.METADATA_CHANGE_EVENT + "}}", - containerFactory = "kafkaEventConsumer") + containerFactory = DEFAULT_EVENT_CONSUMER_NAME) @Deprecated public void consume(final ConsumerRecord consumerRecord) { try (Timer.Context i = MetricUtils.timer(this.getClass(), "consume").time()) { diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java index 22c2b4b9c04503..ef87afdef46cb7 100644 --- a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java +++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java @@ -4,6 +4,7 @@ import static com.linkedin.metadata.Constants.MDC_CHANGE_TYPE; import static com.linkedin.metadata.Constants.MDC_ENTITY_TYPE; import static com.linkedin.metadata.Constants.MDC_ENTITY_URN; +import static com.linkedin.metadata.config.kafka.KafkaConfiguration.MCP_EVENT_CONSUMER_NAME; import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; @@ -116,7 +117,7 @@ public void registerConsumerThrottle() { @KafkaListener( id = CONSUMER_GROUP_ID_VALUE, topics = "${METADATA_CHANGE_PROPOSAL_TOPIC_NAME:" + Topics.METADATA_CHANGE_PROPOSAL + "}", - containerFactory = "kafkaEventConsumer") + containerFactory = MCP_EVENT_CONSUMER_NAME) public void consume(final ConsumerRecord consumerRecord) { try (Timer.Context ignored = MetricUtils.timer(this.getClass(), "consume").time()) { kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp()); diff --git a/metadata-jobs/pe-consumer/src/main/java/com/datahub/event/PlatformEventProcessor.java b/metadata-jobs/pe-consumer/src/main/java/com/datahub/event/PlatformEventProcessor.java index 358a2ac0c2ee33..5d11697bed93d2 100644 --- a/metadata-jobs/pe-consumer/src/main/java/com/datahub/event/PlatformEventProcessor.java +++ b/metadata-jobs/pe-consumer/src/main/java/com/datahub/event/PlatformEventProcessor.java @@ -1,5 +1,7 @@ package com.datahub.event; +import static com.linkedin.metadata.config.kafka.KafkaConfiguration.PE_EVENT_CONSUMER_NAME; + import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; @@ -56,7 +58,7 @@ public PlatformEventProcessor( @KafkaListener( id = "${PLATFORM_EVENT_KAFKA_CONSUMER_GROUP_ID:generic-platform-event-job-client}", topics = {"${PLATFORM_EVENT_TOPIC_NAME:" + Topics.PLATFORM_EVENT + "}"}, - containerFactory = "kafkaEventConsumer") + containerFactory = PE_EVENT_CONSUMER_NAME) public void consume(final ConsumerRecord consumerRecord) { try (Timer.Context i = MetricUtils.timer(this.getClass(), "consume").time()) { diff --git a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/ConsumerConfiguration.java b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/ConsumerConfiguration.java index 60f3e1b4fef76f..9b476483a2baf8 100644 --- a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/ConsumerConfiguration.java +++ b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/ConsumerConfiguration.java @@ -8,4 +8,13 @@ public class ConsumerConfiguration { private int maxPartitionFetchBytes; private boolean stopOnDeserializationError; private boolean healthCheckEnabled; + + private ConsumerOptions mcp; + private ConsumerOptions mcl; + private ConsumerOptions pe; + + @Data + public static class ConsumerOptions { + private String autoOffsetReset; + } } diff --git a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/KafkaConfiguration.java b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/KafkaConfiguration.java index b03aedc1a7b5eb..ae0d3a3bb4647a 100644 --- a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/KafkaConfiguration.java +++ b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/KafkaConfiguration.java @@ -20,6 +20,10 @@ public class KafkaConfiguration { "spring.deserializer.key.delegate.class"; public static final String VALUE_DESERIALIZER_DELEGATE_CLASS = "spring.deserializer.value.delegate.class"; + public static final String MCP_EVENT_CONSUMER_NAME = "mcpEventConsumer"; + public static final String MCL_EVENT_CONSUMER_NAME = "mclEventConsumer"; + public static final String PE_EVENT_CONSUMER_NAME = "platformEventConsumer"; + public static final String DEFAULT_EVENT_CONSUMER_NAME = "kafkaEventConsumer"; private String bootstrapServers; diff --git a/metadata-service/configuration/src/main/resources/application.yaml b/metadata-service/configuration/src/main/resources/application.yaml index 8010ae187b6c8b..4945b36a251c26 100644 --- a/metadata-service/configuration/src/main/resources/application.yaml +++ b/metadata-service/configuration/src/main/resources/application.yaml @@ -289,6 +289,13 @@ kafka: maxPartitionFetchBytes: ${KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES:5242880} # the max bytes consumed per partition stopOnDeserializationError: ${KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR:true} # Stops kafka listener container on deserialization error, allows user to fix problems before moving past problematic offset. If false will log and move forward past the offset healthCheckEnabled: ${KAFKA_CONSUMER_HEALTH_CHECK_ENABLED:true} # Sets the health indicator to down when a message listener container has stopped due to a deserialization failure, will force consumer apps to restart through k8s and docker-compose health mechanisms + mcp: + autoOffsetReset: ${KAFKA_CONSUMER_MCP_AUTO_OFFSET_RESET:earliest} + mcl: + autoOffsetReset: ${KAFKA_CONSUMER_MCL_AUTO_OFFSET_RESET:earliest} + pe: + autoOffsetReset: ${KAFKA_CONSUMER_PE_AUTO_OFFSET_RESET:latest} + schemaRegistry: type: ${SCHEMA_REGISTRY_TYPE:KAFKA} # INTERNAL or KAFKA or AWS_GLUE url: ${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081} diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java index 750af8ec488df3..a1ee4df360b7ec 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java @@ -1,10 +1,18 @@ package com.linkedin.gms.factory.kafka; +import static com.linkedin.metadata.config.kafka.KafkaConfiguration.DEFAULT_EVENT_CONSUMER_NAME; +import static com.linkedin.metadata.config.kafka.KafkaConfiguration.MCL_EVENT_CONSUMER_NAME; +import static com.linkedin.metadata.config.kafka.KafkaConfiguration.MCP_EVENT_CONSUMER_NAME; +import static com.linkedin.metadata.config.kafka.KafkaConfiguration.PE_EVENT_CONSUMER_NAME; + import com.linkedin.gms.factory.config.ConfigurationProvider; +import com.linkedin.metadata.config.kafka.ConsumerConfiguration; import com.linkedin.metadata.config.kafka.KafkaConfiguration; import java.time.Duration; import java.util.Arrays; +import java.util.HashMap; import java.util.Map; +import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -23,7 +31,6 @@ @Slf4j @Configuration public class KafkaEventConsumerFactory { - private int kafkaEventConsumerConcurrency; @Bean(name = "kafkaConsumerFactory") @@ -87,15 +94,82 @@ private static Map buildCustomizedProperties( return customizedProperties; } - @Bean(name = "kafkaEventConsumer") + @Bean(name = PE_EVENT_CONSUMER_NAME) + protected KafkaListenerContainerFactory platformEventConsumer( + @Qualifier("kafkaConsumerFactory") + DefaultKafkaConsumerFactory kafkaConsumerFactory, + @Qualifier("configurationProvider") ConfigurationProvider configurationProvider) { + + return buildDefaultKafkaListenerContainerFactory( + PE_EVENT_CONSUMER_NAME, + kafkaConsumerFactory, + configurationProvider.getKafka().getConsumer().isStopOnDeserializationError(), + configurationProvider.getKafka().getConsumer().getPe()); + } + + @Bean(name = MCP_EVENT_CONSUMER_NAME) + protected KafkaListenerContainerFactory mcpEventConsumer( + @Qualifier("kafkaConsumerFactory") + DefaultKafkaConsumerFactory kafkaConsumerFactory, + @Qualifier("configurationProvider") ConfigurationProvider configurationProvider) { + + return buildDefaultKafkaListenerContainerFactory( + MCP_EVENT_CONSUMER_NAME, + kafkaConsumerFactory, + configurationProvider.getKafka().getConsumer().isStopOnDeserializationError(), + configurationProvider.getKafka().getConsumer().getMcp()); + } + + @Bean(name = MCL_EVENT_CONSUMER_NAME) + protected KafkaListenerContainerFactory mclEventConsumer( + @Qualifier("kafkaConsumerFactory") + DefaultKafkaConsumerFactory kafkaConsumerFactory, + @Qualifier("configurationProvider") ConfigurationProvider configurationProvider) { + + return buildDefaultKafkaListenerContainerFactory( + MCL_EVENT_CONSUMER_NAME, + kafkaConsumerFactory, + configurationProvider.getKafka().getConsumer().isStopOnDeserializationError(), + configurationProvider.getKafka().getConsumer().getMcl()); + } + + @Bean(name = DEFAULT_EVENT_CONSUMER_NAME) protected KafkaListenerContainerFactory kafkaEventConsumer( @Qualifier("kafkaConsumerFactory") DefaultKafkaConsumerFactory kafkaConsumerFactory, @Qualifier("configurationProvider") ConfigurationProvider configurationProvider) { + return buildDefaultKafkaListenerContainerFactory( + DEFAULT_EVENT_CONSUMER_NAME, + kafkaConsumerFactory, + configurationProvider.getKafka().getConsumer().isStopOnDeserializationError(), + null); + } + + private KafkaListenerContainerFactory buildDefaultKafkaListenerContainerFactory( + String consumerFactoryName, + DefaultKafkaConsumerFactory kafkaConsumerFactory, + boolean isStopOnDeserializationError, + @Nullable ConsumerConfiguration.ConsumerOptions consumerOptions) { + + final DefaultKafkaConsumerFactory factoryWithOverrides; + if (consumerOptions != null) { + // Copy the base config + Map props = new HashMap<>(kafkaConsumerFactory.getConfigurationProperties()); + // Override just the auto.offset.reset + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerOptions.getAutoOffsetReset()); + factoryWithOverrides = + new DefaultKafkaConsumerFactory<>( + props, + kafkaConsumerFactory.getKeyDeserializer(), + kafkaConsumerFactory.getValueDeserializer()); + } else { + factoryWithOverrides = kafkaConsumerFactory; + } + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(kafkaConsumerFactory); + factory.setConsumerFactory(factoryWithOverrides); factory.setContainerCustomizer(new ThreadPoolContainerCustomizer()); factory.setConcurrency(kafkaEventConsumerConcurrency); @@ -103,7 +177,7 @@ protected KafkaListenerContainerFactory kafkaEventConsumer( use DefaultErrorHandler (does back-off retry and then logs) rather than stopping the container. Stopping the container prevents lost messages until the error can be examined, disabling this will allow progress, but may lose data */ - if (configurationProvider.getKafka().getConsumer().isStopOnDeserializationError()) { + if (isStopOnDeserializationError) { CommonDelegatingErrorHandler delegatingErrorHandler = new CommonDelegatingErrorHandler(new DefaultErrorHandler()); delegatingErrorHandler.addDelegate( @@ -111,9 +185,9 @@ use DefaultErrorHandler (does back-off retry and then logs) rather than stopping factory.setCommonErrorHandler(delegatingErrorHandler); } log.info( - String.format( - "Event-based KafkaListenerContainerFactory built successfully. Consumer concurrency = %s", - kafkaEventConsumerConcurrency)); + "Event-based {} KafkaListenerContainerFactory built successfully. Consumer concurrency = {}", + consumerFactoryName, + kafkaEventConsumerConcurrency); return factory; } diff --git a/metadata-service/schema-registry-servlet/src/test/java/io/datahubproject/openapi/test/SchemaRegistryControllerTest.java b/metadata-service/schema-registry-servlet/src/test/java/io/datahubproject/openapi/test/SchemaRegistryControllerTest.java index 664766f204e460..e8deed00672da7 100644 --- a/metadata-service/schema-registry-servlet/src/test/java/io/datahubproject/openapi/test/SchemaRegistryControllerTest.java +++ b/metadata-service/schema-registry-servlet/src/test/java/io/datahubproject/openapi/test/SchemaRegistryControllerTest.java @@ -1,6 +1,7 @@ package io.datahubproject.openapi.test; import static com.linkedin.metadata.Constants.*; +import static com.linkedin.metadata.config.kafka.KafkaConfiguration.DEFAULT_EVENT_CONSUMER_NAME; import static org.testng.Assert.*; import com.linkedin.common.urn.Urn; @@ -199,7 +200,7 @@ public void testPEConsumption() @KafkaListener( id = "test-mcp-consumer", topics = Topics.METADATA_CHANGE_PROPOSAL, - containerFactory = "kafkaEventConsumer", + containerFactory = DEFAULT_EVENT_CONSUMER_NAME, properties = {"auto.offset.reset:earliest"}) public void receiveMCP(ConsumerRecord consumerRecord) { @@ -216,7 +217,7 @@ public void receiveMCP(ConsumerRecord consumerRecord) { @KafkaListener( id = "test-mcl-consumer", topics = Topics.METADATA_CHANGE_LOG_VERSIONED, - containerFactory = "kafkaEventConsumer", + containerFactory = DEFAULT_EVENT_CONSUMER_NAME, properties = {"auto.offset.reset:earliest"}) public void receiveMCL(ConsumerRecord consumerRecord) { @@ -232,7 +233,7 @@ public void receiveMCL(ConsumerRecord consumerRecord) { @KafkaListener( id = "test-pe-consumer", topics = Topics.PLATFORM_EVENT, - containerFactory = "kafkaEventConsumer", + containerFactory = DEFAULT_EVENT_CONSUMER_NAME, properties = {"auto.offset.reset:earliest"}) public void receivePE(ConsumerRecord consumerRecord) { From 77394becd3173b4be112ff0a7f254312a7abe2fc Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Tue, 19 Nov 2024 10:58:42 -0600 Subject: [PATCH 03/10] Update v_0_3_7.md (#11895) --- docs/managed-datahub/release-notes/v_0_3_7.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/managed-datahub/release-notes/v_0_3_7.md b/docs/managed-datahub/release-notes/v_0_3_7.md index 59b7a23b5e5836..19cb04e9f56039 100644 --- a/docs/managed-datahub/release-notes/v_0_3_7.md +++ b/docs/managed-datahub/release-notes/v_0_3_7.md @@ -11,6 +11,11 @@ Recommended CLI/SDK If you are using an older CLI/SDK version, then please upgrade it. This applies for all CLI/SDK usages, if you are using it through your terminal, GitHub Actions, Airflow, in Python SDK somewhere, Java SDK, etc. This is a strong recommendation to upgrade, as we keep on pushing fixes in the CLI, and it helps us support you better. +## Known Issues + +### v0.3.7.3 + * Search page fails to render when filters are applied with a query which returns zero results. + ## Release Changelog --- From d97885749e75098f81d9d34ac78b0e32ee6f2561 Mon Sep 17 00:00:00 2001 From: Gabe Lyons Date: Tue, 19 Nov 2024 10:23:25 -0800 Subject: [PATCH 04/10] docs(structured props): fix a typo in structured property docs (#11887) --- docs/api/tutorials/structured-properties.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/api/tutorials/structured-properties.md b/docs/api/tutorials/structured-properties.md index 9b18aa922290b4..b606ce9a8e2455 100644 --- a/docs/api/tutorials/structured-properties.md +++ b/docs/api/tutorials/structured-properties.md @@ -66,7 +66,7 @@ mutation createStructuredProperty { qualifiedName:"retentionTime", displayName: "Retention Time", description: "Retention Time is used to figure out how long to retain records in a dataset", - valueType: "urn:li:dataType:number", + valueType: "urn:li:dataType:datahub.number", allowedValues: [ {numberValue: 30, description: "30 days, usually reserved for datasets that are ephemeral and contain pii"}, {numberValue: 90, description:"description: Use this for datasets that drive monthly reporting but contain pii"}, From 2d155ccaa9eb2966295d9c248fdb61a23354f305 Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Tue, 19 Nov 2024 14:33:47 -0600 Subject: [PATCH 05/10] feat(mcl-upgrade): implement resume & urn pagination (#11889) --- .../upgrade/system/AbstractMCLStep.java | 131 ++++++++++----- ...ateSchemaFieldsFromSchemaMetadataStep.java | 2 +- .../DatahubUpgradeNonBlockingTest.java | 149 +++++++++++++++++- ...pgradeCliApplicationTestConfiguration.java | 12 +- 4 files changed, 248 insertions(+), 46 deletions(-) diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/AbstractMCLStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/AbstractMCLStep.java index 6c70aee88675c5..cd7947ce3c11aa 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/AbstractMCLStep.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/AbstractMCLStep.java @@ -1,13 +1,12 @@ package com.linkedin.datahub.upgrade.system; -import static com.linkedin.metadata.Constants.DATA_HUB_UPGRADE_RESULT_ASPECT_NAME; - import com.linkedin.common.urn.Urn; import com.linkedin.datahub.upgrade.UpgradeContext; import com.linkedin.datahub.upgrade.UpgradeStep; import com.linkedin.datahub.upgrade.UpgradeStepResult; import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult; import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.aspect.SystemAspect; import com.linkedin.metadata.boot.BootstrapStep; import com.linkedin.metadata.entity.AspectDao; import com.linkedin.metadata.entity.EntityService; @@ -16,10 +15,13 @@ import com.linkedin.metadata.entity.ebean.PartitionedStream; import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs; import com.linkedin.metadata.utils.AuditStampUtils; +import com.linkedin.upgrade.DataHubUpgradeResult; import com.linkedin.upgrade.DataHubUpgradeState; import com.linkedin.util.Pair; import io.datahubproject.metadata.context.OperationContext; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.function.Function; @@ -33,6 +35,8 @@ */ @Slf4j public abstract class AbstractMCLStep implements UpgradeStep { + public static final String LAST_URN_KEY = "lastUrn"; + private final OperationContext opContext; private final EntityService entityService; private final AspectDao aspectDao; @@ -70,10 +74,30 @@ protected Urn getUpgradeIdUrn() { @Override public Function executable() { return (context) -> { + // Resume state + Optional prevResult = + context.upgrade().getUpgradeResult(opContext, getUpgradeIdUrn(), entityService); + String resumeUrn = + prevResult + .filter( + result -> + DataHubUpgradeState.IN_PROGRESS.equals(result.getState()) + && result.getResult() != null + && result.getResult().containsKey(LAST_URN_KEY)) + .map(result -> result.getResult().get(LAST_URN_KEY)) + .orElse(null); + if (resumeUrn != null) { + log.info("{}: Resuming from URN: {}", getUpgradeIdUrn(), resumeUrn); + } // re-using for configuring the sql scan RestoreIndicesArgs args = - new RestoreIndicesArgs().aspectName(getAspectName()).batchSize(batchSize).limit(limit); + new RestoreIndicesArgs() + .aspectName(getAspectName()) + .batchSize(batchSize) + .lastUrn(resumeUrn) + .urnBasedPagination(resumeUrn != null) + .limit(limit); if (getUrnLike() != null) { args = args.urnLike(getUrnLike()); @@ -86,40 +110,62 @@ public Function executable() { batch -> { log.info("Processing batch({}) of size {}.", getAspectName(), batchSize); - List, Boolean>> futures; - + List, SystemAspect>> futures; futures = EntityUtils.toSystemAspectFromEbeanAspects( opContext.getRetrieverContext().get(), batch.collect(Collectors.toList())) .stream() .map( - systemAspect -> - entityService.alwaysProduceMCLAsync( - opContext, - systemAspect.getUrn(), - systemAspect.getUrn().getEntityType(), - getAspectName(), - systemAspect.getAspectSpec(), - null, - systemAspect.getRecordTemplate(), - null, - systemAspect - .getSystemMetadata() - .setRunId(id()) - .setLastObserved(System.currentTimeMillis()), - AuditStampUtils.createDefaultAuditStamp(), - ChangeType.UPSERT)) - .collect(Collectors.toList()); - - futures.forEach( - f -> { - try { - f.getFirst().get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } - }); + systemAspect -> { + Pair, Boolean> future = + entityService.alwaysProduceMCLAsync( + opContext, + systemAspect.getUrn(), + systemAspect.getUrn().getEntityType(), + getAspectName(), + systemAspect.getAspectSpec(), + null, + systemAspect.getRecordTemplate(), + null, + systemAspect + .getSystemMetadata() + .setRunId(id()) + .setLastObserved(System.currentTimeMillis()), + AuditStampUtils.createDefaultAuditStamp(), + ChangeType.UPSERT); + return Pair., SystemAspect>of( + future.getFirst(), systemAspect); + }) + .toList(); + + SystemAspect lastAspect = + futures.stream() + .map( + f -> { + try { + f.getFirst().get(); + return f.getSecond(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }) + .reduce((a, b) -> b) + .orElse(null); + + // record progress + if (lastAspect != null) { + log.info( + "{}: Saving state. Last urn:{}", getUpgradeIdUrn(), lastAspect.getUrn()); + context + .upgrade() + .setUpgradeResult( + opContext, + getUpgradeIdUrn(), + entityService, + DataHubUpgradeState.IN_PROGRESS, + Map.of(LAST_URN_KEY, lastAspect.getUrn().toString())); + } if (batchDelayMs > 0) { log.info("Sleeping for {} ms", batchDelayMs); @@ -142,12 +188,23 @@ public Function executable() { @Override /** Returns whether the upgrade should be skipped. */ public boolean skip(UpgradeContext context) { - boolean previouslyRun = - entityService.exists( - opContext, getUpgradeIdUrn(), DATA_HUB_UPGRADE_RESULT_ASPECT_NAME, true); - if (previouslyRun) { - log.info("{} was already run. Skipping.", id()); + Optional prevResult = + context.upgrade().getUpgradeResult(opContext, getUpgradeIdUrn(), entityService); + + boolean previousRunFinal = + prevResult + .filter( + result -> + DataHubUpgradeState.SUCCEEDED.equals(result.getState()) + || DataHubUpgradeState.ABORTED.equals(result.getState())) + .isPresent(); + + if (previousRunFinal) { + log.info( + "{} was already run. State: {} Skipping.", + id(), + prevResult.map(DataHubUpgradeResult::getState)); } - return previouslyRun; + return previousRunFinal; } } diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/schemafield/GenerateSchemaFieldsFromSchemaMetadataStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/schemafield/GenerateSchemaFieldsFromSchemaMetadataStep.java index eece83f4ab713e..55bc8edbf6a768 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/schemafield/GenerateSchemaFieldsFromSchemaMetadataStep.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/schemafield/GenerateSchemaFieldsFromSchemaMetadataStep.java @@ -1,5 +1,6 @@ package com.linkedin.datahub.upgrade.system.schemafield; +import static com.linkedin.datahub.upgrade.system.AbstractMCLStep.LAST_URN_KEY; import static com.linkedin.metadata.Constants.APP_SOURCE; import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME; import static com.linkedin.metadata.Constants.SCHEMA_METADATA_ASPECT_NAME; @@ -61,7 +62,6 @@ */ @Slf4j public class GenerateSchemaFieldsFromSchemaMetadataStep implements UpgradeStep { - private static final String LAST_URN_KEY = "lastUrn"; private static final List REQUIRED_ASPECTS = List.of(SCHEMA_METADATA_ASPECT_NAME, STATUS_ASPECT_NAME); diff --git a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNonBlockingTest.java b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNonBlockingTest.java index f340e688ad7f77..21bc6b725cba2b 100644 --- a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNonBlockingTest.java +++ b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNonBlockingTest.java @@ -1,14 +1,18 @@ package com.linkedin.datahub.upgrade; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import static org.testng.AssertJUnit.assertNotNull; +import com.linkedin.data.template.StringMap; import com.linkedin.datahub.upgrade.impl.DefaultUpgradeManager; import com.linkedin.datahub.upgrade.system.SystemUpdateNonBlocking; import com.linkedin.datahub.upgrade.system.bootstrapmcps.BootstrapMCPStep; @@ -20,17 +24,30 @@ import com.linkedin.metadata.entity.AspectDao; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.entity.EntityServiceImpl; +import com.linkedin.metadata.entity.ebean.EbeanAspectV2; +import com.linkedin.metadata.entity.ebean.PartitionedStream; import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs; import com.linkedin.mxe.Topics; +import com.linkedin.upgrade.DataHubUpgradeResult; +import com.linkedin.upgrade.DataHubUpgradeState; +import com.linkedin.util.Pair; import io.datahubproject.metadata.context.OperationContext; import io.datahubproject.test.metadata.context.TestOperationContexts; +import java.sql.Timestamp; +import java.util.Arrays; import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.inject.Named; +import org.mockito.ArgumentCaptor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @ActiveProfiles("test") @@ -63,7 +80,12 @@ public class DatahubUpgradeNonBlockingTest extends AbstractTestNGSpringContextTe @Autowired private EntityServiceImpl entityService; - @Autowired private OperationContext opContext; + private OperationContext opContext; + + @BeforeClass + public void init() { + opContext = TestOperationContexts.systemContextNoValidate(); + } @Test public void testSystemUpdateNonBlockingInit() { @@ -81,10 +103,13 @@ public void testSystemUpdateNonBlockingInit() { } @Test - public void testReindexDataJobViaNodesCLLPaging() { + public void testReindexDataJobViaNodesCLLPagingArgs() { EntityService mockService = mock(EntityService.class); AspectDao mockAspectDao = mock(AspectDao.class); + PartitionedStream mockStream = mock(PartitionedStream.class); + when(mockStream.partition(anyInt())).thenReturn(Stream.empty()); + when(mockAspectDao.streamAspectBatches(any(RestoreIndicesArgs.class))).thenReturn(mockStream); ReindexDataJobViaNodesCLL cllUpgrade = new ReindexDataJobViaNodesCLL(opContext, mockService, mockAspectDao, true, 10, 0, 0); @@ -102,9 +127,79 @@ public void testReindexDataJobViaNodesCLLPaging() { .batchSize(10) .limit(0) .aspectName("dataJobInputOutput") + .urnBasedPagination(false) + .lastUrn(null) .urnLike("urn:li:dataJob:%"))); } + @Test + public void testReindexDataJobViaNodesCLLResumePaging() throws Exception { + // Mock services + EntityService mockService = mock(EntityService.class); + AspectDao mockAspectDao = mock(AspectDao.class); + + // Create test data + EbeanAspectV2 aspect1 = createMockEbeanAspect("urn:li:dataJob:job1", "dataJobInputOutput"); + EbeanAspectV2 aspect2 = createMockEbeanAspect("urn:li:dataJob:job2", "dataJobInputOutput"); + EbeanAspectV2 aspect3 = createMockEbeanAspect("urn:li:dataJob:job3", "dataJobInputOutput"); + List initialBatch = Arrays.asList(aspect1, aspect2); + List resumeBatch = Arrays.asList(aspect3); + + // Mock the stream for first batch + PartitionedStream initialStream = mock(PartitionedStream.class); + when(initialStream.partition(anyInt())).thenReturn(Stream.of(initialBatch.stream())); + + // Mock the stream for second batch + PartitionedStream resumeStream = mock(PartitionedStream.class); + when(resumeStream.partition(anyInt())).thenReturn(Stream.of(resumeBatch.stream())); + + // Setup the AspectDao using Answer to handle null safely + when(mockAspectDao.streamAspectBatches(any(RestoreIndicesArgs.class))) + .thenAnswer( + invocation -> { + RestoreIndicesArgs args = invocation.getArgument(0); + if (args.lastUrn() == null) { + return initialStream; + } else if ("urn:li:dataJob:job2".equals(args.lastUrn())) { + return resumeStream; + } + return mock(PartitionedStream.class); + }); + + // Mock successful MCL production + when(mockService.alwaysProduceMCLAsync( + any(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any())) + .thenReturn(Pair.of(CompletableFuture.completedFuture(null), true)); + + // Create the upgrade + ReindexDataJobViaNodesCLL cllUpgrade = + new ReindexDataJobViaNodesCLL(opContext, mockService, mockAspectDao, true, 2, 0, 0); + + // Initial Run + cllUpgrade.steps().get(0).executable().apply(createMockInitialUpgrade()); + + // Resumed + cllUpgrade.steps().get(0).executable().apply(createMockResumeUpgrade()); + + // Use ArgumentCaptor to verify the calls + ArgumentCaptor argsCaptor = + ArgumentCaptor.forClass(RestoreIndicesArgs.class); + verify(mockAspectDao, times(2)).streamAspectBatches(argsCaptor.capture()); + + List capturedArgs = argsCaptor.getAllValues(); + + // Verify both the initial and resume calls were made with correct arguments + assertEquals(capturedArgs.get(0).lastUrn(), null); + assertEquals(capturedArgs.get(0).urnBasedPagination(), false); + assertEquals(capturedArgs.get(1).lastUrn(), "urn:li:dataJob:job2"); + assertEquals(capturedArgs.get(1).urnBasedPagination(), true); + + // Verify MCL production was called for each aspect + verify(mockService, times(3)) + .alwaysProduceMCLAsync( + any(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any()); + } + @Test public void testNonBlockingBootstrapMCP() { List mcpTemplate = @@ -123,4 +218,54 @@ public void testNonBlockingBootstrapMCP() { .map(update -> update.getMcpTemplate().getName()) .collect(Collectors.toSet()))); } + + private UpgradeContext createMockInitialUpgrade() { + // Mock the Upgrade instance + Upgrade mockUpgrade = mock(Upgrade.class); + + // Configure the mock upgrade to return no previous result + when(mockUpgrade.getUpgradeResult(any(), any(), any())).thenReturn(Optional.empty()); + + UpgradeContext mockInitialContext = mock(UpgradeContext.class); + when(mockInitialContext.opContext()).thenReturn(opContext); + when(mockInitialContext.upgrade()).thenReturn(mockUpgrade); + when(mockInitialContext.report()).thenReturn(mock(UpgradeReport.class)); + + return mockInitialContext; + } + + private UpgradeContext createMockResumeUpgrade() { + // Mock the Upgrade instance + Upgrade mockUpgrade = mock(Upgrade.class); + DataHubUpgradeResult mockPrevResult = mock(DataHubUpgradeResult.class); + + // Configure the mock previous result + when(mockPrevResult.getState()).thenReturn(DataHubUpgradeState.IN_PROGRESS); + when(mockPrevResult.getResult()) + .thenReturn(new StringMap(Map.of("lastUrn", "urn:li:dataJob:job2"))); + + // Configure the mock upgrade to return our previous result + when(mockUpgrade.getUpgradeResult(any(), any(), any())).thenReturn(Optional.of(mockPrevResult)); + + UpgradeContext mockResumeContext = mock(UpgradeContext.class); + when(mockResumeContext.opContext()).thenReturn(opContext); + when(mockResumeContext.upgrade()).thenReturn(mockUpgrade); + when(mockResumeContext.report()).thenReturn(mock(UpgradeReport.class)); + + return mockResumeContext; + } + + private static EbeanAspectV2 createMockEbeanAspect(String urn, String aspectName) { + Timestamp now = new Timestamp(System.currentTimeMillis()); + return new EbeanAspectV2( + urn, + aspectName, + 0L, + "{}", // metadata + now, // createdOn + "urn:li:corpuser:testUser", // createdBy + null, // createdFor + null // systemMetadata + ); + } } diff --git a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTestConfiguration.java b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTestConfiguration.java index 81d883d8ce36b7..5b7b8756f11fb1 100644 --- a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTestConfiguration.java +++ b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTestConfiguration.java @@ -19,17 +19,17 @@ @Import(value = {SystemAuthenticationFactory.class}) public class UpgradeCliApplicationTestConfiguration { - @MockBean private UpgradeCli upgradeCli; + @MockBean public UpgradeCli upgradeCli; - @MockBean private Database ebeanServer; + @MockBean public Database ebeanServer; - @MockBean private SearchService searchService; + @MockBean public SearchService searchService; - @MockBean private GraphService graphService; + @MockBean public GraphService graphService; - @MockBean private EntityRegistry entityRegistry; + @MockBean public EntityRegistry entityRegistry; - @MockBean ConfigEntityRegistry configEntityRegistry; + @MockBean public ConfigEntityRegistry configEntityRegistry; @MockBean public EntityIndexBuilders entityIndexBuilders; From 44affd7f8211cb902112156660666b05b5f4dbe6 Mon Sep 17 00:00:00 2001 From: Chris Collins Date: Tue, 19 Nov 2024 17:24:17 -0500 Subject: [PATCH 06/10] fix(ui) Fix merging siblings schema with mix of v1 & v2 fields (#11837) --- .../src/app/entity/shared/siblingUtils.ts | 33 ++++++++++++++----- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/datahub-web-react/src/app/entity/shared/siblingUtils.ts b/datahub-web-react/src/app/entity/shared/siblingUtils.ts index 2f50dc99df191b..aa9e4bcb5e46e1 100644 --- a/datahub-web-react/src/app/entity/shared/siblingUtils.ts +++ b/datahub-web-react/src/app/entity/shared/siblingUtils.ts @@ -5,6 +5,7 @@ import * as QueryString from 'query-string'; import { Dataset, Entity, Maybe, SiblingProperties } from '../../../types.generated'; import { GenericEntityProperties } from './types'; import { useIsShowSeparateSiblingsEnabled } from '../../useAppConfig'; +import { downgradeV2FieldPath } from '../dataset/profile/schema/utils/utils'; export function stripSiblingsFromEntity(entity: any) { return { @@ -55,16 +56,30 @@ const combineMerge = (target, source, options) => { return destination; }; -function convertObjectKeysToLowercase(object: Record) { - return Object.fromEntries(Object.entries(object).map(([key, value]) => [key.toLowerCase(), value])); +// this function is responsible for normalizing object keys to make sure merging on key matches keys appropriately +function normalizeObjectKeys(object: Record, isSchemaField = false) { + return Object.fromEntries( + Object.entries(object).map(([key, value]) => { + let normalizedKey = key.toLowerCase(); + if (isSchemaField) { + normalizedKey = downgradeV2FieldPath(normalizedKey) || normalizedKey; + } + return [normalizedKey, value]; + }), + ); } // use when you want to merge an array of objects by key in the object as opposed to by index of array -const mergeArrayOfObjectsByKey = (destinationArray: any[], sourceArray: any[], key: string) => { - const destination = convertObjectKeysToLowercase(keyBy(destinationArray, key)); - const source = convertObjectKeysToLowercase(keyBy(sourceArray, key)); +const mergeArrayOfObjectsByKey = (destinationArray: any[], sourceArray: any[], key: string, isSchemaField = false) => { + const destination = normalizeObjectKeys(keyBy(destinationArray, key), isSchemaField); + const source = normalizeObjectKeys(keyBy(sourceArray, key), isSchemaField); - return values(merge(destination, source)); + return values( + merge(destination, source, { + arrayMerge: combineMerge, + customMerge, + }), + ); }; const mergeTags = (destinationArray, sourceArray, _options) => { @@ -88,7 +103,7 @@ const mergeOwners = (destinationArray, sourceArray, _options) => { }; const mergeFields = (destinationArray, sourceArray, _options) => { - return mergeArrayOfObjectsByKey(destinationArray, sourceArray, 'fieldPath'); + return mergeArrayOfObjectsByKey(destinationArray, sourceArray, 'fieldPath', true); }; function getArrayMergeFunction(key) { @@ -112,7 +127,7 @@ function getArrayMergeFunction(key) { } } -const customMerge = (isPrimary, key) => { +function customMerge(isPrimary, key) { if (key === 'upstream' || key === 'downstream') { return (_secondary, primary) => primary; } @@ -145,7 +160,7 @@ const customMerge = (isPrimary, key) => { customMerge: customMerge.bind({}, isPrimary), }); }; -}; +} export const getEntitySiblingData = (baseEntity: T): Maybe => { if (!baseEntity) { From 85c8e605be045deb59f7548380b550d12e70c900 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 19 Nov 2024 15:06:16 -0800 Subject: [PATCH 07/10] fix(ingest): consider sql parsing fallback as failure (#11896) --- metadata-ingestion/src/datahub/cli/check_cli.py | 4 +++- .../src/datahub/sql_parsing/sqlglot_lineage.py | 9 +++++++++ .../goldens/test_sqlite_attach_database.json | 12 ++++++++++++ .../tests/unit/sql_parsing/test_sqlglot_lineage.py | 11 +++++++++++ 4 files changed, 35 insertions(+), 1 deletion(-) create mode 100644 metadata-ingestion/tests/unit/sql_parsing/goldens/test_sqlite_attach_database.json diff --git a/metadata-ingestion/src/datahub/cli/check_cli.py b/metadata-ingestion/src/datahub/cli/check_cli.py index 39ed1b2bfea087..fbe07b64f0e154 100644 --- a/metadata-ingestion/src/datahub/cli/check_cli.py +++ b/metadata-ingestion/src/datahub/cli/check_cli.py @@ -268,7 +268,9 @@ def sql_lineage( ) logger.debug("Sql parsing debug info: %s", lineage.debug_info) - if lineage.debug_info.error: + if lineage.debug_info.table_error: + raise lineage.debug_info.table_error + elif lineage.debug_info.error: logger.debug("Sql parsing error details", exc_info=lineage.debug_info.error) click.echo(lineage.json(indent=4)) diff --git a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py index b635f8cb47b6d2..506bd1d8c6be40 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py @@ -904,6 +904,15 @@ def _sqlglot_lineage_inner( logger.debug("Parsing lineage from sql statement: %s", sql) statement = parse_statement(sql, dialect=dialect) + if isinstance(statement, sqlglot.exp.Command): + # For unsupported syntax, sqlglot will usually fallback to parsing as a Command. + # This is effectively a parsing error, and we won't get any lineage from it. + # See https://github.com/tobymao/sqlglot/commit/3a13fdf4e597a2f0a3f9fc126a129183fe98262f + # and https://github.com/tobymao/sqlglot/pull/2874 + raise UnsupportedStatementTypeError( + f"Got unsupported syntax for statement: {sql}" + ) + original_statement, statement = statement, statement.copy() # logger.debug( # "Formatted sql statement: %s", diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_sqlite_attach_database.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_sqlite_attach_database.json new file mode 100644 index 00000000000000..bcf31f6be803a2 --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_sqlite_attach_database.json @@ -0,0 +1,12 @@ +{ + "query_type": "UNKNOWN", + "query_type_props": {}, + "query_fingerprint": null, + "in_tables": [], + "out_tables": [], + "column_lineage": null, + "debug_info": { + "confidence": 0.0, + "generalized_statement": null + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py b/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py index 90cc863d6bd231..170341230205f3 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py +++ b/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py @@ -1268,3 +1268,14 @@ def test_bigquery_subquery_column_inference() -> None: dialect="bigquery", expected_file=RESOURCE_DIR / "test_bigquery_subquery_column_inference.json", ) + + +def test_sqlite_attach_database() -> None: + assert_sql_result( + """\ +ATTACH DATABASE ':memory:' AS aux1 +""", + dialect="sqlite", + expected_file=RESOURCE_DIR / "test_sqlite_attach_database.json", + allow_table_error=True, + ) From 1f396e87c1c48ad7b8a9996dc94c227ffd53e876 Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Wed, 20 Nov 2024 00:13:24 +0100 Subject: [PATCH 08/10] feat(spark): OpenLineage 1.24.2 upgrade (#11830) --- build.gradle | 2 +- entity-registry/build.gradle | 2 ++ .../java/acryl-spark-lineage/README.md | 1 + .../java/acryl-spark-lineage/build.gradle | 35 ++++++++++--------- .../datahub/spark/DatahubSparkListener.java | 8 +++-- .../datahub/spark/conf/SparkConfigParser.java | 2 ++ .../spark/agent/util/PlanUtils.java | 8 ++--- .../spark/agent/util/RddPathUtils.java | 18 ++++++---- .../java/datahub-client/build.gradle | 28 ++++++++++----- .../rest/DatahubHttpRequestRetryStrategy.java | 1 - .../java/datahub/client/rest/RestEmitter.java | 33 +++++++++++------ .../client/rest/RestEmitterConfig.java | 2 ++ .../java/openlineage-converter/build.gradle | 2 +- 13 files changed, 89 insertions(+), 53 deletions(-) diff --git a/build.gradle b/build.gradle index 6e6dadb7ebfa34..9ee756d41e11ef 100644 --- a/build.gradle +++ b/build.gradle @@ -56,7 +56,7 @@ buildscript { ext.hazelcastVersion = '5.3.6' ext.ebeanVersion = '15.5.2' ext.googleJavaFormatVersion = '1.18.1' - ext.openLineageVersion = '1.19.0' + ext.openLineageVersion = '1.24.2' ext.logbackClassicJava8 = '1.2.12' ext.docker_registry = 'acryldata' diff --git a/entity-registry/build.gradle b/entity-registry/build.gradle index 2dedea1f16d99c..22e5b601d39db2 100644 --- a/entity-registry/build.gradle +++ b/entity-registry/build.gradle @@ -25,6 +25,8 @@ dependencies { because("previous versions are vulnerable to CVE-2022-25857") } } + api project(path: ':li-utils') + api project(path: ':li-utils', configuration: "dataTemplate") dataModel project(':li-utils') annotationProcessor externalDependency.lombok diff --git a/metadata-integration/java/acryl-spark-lineage/README.md b/metadata-integration/java/acryl-spark-lineage/README.md index bd0a58b635b483..267e979b0fa073 100644 --- a/metadata-integration/java/acryl-spark-lineage/README.md +++ b/metadata-integration/java/acryl-spark-lineage/README.md @@ -165,6 +165,7 @@ information like tokens. | spark.datahub.rest.server | | http://localhost:8080 | Datahub server url eg: | | spark.datahub.rest.token | | | Authentication token. | | spark.datahub.rest.disable_ssl_verification | | false | Disable SSL certificate validation. Caution: Only use this if you know what you are doing! | +| spark.datahub.rest.disable_chunked_encoding | | false | Disable Chunked Transfer Encoding. In some environment chunked encoding causes issues. With this config option it can be disabled. || | spark.datahub.rest.max_retries | | 0 | Number of times a request retried if failed | | spark.datahub.rest.retry_interval | | 10 | Number of seconds to wait between retries | | spark.datahub.file.filename | | | The file where metadata will be written if file emitter is set | diff --git a/metadata-integration/java/acryl-spark-lineage/build.gradle b/metadata-integration/java/acryl-spark-lineage/build.gradle index 6620c34021ac4a..3f83e5657bbf4d 100644 --- a/metadata-integration/java/acryl-spark-lineage/build.gradle +++ b/metadata-integration/java/acryl-spark-lineage/build.gradle @@ -1,7 +1,7 @@ plugins { id("com.palantir.git-version") apply false } -apply plugin: 'java' +apply plugin: 'java-library' apply plugin: 'com.github.johnrengelman.shadow' apply plugin: 'signing' apply plugin: 'io.codearte.nexus-staging' @@ -51,8 +51,8 @@ dependencies { implementation project(':metadata-integration:java:openlineage-converter') - implementation project(path: ':metadata-integration:java:datahub-client', configuration: 'shadow') - implementation project(path: ':metadata-integration:java:openlineage-converter', configuration: 'shadow') + implementation project(path: ':metadata-integration:java:datahub-client') + implementation project(path: ':metadata-integration:java:openlineage-converter') //implementation "io.acryl:datahub-client:0.10.2" implementation "io.openlineage:openlineage-spark_2.12:$openLineageVersion" @@ -91,6 +91,8 @@ shadowJar { zip64 = true archiveClassifier = '' mergeServiceFiles() + project.configurations.implementation.canBeResolved = true + configurations = [project.configurations.implementation] def exclude_modules = project .configurations @@ -106,6 +108,8 @@ shadowJar { exclude(dependency { exclude_modules.contains(it.name) }) + exclude(dependency("org.slf4j::")) + exclude("org/apache/commons/logging/**") } // preventing java multi-release JAR leakage @@ -123,39 +127,36 @@ shadowJar { relocate 'com.sun.activation', 'io.acryl.shaded.com.sun.activation' relocate 'com.sun.codemodel', 'io.acryl.shaded.com.sun.codemodel' relocate 'com.sun.mail', 'io.acryl.shaded.com.sun.mail' - relocate 'com.fasterxml.jackson', 'datahub.spark2.shaded.jackson' - relocate 'org.slf4j', 'datahub.spark2.shaded.org.slf4j' // relocate 'org.apache.hc', 'io.acryl.shaded.http' - relocate 'org.apache.commons.codec', 'datahub.spark2.shaded.o.a.c.codec' - relocate 'org.apache.commons.compress', 'datahub.spark2.shaded.o.a.c.compress' - relocate 'org.apache.commons.lang3', 'datahub.spark2.shaded.o.a.c.lang3' + relocate 'org.apache.commons.codec', 'io.acryl.shaded.org.apache.commons.codec' + relocate 'org.apache.commons.compress', 'io.acryl.shaded.org.apache.commons.compress' + relocate 'org.apache.commons.lang3', 'io.acryl.shaded.org.apache.commons.lang3' relocate 'mozilla', 'datahub.spark2.shaded.mozilla' - relocate 'com.typesafe', 'datahub.spark2.shaded.typesafe' - relocate 'io.opentracing', 'datahub.spark2.shaded.io.opentracing' - relocate 'io.netty', 'datahub.spark2.shaded.io.netty' - relocate 'ch.randelshofer', 'datahub.spark2.shaded.ch.randelshofer' - relocate 'ch.qos', 'datahub.spark2.shaded.ch.qos' + relocate 'com.typesafe', 'io.acryl.shaded.com.typesafe' + relocate 'io.opentracing', 'io.acryl.shaded.io.opentracing' + relocate 'io.netty', 'io.acryl.shaded.io.netty' + relocate 'ch.randelshofer', 'io.acryl.shaded.ch.randelshofer' + relocate 'ch.qos', 'io.acryl.shaded.ch.qos' relocate 'org.springframework', 'io.acryl.shaded.org.springframework' relocate 'com.fasterxml.jackson', 'io.acryl.shaded.jackson' relocate 'org.yaml', 'io.acryl.shaded.org.yaml' // Required for shading snakeyaml relocate 'net.jcip.annotations', 'io.acryl.shaded.annotations' relocate 'javassist', 'io.acryl.shaded.javassist' relocate 'edu.umd.cs.findbugs', 'io.acryl.shaded.findbugs' - relocate 'org.antlr', 'io.acryl.shaded.org.antlr' - relocate 'antlr', 'io.acryl.shaded.antlr' + //relocate 'org.antlr', 'io.acryl.shaded.org.antlr' + //relocate 'antlr', 'io.acryl.shaded.antlr' relocate 'com.google.common', 'io.acryl.shaded.com.google.common' - relocate 'org.apache.commons', 'io.acryl.shaded.org.apache.commons' relocate 'org.reflections', 'io.acryl.shaded.org.reflections' relocate 'st4hidden', 'io.acryl.shaded.st4hidden' relocate 'org.stringtemplate', 'io.acryl.shaded.org.stringtemplate' relocate 'org.abego.treelayout', 'io.acryl.shaded.treelayout' - relocate 'org.slf4j', 'io.acryl.shaded.slf4j' relocate 'javax.annotation', 'io.acryl.shaded.javax.annotation' relocate 'com.github.benmanes.caffeine', 'io.acryl.shaded.com.github.benmanes.caffeine' relocate 'org.checkerframework', 'io.acryl.shaded.org.checkerframework' relocate 'com.google.errorprone', 'io.acryl.shaded.com.google.errorprone' relocate 'com.sun.jna', 'io.acryl.shaded.com.sun.jna' + } checkShadowJar { diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java index ee0938edb50454..b594f6bae954fa 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java @@ -120,7 +120,9 @@ public Optional initializeEmitter(Config sparkConf) { boolean disableSslVerification = sparkConf.hasPath(SparkConfigParser.DISABLE_SSL_VERIFICATION_KEY) && sparkConf.getBoolean(SparkConfigParser.DISABLE_SSL_VERIFICATION_KEY); - + boolean disableChunkedEncoding = + sparkConf.hasPath(SparkConfigParser.REST_DISABLE_CHUNKED_ENCODING) + && sparkConf.getBoolean(SparkConfigParser.REST_DISABLE_CHUNKED_ENCODING); int retry_interval_in_sec = sparkConf.hasPath(SparkConfigParser.RETRY_INTERVAL_IN_SEC) ? sparkConf.getInt(SparkConfigParser.RETRY_INTERVAL_IN_SEC) @@ -150,6 +152,7 @@ public Optional initializeEmitter(Config sparkConf) { .disableSslVerification(disableSslVerification) .maxRetries(max_retries) .retryIntervalSec(retry_interval_in_sec) + .disableChunkedEncoding(disableChunkedEncoding) .build(); return Optional.of(new RestDatahubEmitterConfig(restEmitterConf)); case "kafka": @@ -374,7 +377,8 @@ private static void initializeMetrics(OpenLineageConfig openLineageConfig) { String disabledFacets; if (openLineageConfig.getFacetsConfig() != null && openLineageConfig.getFacetsConfig().getDisabledFacets() != null) { - disabledFacets = String.join(";", openLineageConfig.getFacetsConfig().getDisabledFacets()); + disabledFacets = + String.join(";", openLineageConfig.getFacetsConfig().getEffectiveDisabledFacets()); } else { disabledFacets = ""; } diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java index 45ec5365d09b36..3860285083c4bb 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java @@ -30,6 +30,8 @@ public class SparkConfigParser { public static final String GMS_AUTH_TOKEN = "rest.token"; public static final String FILE_EMITTER_FILE_NAME = "file.filename"; public static final String DISABLE_SSL_VERIFICATION_KEY = "rest.disable_ssl_verification"; + public static final String REST_DISABLE_CHUNKED_ENCODING = "rest.disable_chunked_encoding"; + public static final String MAX_RETRIES = "rest.max_retries"; public static final String RETRY_INTERVAL_IN_SEC = "rest.retry_interval_in_sec"; public static final String KAFKA_MCP_TOPIC = "kafka.mcp_topic"; diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/PlanUtils.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/PlanUtils.java index d46d741d155b8b..5f87df2a65d6c2 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/PlanUtils.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/PlanUtils.java @@ -5,14 +5,13 @@ package io.openlineage.spark.agent.util; -import static io.openlineage.spark.agent.lifecycle.ExecutionContext.CAMEL_TO_SNAKE_CASE; - import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import datahub.spark.conf.SparkLineageConf; import io.datahubproject.openlineage.dataset.HdfsPathDataset; import io.openlineage.client.OpenLineage; import io.openlineage.spark.agent.Versions; +import io.openlineage.spark.api.naming.NameNormalizer; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; @@ -21,7 +20,6 @@ import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Locale; import java.util.Objects; import java.util.Optional; import java.util.UUID; @@ -186,7 +184,7 @@ public static OpenLineage.ParentRunFacet parentRunFacet( .run(new OpenLineage.ParentRunFacetRunBuilder().runId(parentRunId).build()) .job( new OpenLineage.ParentRunFacetJobBuilder() - .name(parentJob.replaceAll(CAMEL_TO_SNAKE_CASE, "_$1").toLowerCase(Locale.ROOT)) + .name(NameNormalizer.normalize(parentJob)) .namespace(parentJobNamespace) .build()) .build(); @@ -287,8 +285,6 @@ public static boolean safeIsDefinedAt(PartialFunction pfn, Object x) { * @param pfn * @param x * @return - * @param - * @param */ public static List safeApply(PartialFunction> pfn, D x) { try { diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/RddPathUtils.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/RddPathUtils.java index 62005bf15f8505..6ef7403362a909 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/RddPathUtils.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/RddPathUtils.java @@ -7,6 +7,7 @@ import java.util.Arrays; import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.reflect.FieldUtils; @@ -18,6 +19,7 @@ import org.apache.spark.rdd.MapPartitionsRDD; import org.apache.spark.rdd.ParallelCollectionRDD; import org.apache.spark.rdd.RDD; +import org.apache.spark.sql.execution.datasources.FilePartition; import org.apache.spark.sql.execution.datasources.FileScanRDD; import scala.Tuple2; import scala.collection.immutable.Seq; @@ -90,7 +92,7 @@ public boolean isDefinedAt(Object rdd) { @SuppressWarnings("PMD.AvoidLiteralsInIfCondition") public Stream extract(FileScanRDD rdd) { return ScalaConversionUtils.fromSeq(rdd.filePartitions()).stream() - .flatMap(fp -> Arrays.stream(fp.files())) + .flatMap((FilePartition fp) -> Arrays.stream(fp.files())) .map( f -> { if ("3.4".compareTo(package$.MODULE$.SPARK_VERSION()) <= 0) { @@ -115,11 +117,15 @@ public boolean isDefinedAt(Object rdd) { @Override public Stream extract(ParallelCollectionRDD rdd) { + int SEQ_LIMIT = 1000; + AtomicBoolean loggingDone = new AtomicBoolean(false); try { Object data = FieldUtils.readField(rdd, "data", true); log.debug("ParallelCollectionRDD data: {}", data); - if (data instanceof Seq) { - return ScalaConversionUtils.fromSeq((Seq) data).stream() + if ((data instanceof Seq) && ((Seq) data).head() instanceof Tuple2) { + // exit if the first element is invalid + Seq data_slice = (Seq) ((Seq) data).slice(0, SEQ_LIMIT); + return ScalaConversionUtils.fromSeq(data_slice).stream() .map( el -> { Path path = null; @@ -127,9 +133,9 @@ public Stream extract(ParallelCollectionRDD rdd) { // we're able to extract path path = parentOf(((Tuple2) el)._1.toString()); log.debug("Found input {}", path); - } else { - // Change to debug to silence error - log.debug("unable to extract Path from {}", el.getClass().getCanonicalName()); + } else if (!loggingDone.get()) { + log.warn("unable to extract Path from {}", el.getClass().getCanonicalName()); + loggingDone.set(true); } return path; }) diff --git a/metadata-integration/java/datahub-client/build.gradle b/metadata-integration/java/datahub-client/build.gradle index d9087347e1b5c6..1bdc848d0385b1 100644 --- a/metadata-integration/java/datahub-client/build.gradle +++ b/metadata-integration/java/datahub-client/build.gradle @@ -1,6 +1,6 @@ plugins { id("com.palantir.git-version") apply false - id 'java' + id 'java-library' id 'com.github.johnrengelman.shadow' id 'jacoco' id 'signing' @@ -12,11 +12,13 @@ apply from: "../versioning.gradle" import org.apache.tools.ant.filters.ReplaceTokens -jar.enabled = false // Since we only want to build shadow jars, disabling the regular jar creation +jar { + archiveClassifier = "lib" +} dependencies { - implementation project(':entity-registry') - implementation project(':metadata-integration:java:datahub-event') + api project(':entity-registry') + api project(':metadata-integration:java:datahub-event') implementation(externalDependency.kafkaAvroSerializer) { exclude group: "org.apache.avro" } @@ -33,7 +35,7 @@ dependencies { implementation externalDependency.jacksonDataBind runtimeOnly externalDependency.jna - implementation externalDependency.slf4jApi + api externalDependency.slf4jApi compileOnly externalDependency.lombok annotationProcessor externalDependency.lombok // VisibleForTesting @@ -78,6 +80,11 @@ shadowJar { // https://github.com/johnrengelman/shadow/issues/729 exclude('module-info.class', 'META-INF/versions/**', '**/LICENSE', '**/LICENSE*.txt', '**/NOTICE', '**/NOTICE.txt', 'licenses/**', 'log4j2.*', 'log4j.*') + dependencies { + exclude(dependency("org.slf4j::")) + exclude(dependency("antlr::")) + exclude("org/apache/commons/logging/**") + } mergeServiceFiles() // we relocate namespaces manually, because we want to know exactly which libs we are exposing and why // we can move to automatic relocation using ConfigureShadowRelocation after we get to a good place on these first @@ -88,15 +95,20 @@ shadowJar { relocate 'javassist', 'datahub.shaded.javassist' relocate 'edu.umd.cs.findbugs', 'datahub.shaded.findbugs' relocate 'org.antlr', 'datahub.shaded.org.antlr' - relocate 'antlr', 'datahub.shaded.antlr' + //relocate 'antlr', 'datahub.shaded.antlr' relocate 'com.google.common', 'datahub.shaded.com.google.common' - relocate 'org.apache.commons', 'datahub.shaded.org.apache.commons' + relocate 'org.apache.commons.codec', 'datahub.shaded.org.apache.commons.codec' + relocate 'org.apache.commons.compress', 'datahub.shaded.org.apache.commons.compress' + relocate 'org.apache.commons.lang3', 'datahub.shaded.org.apache.commons.lang3' + relocate 'org.apache.commons.lang', 'datahub.shaded.org.apache.commons.lang' + relocate 'org.apache.commons.cli', 'datahub.shaded.org.apache.commons.cli' + relocate 'org.apache.commons.text', 'datahub.shaded.org.apache.commons.text' + relocate 'org.apache.commons.io', 'datahub.shaded.org.apache.commons.io' relocate 'org.apache.maven', 'datahub.shaded.org.apache.maven' relocate 'org.reflections', 'datahub.shaded.org.reflections' relocate 'st4hidden', 'datahub.shaded.st4hidden' relocate 'org.stringtemplate', 'datahub.shaded.org.stringtemplate' relocate 'org.abego.treelayout', 'datahub.shaded.treelayout' - relocate 'org.slf4j', 'datahub.shaded.slf4j' relocate 'javax.annotation', 'datahub.shaded.javax.annotation' relocate 'com.github.benmanes.caffeine', 'datahub.shaded.com.github.benmanes.caffeine' relocate 'org.checkerframework', 'datahub.shaded.org.checkerframework' diff --git a/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/DatahubHttpRequestRetryStrategy.java b/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/DatahubHttpRequestRetryStrategy.java index 71a4b93baf48f4..50c0277c98b03b 100644 --- a/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/DatahubHttpRequestRetryStrategy.java +++ b/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/DatahubHttpRequestRetryStrategy.java @@ -48,7 +48,6 @@ public boolean retryRequest( @Override public boolean retryRequest(HttpResponse response, int execCount, HttpContext context) { - log.warn("Retrying request due to error: {}", response); return super.retryRequest(response, execCount, context); } } diff --git a/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitter.java b/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitter.java index e1017372be124b..d70c5baf10879d 100644 --- a/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitter.java +++ b/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitter.java @@ -1,6 +1,7 @@ package datahub.client.rest; import static com.linkedin.metadata.Constants.*; +import static org.apache.hc.core5.http.HttpHeaders.*; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.core.StreamReadConstraints; @@ -18,6 +19,7 @@ import datahub.event.UpsertAspectRequest; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.security.KeyManagementException; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; @@ -26,6 +28,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import javax.annotation.concurrent.ThreadSafe; @@ -97,17 +100,20 @@ public RestEmitter(RestEmitterConfig config) { this.config = config; HttpAsyncClientBuilder httpClientBuilder = this.config.getAsyncHttpClientBuilder(); httpClientBuilder.setRetryStrategy(new DatahubHttpRequestRetryStrategy()); - - // Override httpClient settings with RestEmitter configs if present - if (config.getTimeoutSec() != null) { - httpClientBuilder.setDefaultRequestConfig( - RequestConfig.custom() - .setConnectionRequestTimeout( - config.getTimeoutSec() * 1000, java.util.concurrent.TimeUnit.MILLISECONDS) - .setResponseTimeout( - config.getTimeoutSec() * 1000, java.util.concurrent.TimeUnit.MILLISECONDS) - .build()); + if ((config.getTimeoutSec() != null) || (config.isDisableChunkedEncoding())) { + RequestConfig.Builder requestConfigBuilder = RequestConfig.custom(); + // Override httpClient settings with RestEmitter configs if present + if (config.getTimeoutSec() != null) { + requestConfigBuilder + .setConnectionRequestTimeout(config.getTimeoutSec() * 1000, TimeUnit.MILLISECONDS) + .setResponseTimeout(config.getTimeoutSec() * 1000, TimeUnit.MILLISECONDS); + } + if (config.isDisableChunkedEncoding()) { + requestConfigBuilder.setContentCompressionEnabled(false); + } + httpClientBuilder.setDefaultRequestConfig(requestConfigBuilder.build()); } + PoolingAsyncClientConnectionManagerBuilder poolingAsyncClientConnectionManagerBuilder = PoolingAsyncClientConnectionManagerBuilder.create(); @@ -223,8 +229,13 @@ private Future postGeneric( if (this.config.getToken() != null) { simpleRequestBuilder.setHeader("Authorization", "Bearer " + this.config.getToken()); } + if (this.config.isDisableChunkedEncoding()) { + byte[] payloadBytes = payloadJson.getBytes(StandardCharsets.UTF_8); + simpleRequestBuilder.setBody(payloadBytes, ContentType.APPLICATION_JSON); + } else { + simpleRequestBuilder.setBody(payloadJson, ContentType.APPLICATION_JSON); + } - simpleRequestBuilder.setBody(payloadJson, ContentType.APPLICATION_JSON); AtomicReference responseAtomicReference = new AtomicReference<>(); CountDownLatch responseLatch = new CountDownLatch(1); FutureCallback httpCallback = diff --git a/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitterConfig.java b/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitterConfig.java index e28ad4ed660f0b..55c11aab0ebf3c 100644 --- a/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitterConfig.java +++ b/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitterConfig.java @@ -30,6 +30,8 @@ public class RestEmitterConfig { Integer timeoutSec; @Builder.Default boolean disableSslVerification = false; + @Builder.Default boolean disableChunkedEncoding = false; + @Builder.Default int maxRetries = 0; @Builder.Default int retryIntervalSec = 10; diff --git a/metadata-integration/java/openlineage-converter/build.gradle b/metadata-integration/java/openlineage-converter/build.gradle index 2e04881ab5ccda..d149104f089b36 100644 --- a/metadata-integration/java/openlineage-converter/build.gradle +++ b/metadata-integration/java/openlineage-converter/build.gradle @@ -1,4 +1,4 @@ -apply plugin: 'java' +apply plugin: 'java-library' apply plugin: 'com.github.johnrengelman.shadow' apply plugin: 'signing' apply plugin: 'maven-publish' From 8638bf974a00cb18c837616ed69b794b90de720f Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Tue, 19 Nov 2024 17:51:58 -0600 Subject: [PATCH 09/10] chore(cleanup): remove unused UrnUtils function (#11897) --- .../com/linkedin/common/urn/UrnUtils.java | 22 ------------------- 1 file changed, 22 deletions(-) diff --git a/li-utils/src/main/javaPegasus/com/linkedin/common/urn/UrnUtils.java b/li-utils/src/main/javaPegasus/com/linkedin/common/urn/UrnUtils.java index 89f0cd8fbc9791..0a2400badfc627 100644 --- a/li-utils/src/main/javaPegasus/com/linkedin/common/urn/UrnUtils.java +++ b/li-utils/src/main/javaPegasus/com/linkedin/common/urn/UrnUtils.java @@ -27,28 +27,6 @@ public static DatasetUrn toDatasetUrn( new DataPlatformUrn(platformName), datasetName, FabricType.valueOf(origin.toUpperCase())); } - /** - * Convert fabric String to FabricType - * - * @param fabric PROD, CORP, EI, DEV, LIT, PRIME - * @return FabricType - */ - @Nonnull - public static FabricType toFabricType(@Nonnull String fabric) { - switch (fabric.toUpperCase()) { - case "PROD": - return FabricType.PROD; - case "CORP": - return FabricType.CORP; - case "EI": - return FabricType.EI; - case "DEV": - return FabricType.DEV; - default: - throw new IllegalArgumentException("Unsupported Fabric Type: " + fabric); - } - } - public static Urn getUrn(String urnStr) { try { return Urn.createFromString(urnStr); From 524ef8c6d0a07961576a2e69b8c3d7e4313550a7 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 20 Nov 2024 00:29:05 -0800 Subject: [PATCH 10/10] perf(ingest/redshift): limit copy lineage (#11662) Co-authored-by: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com> --- .../ingestion/source/redshift/query.py | 47 ++++++++++++------- 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py index b18b526ef30fce..71a20890d35e88 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py @@ -9,6 +9,8 @@ # We use 290 instead instead of the standard 320, because escape characters can add to the length. _QUERY_SEQUENCE_LIMIT = 290 +_MAX_COPY_ENTRIES_PER_TABLE = 20 + class RedshiftCommonQuery: CREATE_TEMP_TABLE_CLAUSE = "create temp table" @@ -293,28 +295,37 @@ def alter_table_rename_query( def list_copy_commands_sql( db_name: str, start_time: datetime, end_time: datetime ) -> str: - return """ - select - distinct - "schema" as target_schema, - "table" as target_table, - c.file_name as filename - from - SYS_QUERY_DETAIL as si - join SYS_LOAD_DETAIL as c on - si.query_id = c.query_id - join SVV_TABLE_INFO sti on - sti.table_id = si.table_id - where - database = '{db_name}' - and si.start_time >= '{start_time}' - and si.start_time < '{end_time}' - order by target_schema, target_table, si.start_time asc - """.format( + return """\ +SELECT DISTINCT + target_schema, + target_table, + filename +FROM ( + SELECT + sti."schema" AS target_schema, + sti."table" AS target_table, + c.file_name AS filename, + ROW_NUMBER() OVER ( + PARTITION BY sti."schema", sti."table" + ORDER BY si.start_time DESC + ) AS rn + FROM + SYS_QUERY_DETAIL AS si + JOIN SYS_LOAD_DETAIL AS c ON si.query_id = c.query_id + JOIN SVV_TABLE_INFO sti ON sti.table_id = si.table_id + WHERE + sti.database = '{db_name}' + AND si.start_time >= '{start_time}' + AND si.start_time < '{end_time}' +) subquery +WHERE rn <= {_MAX_COPY_ENTRIES_PER_TABLE} +ORDER BY target_schema, target_table, filename +""".format( # We need the original database name for filtering db_name=db_name, start_time=start_time.strftime(redshift_datetime_format), end_time=end_time.strftime(redshift_datetime_format), + _MAX_COPY_ENTRIES_PER_TABLE=_MAX_COPY_ENTRIES_PER_TABLE, ) @staticmethod