From c567247364e0282badd3ecab82f0a0528a49aad2 Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Thu, 16 Jan 2025 11:58:43 -0800 Subject: [PATCH] Adjust --- .../davinci/config/VeniceServerConfig.java | 15 +++ .../AdaptiveThrottlerSignalService.java | 15 +-- .../kafka/consumer/IngestionThrottler.java | 95 +++++++------------ .../VeniceAdaptiveIngestionThrottler.java | 37 +++++++- .../AdaptiveThrottlerSingalServiceTest.java | 9 +- .../VeniceAdaptiveIngestionThrottlerTest.java | 14 ++- .../java/com/linkedin/venice/ConfigKeys.java | 4 + .../linkedin/venice/server/VeniceServer.java | 2 +- 8 files changed, 116 insertions(+), 75 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java index 7a44e92c22..769b58c19a 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java @@ -57,6 +57,8 @@ import static com.linkedin.venice.ConfigKeys.SERVER_AA_WC_WORKLOAD_PARALLEL_PROCESSING_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_AA_WC_WORKLOAD_PARALLEL_PROCESSING_THREAD_POOL_SIZE; import static com.linkedin.venice.ConfigKeys.SERVER_ADAPTIVE_THROTTLER_ENABLED; +import static com.linkedin.venice.ConfigKeys.SERVER_ADAPTIVE_THROTTLER_SIGNAL_IDLE_THRESHOLD; +import static com.linkedin.venice.ConfigKeys.SERVER_ADAPTIVE_THROTTLER_SINGLE_GET_LATENCY_THRESHOLD; import static com.linkedin.venice.ConfigKeys.SERVER_BATCH_REPORT_END_OF_INCREMENTAL_PUSH_STATUS_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_BLOCKING_QUEUE_TYPE; import static com.linkedin.venice.ConfigKeys.SERVER_CHANNEL_OPTION_WRITE_BUFFER_WATERMARK_HIGH_BYTES; @@ -499,6 +501,8 @@ public class VeniceServerConfig extends VeniceClusterConfig { private final boolean resetErrorReplicaEnabled; private final boolean adaptiveThrottlerEnabled; + private final int adaptiveThrottlerSignalIdleThreshold; + private final double adaptiveThrottlerSingleGetLatencyThreshold; private final int fastAvroFieldLimitPerMethod; @@ -664,6 +668,9 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map throttlerList = new ArrayList<>(); @@ -33,8 +35,10 @@ public class AdaptiveThrottlerSignalService extends AbstractVeniceService { private boolean nonCurrentFollowerMaxHeartbeatLagSignal = false; public AdaptiveThrottlerSignalService( + VeniceServerConfig veniceServerConfig, MetricsRepository metricsRepository, HeartbeatMonitoringService heartbeatMonitoringService) { + this.singleGetLatencyP99Threshold = veniceServerConfig.getAdaptiveThrottlerSingleGetLatencyThreshold(); this.metricsRepository = metricsRepository; this.heartbeatMonitoringService = heartbeatMonitoringService; } @@ -52,11 +56,10 @@ public void refreshSignalAndThrottler() { } void updateReadLatencySignal() { - Metric hostSingleGetLatencyP95Metric = metricsRepository.getMetric("total--success_request_latency.95thPercentile"); - if (hostSingleGetLatencyP95Metric != null) { - double hostSingleGetLatencyP95 = - metricsRepository.getMetric("total--success_request_latency.95thPercentile").value(); - singleGetLatencySignal = hostSingleGetLatencyP95 > READ_LATENCY_P95_LIMIT; + Metric hostSingleGetLatencyP99Metric = metricsRepository.getMetric(SINGLE_GET_LATENCY_P99_METRIC_NAME); + if (hostSingleGetLatencyP99Metric != null) { + double hostSingleGetLatencyP99 = metricsRepository.getMetric(SINGLE_GET_LATENCY_P99_METRIC_NAME).value(); + singleGetLatencySignal = hostSingleGetLatencyP99 > singleGetLatencyP99Threshold; } LOGGER.info("Update read latency signal. singleGetLatency: {}", singleGetLatencySignal); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/IngestionThrottler.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/IngestionThrottler.java index 87893b017e..223c700477 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/IngestionThrottler.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/IngestionThrottler.java @@ -63,12 +63,26 @@ public IngestionThrottler( TimeUnit checkTimeUnit, AdaptiveThrottlerSignalService adaptiveThrottlerSignalService) { this.adaptiveThrottlerSignalService = adaptiveThrottlerSignalService; - EventThrottler regularRecordThrottler = new EventThrottler( - serverConfig.getKafkaFetchQuotaRecordPerSecond(), - serverConfig.getKafkaFetchQuotaTimeWindow(), - "kafka_consumption_records_count", - false, - EventThrottler.BLOCK_STRATEGY); + EventThrottler regularRecordThrottler; + if (serverConfig.isAdaptiveThrottlerEnabled()) { + VeniceAdaptiveIngestionThrottler adaptiveIngestionThrottler = new VeniceAdaptiveIngestionThrottler( + serverConfig.getAdaptiveThrottlerSignalIdleThreshold(), + serverConfig.getKafkaFetchQuotaRecordPerSecond(), + serverConfig.getKafkaFetchQuotaTimeWindow(), + "kafka_consumption_records_count"); + adaptiveIngestionThrottler.registerLimiterSignal(adaptiveThrottlerSignalService::isSingleGetLatencySignalActive); + adaptiveIngestionThrottler + .registerLimiterSignal(adaptiveThrottlerSignalService::isCurrentLeaderMaxHeartbeatLagSignalActive); + adaptiveThrottlerSignalService.registerThrottler(adaptiveIngestionThrottler); + regularRecordThrottler = adaptiveIngestionThrottler; + } else { + regularRecordThrottler = new EventThrottler( + serverConfig.getKafkaFetchQuotaRecordPerSecond(), + serverConfig.getKafkaFetchQuotaTimeWindow(), + "kafka_consumption_records_count", + false, + EventThrottler.BLOCK_STRATEGY); + } EventThrottler regularBandwidthThrottler = new EventThrottler( serverConfig.getKafkaFetchQuotaBytesPerSecond(), serverConfig.getKafkaFetchQuotaTimeWindow(), @@ -108,59 +122,22 @@ public IngestionThrottler( "current_version_non_aa_wc_leader_records_count", false, EventThrottler.BLOCK_STRATEGY)); - if (serverConfig.isAdaptiveThrottlerEnabled()) { - VeniceAdaptiveIngestionThrottler nonCurrentVersionAaWcLeaderAdaptiveIngestionThrottler = - new VeniceAdaptiveIngestionThrottler( - serverConfig.getNonCurrentVersionAAWCLeaderQuotaRecordsPerSecond(), - serverConfig.getKafkaFetchQuotaTimeWindow(), - "non_current_version_aa_wc_leader_records_count"); - nonCurrentVersionAaWcLeaderAdaptiveIngestionThrottler - .registerLimiterSignal(adaptiveThrottlerSignalService::isSingleGetLatencySignalActive); - nonCurrentVersionAaWcLeaderAdaptiveIngestionThrottler - .registerLimiterSignal(adaptiveThrottlerSignalService::isCurrentLeaderMaxHeartbeatLagSignalActive); - nonCurrentVersionAaWcLeaderAdaptiveIngestionThrottler - .registerLimiterSignal(adaptiveThrottlerSignalService::isCurrentFollowerMaxHeartbeatLagSignalActive); - nonCurrentVersionAaWcLeaderAdaptiveIngestionThrottler - .registerBoosterSignal(adaptiveThrottlerSignalService::isNonCurrentLeaderMaxHeartbeatLagSignalActive); - adaptiveThrottlerSignalService.registerThrottler(nonCurrentVersionAaWcLeaderAdaptiveIngestionThrottler); - this.poolTypeRecordThrottlerMap.put( - ConsumerPoolType.NON_CURRENT_VERSION_AA_WC_LEADER_POOL, - nonCurrentVersionAaWcLeaderAdaptiveIngestionThrottler); - VeniceAdaptiveIngestionThrottler nonCurrentVersionNonAaWcLeaderAdaptiveIngestionThrottler = - new VeniceAdaptiveIngestionThrottler( - serverConfig.getNonCurrentVersionNonAAWCLeaderQuotaRecordsPerSecond(), - serverConfig.getKafkaFetchQuotaTimeWindow(), - "non_current_version_non_aa_wc_leader_records_count"); - nonCurrentVersionNonAaWcLeaderAdaptiveIngestionThrottler - .registerLimiterSignal(adaptiveThrottlerSignalService::isSingleGetLatencySignalActive); - nonCurrentVersionNonAaWcLeaderAdaptiveIngestionThrottler - .registerLimiterSignal(adaptiveThrottlerSignalService::isCurrentLeaderMaxHeartbeatLagSignalActive); - nonCurrentVersionNonAaWcLeaderAdaptiveIngestionThrottler - .registerLimiterSignal(adaptiveThrottlerSignalService::isCurrentFollowerMaxHeartbeatLagSignalActive); - nonCurrentVersionNonAaWcLeaderAdaptiveIngestionThrottler - .registerBoosterSignal(adaptiveThrottlerSignalService::isNonCurrentFollowerMaxHeartbeatLagSignalActive); - adaptiveThrottlerSignalService.registerThrottler(nonCurrentVersionNonAaWcLeaderAdaptiveIngestionThrottler); - this.poolTypeRecordThrottlerMap.put( - ConsumerPoolType.NON_CURRENT_VERSION_NON_AA_WC_LEADER_POOL, - nonCurrentVersionNonAaWcLeaderAdaptiveIngestionThrottler); - } else { - this.poolTypeRecordThrottlerMap.put( - ConsumerPoolType.NON_CURRENT_VERSION_AA_WC_LEADER_POOL, - new EventThrottler( - serverConfig.getNonCurrentVersionAAWCLeaderQuotaRecordsPerSecond(), - serverConfig.getKafkaFetchQuotaTimeWindow(), - "non_current_version_aa_wc_leader_records_count", - false, - EventThrottler.BLOCK_STRATEGY)); - this.poolTypeRecordThrottlerMap.put( - ConsumerPoolType.NON_CURRENT_VERSION_NON_AA_WC_LEADER_POOL, - new EventThrottler( - serverConfig.getNonCurrentVersionNonAAWCLeaderQuotaRecordsPerSecond(), - serverConfig.getKafkaFetchQuotaTimeWindow(), - "non_current_version_non_aa_wc_leader_records_count", - false, - EventThrottler.BLOCK_STRATEGY)); - } + this.poolTypeRecordThrottlerMap.put( + ConsumerPoolType.NON_CURRENT_VERSION_AA_WC_LEADER_POOL, + new EventThrottler( + serverConfig.getNonCurrentVersionAAWCLeaderQuotaRecordsPerSecond(), + serverConfig.getKafkaFetchQuotaTimeWindow(), + "non_current_version_aa_wc_leader_records_count", + false, + EventThrottler.BLOCK_STRATEGY)); + this.poolTypeRecordThrottlerMap.put( + ConsumerPoolType.NON_CURRENT_VERSION_NON_AA_WC_LEADER_POOL, + new EventThrottler( + serverConfig.getNonCurrentVersionNonAAWCLeaderQuotaRecordsPerSecond(), + serverConfig.getKafkaFetchQuotaTimeWindow(), + "non_current_version_non_aa_wc_leader_records_count", + false, + EventThrottler.BLOCK_STRATEGY)); if (isDaVinciClient && serverConfig.isDaVinciCurrentVersionBootstrappingSpeedupEnabled()) { EventThrottler speedupRecordThrottler = new EventThrottler( diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/VeniceAdaptiveIngestionThrottler.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/VeniceAdaptiveIngestionThrottler.java index 17a05a2bbd..8df0a7aa52 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/VeniceAdaptiveIngestionThrottler.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/VeniceAdaptiveIngestionThrottler.java @@ -5,21 +5,30 @@ import java.util.ArrayList; import java.util.List; import java.util.function.BooleanSupplier; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class VeniceAdaptiveIngestionThrottler extends EventThrottler { + private static final Logger LOGGER = LogManager.getLogger(VeniceAdaptiveIngestionThrottler.class); private final List limiterSuppliers = new ArrayList<>(); private final List boosterSuppliers = new ArrayList<>(); private final static int MAX_THROTTLERS = 7; private final List eventThrottlers = new ArrayList<>(MAX_THROTTLERS); + private final int signalIdleThreshold; + private int signalIdleCount = 0; + private int currentThrottlerIndex = MAX_THROTTLERS / 2; - private int currentThrottlerIndex = 3; - - public VeniceAdaptiveIngestionThrottler(int quotaPerSecond, long timeWindow, String throttlerName) { + public VeniceAdaptiveIngestionThrottler( + int signalIdleThreshold, + long quotaPerSecond, + long timeWindow, + String throttlerName) { + this.signalIdleThreshold = signalIdleThreshold; double factor = 0.4; DecimalFormat decimalFormat = new DecimalFormat("0.0"); - for (int i = 0; i < 7; i++) { + for (int i = 0; i < MAX_THROTTLERS; i++) { EventThrottler eventThrottler = new EventThrottler( (long) (quotaPerSecond * factor), timeWindow, @@ -45,13 +54,17 @@ public void registerBoosterSignal(BooleanSupplier supplier) { } public void checkSignalAndAdjustThrottler() { + boolean isSignalIdle = true; boolean hasLimitedRate = false; for (BooleanSupplier supplier: limiterSuppliers) { if (supplier.getAsBoolean()) { hasLimitedRate = true; + isSignalIdle = false; + signalIdleCount = 0; if (currentThrottlerIndex > 0) { currentThrottlerIndex--; } + LOGGER.info("Found active limiter signal, adjusting throttler index to {}", currentThrottlerIndex); } } // If any limiter signal is true do not booster the throttler @@ -61,15 +74,29 @@ public void checkSignalAndAdjustThrottler() { for (BooleanSupplier supplier: boosterSuppliers) { if (supplier.getAsBoolean()) { + isSignalIdle = false; + signalIdleCount = 0; + if (currentThrottlerIndex < MAX_THROTTLERS - 1) { + currentThrottlerIndex++; + } + LOGGER.info("Found active booster signal, adjusting throttler index to {}", currentThrottlerIndex); + } + } + if (isSignalIdle) { + signalIdleCount += 1; + LOGGER.info("No active signal found, increasing idle count to {}/{}", signalIdleCount, signalIdleThreshold); + if (signalIdleCount == signalIdleThreshold) { if (currentThrottlerIndex < MAX_THROTTLERS - 1) { currentThrottlerIndex++; } + LOGGER.info("Reach max signal idle count, adjusting throttler index to {}", currentThrottlerIndex); + signalIdleCount = 0; } } } // TEST - public int getCurrentThrottlerIndex() { + int getCurrentThrottlerIndex() { return currentThrottlerIndex; } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/AdaptiveThrottlerSingalServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/AdaptiveThrottlerSingalServiceTest.java index b131ebcd9b..1e78726943 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/AdaptiveThrottlerSingalServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/AdaptiveThrottlerSingalServiceTest.java @@ -4,6 +4,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.when; +import com.linkedin.davinci.config.VeniceServerConfig; import com.linkedin.davinci.stats.ingestion.heartbeat.AggregatedHeartbeatLagEntry; import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService; import io.tehuti.Metric; @@ -19,8 +20,10 @@ public class AdaptiveThrottlerSingalServiceTest { public void testUpdateSignal() { MetricsRepository metricsRepository = mock(MetricsRepository.class); HeartbeatMonitoringService heartbeatMonitoringService = mock(HeartbeatMonitoringService.class); + VeniceServerConfig veniceServerConfig = mock(VeniceServerConfig.class); + when(veniceServerConfig.getAdaptiveThrottlerSingleGetLatencyThreshold()).thenReturn(10d); AdaptiveThrottlerSignalService adaptiveThrottlerSignalService = - new AdaptiveThrottlerSignalService(metricsRepository, heartbeatMonitoringService); + new AdaptiveThrottlerSignalService(veniceServerConfig, metricsRepository, heartbeatMonitoringService); // Single Get Signal Assert.assertFalse(adaptiveThrottlerSignalService.isSingleGetLatencySignalActive()); @@ -55,8 +58,10 @@ public void testUpdateSignal() { public void testRegisterThrottler() { MetricsRepository metricsRepository = mock(MetricsRepository.class); HeartbeatMonitoringService heartbeatMonitoringService = mock(HeartbeatMonitoringService.class); + VeniceServerConfig veniceServerConfig = mock(VeniceServerConfig.class); + when(veniceServerConfig.getAdaptiveThrottlerSingleGetLatencyThreshold()).thenReturn(10d); AdaptiveThrottlerSignalService adaptiveThrottlerSignalService = - new AdaptiveThrottlerSignalService(metricsRepository, heartbeatMonitoringService); + new AdaptiveThrottlerSignalService(veniceServerConfig, metricsRepository, heartbeatMonitoringService); VeniceAdaptiveIngestionThrottler adaptiveIngestionThrottler = mock(VeniceAdaptiveIngestionThrottler.class); adaptiveThrottlerSignalService.registerThrottler(adaptiveIngestionThrottler); Assert.assertEquals(adaptiveThrottlerSignalService.getThrottlerList().size(), 1); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/VeniceAdaptiveIngestionThrottlerTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/VeniceAdaptiveIngestionThrottlerTest.java index b485b009a6..c9bc36cf03 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/VeniceAdaptiveIngestionThrottlerTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/VeniceAdaptiveIngestionThrottlerTest.java @@ -7,7 +7,8 @@ public class VeniceAdaptiveIngestionThrottlerTest { @Test public void testAdaptiveIngestionThrottler() { - VeniceAdaptiveIngestionThrottler adaptiveIngestionThrottler = new VeniceAdaptiveIngestionThrottler(100, 10, "test"); + VeniceAdaptiveIngestionThrottler adaptiveIngestionThrottler = + new VeniceAdaptiveIngestionThrottler(10, 100, 10, "test"); adaptiveIngestionThrottler.registerLimiterSignal(() -> true); adaptiveIngestionThrottler.checkSignalAndAdjustThrottler(); Assert.assertEquals(adaptiveIngestionThrottler.getCurrentThrottlerIndex(), 2); @@ -17,9 +18,18 @@ public void testAdaptiveIngestionThrottler() { Assert.assertEquals(adaptiveIngestionThrottler.getCurrentThrottlerIndex(), 0); adaptiveIngestionThrottler.checkSignalAndAdjustThrottler(); Assert.assertEquals(adaptiveIngestionThrottler.getCurrentThrottlerIndex(), 0); - adaptiveIngestionThrottler = new VeniceAdaptiveIngestionThrottler(100, 10, "test"); + adaptiveIngestionThrottler = new VeniceAdaptiveIngestionThrottler(10, 100, 10, "test"); adaptiveIngestionThrottler.registerBoosterSignal(() -> true); adaptiveIngestionThrottler.checkSignalAndAdjustThrottler(); Assert.assertEquals(adaptiveIngestionThrottler.getCurrentThrottlerIndex(), 4); + + adaptiveIngestionThrottler = new VeniceAdaptiveIngestionThrottler(3, 100, 10, "test"); + + adaptiveIngestionThrottler.checkSignalAndAdjustThrottler(); + Assert.assertEquals(adaptiveIngestionThrottler.getCurrentThrottlerIndex(), 3); + adaptiveIngestionThrottler.checkSignalAndAdjustThrottler(); + Assert.assertEquals(adaptiveIngestionThrottler.getCurrentThrottlerIndex(), 3); + adaptiveIngestionThrottler.checkSignalAndAdjustThrottler(); + Assert.assertEquals(adaptiveIngestionThrottler.getCurrentThrottlerIndex(), 4); } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java index 06da465ac0..eab209a0e7 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java @@ -782,6 +782,10 @@ private ConfigKeys() { public static final String SERVER_RESET_ERROR_REPLICA_ENABLED = "server.reset.error.replica.enabled"; public static final String SERVER_ADAPTIVE_THROTTLER_ENABLED = "server.adaptive.throttler.enabled"; + public static final String SERVER_ADAPTIVE_THROTTLER_SIGNAL_IDLE_THRESHOLD = + "server.adaptive.throttler.signal.idle.threshold"; + public static final String SERVER_ADAPTIVE_THROTTLER_SINGLE_GET_LATENCY_THRESHOLD = + "server.adaptive.throttler.single.get.latency.threshold"; /** * A list of fully-qualified class names of all stats classes that needs to be initialized in isolated ingestion process, diff --git a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java index 8a1408c098..4d30c318df 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java @@ -379,7 +379,7 @@ private List createServices() { this.adaptiveThrottlerSignalService = null; if (serverConfig.isAdaptiveThrottlerEnabled()) { adaptiveThrottlerSignalService = - new AdaptiveThrottlerSignalService(metricsRepository, heartbeatMonitoringService); + new AdaptiveThrottlerSignalService(serverConfig, metricsRepository, heartbeatMonitoringService); services.add(adaptiveThrottlerSignalService); } // create and add KafkaSimpleConsumerService