Skip to content

Commit

Permalink
Add kafka producer metric for total record count (#1852)
Browse files Browse the repository at this point in the history
  • Loading branch information
moscicky authored Apr 30, 2024
1 parent eacfaf5 commit 6dd3829
Show file tree
Hide file tree
Showing 9 changed files with 448 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ public double getBufferAvailableBytes() {
+ meterRegistry.get(ACK_LEADER_BUFFER_AVAILABLE_BYTES).gauge().value();
}

public <T> void registerAckLeaderRecordSendCounter(T stateObj, ToDoubleFunction<T> f, String sender, String datacenter) {
registerCounter(ACK_LEADER_RECORD_SEND_TOTAL, tags(sender, datacenter), stateObj, f);
}

public <T> void registerAckAllRecordSendCounter(T stateObj, ToDoubleFunction<T> f, String sender, String datacenter) {
registerCounter(ACK_ALL_RECORD_SEND_TOTAL, tags(sender, datacenter), stateObj, f);
}

public <T> void registerProducerInflightRequestGauge(T stateObj, ToDoubleFunction<T> f) {
meterRegistry.gauge(INFLIGHT_REQUESTS, stateObj, f);
hermesMetrics.registerProducerInflightRequest(() -> (int) f.applyAsDouble(stateObj));
Expand All @@ -98,6 +106,10 @@ private <T> void registerTimeGauge(T stateObj,
meterRegistry.more().timeGauge(prometheusName, tags, stateObj, timeUnit, f);
}

private <T> void registerCounter(String name, Tags tags, T stateObj, ToDoubleFunction<T> f) {
meterRegistry.more().counter(name, tags, stateObj, f);
}

private static final String KAFKA_PRODUCER = "kafka-producer.";
private static final String ACK_LEADER = "ack-leader.";
private static final String ACK_ALL = "ack-all.";
Expand All @@ -108,11 +120,14 @@ private <T> void registerTimeGauge(T stateObj,
private static final String ACK_ALL_RECORD_QUEUE_TIME_MAX = KAFKA_PRODUCER + ACK_ALL + "record-queue-time-max";
private static final String ACK_ALL_COMPRESSION_RATE = KAFKA_PRODUCER + ACK_ALL + "compression-rate-avg";
private static final String ACK_ALL_FAILED_BATCHES_TOTAL = KAFKA_PRODUCER + ACK_ALL + "failed-batches-total";
private static final String ACK_ALL_RECORD_SEND_TOTAL = KAFKA_PRODUCER + ACK_ALL + "record-send";

private static final String ACK_LEADER_FAILED_BATCHES_TOTAL = KAFKA_PRODUCER + ACK_LEADER + "failed-batches-total";
private static final String ACK_LEADER_BUFFER_TOTAL_BYTES = KAFKA_PRODUCER + ACK_LEADER + "buffer-total-bytes";
private static final String ACK_LEADER_METADATA_AGE = KAFKA_PRODUCER + ACK_LEADER + "metadata-age";
private static final String ACK_LEADER_RECORD_QUEUE_TIME_MAX = KAFKA_PRODUCER + ACK_LEADER + "record-queue-time-max";
private static final String ACK_LEADER_BUFFER_AVAILABLE_BYTES = KAFKA_PRODUCER + ACK_LEADER + "buffer-available-bytes";
private static final String ACK_LEADER_COMPRESSION_RATE = KAFKA_PRODUCER + ACK_LEADER + "compression-rate-avg";
private static final String ACK_LEADER_RECORD_SEND_TOTAL = KAFKA_PRODUCER + ACK_LEADER + "record-send";

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,159 +6,18 @@
import java.time.Duration;

@ConfigurationProperties(prefix = "frontend.kafka.fail-fast-producer")
public class FailFastKafkaProducerProperties implements KafkaProducerParameters {
public class FailFastKafkaProducerProperties {

private KafkaProducerParameters local = new FailFastLocalKafkaProducerProperties();

private KafkaProducerParameters remote = new FailFastRemoteKafkaProducerProperties();

private Duration speculativeSendDelay = Duration.ofMillis(250);

private FallbackSchedulerProperties fallbackScheduler = new FallbackSchedulerProperties();

private ChaosSchedulerProperties chaosScheduler = new ChaosSchedulerProperties();

private Duration maxBlock = Duration.ofMillis(500);

private Duration metadataMaxAge = Duration.ofMinutes(5);

private String compressionCodec = "none";

private int retries = Integer.MAX_VALUE;

private Duration retryBackoff = Duration.ofMillis(50);

private Duration requestTimeout = Duration.ofMillis(500);

private Duration deliveryTimeout = Duration.ofMillis(500);

private int batchSize = 16 * 1024;

private int tcpSendBuffer = 128 * 1024;

private int maxRequestSize = 1024 * 1024;

private Duration linger = Duration.ofMillis(0);

private Duration metricsSampleWindow = Duration.ofSeconds(30);

private int maxInflightRequestsPerConnection = 5;

private boolean reportNodeMetricsEnabled = false;

@Override
public Duration getMaxBlock() {
return maxBlock;
}

public void setMaxBlock(Duration maxBlock) {
this.maxBlock = maxBlock;
}

@Override
public Duration getMetadataMaxAge() {
return metadataMaxAge;
}

public void setMetadataMaxAge(Duration metadataMaxAge) {
this.metadataMaxAge = metadataMaxAge;
}

@Override
public String getCompressionCodec() {
return compressionCodec;
}

public void setCompressionCodec(String compressionCodec) {
this.compressionCodec = compressionCodec;
}

@Override
public int getRetries() {
return retries;
}

public void setRetries(int retries) {
this.retries = retries;
}

@Override
public Duration getRetryBackoff() {
return retryBackoff;
}

public void setRetryBackoff(Duration retryBackoff) {
this.retryBackoff = retryBackoff;
}

@Override
public Duration getRequestTimeout() {
return requestTimeout;
}

public void setRequestTimeout(Duration requestTimeout) {
this.requestTimeout = requestTimeout;
}

@Override
public int getBatchSize() {
return batchSize;
}

public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}

@Override
public int getTcpSendBuffer() {
return tcpSendBuffer;
}

public void setTcpSendBuffer(int tcpSendBuffer) {
this.tcpSendBuffer = tcpSendBuffer;
}

@Override
public int getMaxRequestSize() {
return maxRequestSize;
}

public void setMaxRequestSize(int maxRequestSize) {
this.maxRequestSize = maxRequestSize;
}

@Override
public Duration getLinger() {
return linger;
}

public void setLinger(Duration linger) {
this.linger = linger;
}

@Override
public Duration getMetricsSampleWindow() {
return metricsSampleWindow;
}

public void setMetricsSampleWindow(Duration metricsSampleWindow) {
this.metricsSampleWindow = metricsSampleWindow;
}

@Override
public int getMaxInflightRequestsPerConnection() {
return maxInflightRequestsPerConnection;
}

public void setMaxInflightRequestsPerConnection(int maxInflightRequestsPerConnection) {
this.maxInflightRequestsPerConnection = maxInflightRequestsPerConnection;
}

@Override
public boolean isReportNodeMetricsEnabled() {
return reportNodeMetricsEnabled;
}

public void setReportNodeMetricsEnabled(boolean reportNodeMetricsEnabled) {
this.reportNodeMetricsEnabled = reportNodeMetricsEnabled;
}

public Duration getSpeculativeSendDelay() {
return speculativeSendDelay;
}
Expand All @@ -167,15 +26,6 @@ public void setSpeculativeSendDelay(Duration speculativeSendDelay) {
this.speculativeSendDelay = speculativeSendDelay;
}

@Override
public Duration getDeliveryTimeout() {
return deliveryTimeout;
}

public void setDeliveryTimeout(Duration deliveryTimeout) {
this.deliveryTimeout = deliveryTimeout;
}

public FallbackSchedulerProperties getFallbackScheduler() {
return fallbackScheduler;
}
Expand All @@ -184,6 +34,22 @@ public void setFallbackScheduler(FallbackSchedulerProperties fallbackScheduler)
this.fallbackScheduler = fallbackScheduler;
}

public KafkaProducerParameters getLocal() {
return local;
}

public void setLocal(KafkaProducerParameters local) {
this.local = local;
}

public KafkaProducerParameters getRemote() {
return remote;
}

public void setRemote(KafkaProducerParameters remote) {
this.remote = remote;
}

public ChaosSchedulerProperties getChaosScheduler() {
return chaosScheduler;
}
Expand Down
Loading

0 comments on commit 6dd3829

Please sign in to comment.