Skip to content

Commit

Permalink
[server] Add global bandwidth throttler into adaptive throttler
Browse files Browse the repository at this point in the history
  • Loading branch information
sixpluszero committed Jan 17, 2025
1 parent a8173c8 commit 121e0b0
Showing 1 changed file with 22 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ public IngestionThrottler(
AdaptiveThrottlerSignalService adaptiveThrottlerSignalService) {
VeniceAdaptiveIngestionThrottler globalRecordAdaptiveIngestionThrottler;
EventThrottler globalRecordThrottler;
EventThrottler globalBandwidthThrottler;
VeniceAdaptiveIngestionThrottler globalBandwidthAdaptiveIngestionThrottler;

if (serverConfig.isAdaptiveThrottlerEnabled()) {
globalRecordThrottler = null;
globalRecordAdaptiveIngestionThrottler = new VeniceAdaptiveIngestionThrottler(
Expand All @@ -73,6 +76,15 @@ public IngestionThrottler(
globalRecordAdaptiveIngestionThrottler
.registerLimiterSignal(adaptiveThrottlerSignalService::isSingleGetLatencySignalActive);
adaptiveThrottlerSignalService.registerThrottler(globalRecordAdaptiveIngestionThrottler);
globalBandwidthThrottler = null;
globalBandwidthAdaptiveIngestionThrottler = new VeniceAdaptiveIngestionThrottler(
serverConfig.getAdaptiveThrottlerSignalIdleThreshold(),
serverConfig.getKafkaFetchQuotaBytesPerSecond(),
serverConfig.getKafkaFetchQuotaTimeWindow(),
"kafka_consumption_bandwidth");
globalBandwidthAdaptiveIngestionThrottler
.registerLimiterSignal(adaptiveThrottlerSignalService::isSingleGetLatencySignalActive);
adaptiveThrottlerSignalService.registerThrottler(globalBandwidthAdaptiveIngestionThrottler);
} else {
globalRecordAdaptiveIngestionThrottler = null;
globalRecordThrottler = new EventThrottler(
Expand All @@ -81,13 +93,14 @@ public IngestionThrottler(
"kafka_consumption_records_count",
false,
EventThrottler.BLOCK_STRATEGY);
globalBandwidthAdaptiveIngestionThrottler = null;
globalBandwidthThrottler = new EventThrottler(
serverConfig.getKafkaFetchQuotaBytesPerSecond(),
serverConfig.getKafkaFetchQuotaTimeWindow(),
"kafka_consumption_bandwidth",
false,
EventThrottler.BLOCK_STRATEGY);
}
EventThrottler globalBandwidthThrottler = new EventThrottler(
serverConfig.getKafkaFetchQuotaBytesPerSecond(),
serverConfig.getKafkaFetchQuotaTimeWindow(),
"kafka_consumption_bandwidth",
false,
EventThrottler.BLOCK_STRATEGY);
this.poolTypeRecordThrottlerMap = new VeniceConcurrentHashMap<>();
this.poolTypeRecordThrottlerMap.put(
ConsumerPoolType.AA_WC_LEADER_POOL,
Expand Down Expand Up @@ -177,7 +190,9 @@ public IngestionThrottler(
this.finalRecordThrottler = serverConfig.isAdaptiveThrottlerEnabled()
? globalRecordAdaptiveIngestionThrottler
: globalRecordThrottler;
this.finalBandwidthThrottler = globalBandwidthThrottler;
this.finalBandwidthThrottler = serverConfig.isAdaptiveThrottlerEnabled()
? globalBandwidthAdaptiveIngestionThrottler
: globalBandwidthThrottler;
this.isUsingSpeedupThrottler = false;
}

Expand Down

0 comments on commit 121e0b0

Please sign in to comment.