diff --git a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java index bd020fb7..e8002cce 100644 --- a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java +++ b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java @@ -24,11 +24,11 @@ import org.apache.flink.connector.elasticsearch.test.DockerImageVersions; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.OperatorIOMetricGroup; import org.apache.flink.metrics.groups.SinkWriterMetricGroup; import org.apache.flink.metrics.testutils.MetricListener; import org.apache.flink.runtime.metrics.MetricNames; -import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.test.junit5.MiniClusterExtension; @@ -69,6 +69,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.function.Consumer; import static org.apache.flink.connector.elasticsearch.sink.TestClientBase.DOCUMENT_TYPE; import static org.apache.flink.connector.elasticsearch.sink.TestClientBase.buildMessage; @@ -193,15 +194,16 @@ void testIncrementByteOutMetric() throws Exception { final String index = "test-inc-byte-out"; final OperatorIOMetricGroup operatorIOMetricGroup = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup(); - final InternalSinkWriterMetricGroup metricGroup = - InternalSinkWriterMetricGroup.mock( - metricListener.getMetricGroup(), operatorIOMetricGroup); final int flushAfterNActions = 2; final BulkProcessorConfig bulkProcessorConfig = new BulkProcessorConfig(flushAfterNActions, -1, -1, FlushBackoffType.NONE, 0, 0); try (final ElasticsearchWriter> writer = - createWriter(index, false, bulkProcessorConfig, metricGroup)) { + createWriter( + index, + false, + bulkProcessorConfig, + getSinkWriterMetricGroup(operatorIOMetricGroup))) { final Counter numBytesOut = operatorIOMetricGroup.getNumBytesOutCounter(); assertThat(numBytesOut.getCount()).isZero(); writer.write(Tuple2.of(1, buildMessage(1)), null); @@ -267,10 +269,7 @@ void testCurrentSendTime() throws Exception { private ElasticsearchWriter> createWriter( String index, boolean flushOnCheckpoint, BulkProcessorConfig bulkProcessorConfig) { return createWriter( - index, - flushOnCheckpoint, - bulkProcessorConfig, - InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup())); + index, flushOnCheckpoint, bulkProcessorConfig, getSinkWriterMetricGroup()); } private ElasticsearchWriter> createWriter( @@ -289,6 +288,40 @@ private ElasticsearchWriter> createWriter( new TestMailbox()); } + private TestingSinkWriterMetricGroup getSinkWriterMetricGroup() { + final OperatorIOMetricGroup operatorIOMetricGroup = + UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup(); + return getSinkWriterMetricGroup(operatorIOMetricGroup); + } + + private TestingSinkWriterMetricGroup getSinkWriterMetricGroup( + OperatorIOMetricGroup operatorIOMetricGroup) { + MetricGroup parentMetricGroup = metricListener.getMetricGroup(); + Counter numRecordsOutErrors = parentMetricGroup.counter(MetricNames.NUM_RECORDS_OUT_ERRORS); + Counter numRecordsSendErrors = + parentMetricGroup.counter(MetricNames.NUM_RECORDS_SEND_ERRORS, numRecordsOutErrors); + Counter numRecordsWritten = + parentMetricGroup.counter( + MetricNames.NUM_RECORDS_SEND, + operatorIOMetricGroup.getNumRecordsOutCounter()); + Counter numBytesWritten = + parentMetricGroup.counter( + MetricNames.NUM_BYTES_SEND, operatorIOMetricGroup.getNumBytesOutCounter()); + Consumer> currentSendTimeGaugeConsumer = + currentSendTimeGauge -> + parentMetricGroup.gauge( + MetricNames.CURRENT_SEND_TIME, currentSendTimeGauge); + return new TestingSinkWriterMetricGroup.Builder() + .setParentMetricGroup(parentMetricGroup) + .setIoMetricGroupSupplier(() -> operatorIOMetricGroup) + .setNumRecordsOutErrorsCounterSupplier(() -> numRecordsOutErrors) + .setNumRecordsSendErrorsCounterSupplier(() -> numRecordsSendErrors) + .setNumRecordsSendCounterSupplier(() -> numRecordsWritten) + .setNumBytesSendCounterSupplier(() -> numBytesWritten) + .setCurrentSendTimeGaugeConsumer(currentSendTimeGaugeConsumer) + .build(); + } + private static class TestBulkProcessorBuilderFactory implements BulkProcessorBuilderFactory { @Override public BulkProcessor.Builder apply( diff --git a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/TestingSinkWriterMetricGroup.java b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/TestingSinkWriterMetricGroup.java new file mode 100644 index 00000000..b122d66e --- /dev/null +++ b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/TestingSinkWriterMetricGroup.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.sink; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.OperatorIOMetricGroup; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; +import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup; + +import java.util.function.Consumer; +import java.util.function.Supplier; + +/** Testing implementation for {@link SinkWriterMetricGroup}. */ +public class TestingSinkWriterMetricGroup extends ProxyMetricGroup + implements SinkWriterMetricGroup { + + private final Supplier numRecordsOutErrorsCounterSupplier; + + private final Supplier numRecordsSendErrorsCounterSupplier; + + private final Supplier numRecordsSendCounterSupplier; + + private final Supplier numBytesSendCounterSupplier; + + private final Consumer> currentSendTimeGaugeConsumer; + + private final Supplier ioMetricGroupSupplier; + + public TestingSinkWriterMetricGroup( + MetricGroup parentMetricGroup, + Supplier numRecordsOutErrorsCounterSupplier, + Supplier numRecordsSendErrorsCounterSupplier, + Supplier numRecordsSendCounterSupplier, + Supplier numBytesSendCounterSupplier, + Consumer> currentSendTimeGaugeConsumer, + Supplier ioMetricGroupSupplier) { + super(parentMetricGroup); + this.numRecordsOutErrorsCounterSupplier = numRecordsOutErrorsCounterSupplier; + this.numRecordsSendErrorsCounterSupplier = numRecordsSendErrorsCounterSupplier; + this.numRecordsSendCounterSupplier = numRecordsSendCounterSupplier; + this.numBytesSendCounterSupplier = numBytesSendCounterSupplier; + this.currentSendTimeGaugeConsumer = currentSendTimeGaugeConsumer; + this.ioMetricGroupSupplier = ioMetricGroupSupplier; + } + + @Override + public Counter getNumRecordsOutErrorsCounter() { + return numRecordsOutErrorsCounterSupplier.get(); + } + + @Override + public Counter getNumRecordsSendErrorsCounter() { + return numRecordsSendErrorsCounterSupplier.get(); + } + + @Override + public Counter getNumRecordsSendCounter() { + return numRecordsSendCounterSupplier.get(); + } + + @Override + public Counter getNumBytesSendCounter() { + return numBytesSendCounterSupplier.get(); + } + + @Override + public void setCurrentSendTimeGauge(Gauge gauge) { + currentSendTimeGaugeConsumer.accept(gauge); + } + + @Override + public OperatorIOMetricGroup getIOMetricGroup() { + return ioMetricGroupSupplier.get(); + } + + /** Builder for {@link TestingSinkWriterMetricGroup}. */ + public static class Builder { + + private MetricGroup parentMetricGroup = null; + + private Supplier numRecordsOutErrorsCounterSupplier = () -> null; + + private Supplier numRecordsSendErrorsCounterSupplier = () -> null; + + private Supplier numRecordsSendCounterSupplier = () -> null; + + private Supplier numBytesSendCounterSupplier = () -> null; + + private Consumer> currentSendTimeGaugeConsumer = counter -> {}; + + private Supplier ioMetricGroupSupplier = () -> null; + + public Builder setParentMetricGroup(MetricGroup parentMetricGroup) { + this.parentMetricGroup = parentMetricGroup; + return this; + } + + public Builder setNumRecordsOutErrorsCounterSupplier( + Supplier numRecordsOutErrorsCounterSupplier) { + this.numRecordsOutErrorsCounterSupplier = numRecordsOutErrorsCounterSupplier; + return this; + } + + public Builder setNumRecordsSendErrorsCounterSupplier( + Supplier numRecordsSendErrorsCounterSupplier) { + this.numRecordsSendErrorsCounterSupplier = numRecordsSendErrorsCounterSupplier; + return this; + } + + public Builder setNumRecordsSendCounterSupplier( + Supplier numRecordsSendCounterSupplier) { + this.numRecordsSendCounterSupplier = numRecordsSendCounterSupplier; + return this; + } + + public Builder setNumBytesSendCounterSupplier( + Supplier numBytesSendCounterSupplier) { + this.numBytesSendCounterSupplier = numBytesSendCounterSupplier; + return this; + } + + public Builder setCurrentSendTimeGaugeConsumer( + Consumer> currentSendTimeGaugeConsumer) { + this.currentSendTimeGaugeConsumer = currentSendTimeGaugeConsumer; + return this; + } + + public Builder setIoMetricGroupSupplier( + Supplier ioMetricGroupSupplier) { + this.ioMetricGroupSupplier = ioMetricGroupSupplier; + return this; + } + + public TestingSinkWriterMetricGroup build() { + return new TestingSinkWriterMetricGroup( + parentMetricGroup, + numRecordsOutErrorsCounterSupplier, + numRecordsSendErrorsCounterSupplier, + numRecordsSendCounterSupplier, + numBytesSendCounterSupplier, + currentSendTimeGaugeConsumer, + ioMetricGroupSupplier); + } + } +}