Skip to content

Commit

Permalink
[FLINK-34113][elasticsearch] Make flink-connector-elasticsearch to be…
Browse files Browse the repository at this point in the history
… compatible with updated SinkV2 interfaces

This closes #88.
  • Loading branch information
Jiabao-Sun authored and leonardBang committed Jan 31, 2024
1 parent 153b8fc commit 7bda67e
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 97 deletions.
8 changes: 8 additions & 0 deletions flink-connector-elasticsearch-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,14 @@ under the License.
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<!-- Elasticsearch table descriptor testing -->
<dependency>
<groupId>org.apache.flink</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,12 @@

package org.apache.flink.connector.elasticsearch.sink;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.Sink.InitContext;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
import org.apache.flink.connector.elasticsearch.sink.BulkResponseInspector.BulkResponseInspectorFactory;
import org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.DefaultBulkResponseInspector;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.SimpleUserCodeClassLoader;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.UserCodeClassLoader;
import org.apache.flink.util.function.ThrowingRunnable;

import org.apache.http.HttpHost;
import org.junit.jupiter.api.DynamicTest;
Expand All @@ -45,7 +31,6 @@
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.ExtendWith;

import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;

Expand Down Expand Up @@ -125,7 +110,7 @@ void testOverrideFailureHandler() {
final ElasticsearchSink<Object> sink =
createMinimalBuilder().setFailureHandler(failureHandler).build();

final InitContext sinkInitContext = new MockInitContext();
final TestSinkInitContext sinkInitContext = new TestSinkInitContext();
final BulkResponseInspector bulkResponseInspector =
sink.getBulkResponseInspectorFactory().apply(sinkInitContext::metricGroup);
assertThat(bulkResponseInspector)
Expand All @@ -150,7 +135,7 @@ void testOverrideBulkResponseInspectorFactory() {
.setBulkResponseInspectorFactory(bulkResponseInspectorFactory)
.build();

final InitContext sinkInitContext = new MockInitContext();
final TestSinkInitContext sinkInitContext = new TestSinkInitContext();

assertThatCode(() -> sink.createWriter(sinkInitContext)).doesNotThrowAnyException();
assertThat(called).isTrue();
Expand All @@ -159,83 +144,4 @@ void testOverrideBulkResponseInspectorFactory() {
abstract B createEmptyBuilder();

abstract B createMinimalBuilder();

private static class DummyMailboxExecutor implements MailboxExecutor {
private DummyMailboxExecutor() {}

public void execute(
ThrowingRunnable<? extends Exception> command,
String descriptionFormat,
Object... descriptionArgs) {}

public void yield() throws InterruptedException, FlinkRuntimeException {}

public boolean tryYield() throws FlinkRuntimeException {
return false;
}
}

private static class MockInitContext
implements Sink.InitContext, SerializationSchema.InitializationContext {

public UserCodeClassLoader getUserCodeClassLoader() {
return SimpleUserCodeClassLoader.create(
ElasticsearchSinkBuilderBaseTest.class.getClassLoader());
}

public MailboxExecutor getMailboxExecutor() {
return new ElasticsearchSinkBuilderBaseTest.DummyMailboxExecutor();
}

public ProcessingTimeService getProcessingTimeService() {
return new TestProcessingTimeService();
}

public int getSubtaskId() {
return 0;
}

public int getNumberOfParallelSubtasks() {
return 0;
}

public int getAttemptNumber() {
return 0;
}

public SinkWriterMetricGroup metricGroup() {
return InternalSinkWriterMetricGroup.wrap(
new TestingSinkWriterMetricGroup.Builder()
.setIoMetricGroupSupplier(
UnregisteredMetricsGroup::createOperatorIOMetricGroup)
.setParentMetricGroup(
UnregisteredMetricsGroup.createOperatorMetricGroup())
.build());
}

public MetricGroup getMetricGroup() {
return this.metricGroup();
}

public OptionalLong getRestoredCheckpointId() {
return OptionalLong.empty();
}

public SerializationSchema.InitializationContext
asSerializationSchemaInitializationContext() {
return this;
}

public boolean isObjectReuseEnabled() {
return false;
}

public <IN> TypeSerializer<IN> createInputSerializer() {
throw new UnsupportedOperationException();
}

public JobID getJobId() {
throw new UnsupportedOperationException();
}
}
}
8 changes: 8 additions & 0 deletions flink-connector-elasticsearch6/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,14 @@ under the License.
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch-base</artifactId>
Expand Down
8 changes: 8 additions & 0 deletions flink-connector-elasticsearch7/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,14 @@ under the License.
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch-base</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ This project bundles the following dependencies under the Apache Software Licens

- com.carrotsearch:hppc:0.8.1
- com.fasterxml.jackson.core:jackson-core:2.13.4
- com.fasterxml.jackson.core:jackson-databind:2.13.4.2
- com.fasterxml.jackson.core:jackson-annotations:2.13.4
- com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.13.4
- com.fasterxml.jackson.dataformat:jackson-dataformat-smile:2.13.4
- com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.13.4
Expand Down

0 comments on commit 7bda67e

Please sign in to comment.