From a4591dd755684c010d8e9b22718586e9a76fede9 Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Tue, 29 Oct 2024 00:50:30 -0400 Subject: [PATCH 1/8] DBZ-8362 Use quay.io images --- .../io/debezium/connector/spanner/util/docker-compose.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/io/debezium/connector/spanner/util/docker-compose.yml b/src/test/java/io/debezium/connector/spanner/util/docker-compose.yml index 5984f96..ae8287b 100644 --- a/src/test/java/io/debezium/connector/spanner/util/docker-compose.yml +++ b/src/test/java/io/debezium/connector/spanner/util/docker-compose.yml @@ -1,7 +1,7 @@ version: "2" services: zookeeper: - image: debezium/zookeeper:latest + image: quay.io/debezium/zookeeper:latest hostname: zookeeper ports: - "2181:2181" @@ -10,7 +10,7 @@ services: ZOOKEEPER_TICK_TIME: 2000 broker: - image: debezium/kafka:latest + image: quay.io/debezium/kafka:latest hostname: broker depends_on: - zookeeper From 1a4aed005c139bc1042c2082ef93635c3b0d1f35 Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Wed, 30 Oct 2024 17:59:40 +0100 Subject: [PATCH 2/8] DBZ-8362 Enable logging --- pom.xml | 5 +++++ src/test/resources/logback-test.xml | 30 +++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+) create mode 100644 src/test/resources/logback-test.xml diff --git a/pom.xml b/pom.xml index 7e9d38f..c85c3d4 100644 --- a/pom.xml +++ b/pom.xml @@ -366,6 +366,11 @@ debezium-embedded test + + ch.qos.logback + logback-classic + test + io.debezium debezium-core diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml new file mode 100644 index 0000000..a9ccf97 --- /dev/null +++ b/src/test/resources/logback-test.xml @@ -0,0 +1,30 @@ + + + + + %d{ISO8601} %-5p %X{dbz.connectorType}|%X{dbz.connectorName}|%X{dbz.connectorContext} %m [%c]%n + + + + + + + + + + + + + + + + + + + From be4e4ecce21ed4124441186ce9e8fd174f1f6ac7 Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Wed, 30 Oct 2024 18:01:29 +0100 Subject: [PATCH 3/8] DBZ-6998 Revert "DBZ-6998: Update code for style copliance" This reverts commit e425b99bcb1d84822e760364453b7a38836e5d4b. --- .../debezium/connector/spanner/metrics/latency/Metric.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/debezium/connector/spanner/metrics/latency/Metric.java b/src/main/java/io/debezium/connector/spanner/metrics/latency/Metric.java index ea8b8fe..f8f9a2d 100644 --- a/src/main/java/io/debezium/connector/spanner/metrics/latency/Metric.java +++ b/src/main/java/io/debezium/connector/spanner/metrics/latency/Metric.java @@ -62,7 +62,10 @@ void set(Duration lastDuration) { total.accumulateAndGet(lastDuration, Duration::plus); count.incrementAndGet(); - average.set(total.get().dividedBy(count.get())); + average.set( + total.get() + .dividedBy(count.get()) + ); last.set(lastDuration); From 79807119fdfd962f13ec72439dc35f6b30437948 Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Wed, 30 Oct 2024 18:01:44 +0100 Subject: [PATCH 4/8] DBZ-6998 Revert "DBZ-6998: Fix StatisticsTest" This reverts commit 1d03ec2252875bb12b6ae777505cfe308cb11272. --- .../spanner/metrics/latency/Metric.java | 16 ++++++++-------- .../spanner/metrics/latency/StatisticsTest.java | 13 ++++--------- 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/src/main/java/io/debezium/connector/spanner/metrics/latency/Metric.java b/src/main/java/io/debezium/connector/spanner/metrics/latency/Metric.java index f8f9a2d..19d9560 100644 --- a/src/main/java/io/debezium/connector/spanner/metrics/latency/Metric.java +++ b/src/main/java/io/debezium/connector/spanner/metrics/latency/Metric.java @@ -12,7 +12,7 @@ public abstract class Metric { - private final AtomicReference minimum = new AtomicReference<>(Duration.ofSeconds(Long.MAX_VALUE)); + private final AtomicReference minimum = new AtomicReference<>(Duration.ZERO); private final AtomicReference maximum = new AtomicReference<>(Duration.ZERO); private final AtomicReference last = new AtomicReference<>(Duration.ZERO); private final AtomicReference total = new AtomicReference<>(Duration.ZERO); @@ -28,7 +28,7 @@ public Metric(Duration percentageMetricsClearInterval, Consumer error * Resets the duration metric */ public void reset() { - minimum.set(Duration.ofSeconds(Long.MAX_VALUE)); + minimum.set(Duration.ZERO); maximum.set(Duration.ZERO); last.set(Duration.ZERO); total.set(Duration.ZERO); @@ -60,20 +60,20 @@ void set(Duration lastDuration) { } total.accumulateAndGet(lastDuration, Duration::plus); - count.incrementAndGet(); average.set( - total.get() - .dividedBy(count.get()) - ); + average.get() + .multipliedBy(count.get() - 1) + .plus(lastDuration) + .dividedBy(count.get())); last.set(lastDuration); - quantileMeter.addValue((double) lastDuration.toSeconds()); + quantileMeter.addValue((double) lastDuration.toMillis()); } public synchronized void update(long value) { - set(Duration.ofSeconds(value)); + set(Duration.ofMillis(value)); } public Duration getMinValue() { diff --git a/src/test/java/io/debezium/connector/spanner/metrics/latency/StatisticsTest.java b/src/test/java/io/debezium/connector/spanner/metrics/latency/StatisticsTest.java index 51ad961..deea460 100644 --- a/src/test/java/io/debezium/connector/spanner/metrics/latency/StatisticsTest.java +++ b/src/test/java/io/debezium/connector/spanner/metrics/latency/StatisticsTest.java @@ -11,14 +11,9 @@ import org.junit.jupiter.api.Test; class StatisticsTest { - Double durationToFloat(Duration duration) { - return duration.getSeconds() + (double) duration.getNano() / 1_000_000_000; - } @Test void updateAndReset() { - double epsilon = 0.000000001; - Statistics statistics = new Statistics(Duration.ofSeconds(10), null); statistics.update(148); statistics.update(197); @@ -28,13 +23,13 @@ void updateAndReset() { statistics.update(10); statistics.update(298); - Assertions.assertEquals(170.71428571428572, durationToFloat(statistics.getAverageValue()), epsilon); + Assertions.assertEquals(170.71428571428572, statistics.getAverageValue()); - Assertions.assertEquals(298, durationToFloat(statistics.getLastValue())); + Assertions.assertEquals(298, statistics.getLastValue()); - Assertions.assertEquals(397, durationToFloat(statistics.getMaxValue())); + Assertions.assertEquals(397, statistics.getMaxValue()); - Assertions.assertEquals(10, durationToFloat(statistics.getMinValue())); + Assertions.assertEquals(10, statistics.getMinValue()); statistics.reset(); From e52a051b0e5d701b86eea9198e372713ead5f1d5 Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Wed, 30 Oct 2024 18:01:49 +0100 Subject: [PATCH 5/8] DBZ-6998 Revert "DBZ-6998: Fix broken tests" This reverts commit a69007715bb067a7f47549cdb13148c3c8e515ac. --- .../metrics/jmx/SpannerMetricsMXBean.java | 4 +- ...nnerStreamingChangeEventSourceMetrics.java | 70 +++++++++---------- .../spanner/metrics/latency/Metric.java | 6 +- .../spanner/metrics/latency/Statistics.java | 1 + 4 files changed, 40 insertions(+), 41 deletions(-) diff --git a/src/main/java/io/debezium/connector/spanner/metrics/jmx/SpannerMetricsMXBean.java b/src/main/java/io/debezium/connector/spanner/metrics/jmx/SpannerMetricsMXBean.java index 0c9e4a6..6a5c957 100644 --- a/src/main/java/io/debezium/connector/spanner/metrics/jmx/SpannerMetricsMXBean.java +++ b/src/main/java/io/debezium/connector/spanner/metrics/jmx/SpannerMetricsMXBean.java @@ -5,8 +5,6 @@ */ package io.debezium.connector.spanner.metrics.jmx; -import java.time.Duration; - import javax.management.MXBean; import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetricsMXBean; @@ -69,7 +67,7 @@ public interface SpannerMetricsMXBean extends StreamingChangeEventSourceMetricsM * The delay which Spanner connector waits for * the next Change Stream Event */ - Duration getDelayChangeStreamEventsLastMilliSeconds(); + Long getDelayChangeStreamEventsLastMilliSeconds(); Double getDelayChangeStreamEventsP50MilliSeconds(); diff --git a/src/main/java/io/debezium/connector/spanner/metrics/jmx/SpannerStreamingChangeEventSourceMetrics.java b/src/main/java/io/debezium/connector/spanner/metrics/jmx/SpannerStreamingChangeEventSourceMetrics.java index 51998e7..4b65b23 100644 --- a/src/main/java/io/debezium/connector/spanner/metrics/jmx/SpannerStreamingChangeEventSourceMetrics.java +++ b/src/main/java/io/debezium/connector/spanner/metrics/jmx/SpannerStreamingChangeEventSourceMetrics.java @@ -5,8 +5,6 @@ */ package io.debezium.connector.spanner.metrics.jmx; -import java.time.Duration; - import org.apache.kafka.connect.data.Struct; import com.google.cloud.Timestamp; @@ -98,7 +96,7 @@ public int getStuckHeartbeatIntervals() { } @Override - public Duration getDelayChangeStreamEventsLastMilliSeconds() { + public Long getDelayChangeStreamEventsLastMilliSeconds() { return spannerMeter.getDelayChangeStreamEvents().getLastValue(); } @@ -136,17 +134,17 @@ public String getTaskUid() { @Override public Long getLatencyLowWatermarkLagMinMilliSeconds() { - return spannerMeter.getLowWatermarkLagLatency().getMinValue().toMillis(); + return spannerMeter.getLowWatermarkLagLatency().getMinValue(); } @Override public Long getLatencyLowWatermarkLagMaxMilliSeconds() { - return spannerMeter.getLowWatermarkLagLatency().getMaxValue().toMillis(); + return spannerMeter.getLowWatermarkLagLatency().getMaxValue(); } @Override public Double getLatencyLowWatermarkLagAvgMilliSeconds() { - return Double.valueOf(spannerMeter.getLowWatermarkLagLatency().getAverageValue().toMillis()); + return spannerMeter.getLowWatermarkLagLatency().getAverageValue(); } @Override @@ -167,17 +165,17 @@ public Double getLatencyLowWatermarkLagP99MilliSeconds() { // Total latency @Override public Long getLatencyTotalMinMilliSeconds() { - return spannerMeter.getTotalLatency().getMinValue().toMillis(); + return spannerMeter.getTotalLatency().getMinValue(); } @Override public Long getLatencyTotalMaxMilliSeconds() { - return spannerMeter.getTotalLatency().getMaxValue().toMillis(); + return spannerMeter.getTotalLatency().getMaxValue(); } @Override public Double getLatencyTotalAvgMilliSeconds() { - return Double.valueOf(spannerMeter.getTotalLatency().getAverageValue().toMillis()); + return spannerMeter.getTotalLatency().getAverageValue(); } @Override @@ -198,17 +196,17 @@ public Double getLatencyTotalP99MilliSeconds() { // Spanner latency @Override public Long getLatencySpannerMinMilliSeconds() { - return spannerMeter.getSpannerLatency().getMinValue().toMillis(); + return spannerMeter.getSpannerLatency().getMinValue(); } @Override public Long getLatencySpannerMaxMilliSeconds() { - return spannerMeter.getSpannerLatency().getMaxValue().toMillis(); + return spannerMeter.getSpannerLatency().getMaxValue(); } @Override public Double getLatencySpannerAvgMilliSeconds() { - return Double.valueOf(spannerMeter.getSpannerLatency().getAverageValue().toMillis()); + return spannerMeter.getSpannerLatency().getAverageValue(); } @Override @@ -229,17 +227,17 @@ public Double getLatencySpannerP99MilliSeconds() { // ReadToEmit latency @Override public Long getLatencyReadToEmitMinMilliSeconds() { - return spannerMeter.getConnectorLatency().getMinValue().toMillis(); + return spannerMeter.getConnectorLatency().getMinValue(); } @Override public Long getLatencyReadToEmitMaxMilliSeconds() { - return spannerMeter.getConnectorLatency().getMaxValue().toMillis(); + return spannerMeter.getConnectorLatency().getMaxValue(); } @Override public Double getLatencyReadToEmitAvgMilliSeconds() { - return Double.valueOf(spannerMeter.getConnectorLatency().getAverageValue().toMillis()); + return spannerMeter.getConnectorLatency().getAverageValue(); } @Override @@ -260,17 +258,17 @@ public Double getLatencyReadToEmitP99MilliSeconds() { // CommitToEmit latency @Override public Long getLatencyCommitToEmitMinMilliSeconds() { - return spannerMeter.getCommitToEmitLatency().getMinValue().toMillis(); + return spannerMeter.getCommitToEmitLatency().getMinValue(); } @Override public Long getLatencyCommitToEmitMaxMilliSeconds() { - return spannerMeter.getCommitToEmitLatency().getMaxValue().toMillis(); + return spannerMeter.getCommitToEmitLatency().getMaxValue(); } @Override public Double getLatencyCommitToEmitAvgMilliSeconds() { - return Double.valueOf(spannerMeter.getCommitToEmitLatency().getAverageValue().toMillis()); + return spannerMeter.getCommitToEmitLatency().getAverageValue(); } @Override @@ -291,17 +289,17 @@ public Double getLatencyCommitToEmitP99MilliSeconds() { // CommitToPublish Latency @Override public Long getLatencyCommitToPublishMinMilliSeconds() { - return spannerMeter.getCommitToPublishLatency().getMinValue().toMillis(); + return spannerMeter.getCommitToPublishLatency().getMinValue(); } @Override public Long getLatencyCommitToPublishMaxMilliSeconds() { - return spannerMeter.getCommitToPublishLatency().getMaxValue().toMillis(); + return spannerMeter.getCommitToPublishLatency().getMaxValue(); } @Override public Double getLatencyCommitToPublishAvgMilliSeconds() { - return Double.valueOf(spannerMeter.getCommitToPublishLatency().getAverageValue().toMillis()); + return spannerMeter.getCommitToPublishLatency().getAverageValue(); } @Override @@ -322,17 +320,17 @@ public Double getLatencyCommitToPublishP99MilliSeconds() { // EmitToPublish Latency @Override public Long getLatencyEmitToPublishMinMilliSeconds() { - return spannerMeter.getEmitToPublishLatency().getMinValue().toMillis(); + return spannerMeter.getEmitToPublishLatency().getMinValue(); } @Override public Long getLatencyEmitToPublishMaxMilliSeconds() { - return spannerMeter.getEmitToPublishLatency().getMaxValue().toMillis(); + return spannerMeter.getEmitToPublishLatency().getMaxValue(); } @Override public Double getLatencyEmitToPublishAvgMilliSeconds() { - return Double.valueOf(spannerMeter.getEmitToPublishLatency().getAverageValue().toMillis()); + return spannerMeter.getEmitToPublishLatency().getAverageValue(); } @Override @@ -353,22 +351,22 @@ public Double getLatencyEmitToPublishP99MilliSeconds() { // debug OwnConnector Latency @Override public Long getDebugLatencyOwnConnectorMinMilliSeconds() { - return spannerMeter.getOwnConnectorLatency().getMinValue().toMillis(); + return spannerMeter.getOwnConnectorLatency().getMinValue(); } @Override public Long getDebugLatencyOwnConnectorMaxMilliSeconds() { - return spannerMeter.getOwnConnectorLatency().getMaxValue().toMillis(); + return spannerMeter.getOwnConnectorLatency().getMaxValue(); } @Override public Double getDebugLatencyOwnConnectorAvgMilliSeconds() { - return Double.valueOf(spannerMeter.getOwnConnectorLatency().getAverageValue().toMillis()); + return spannerMeter.getOwnConnectorLatency().getAverageValue(); } @Override public Long getDebugLatencyOwnConnectorLastMilliSeconds() { - return spannerMeter.getOwnConnectorLatency().getLastValue().toMillis(); + return spannerMeter.getOwnConnectorLatency().getLastValue(); } @Override @@ -390,17 +388,17 @@ public Double getDebugLatencyOwnConnectorP99MilliSeconds() { @Override public Long getPartitionOffsetLagMinMilliSeconds() { - return spannerMeter.getPartitionOffsetLagStatistics().getMinValue().toMillis(); + return spannerMeter.getPartitionOffsetLagStatistics().getMinValue(); } @Override public Long getPartitionOffsetLagMaxMilliSeconds() { - return spannerMeter.getPartitionOffsetLagStatistics().getMaxValue().toMillis(); + return spannerMeter.getPartitionOffsetLagStatistics().getMaxValue(); } @Override public Double getPartitionOffsetLagAvgMilliSeconds() { - return Double.valueOf(spannerMeter.getPartitionOffsetLagStatistics().getAverageValue().toMillis()); + return spannerMeter.getPartitionOffsetLagStatistics().getAverageValue(); } @Override @@ -420,23 +418,23 @@ public Double getPartitionOffsetLagP99MilliSeconds() { @Override public Long getPartitionOffsetLagLastMilliSeconds() { - return spannerMeter.getPartitionOffsetLagStatistics().getLastValue().toMillis(); + return spannerMeter.getPartitionOffsetLagStatistics().getLastValue(); } // offset receiving time statistics @Override public Long getOffsetReceivingTimeMinMilliSeconds() { - return spannerMeter.getOffsetReceivingTimeStatistics().getMinValue().toMillis(); + return spannerMeter.getOffsetReceivingTimeStatistics().getMinValue(); } @Override public Long getOffsetReceivingTimeMaxMilliSeconds() { - return spannerMeter.getOffsetReceivingTimeStatistics().getMaxValue().toMillis(); + return spannerMeter.getOffsetReceivingTimeStatistics().getMaxValue(); } @Override public Double getOffsetReceivingTimeAvgMilliSeconds() { - return Double.valueOf(spannerMeter.getOffsetReceivingTimeStatistics().getAverageValue().toMillis()); + return spannerMeter.getOffsetReceivingTimeStatistics().getAverageValue(); } @Override @@ -456,7 +454,7 @@ public Double getOffsetReceivingTimeP99MilliSeconds() { @Override public Long getOffsetReceivingTimeLastMilliSeconds() { - return spannerMeter.getOffsetReceivingTimeStatistics().getLastValue().toMillis(); + return spannerMeter.getOffsetReceivingTimeStatistics().getLastValue(); } @Override diff --git a/src/main/java/io/debezium/connector/spanner/metrics/latency/Metric.java b/src/main/java/io/debezium/connector/spanner/metrics/latency/Metric.java index 19d9560..8535dde 100644 --- a/src/main/java/io/debezium/connector/spanner/metrics/latency/Metric.java +++ b/src/main/java/io/debezium/connector/spanner/metrics/latency/Metric.java @@ -6,10 +6,11 @@ package io.debezium.connector.spanner.metrics.latency; import java.time.Duration; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; + public abstract class Metric { private final AtomicReference minimum = new AtomicReference<>(Duration.ZERO); @@ -65,7 +66,8 @@ void set(Duration lastDuration) { average.get() .multipliedBy(count.get() - 1) .plus(lastDuration) - .dividedBy(count.get())); + .dividedBy(count.get()) + ); last.set(lastDuration); diff --git a/src/main/java/io/debezium/connector/spanner/metrics/latency/Statistics.java b/src/main/java/io/debezium/connector/spanner/metrics/latency/Statistics.java index 4030ae5..ede5b43 100644 --- a/src/main/java/io/debezium/connector/spanner/metrics/latency/Statistics.java +++ b/src/main/java/io/debezium/connector/spanner/metrics/latency/Statistics.java @@ -8,6 +8,7 @@ import java.time.Duration; import java.util.function.Consumer; + /** * This class provides functionality to calculate statistics: * min, max, avg values, percentiles. From e842e80904b38da5ce2b25f21f02c3c49047b497 Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Wed, 30 Oct 2024 18:01:54 +0100 Subject: [PATCH 6/8] DBZ-6998 Revert "DBZ-6998: Replace references to Statistics with Metric" This reverts commit 464b82506c00c29ac3c2da2c9cc7017fb0b8da8a. --- .../spanner/metrics/latency/Metric.java | 24 +++--- .../spanner/metrics/latency/Statistics.java | 81 ++++++++++++++++++- 2 files changed, 91 insertions(+), 14 deletions(-) diff --git a/src/main/java/io/debezium/connector/spanner/metrics/latency/Metric.java b/src/main/java/io/debezium/connector/spanner/metrics/latency/Metric.java index 8535dde..5b03188 100644 --- a/src/main/java/io/debezium/connector/spanner/metrics/latency/Metric.java +++ b/src/main/java/io/debezium/connector/spanner/metrics/latency/Metric.java @@ -21,14 +21,14 @@ public abstract class Metric { private final QuantileMeter quantileMeter; private final AtomicLong count = new AtomicLong(); - public Metric(Duration percentageMetricsClearInterval, Consumer errorConsumer) { + protected Metric(Duration percentageMetricsClearInterval, Consumer errorConsumer) { this.quantileMeter = new QuantileMeter(percentageMetricsClearInterval, errorConsumer); } /** * Resets the duration metric */ - public void reset() { + void reset() { minimum.set(Duration.ZERO); maximum.set(Duration.ZERO); last.set(Duration.ZERO); @@ -38,11 +38,11 @@ public void reset() { count.set(0); } - public void start() { + protected void start() { quantileMeter.start(); } - public void shutdown() { + protected void shutdown() { quantileMeter.shutdown(); } @@ -74,35 +74,35 @@ void set(Duration lastDuration) { quantileMeter.addValue((double) lastDuration.toMillis()); } - public synchronized void update(long value) { + protected synchronized void update(long value) { set(Duration.ofMillis(value)); } - public Duration getMinValue() { + protected Duration getMinValue() { return count.get() == 0 ? null : minimum.get(); } - public Duration getMaxValue() { + protected Duration getMaxValue() { return count.get() == 0 ? null : maximum.get(); } - public Duration getAverageValue() { + protected Duration getAverageValue() { return count.get() == 0 ? null : average.get(); } - public Duration getLastValue() { + protected Duration getLastValue() { return count.get() == 0 ? null : last.get(); } - public Double getValueAtP50() { + protected Double getValueAtP50() { return quantileMeter.getValueAtQuantile(0.5); } - public Double getValueAtP95() { + protected Double getValueAtP95() { return quantileMeter.getValueAtQuantile(0.95); } - public Double getValueAtP99() { + protected Double getValueAtP99() { return quantileMeter.getValueAtQuantile(0.99); } } diff --git a/src/main/java/io/debezium/connector/spanner/metrics/latency/Statistics.java b/src/main/java/io/debezium/connector/spanner/metrics/latency/Statistics.java index ede5b43..849d5e0 100644 --- a/src/main/java/io/debezium/connector/spanner/metrics/latency/Statistics.java +++ b/src/main/java/io/debezium/connector/spanner/metrics/latency/Statistics.java @@ -6,15 +6,92 @@ package io.debezium.connector.spanner.metrics.latency; import java.time.Duration; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import com.google.common.util.concurrent.AtomicDouble; /** * This class provides functionality to calculate statistics: * min, max, avg values, percentiles. */ -public class Statistics extends Metric { +public class Statistics { + private final AtomicLong minValue = new AtomicLong(Long.MAX_VALUE); + private final AtomicLong maxValue = new AtomicLong(0); + private final AtomicDouble averageValue = new AtomicDouble(0); + private final AtomicLong lastValue = new AtomicLong(Long.MAX_VALUE); + private final AtomicLong count = new AtomicLong(0); + private final QuantileMeter quantileMeter; + public Statistics(Duration percentageMetricsClearInterval, Consumer errorConsumer) { - super(percentageMetricsClearInterval, errorConsumer); + this.quantileMeter = new QuantileMeter(percentageMetricsClearInterval, errorConsumer); + } + + public synchronized void reset() { + minValue.set(Long.MAX_VALUE); + maxValue.set(0); + averageValue.set(0); + lastValue.set(Long.MAX_VALUE); + count.set(0); + quantileMeter.reset(); + } + + public void start() { + quantileMeter.start(); + } + + public void shutdown() { + quantileMeter.shutdown(); + } + + void set(Duration lastDuration) { + long value = lastDuration.toMillis(); + if (minValue.get() > value) { + minValue.set(value); + } + + if (maxValue.get() < value) { + maxValue.set(value); + } + + count.incrementAndGet(); + + averageValue.set((averageValue.get() * (count.get() - 1) + value) / count.get()); + + lastValue.set(value); + + quantileMeter.addValue((double) value); + } + + public synchronized void update(long value) { + set(Duration.ofMillis(value)); + } + + public Long getMinValue() { + return count.get() == 0 ? null : minValue.get(); + } + + public Long getMaxValue() { + return count.get() == 0 ? null : maxValue.get(); + } + + public Double getAverageValue() { + return count.get() == 0 ? null : averageValue.get(); + } + + public Long getLastValue() { + return count.get() == 0 ? null : lastValue.get(); + } + + public Double getValueAtP50() { + return quantileMeter.getValueAtQuantile(0.5); + } + + public Double getValueAtP95() { + return quantileMeter.getValueAtQuantile(0.95); + } + + public Double getValueAtP99() { + return quantileMeter.getValueAtQuantile(0.99); } } From 067d9f72ed46fe55e0c65dbde5404a5acb7d51c5 Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Wed, 30 Oct 2024 18:02:00 +0100 Subject: [PATCH 7/8] DBZ-6998 Revert "DBZ-6998: Create Metric class for abstraction" This reverts commit f9eeea8bbe22510ecfd04085202870daf2343670. --- .../spanner/metrics/latency/Metric.java | 108 ------------------ 1 file changed, 108 deletions(-) delete mode 100644 src/main/java/io/debezium/connector/spanner/metrics/latency/Metric.java diff --git a/src/main/java/io/debezium/connector/spanner/metrics/latency/Metric.java b/src/main/java/io/debezium/connector/spanner/metrics/latency/Metric.java deleted file mode 100644 index 5b03188..0000000 --- a/src/main/java/io/debezium/connector/spanner/metrics/latency/Metric.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.connector.spanner.metrics.latency; - -import java.time.Duration; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; - - -public abstract class Metric { - - private final AtomicReference minimum = new AtomicReference<>(Duration.ZERO); - private final AtomicReference maximum = new AtomicReference<>(Duration.ZERO); - private final AtomicReference last = new AtomicReference<>(Duration.ZERO); - private final AtomicReference total = new AtomicReference<>(Duration.ZERO); - private final AtomicReference average = new AtomicReference<>(Duration.ZERO); - private final QuantileMeter quantileMeter; - private final AtomicLong count = new AtomicLong(); - - protected Metric(Duration percentageMetricsClearInterval, Consumer errorConsumer) { - this.quantileMeter = new QuantileMeter(percentageMetricsClearInterval, errorConsumer); - } - - /** - * Resets the duration metric - */ - void reset() { - minimum.set(Duration.ZERO); - maximum.set(Duration.ZERO); - last.set(Duration.ZERO); - total.set(Duration.ZERO); - average.set(Duration.ZERO); - quantileMeter.reset(); - count.set(0); - } - - protected void start() { - quantileMeter.start(); - } - - protected void shutdown() { - quantileMeter.shutdown(); - } - - /** - * Sets the last duration-based value for the histogram. - * - * @param lastDuration last duration - */ - void set(Duration lastDuration) { - if (lastDuration.compareTo(minimum.get()) < 0) { - minimum.set(lastDuration); - } - - if (lastDuration.compareTo(maximum.get()) > 0) { - maximum.set(lastDuration); - } - - total.accumulateAndGet(lastDuration, Duration::plus); - - average.set( - average.get() - .multipliedBy(count.get() - 1) - .plus(lastDuration) - .dividedBy(count.get()) - ); - - last.set(lastDuration); - - quantileMeter.addValue((double) lastDuration.toMillis()); - } - - protected synchronized void update(long value) { - set(Duration.ofMillis(value)); - } - - protected Duration getMinValue() { - return count.get() == 0 ? null : minimum.get(); - } - - protected Duration getMaxValue() { - return count.get() == 0 ? null : maximum.get(); - } - - protected Duration getAverageValue() { - return count.get() == 0 ? null : average.get(); - } - - protected Duration getLastValue() { - return count.get() == 0 ? null : last.get(); - } - - protected Double getValueAtP50() { - return quantileMeter.getValueAtQuantile(0.5); - } - - protected Double getValueAtP95() { - return quantileMeter.getValueAtQuantile(0.95); - } - - protected Double getValueAtP99() { - return quantileMeter.getValueAtQuantile(0.99); - } -} From 46d0f61e559910f0185e8c2b95824c3e9d1ffe21 Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Wed, 30 Oct 2024 18:02:06 +0100 Subject: [PATCH 8/8] DBZ-6998 Revert "DBZ-6998: Align min, max of Histogram/Statistics" This reverts commit 83ab22c0f94739910f5adb03bd07a19f45d923be. --- .../connector/spanner/metrics/latency/Statistics.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/main/java/io/debezium/connector/spanner/metrics/latency/Statistics.java b/src/main/java/io/debezium/connector/spanner/metrics/latency/Statistics.java index 849d5e0..1a078ce 100644 --- a/src/main/java/io/debezium/connector/spanner/metrics/latency/Statistics.java +++ b/src/main/java/io/debezium/connector/spanner/metrics/latency/Statistics.java @@ -44,8 +44,8 @@ public void shutdown() { quantileMeter.shutdown(); } - void set(Duration lastDuration) { - long value = lastDuration.toMillis(); + public synchronized void update(long value) { + if (minValue.get() > value) { minValue.set(value); } @@ -63,10 +63,6 @@ void set(Duration lastDuration) { quantileMeter.addValue((double) value); } - public synchronized void update(long value) { - set(Duration.ofMillis(value)); - } - public Long getMinValue() { return count.get() == 0 ? null : minValue.get(); }