From 121e0b0b5febcbd998009ed67184c4466fee26df Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Thu, 16 Jan 2025 16:36:45 -0800 Subject: [PATCH] [server] Add global bandwidth throttler into adaptive throttler --- .../kafka/consumer/IngestionThrottler.java | 29 ++++++++++++++----- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/IngestionThrottler.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/IngestionThrottler.java index c648d0eeff..2774b984b9 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/IngestionThrottler.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/IngestionThrottler.java @@ -63,6 +63,9 @@ public IngestionThrottler( AdaptiveThrottlerSignalService adaptiveThrottlerSignalService) { VeniceAdaptiveIngestionThrottler globalRecordAdaptiveIngestionThrottler; EventThrottler globalRecordThrottler; + EventThrottler globalBandwidthThrottler; + VeniceAdaptiveIngestionThrottler globalBandwidthAdaptiveIngestionThrottler; + if (serverConfig.isAdaptiveThrottlerEnabled()) { globalRecordThrottler = null; globalRecordAdaptiveIngestionThrottler = new VeniceAdaptiveIngestionThrottler( @@ -73,6 +76,15 @@ public IngestionThrottler( globalRecordAdaptiveIngestionThrottler .registerLimiterSignal(adaptiveThrottlerSignalService::isSingleGetLatencySignalActive); adaptiveThrottlerSignalService.registerThrottler(globalRecordAdaptiveIngestionThrottler); + globalBandwidthThrottler = null; + globalBandwidthAdaptiveIngestionThrottler = new VeniceAdaptiveIngestionThrottler( + serverConfig.getAdaptiveThrottlerSignalIdleThreshold(), + serverConfig.getKafkaFetchQuotaBytesPerSecond(), + serverConfig.getKafkaFetchQuotaTimeWindow(), + "kafka_consumption_bandwidth"); + globalBandwidthAdaptiveIngestionThrottler + .registerLimiterSignal(adaptiveThrottlerSignalService::isSingleGetLatencySignalActive); + adaptiveThrottlerSignalService.registerThrottler(globalBandwidthAdaptiveIngestionThrottler); } else { globalRecordAdaptiveIngestionThrottler = null; globalRecordThrottler = new EventThrottler( @@ -81,13 +93,14 @@ public IngestionThrottler( "kafka_consumption_records_count", false, EventThrottler.BLOCK_STRATEGY); + globalBandwidthAdaptiveIngestionThrottler = null; + globalBandwidthThrottler = new EventThrottler( + serverConfig.getKafkaFetchQuotaBytesPerSecond(), + serverConfig.getKafkaFetchQuotaTimeWindow(), + "kafka_consumption_bandwidth", + false, + EventThrottler.BLOCK_STRATEGY); } - EventThrottler globalBandwidthThrottler = new EventThrottler( - serverConfig.getKafkaFetchQuotaBytesPerSecond(), - serverConfig.getKafkaFetchQuotaTimeWindow(), - "kafka_consumption_bandwidth", - false, - EventThrottler.BLOCK_STRATEGY); this.poolTypeRecordThrottlerMap = new VeniceConcurrentHashMap<>(); this.poolTypeRecordThrottlerMap.put( ConsumerPoolType.AA_WC_LEADER_POOL, @@ -177,7 +190,9 @@ public IngestionThrottler( this.finalRecordThrottler = serverConfig.isAdaptiveThrottlerEnabled() ? globalRecordAdaptiveIngestionThrottler : globalRecordThrottler; - this.finalBandwidthThrottler = globalBandwidthThrottler; + this.finalBandwidthThrottler = serverConfig.isAdaptiveThrottlerEnabled() + ? globalBandwidthAdaptiveIngestionThrottler + : globalBandwidthThrottler; this.isUsingSpeedupThrottler = false; }