Skip to content

Commit

Permalink
Adjust
Browse files Browse the repository at this point in the history
  • Loading branch information
sixpluszero committed Jan 16, 2025
1 parent ba472a6 commit c567247
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import static com.linkedin.venice.ConfigKeys.SERVER_AA_WC_WORKLOAD_PARALLEL_PROCESSING_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_AA_WC_WORKLOAD_PARALLEL_PROCESSING_THREAD_POOL_SIZE;
import static com.linkedin.venice.ConfigKeys.SERVER_ADAPTIVE_THROTTLER_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_ADAPTIVE_THROTTLER_SIGNAL_IDLE_THRESHOLD;
import static com.linkedin.venice.ConfigKeys.SERVER_ADAPTIVE_THROTTLER_SINGLE_GET_LATENCY_THRESHOLD;
import static com.linkedin.venice.ConfigKeys.SERVER_BATCH_REPORT_END_OF_INCREMENTAL_PUSH_STATUS_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_BLOCKING_QUEUE_TYPE;
import static com.linkedin.venice.ConfigKeys.SERVER_CHANNEL_OPTION_WRITE_BUFFER_WATERMARK_HIGH_BYTES;
Expand Down Expand Up @@ -499,6 +501,8 @@ public class VeniceServerConfig extends VeniceClusterConfig {
private final boolean resetErrorReplicaEnabled;

private final boolean adaptiveThrottlerEnabled;
private final int adaptiveThrottlerSignalIdleThreshold;
private final double adaptiveThrottlerSingleGetLatencyThreshold;

private final int fastAvroFieldLimitPerMethod;

Expand Down Expand Up @@ -664,6 +668,9 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
serverProperties.getBoolean(SERVER_DB_READ_ONLY_FOR_BATCH_ONLY_STORE_ENABLED, true);
resetErrorReplicaEnabled = serverProperties.getBoolean(SERVER_RESET_ERROR_REPLICA_ENABLED, false);
adaptiveThrottlerEnabled = serverProperties.getBoolean(SERVER_ADAPTIVE_THROTTLER_ENABLED, false);
adaptiveThrottlerSignalIdleThreshold = serverProperties.getInt(SERVER_ADAPTIVE_THROTTLER_SIGNAL_IDLE_THRESHOLD, 10);
adaptiveThrottlerSingleGetLatencyThreshold =
serverProperties.getDouble(SERVER_ADAPTIVE_THROTTLER_SINGLE_GET_LATENCY_THRESHOLD, 10d);

databaseSyncBytesIntervalForTransactionalMode =
serverProperties.getSizeInBytes(SERVER_DATABASE_SYNC_BYTES_INTERNAL_FOR_TRANSACTIONAL_MODE, 32 * 1024 * 1024);
Expand Down Expand Up @@ -1502,6 +1509,14 @@ public boolean isAdaptiveThrottlerEnabled() {
return adaptiveThrottlerEnabled;
}

public int getAdaptiveThrottlerSignalIdleThreshold() {
return adaptiveThrottlerSignalIdleThreshold;
}

public double getAdaptiveThrottlerSingleGetLatencyThreshold() {
return adaptiveThrottlerSingleGetLatencyThreshold;
}

public int getFastAvroFieldLimitPerMethod() {
return fastAvroFieldLimitPerMethod;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.alpini.base.concurrency.Executors;
import com.linkedin.alpini.base.concurrency.ScheduledExecutorService;
import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.stats.ingestion.heartbeat.AggregatedHeartbeatLagEntry;
import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService;
import com.linkedin.venice.service.AbstractVeniceService;
Expand All @@ -20,8 +21,9 @@
*/
public class AdaptiveThrottlerSignalService extends AbstractVeniceService {
private static final Logger LOGGER = LogManager.getLogger(AdaptiveThrottlerSignalService.class);
public static final double READ_LATENCY_P95_LIMIT = 2.0f;
public static final long HEARTBEAT_LAG_LIMIT = TimeUnit.MINUTES.toMillis(10);
private static final String SINGLE_GET_LATENCY_P99_METRIC_NAME = "total--success_request_latency.99thPercentile";
private final double singleGetLatencyP99Threshold;
private final MetricsRepository metricsRepository;
private final HeartbeatMonitoringService heartbeatMonitoringService;
private final List<VeniceAdaptiveIngestionThrottler> throttlerList = new ArrayList<>();
Expand All @@ -33,8 +35,10 @@ public class AdaptiveThrottlerSignalService extends AbstractVeniceService {
private boolean nonCurrentFollowerMaxHeartbeatLagSignal = false;

public AdaptiveThrottlerSignalService(
VeniceServerConfig veniceServerConfig,
MetricsRepository metricsRepository,
HeartbeatMonitoringService heartbeatMonitoringService) {
this.singleGetLatencyP99Threshold = veniceServerConfig.getAdaptiveThrottlerSingleGetLatencyThreshold();
this.metricsRepository = metricsRepository;
this.heartbeatMonitoringService = heartbeatMonitoringService;
}
Expand All @@ -52,11 +56,10 @@ public void refreshSignalAndThrottler() {
}

void updateReadLatencySignal() {
Metric hostSingleGetLatencyP95Metric = metricsRepository.getMetric("total--success_request_latency.95thPercentile");
if (hostSingleGetLatencyP95Metric != null) {
double hostSingleGetLatencyP95 =
metricsRepository.getMetric("total--success_request_latency.95thPercentile").value();
singleGetLatencySignal = hostSingleGetLatencyP95 > READ_LATENCY_P95_LIMIT;
Metric hostSingleGetLatencyP99Metric = metricsRepository.getMetric(SINGLE_GET_LATENCY_P99_METRIC_NAME);
if (hostSingleGetLatencyP99Metric != null) {
double hostSingleGetLatencyP99 = metricsRepository.getMetric(SINGLE_GET_LATENCY_P99_METRIC_NAME).value();
singleGetLatencySignal = hostSingleGetLatencyP99 > singleGetLatencyP99Threshold;
}
LOGGER.info("Update read latency signal. singleGetLatency: {}", singleGetLatencySignal);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,26 @@ public IngestionThrottler(
TimeUnit checkTimeUnit,
AdaptiveThrottlerSignalService adaptiveThrottlerSignalService) {
this.adaptiveThrottlerSignalService = adaptiveThrottlerSignalService;
EventThrottler regularRecordThrottler = new EventThrottler(
serverConfig.getKafkaFetchQuotaRecordPerSecond(),
serverConfig.getKafkaFetchQuotaTimeWindow(),
"kafka_consumption_records_count",
false,
EventThrottler.BLOCK_STRATEGY);
EventThrottler regularRecordThrottler;
if (serverConfig.isAdaptiveThrottlerEnabled()) {
VeniceAdaptiveIngestionThrottler adaptiveIngestionThrottler = new VeniceAdaptiveIngestionThrottler(
serverConfig.getAdaptiveThrottlerSignalIdleThreshold(),
serverConfig.getKafkaFetchQuotaRecordPerSecond(),
serverConfig.getKafkaFetchQuotaTimeWindow(),
"kafka_consumption_records_count");
adaptiveIngestionThrottler.registerLimiterSignal(adaptiveThrottlerSignalService::isSingleGetLatencySignalActive);
adaptiveIngestionThrottler
.registerLimiterSignal(adaptiveThrottlerSignalService::isCurrentLeaderMaxHeartbeatLagSignalActive);
adaptiveThrottlerSignalService.registerThrottler(adaptiveIngestionThrottler);
regularRecordThrottler = adaptiveIngestionThrottler;
} else {
regularRecordThrottler = new EventThrottler(
serverConfig.getKafkaFetchQuotaRecordPerSecond(),
serverConfig.getKafkaFetchQuotaTimeWindow(),
"kafka_consumption_records_count",
false,
EventThrottler.BLOCK_STRATEGY);
}
EventThrottler regularBandwidthThrottler = new EventThrottler(
serverConfig.getKafkaFetchQuotaBytesPerSecond(),
serverConfig.getKafkaFetchQuotaTimeWindow(),
Expand Down Expand Up @@ -108,59 +122,22 @@ public IngestionThrottler(
"current_version_non_aa_wc_leader_records_count",
false,
EventThrottler.BLOCK_STRATEGY));
if (serverConfig.isAdaptiveThrottlerEnabled()) {
VeniceAdaptiveIngestionThrottler nonCurrentVersionAaWcLeaderAdaptiveIngestionThrottler =
new VeniceAdaptiveIngestionThrottler(
serverConfig.getNonCurrentVersionAAWCLeaderQuotaRecordsPerSecond(),
serverConfig.getKafkaFetchQuotaTimeWindow(),
"non_current_version_aa_wc_leader_records_count");
nonCurrentVersionAaWcLeaderAdaptiveIngestionThrottler
.registerLimiterSignal(adaptiveThrottlerSignalService::isSingleGetLatencySignalActive);
nonCurrentVersionAaWcLeaderAdaptiveIngestionThrottler
.registerLimiterSignal(adaptiveThrottlerSignalService::isCurrentLeaderMaxHeartbeatLagSignalActive);
nonCurrentVersionAaWcLeaderAdaptiveIngestionThrottler
.registerLimiterSignal(adaptiveThrottlerSignalService::isCurrentFollowerMaxHeartbeatLagSignalActive);
nonCurrentVersionAaWcLeaderAdaptiveIngestionThrottler
.registerBoosterSignal(adaptiveThrottlerSignalService::isNonCurrentLeaderMaxHeartbeatLagSignalActive);
adaptiveThrottlerSignalService.registerThrottler(nonCurrentVersionAaWcLeaderAdaptiveIngestionThrottler);
this.poolTypeRecordThrottlerMap.put(
ConsumerPoolType.NON_CURRENT_VERSION_AA_WC_LEADER_POOL,
nonCurrentVersionAaWcLeaderAdaptiveIngestionThrottler);
VeniceAdaptiveIngestionThrottler nonCurrentVersionNonAaWcLeaderAdaptiveIngestionThrottler =
new VeniceAdaptiveIngestionThrottler(
serverConfig.getNonCurrentVersionNonAAWCLeaderQuotaRecordsPerSecond(),
serverConfig.getKafkaFetchQuotaTimeWindow(),
"non_current_version_non_aa_wc_leader_records_count");
nonCurrentVersionNonAaWcLeaderAdaptiveIngestionThrottler
.registerLimiterSignal(adaptiveThrottlerSignalService::isSingleGetLatencySignalActive);
nonCurrentVersionNonAaWcLeaderAdaptiveIngestionThrottler
.registerLimiterSignal(adaptiveThrottlerSignalService::isCurrentLeaderMaxHeartbeatLagSignalActive);
nonCurrentVersionNonAaWcLeaderAdaptiveIngestionThrottler
.registerLimiterSignal(adaptiveThrottlerSignalService::isCurrentFollowerMaxHeartbeatLagSignalActive);
nonCurrentVersionNonAaWcLeaderAdaptiveIngestionThrottler
.registerBoosterSignal(adaptiveThrottlerSignalService::isNonCurrentFollowerMaxHeartbeatLagSignalActive);
adaptiveThrottlerSignalService.registerThrottler(nonCurrentVersionNonAaWcLeaderAdaptiveIngestionThrottler);
this.poolTypeRecordThrottlerMap.put(
ConsumerPoolType.NON_CURRENT_VERSION_NON_AA_WC_LEADER_POOL,
nonCurrentVersionNonAaWcLeaderAdaptiveIngestionThrottler);
} else {
this.poolTypeRecordThrottlerMap.put(
ConsumerPoolType.NON_CURRENT_VERSION_AA_WC_LEADER_POOL,
new EventThrottler(
serverConfig.getNonCurrentVersionAAWCLeaderQuotaRecordsPerSecond(),
serverConfig.getKafkaFetchQuotaTimeWindow(),
"non_current_version_aa_wc_leader_records_count",
false,
EventThrottler.BLOCK_STRATEGY));
this.poolTypeRecordThrottlerMap.put(
ConsumerPoolType.NON_CURRENT_VERSION_NON_AA_WC_LEADER_POOL,
new EventThrottler(
serverConfig.getNonCurrentVersionNonAAWCLeaderQuotaRecordsPerSecond(),
serverConfig.getKafkaFetchQuotaTimeWindow(),
"non_current_version_non_aa_wc_leader_records_count",
false,
EventThrottler.BLOCK_STRATEGY));
}
this.poolTypeRecordThrottlerMap.put(
ConsumerPoolType.NON_CURRENT_VERSION_AA_WC_LEADER_POOL,
new EventThrottler(
serverConfig.getNonCurrentVersionAAWCLeaderQuotaRecordsPerSecond(),
serverConfig.getKafkaFetchQuotaTimeWindow(),
"non_current_version_aa_wc_leader_records_count",
false,
EventThrottler.BLOCK_STRATEGY));
this.poolTypeRecordThrottlerMap.put(
ConsumerPoolType.NON_CURRENT_VERSION_NON_AA_WC_LEADER_POOL,
new EventThrottler(
serverConfig.getNonCurrentVersionNonAAWCLeaderQuotaRecordsPerSecond(),
serverConfig.getKafkaFetchQuotaTimeWindow(),
"non_current_version_non_aa_wc_leader_records_count",
false,
EventThrottler.BLOCK_STRATEGY));

if (isDaVinciClient && serverConfig.isDaVinciCurrentVersionBootstrappingSpeedupEnabled()) {
EventThrottler speedupRecordThrottler = new EventThrottler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,30 @@
import java.util.ArrayList;
import java.util.List;
import java.util.function.BooleanSupplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;


public class VeniceAdaptiveIngestionThrottler extends EventThrottler {
private static final Logger LOGGER = LogManager.getLogger(VeniceAdaptiveIngestionThrottler.class);
private final List<BooleanSupplier> limiterSuppliers = new ArrayList<>();
private final List<BooleanSupplier> boosterSuppliers = new ArrayList<>();

private final static int MAX_THROTTLERS = 7;
private final List<EventThrottler> eventThrottlers = new ArrayList<>(MAX_THROTTLERS);
private final int signalIdleThreshold;
private int signalIdleCount = 0;
private int currentThrottlerIndex = MAX_THROTTLERS / 2;

private int currentThrottlerIndex = 3;

public VeniceAdaptiveIngestionThrottler(int quotaPerSecond, long timeWindow, String throttlerName) {
public VeniceAdaptiveIngestionThrottler(
int signalIdleThreshold,
long quotaPerSecond,
long timeWindow,
String throttlerName) {
this.signalIdleThreshold = signalIdleThreshold;
double factor = 0.4;
DecimalFormat decimalFormat = new DecimalFormat("0.0");
for (int i = 0; i < 7; i++) {
for (int i = 0; i < MAX_THROTTLERS; i++) {
EventThrottler eventThrottler = new EventThrottler(
(long) (quotaPerSecond * factor),
timeWindow,
Expand All @@ -45,13 +54,17 @@ public void registerBoosterSignal(BooleanSupplier supplier) {
}

public void checkSignalAndAdjustThrottler() {
boolean isSignalIdle = true;
boolean hasLimitedRate = false;
for (BooleanSupplier supplier: limiterSuppliers) {
if (supplier.getAsBoolean()) {
hasLimitedRate = true;
isSignalIdle = false;
signalIdleCount = 0;
if (currentThrottlerIndex > 0) {
currentThrottlerIndex--;
}
LOGGER.info("Found active limiter signal, adjusting throttler index to {}", currentThrottlerIndex);
}
}
// If any limiter signal is true do not booster the throttler
Expand All @@ -61,15 +74,29 @@ public void checkSignalAndAdjustThrottler() {

for (BooleanSupplier supplier: boosterSuppliers) {
if (supplier.getAsBoolean()) {
isSignalIdle = false;
signalIdleCount = 0;
if (currentThrottlerIndex < MAX_THROTTLERS - 1) {
currentThrottlerIndex++;
}
LOGGER.info("Found active booster signal, adjusting throttler index to {}", currentThrottlerIndex);
}
}
if (isSignalIdle) {
signalIdleCount += 1;
LOGGER.info("No active signal found, increasing idle count to {}/{}", signalIdleCount, signalIdleThreshold);
if (signalIdleCount == signalIdleThreshold) {
if (currentThrottlerIndex < MAX_THROTTLERS - 1) {
currentThrottlerIndex++;
}
LOGGER.info("Reach max signal idle count, adjusting throttler index to {}", currentThrottlerIndex);
signalIdleCount = 0;
}
}
}

// TEST
public int getCurrentThrottlerIndex() {
int getCurrentThrottlerIndex() {
return currentThrottlerIndex;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;

import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.stats.ingestion.heartbeat.AggregatedHeartbeatLagEntry;
import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService;
import io.tehuti.Metric;
Expand All @@ -19,8 +20,10 @@ public class AdaptiveThrottlerSingalServiceTest {
public void testUpdateSignal() {
MetricsRepository metricsRepository = mock(MetricsRepository.class);
HeartbeatMonitoringService heartbeatMonitoringService = mock(HeartbeatMonitoringService.class);
VeniceServerConfig veniceServerConfig = mock(VeniceServerConfig.class);
when(veniceServerConfig.getAdaptiveThrottlerSingleGetLatencyThreshold()).thenReturn(10d);
AdaptiveThrottlerSignalService adaptiveThrottlerSignalService =
new AdaptiveThrottlerSignalService(metricsRepository, heartbeatMonitoringService);
new AdaptiveThrottlerSignalService(veniceServerConfig, metricsRepository, heartbeatMonitoringService);

// Single Get Signal
Assert.assertFalse(adaptiveThrottlerSignalService.isSingleGetLatencySignalActive());
Expand Down Expand Up @@ -55,8 +58,10 @@ public void testUpdateSignal() {
public void testRegisterThrottler() {
MetricsRepository metricsRepository = mock(MetricsRepository.class);
HeartbeatMonitoringService heartbeatMonitoringService = mock(HeartbeatMonitoringService.class);
VeniceServerConfig veniceServerConfig = mock(VeniceServerConfig.class);
when(veniceServerConfig.getAdaptiveThrottlerSingleGetLatencyThreshold()).thenReturn(10d);
AdaptiveThrottlerSignalService adaptiveThrottlerSignalService =
new AdaptiveThrottlerSignalService(metricsRepository, heartbeatMonitoringService);
new AdaptiveThrottlerSignalService(veniceServerConfig, metricsRepository, heartbeatMonitoringService);
VeniceAdaptiveIngestionThrottler adaptiveIngestionThrottler = mock(VeniceAdaptiveIngestionThrottler.class);
adaptiveThrottlerSignalService.registerThrottler(adaptiveIngestionThrottler);
Assert.assertEquals(adaptiveThrottlerSignalService.getThrottlerList().size(), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
public class VeniceAdaptiveIngestionThrottlerTest {
@Test
public void testAdaptiveIngestionThrottler() {
VeniceAdaptiveIngestionThrottler adaptiveIngestionThrottler = new VeniceAdaptiveIngestionThrottler(100, 10, "test");
VeniceAdaptiveIngestionThrottler adaptiveIngestionThrottler =
new VeniceAdaptiveIngestionThrottler(10, 100, 10, "test");
adaptiveIngestionThrottler.registerLimiterSignal(() -> true);
adaptiveIngestionThrottler.checkSignalAndAdjustThrottler();
Assert.assertEquals(adaptiveIngestionThrottler.getCurrentThrottlerIndex(), 2);
Expand All @@ -17,9 +18,18 @@ public void testAdaptiveIngestionThrottler() {
Assert.assertEquals(adaptiveIngestionThrottler.getCurrentThrottlerIndex(), 0);
adaptiveIngestionThrottler.checkSignalAndAdjustThrottler();
Assert.assertEquals(adaptiveIngestionThrottler.getCurrentThrottlerIndex(), 0);
adaptiveIngestionThrottler = new VeniceAdaptiveIngestionThrottler(100, 10, "test");
adaptiveIngestionThrottler = new VeniceAdaptiveIngestionThrottler(10, 100, 10, "test");
adaptiveIngestionThrottler.registerBoosterSignal(() -> true);
adaptiveIngestionThrottler.checkSignalAndAdjustThrottler();
Assert.assertEquals(adaptiveIngestionThrottler.getCurrentThrottlerIndex(), 4);

adaptiveIngestionThrottler = new VeniceAdaptiveIngestionThrottler(3, 100, 10, "test");

adaptiveIngestionThrottler.checkSignalAndAdjustThrottler();
Assert.assertEquals(adaptiveIngestionThrottler.getCurrentThrottlerIndex(), 3);
adaptiveIngestionThrottler.checkSignalAndAdjustThrottler();
Assert.assertEquals(adaptiveIngestionThrottler.getCurrentThrottlerIndex(), 3);
adaptiveIngestionThrottler.checkSignalAndAdjustThrottler();
Assert.assertEquals(adaptiveIngestionThrottler.getCurrentThrottlerIndex(), 4);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,10 @@ private ConfigKeys() {
public static final String SERVER_RESET_ERROR_REPLICA_ENABLED = "server.reset.error.replica.enabled";

public static final String SERVER_ADAPTIVE_THROTTLER_ENABLED = "server.adaptive.throttler.enabled";
public static final String SERVER_ADAPTIVE_THROTTLER_SIGNAL_IDLE_THRESHOLD =
"server.adaptive.throttler.signal.idle.threshold";
public static final String SERVER_ADAPTIVE_THROTTLER_SINGLE_GET_LATENCY_THRESHOLD =
"server.adaptive.throttler.single.get.latency.threshold";

/**
* A list of fully-qualified class names of all stats classes that needs to be initialized in isolated ingestion process,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ private List<AbstractVeniceService> createServices() {
this.adaptiveThrottlerSignalService = null;
if (serverConfig.isAdaptiveThrottlerEnabled()) {
adaptiveThrottlerSignalService =
new AdaptiveThrottlerSignalService(metricsRepository, heartbeatMonitoringService);
new AdaptiveThrottlerSignalService(serverConfig, metricsRepository, heartbeatMonitoringService);
services.add(adaptiveThrottlerSignalService);
}
// create and add KafkaSimpleConsumerService
Expand Down

0 comments on commit c567247

Please sign in to comment.