Skip to content

Commit

Permalink
DBZ-6998: Fix broken tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dongwook-chan authored and nancyxu123 committed Oct 21, 2024
1 parent 464b825 commit a690077
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*/
package io.debezium.connector.spanner.metrics.jmx;

import java.time.Duration;

import javax.management.MXBean;

import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetricsMXBean;
Expand Down Expand Up @@ -67,7 +69,7 @@ public interface SpannerMetricsMXBean extends StreamingChangeEventSourceMetricsM
* The delay which Spanner connector waits for
* the next Change Stream Event
*/
Long getDelayChangeStreamEventsLastMilliSeconds();
Duration getDelayChangeStreamEventsLastMilliSeconds();

Double getDelayChangeStreamEventsP50MilliSeconds();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*/
package io.debezium.connector.spanner.metrics.jmx;

import java.time.Duration;

import org.apache.kafka.connect.data.Struct;

import com.google.cloud.Timestamp;
Expand Down Expand Up @@ -96,7 +98,7 @@ public int getStuckHeartbeatIntervals() {
}

@Override
public Long getDelayChangeStreamEventsLastMilliSeconds() {
public Duration getDelayChangeStreamEventsLastMilliSeconds() {
return spannerMeter.getDelayChangeStreamEvents().getLastValue();
}

Expand Down Expand Up @@ -134,17 +136,17 @@ public String getTaskUid() {

@Override
public Long getLatencyLowWatermarkLagMinMilliSeconds() {
return spannerMeter.getLowWatermarkLagLatency().getMinValue();
return spannerMeter.getLowWatermarkLagLatency().getMinValue().toMillis();
}

@Override
public Long getLatencyLowWatermarkLagMaxMilliSeconds() {
return spannerMeter.getLowWatermarkLagLatency().getMaxValue();
return spannerMeter.getLowWatermarkLagLatency().getMaxValue().toMillis();
}

@Override
public Double getLatencyLowWatermarkLagAvgMilliSeconds() {
return spannerMeter.getLowWatermarkLagLatency().getAverageValue();
return Double.valueOf(spannerMeter.getLowWatermarkLagLatency().getAverageValue().toMillis());
}

@Override
Expand All @@ -165,17 +167,17 @@ public Double getLatencyLowWatermarkLagP99MilliSeconds() {
// Total latency
@Override
public Long getLatencyTotalMinMilliSeconds() {
return spannerMeter.getTotalLatency().getMinValue();
return spannerMeter.getTotalLatency().getMinValue().toMillis();
}

@Override
public Long getLatencyTotalMaxMilliSeconds() {
return spannerMeter.getTotalLatency().getMaxValue();
return spannerMeter.getTotalLatency().getMaxValue().toMillis();
}

@Override
public Double getLatencyTotalAvgMilliSeconds() {
return spannerMeter.getTotalLatency().getAverageValue();
return Double.valueOf(spannerMeter.getTotalLatency().getAverageValue().toMillis());
}

@Override
Expand All @@ -196,17 +198,17 @@ public Double getLatencyTotalP99MilliSeconds() {
// Spanner latency
@Override
public Long getLatencySpannerMinMilliSeconds() {
return spannerMeter.getSpannerLatency().getMinValue();
return spannerMeter.getSpannerLatency().getMinValue().toMillis();
}

@Override
public Long getLatencySpannerMaxMilliSeconds() {
return spannerMeter.getSpannerLatency().getMaxValue();
return spannerMeter.getSpannerLatency().getMaxValue().toMillis();
}

@Override
public Double getLatencySpannerAvgMilliSeconds() {
return spannerMeter.getSpannerLatency().getAverageValue();
return Double.valueOf(spannerMeter.getSpannerLatency().getAverageValue().toMillis());
}

@Override
Expand All @@ -227,17 +229,17 @@ public Double getLatencySpannerP99MilliSeconds() {
// ReadToEmit latency
@Override
public Long getLatencyReadToEmitMinMilliSeconds() {
return spannerMeter.getConnectorLatency().getMinValue();
return spannerMeter.getConnectorLatency().getMinValue().toMillis();
}

@Override
public Long getLatencyReadToEmitMaxMilliSeconds() {
return spannerMeter.getConnectorLatency().getMaxValue();
return spannerMeter.getConnectorLatency().getMaxValue().toMillis();
}

@Override
public Double getLatencyReadToEmitAvgMilliSeconds() {
return spannerMeter.getConnectorLatency().getAverageValue();
return Double.valueOf(spannerMeter.getConnectorLatency().getAverageValue().toMillis());
}

@Override
Expand All @@ -258,17 +260,17 @@ public Double getLatencyReadToEmitP99MilliSeconds() {
// CommitToEmit latency
@Override
public Long getLatencyCommitToEmitMinMilliSeconds() {
return spannerMeter.getCommitToEmitLatency().getMinValue();
return spannerMeter.getCommitToEmitLatency().getMinValue().toMillis();
}

@Override
public Long getLatencyCommitToEmitMaxMilliSeconds() {
return spannerMeter.getCommitToEmitLatency().getMaxValue();
return spannerMeter.getCommitToEmitLatency().getMaxValue().toMillis();
}

@Override
public Double getLatencyCommitToEmitAvgMilliSeconds() {
return spannerMeter.getCommitToEmitLatency().getAverageValue();
return Double.valueOf(spannerMeter.getCommitToEmitLatency().getAverageValue().toMillis());
}

@Override
Expand All @@ -289,17 +291,17 @@ public Double getLatencyCommitToEmitP99MilliSeconds() {
// CommitToPublish Latency
@Override
public Long getLatencyCommitToPublishMinMilliSeconds() {
return spannerMeter.getCommitToPublishLatency().getMinValue();
return spannerMeter.getCommitToPublishLatency().getMinValue().toMillis();
}

@Override
public Long getLatencyCommitToPublishMaxMilliSeconds() {
return spannerMeter.getCommitToPublishLatency().getMaxValue();
return spannerMeter.getCommitToPublishLatency().getMaxValue().toMillis();
}

@Override
public Double getLatencyCommitToPublishAvgMilliSeconds() {
return spannerMeter.getCommitToPublishLatency().getAverageValue();
return Double.valueOf(spannerMeter.getCommitToPublishLatency().getAverageValue().toMillis());
}

@Override
Expand All @@ -320,17 +322,17 @@ public Double getLatencyCommitToPublishP99MilliSeconds() {
// EmitToPublish Latency
@Override
public Long getLatencyEmitToPublishMinMilliSeconds() {
return spannerMeter.getEmitToPublishLatency().getMinValue();
return spannerMeter.getEmitToPublishLatency().getMinValue().toMillis();
}

@Override
public Long getLatencyEmitToPublishMaxMilliSeconds() {
return spannerMeter.getEmitToPublishLatency().getMaxValue();
return spannerMeter.getEmitToPublishLatency().getMaxValue().toMillis();
}

@Override
public Double getLatencyEmitToPublishAvgMilliSeconds() {
return spannerMeter.getEmitToPublishLatency().getAverageValue();
return Double.valueOf(spannerMeter.getEmitToPublishLatency().getAverageValue().toMillis());
}

@Override
Expand All @@ -351,22 +353,22 @@ public Double getLatencyEmitToPublishP99MilliSeconds() {
// debug OwnConnector Latency
@Override
public Long getDebugLatencyOwnConnectorMinMilliSeconds() {
return spannerMeter.getOwnConnectorLatency().getMinValue();
return spannerMeter.getOwnConnectorLatency().getMinValue().toMillis();
}

@Override
public Long getDebugLatencyOwnConnectorMaxMilliSeconds() {
return spannerMeter.getOwnConnectorLatency().getMaxValue();
return spannerMeter.getOwnConnectorLatency().getMaxValue().toMillis();
}

@Override
public Double getDebugLatencyOwnConnectorAvgMilliSeconds() {
return spannerMeter.getOwnConnectorLatency().getAverageValue();
return Double.valueOf(spannerMeter.getOwnConnectorLatency().getAverageValue().toMillis());
}

@Override
public Long getDebugLatencyOwnConnectorLastMilliSeconds() {
return spannerMeter.getOwnConnectorLatency().getLastValue();
return spannerMeter.getOwnConnectorLatency().getLastValue().toMillis();
}

@Override
Expand All @@ -388,17 +390,17 @@ public Double getDebugLatencyOwnConnectorP99MilliSeconds() {

@Override
public Long getPartitionOffsetLagMinMilliSeconds() {
return spannerMeter.getPartitionOffsetLagStatistics().getMinValue();
return spannerMeter.getPartitionOffsetLagStatistics().getMinValue().toMillis();
}

@Override
public Long getPartitionOffsetLagMaxMilliSeconds() {
return spannerMeter.getPartitionOffsetLagStatistics().getMaxValue();
return spannerMeter.getPartitionOffsetLagStatistics().getMaxValue().toMillis();
}

@Override
public Double getPartitionOffsetLagAvgMilliSeconds() {
return spannerMeter.getPartitionOffsetLagStatistics().getAverageValue();
return Double.valueOf(spannerMeter.getPartitionOffsetLagStatistics().getAverageValue().toMillis());
}

@Override
Expand All @@ -418,23 +420,23 @@ public Double getPartitionOffsetLagP99MilliSeconds() {

@Override
public Long getPartitionOffsetLagLastMilliSeconds() {
return spannerMeter.getPartitionOffsetLagStatistics().getLastValue();
return spannerMeter.getPartitionOffsetLagStatistics().getLastValue().toMillis();
}

// offset receiving time statistics
@Override
public Long getOffsetReceivingTimeMinMilliSeconds() {
return spannerMeter.getOffsetReceivingTimeStatistics().getMinValue();
return spannerMeter.getOffsetReceivingTimeStatistics().getMinValue().toMillis();
}

@Override
public Long getOffsetReceivingTimeMaxMilliSeconds() {
return spannerMeter.getOffsetReceivingTimeStatistics().getMaxValue();
return spannerMeter.getOffsetReceivingTimeStatistics().getMaxValue().toMillis();
}

@Override
public Double getOffsetReceivingTimeAvgMilliSeconds() {
return spannerMeter.getOffsetReceivingTimeStatistics().getAverageValue();
return Double.valueOf(spannerMeter.getOffsetReceivingTimeStatistics().getAverageValue().toMillis());
}

@Override
Expand All @@ -454,7 +456,7 @@ public Double getOffsetReceivingTimeP99MilliSeconds() {

@Override
public Long getOffsetReceivingTimeLastMilliSeconds() {
return spannerMeter.getOffsetReceivingTimeStatistics().getLastValue();
return spannerMeter.getOffsetReceivingTimeStatistics().getLastValue().toMillis();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
package io.debezium.connector.spanner.metrics.latency;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;


public abstract class Metric {

private final AtomicReference<Duration> minimum = new AtomicReference<>(Duration.ZERO);
Expand Down Expand Up @@ -66,8 +65,7 @@ void set(Duration lastDuration) {
average.get()
.multipliedBy(count.get() - 1)
.plus(lastDuration)
.dividedBy(count.get())
);
.dividedBy(count.get()));

last.set(lastDuration);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import java.time.Duration;
import java.util.function.Consumer;


/**
* This class provides functionality to calculate statistics:
* min, max, avg values, percentiles.
Expand Down

0 comments on commit a690077

Please sign in to comment.