Skip to content

Commit

Permalink
[FLINK-33493][connectors/elasticsearch] Fix ElasticsearchWriterITCase…
Browse files Browse the repository at this point in the history
… test
  • Loading branch information
TanYuxin-tyx authored and reswqa committed Nov 10, 2023
1 parent 6636b62 commit 161b615
Show file tree
Hide file tree
Showing 2 changed files with 205 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import org.apache.flink.connector.elasticsearch.test.DockerImageVersions;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.metrics.testutils.MetricListener;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.junit5.MiniClusterExtension;
Expand Down Expand Up @@ -69,6 +69,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;

import static org.apache.flink.connector.elasticsearch.sink.TestClientBase.DOCUMENT_TYPE;
import static org.apache.flink.connector.elasticsearch.sink.TestClientBase.buildMessage;
Expand Down Expand Up @@ -193,15 +194,16 @@ void testIncrementByteOutMetric() throws Exception {
final String index = "test-inc-byte-out";
final OperatorIOMetricGroup operatorIOMetricGroup =
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
final InternalSinkWriterMetricGroup metricGroup =
InternalSinkWriterMetricGroup.mock(
metricListener.getMetricGroup(), operatorIOMetricGroup);
final int flushAfterNActions = 2;
final BulkProcessorConfig bulkProcessorConfig =
new BulkProcessorConfig(flushAfterNActions, -1, -1, FlushBackoffType.NONE, 0, 0);

try (final ElasticsearchWriter<Tuple2<Integer, String>> writer =
createWriter(index, false, bulkProcessorConfig, metricGroup)) {
createWriter(
index,
false,
bulkProcessorConfig,
getSinkWriterMetricGroup(operatorIOMetricGroup))) {
final Counter numBytesOut = operatorIOMetricGroup.getNumBytesOutCounter();
assertThat(numBytesOut.getCount()).isZero();
writer.write(Tuple2.of(1, buildMessage(1)), null);
Expand Down Expand Up @@ -267,10 +269,7 @@ void testCurrentSendTime() throws Exception {
private ElasticsearchWriter<Tuple2<Integer, String>> createWriter(
String index, boolean flushOnCheckpoint, BulkProcessorConfig bulkProcessorConfig) {
return createWriter(
index,
flushOnCheckpoint,
bulkProcessorConfig,
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()));
index, flushOnCheckpoint, bulkProcessorConfig, getSinkWriterMetricGroup());
}

private ElasticsearchWriter<Tuple2<Integer, String>> createWriter(
Expand All @@ -289,6 +288,40 @@ private ElasticsearchWriter<Tuple2<Integer, String>> createWriter(
new TestMailbox());
}

private TestingSinkWriterMetricGroup getSinkWriterMetricGroup() {
final OperatorIOMetricGroup operatorIOMetricGroup =
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
return getSinkWriterMetricGroup(operatorIOMetricGroup);
}

private TestingSinkWriterMetricGroup getSinkWriterMetricGroup(
OperatorIOMetricGroup operatorIOMetricGroup) {
MetricGroup parentMetricGroup = metricListener.getMetricGroup();
Counter numRecordsOutErrors = parentMetricGroup.counter(MetricNames.NUM_RECORDS_OUT_ERRORS);
Counter numRecordsSendErrors =
parentMetricGroup.counter(MetricNames.NUM_RECORDS_SEND_ERRORS, numRecordsOutErrors);
Counter numRecordsWritten =
parentMetricGroup.counter(
MetricNames.NUM_RECORDS_SEND,
operatorIOMetricGroup.getNumRecordsOutCounter());
Counter numBytesWritten =
parentMetricGroup.counter(
MetricNames.NUM_BYTES_SEND, operatorIOMetricGroup.getNumBytesOutCounter());
Consumer<Gauge<Long>> currentSendTimeGaugeConsumer =
currentSendTimeGauge ->
parentMetricGroup.gauge(
MetricNames.CURRENT_SEND_TIME, currentSendTimeGauge);
return new TestingSinkWriterMetricGroup.Builder()
.setParentMetricGroup(parentMetricGroup)
.setIoMetricGroupSupplier(() -> operatorIOMetricGroup)
.setNumRecordsOutErrorsCounterSupplier(() -> numRecordsOutErrors)
.setNumRecordsSendErrorsCounterSupplier(() -> numRecordsSendErrors)
.setNumRecordsSendCounterSupplier(() -> numRecordsWritten)
.setNumBytesSendCounterSupplier(() -> numBytesWritten)
.setCurrentSendTimeGaugeConsumer(currentSendTimeGaugeConsumer)
.build();
}

private static class TestBulkProcessorBuilderFactory implements BulkProcessorBuilderFactory {
@Override
public BulkProcessor.Builder apply(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.elasticsearch.sink;

import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;

import java.util.function.Consumer;
import java.util.function.Supplier;

/** Testing implementation for {@link SinkWriterMetricGroup}. */
public class TestingSinkWriterMetricGroup extends ProxyMetricGroup<MetricGroup>
implements SinkWriterMetricGroup {

private final Supplier<Counter> numRecordsOutErrorsCounterSupplier;

private final Supplier<Counter> numRecordsSendErrorsCounterSupplier;

private final Supplier<Counter> numRecordsSendCounterSupplier;

private final Supplier<Counter> numBytesSendCounterSupplier;

private final Consumer<Gauge<Long>> currentSendTimeGaugeConsumer;

private final Supplier<OperatorIOMetricGroup> ioMetricGroupSupplier;

public TestingSinkWriterMetricGroup(
MetricGroup parentMetricGroup,
Supplier<Counter> numRecordsOutErrorsCounterSupplier,
Supplier<Counter> numRecordsSendErrorsCounterSupplier,
Supplier<Counter> numRecordsSendCounterSupplier,
Supplier<Counter> numBytesSendCounterSupplier,
Consumer<Gauge<Long>> currentSendTimeGaugeConsumer,
Supplier<OperatorIOMetricGroup> ioMetricGroupSupplier) {
super(parentMetricGroup);
this.numRecordsOutErrorsCounterSupplier = numRecordsOutErrorsCounterSupplier;
this.numRecordsSendErrorsCounterSupplier = numRecordsSendErrorsCounterSupplier;
this.numRecordsSendCounterSupplier = numRecordsSendCounterSupplier;
this.numBytesSendCounterSupplier = numBytesSendCounterSupplier;
this.currentSendTimeGaugeConsumer = currentSendTimeGaugeConsumer;
this.ioMetricGroupSupplier = ioMetricGroupSupplier;
}

@Override
public Counter getNumRecordsOutErrorsCounter() {
return numRecordsOutErrorsCounterSupplier.get();
}

@Override
public Counter getNumRecordsSendErrorsCounter() {
return numRecordsSendErrorsCounterSupplier.get();
}

@Override
public Counter getNumRecordsSendCounter() {
return numRecordsSendCounterSupplier.get();
}

@Override
public Counter getNumBytesSendCounter() {
return numBytesSendCounterSupplier.get();
}

@Override
public void setCurrentSendTimeGauge(Gauge<Long> gauge) {
currentSendTimeGaugeConsumer.accept(gauge);
}

@Override
public OperatorIOMetricGroup getIOMetricGroup() {
return ioMetricGroupSupplier.get();
}

/** Builder for {@link TestingSinkWriterMetricGroup}. */
public static class Builder {

private MetricGroup parentMetricGroup = null;

private Supplier<Counter> numRecordsOutErrorsCounterSupplier = () -> null;

private Supplier<Counter> numRecordsSendErrorsCounterSupplier = () -> null;

private Supplier<Counter> numRecordsSendCounterSupplier = () -> null;

private Supplier<Counter> numBytesSendCounterSupplier = () -> null;

private Consumer<Gauge<Long>> currentSendTimeGaugeConsumer = counter -> {};

private Supplier<OperatorIOMetricGroup> ioMetricGroupSupplier = () -> null;

public Builder setParentMetricGroup(MetricGroup parentMetricGroup) {
this.parentMetricGroup = parentMetricGroup;
return this;
}

public Builder setNumRecordsOutErrorsCounterSupplier(
Supplier<Counter> numRecordsOutErrorsCounterSupplier) {
this.numRecordsOutErrorsCounterSupplier = numRecordsOutErrorsCounterSupplier;
return this;
}

public Builder setNumRecordsSendErrorsCounterSupplier(
Supplier<Counter> numRecordsSendErrorsCounterSupplier) {
this.numRecordsSendErrorsCounterSupplier = numRecordsSendErrorsCounterSupplier;
return this;
}

public Builder setNumRecordsSendCounterSupplier(
Supplier<Counter> numRecordsSendCounterSupplier) {
this.numRecordsSendCounterSupplier = numRecordsSendCounterSupplier;
return this;
}

public Builder setNumBytesSendCounterSupplier(
Supplier<Counter> numBytesSendCounterSupplier) {
this.numBytesSendCounterSupplier = numBytesSendCounterSupplier;
return this;
}

public Builder setCurrentSendTimeGaugeConsumer(
Consumer<Gauge<Long>> currentSendTimeGaugeConsumer) {
this.currentSendTimeGaugeConsumer = currentSendTimeGaugeConsumer;
return this;
}

public Builder setIoMetricGroupSupplier(
Supplier<OperatorIOMetricGroup> ioMetricGroupSupplier) {
this.ioMetricGroupSupplier = ioMetricGroupSupplier;
return this;
}

public TestingSinkWriterMetricGroup build() {
return new TestingSinkWriterMetricGroup(
parentMetricGroup,
numRecordsOutErrorsCounterSupplier,
numRecordsSendErrorsCounterSupplier,
numRecordsSendCounterSupplier,
numBytesSendCounterSupplier,
currentSendTimeGaugeConsumer,
ioMetricGroupSupplier);
}
}
}

0 comments on commit 161b615

Please sign in to comment.