diff --git a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkBuilder.java b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkBuilder.java index b3bd969b..31e81200 100644 --- a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkBuilder.java +++ b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkBuilder.java @@ -24,11 +24,21 @@ import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder; import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.util.function.SerializableSupplier; import co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant; +import co.elastic.clients.transport.TransportUtils; import org.apache.http.Header; import org.apache.http.HttpHost; +import org.apache.http.conn.ssl.TrustAllStrategy; +import org.apache.http.ssl.SSLContexts; +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.SSLContext; + +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.List; import java.util.Optional; @@ -58,9 +68,6 @@ public class Elasticsearch8AsyncSinkBuilder /** The headers to be sent with the requests made to Elasticsearch cluster. */ private List
headers; - /** The Certificate Fingerprint will be used to verify the HTTPS connection. */ - private String certificateFingerprint; - /** The username to authenticate the connection with the Elasticsearch cluster. */ private String username; @@ -73,6 +80,10 @@ public class Elasticsearch8AsyncSinkBuilder */ private ElementConverter elementConverter; + private SerializableSupplier sslContextSupplier; + + private SerializableSupplier sslHostnameVerifier; + /** * setHosts set the hosts where the Elasticsearch cluster is reachable. * @@ -100,8 +111,29 @@ public Elasticsearch8AsyncSinkBuilder setHeaders(Header... headers) { } /** - * setCertificateFingerprint set the certificate fingerprint to be used to verify the HTTPS - * connection. + * Allows to bypass the certificates chain validation and connect to insecure network endpoints + * (for example, servers which use self-signed certificates). + * + * @return this builder + */ + public Elasticsearch8AsyncSinkBuilder allowInsecure() { + this.sslContextSupplier = + () -> { + try { + return SSLContexts.custom() + .loadTrustMaterial(TrustAllStrategy.INSTANCE) + .build(); + } catch (final NoSuchAlgorithmException + | KeyStoreException + | KeyManagementException ex) { + throw new IllegalStateException("Unable to create custom SSL context", ex); + } + }; + return this; + } + + /** + * Set the certificate fingerprint to be used to verify the HTTPS connection. * * @param certificateFingerprint the certificate fingerprint * @return {@code Elasticsearch8AsyncSinkBuilder} @@ -109,7 +141,32 @@ public Elasticsearch8AsyncSinkBuilder setHeaders(Header... headers) { public Elasticsearch8AsyncSinkBuilder setCertificateFingerprint( String certificateFingerprint) { checkNotNull(certificateFingerprint, "certificateFingerprint must not be null"); - this.certificateFingerprint = certificateFingerprint; + this.sslContextSupplier = + () -> TransportUtils.sslContextFromCaFingerprint(certificateFingerprint); + return this; + } + + /** + * Sets the supplier for getting an {@link SSLContext} instance. + * + * @param sslContextSupplier the serializable SSLContext supplier function + * @return this builder + */ + public Elasticsearch8AsyncSinkBuilder setSslContextSupplier( + SerializableSupplier sslContextSupplier) { + this.sslContextSupplier = checkNotNull(sslContextSupplier); + return this; + } + + /** + * Sets the supplier for getting an SSL {@link HostnameVerifier} instance. + * + * @param sslHostnameVerifierSupplier the serializable hostname verifier supplier function + * @return this builder + */ + public Elasticsearch8AsyncSinkBuilder setSslHostnameVerifier( + SerializableSupplier sslHostnameVerifierSupplier) { + this.sslHostnameVerifier = sslHostnameVerifierSupplier; return this; } @@ -181,7 +238,8 @@ private OperationConverter buildOperationConverter( private NetworkConfig buildNetworkConfig() { checkArgument(!hosts.isEmpty(), "Hosts cannot be empty."); - return new NetworkConfig(hosts, username, password, headers, certificateFingerprint); + return new NetworkConfig( + hosts, username, password, headers, sslContextSupplier, sslHostnameVerifier); } /** A wrapper that evolves the Operation, since a BulkOperationVariant is not Serializable. */ diff --git a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkConfig.java b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkConfig.java index fc41fd53..9a1cd9c1 100644 --- a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkConfig.java +++ b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkConfig.java @@ -21,9 +21,10 @@ package org.apache.flink.connector.elasticsearch.sink; +import org.apache.flink.util.function.SerializableSupplier; + import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient; import co.elastic.clients.json.jackson.JacksonJsonpMapper; -import co.elastic.clients.transport.TransportUtils; import co.elastic.clients.transport.rest_client.RestClientTransport; import org.apache.http.Header; import org.apache.http.HttpHost; @@ -34,6 +35,10 @@ import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; +import javax.annotation.Nullable; +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.SSLContext; + import java.io.Serializable; import java.util.List; @@ -49,20 +54,24 @@ public class NetworkConfig implements Serializable { private final String password; - private final String certificateFingerprint; + @Nullable private final SerializableSupplier sslContextSupplier; + + @Nullable private final SerializableSupplier sslHostnameVerifier; public NetworkConfig( List hosts, String username, String password, List
headers, - String certificateFingerprint) { - checkState(hosts.size() > 0, "Hosts must not be null"); + SerializableSupplier sslContextSupplier, + SerializableSupplier sslHostnameVerifier) { + checkState(!hosts.isEmpty(), "Hosts must not be empty"); this.hosts = hosts; this.username = username; this.password = password; this.headers = headers; - this.certificateFingerprint = certificateFingerprint; + this.sslContextSupplier = sslContextSupplier; + this.sslHostnameVerifier = sslHostnameVerifier; } public ElasticsearchAsyncClient createEsClient() { @@ -80,10 +89,13 @@ private RestClient getRestClient() { getCredentials()); } - if (certificateFingerprint != null) { - httpClientBuilder.setSSLContext( - TransportUtils.sslContextFromCaFingerprint( - certificateFingerprint)); + if (sslContextSupplier != null) { + httpClientBuilder.setSSLContext(sslContextSupplier.get()); + } + + if (sslHostnameVerifier != null) { + httpClientBuilder.setSSLHostnameVerifier( + sslHostnameVerifier.get()); } return httpClientBuilder; diff --git a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkITCase.java b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkITCase.java index f9f84007..22defb14 100644 --- a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkITCase.java +++ b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkITCase.java @@ -90,7 +90,7 @@ public void testWriteToElasticsearch() throws Exception { env.execute(); } - assertIdsAreWritten(index, new String[] {"first_v1_index", "second_v1_index"}); + assertIdsAreWritten(client, index, new String[] {"first_v1_index", "second_v1_index"}); } @Test @@ -149,24 +149,4 @@ public void notifyCheckpointComplete(long l) throws Exception { } } } - - /** DummyData is a POJO to helping during integration tests. */ - public static class DummyData { - private final String id; - - private final String name; - - public DummyData(String id, String name) { - this.id = id; - this.name = name; - } - - public String getId() { - return id; - } - - public String getName() { - return name; - } - } } diff --git a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkSecureITCase.java b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkSecureITCase.java new file mode 100644 index 00000000..ba546166 --- /dev/null +++ b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkSecureITCase.java @@ -0,0 +1,149 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.flink.connector.elasticsearch.sink; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import co.elastic.clients.elasticsearch.core.bulk.IndexOperation; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.elasticsearch.client.RestClient; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; +import org.testcontainers.elasticsearch.ElasticsearchContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.io.IOException; + +import static org.apache.flink.connector.elasticsearch.sink.ElasticsearchSinkBaseITCase.DummyData; +import static org.apache.flink.connector.elasticsearch.sink.ElasticsearchSinkBaseITCase.ELASTICSEARCH_IMAGE; +import static org.apache.flink.connector.elasticsearch.sink.ElasticsearchSinkBaseITCase.assertIdsAreWritten; + +/** Integration tests for {@link Elasticsearch8AsyncSink} against a secure Elasticsearch cluster. */ +@Testcontainers +class Elasticsearch8AsyncSinkSecureITCase { + private static final Logger LOG = + LoggerFactory.getLogger(Elasticsearch8AsyncSinkSecureITCase.class); + private static final String ES_CLUSTER_USERNAME = "elastic"; + private static final String ES_CLUSTER_PASSWORD = "s3cret"; + + @Container + private static final ElasticsearchContainer ES_CONTAINER = createSecureElasticsearchContainer(); + + private RestClient client; + + @BeforeEach + void setUp() { + this.client = getRestClient(); + } + + @AfterEach + void shutdown() throws IOException { + if (client != null) { + client.close(); + } + } + + @Test + void testWriteToSecureElasticsearch8() throws Exception { + final String index = "test-write-to-secure-elasticsearch8"; + + try (StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1)) { + + env.setRestartStrategy(RestartStrategies.noRestart()); + + final Elasticsearch8AsyncSink sink = + Elasticsearch8AsyncSinkBuilder.builder() + .setMaxBatchSize(5) + .setHosts( + new HttpHost( + ES_CONTAINER.getHost(), + ES_CONTAINER.getFirstMappedPort(), + "https")) + .setElementConverter( + (element, ctx) -> + new IndexOperation.Builder<>() + .index(index) + .id(element.getId()) + .document(element) + .build()) + .setUsername(ES_CLUSTER_USERNAME) + .setPassword(ES_CLUSTER_PASSWORD) + .setSslContextSupplier(() -> ES_CONTAINER.createSslContextFromCa()) + .build(); + + env.fromElements("first", "second", "third", "fourth", "fifth") + .map( + (MapFunction) + value -> new DummyData(value + "_v1_index", value)) + .sinkTo(sink); + + env.execute(); + } + + assertIdsAreWritten(client, index, new String[] {"first_v1_index", "second_v1_index"}); + } + + static ElasticsearchContainer createSecureElasticsearchContainer() { + ElasticsearchContainer container = + new ElasticsearchContainer(ELASTICSEARCH_IMAGE) + .withPassword(ES_CLUSTER_PASSWORD) /* set password */ + .withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + // Set log message based wait strategy as the default wait strategy is not aware of TLS + container + .withEnv("logger.org.elasticsearch", "INFO") + .setWaitStrategy( + new LogMessageWaitStrategy().withRegEx(".*\"message\":\"started.*")); + + return container; + } + + private RestClient getRestClient() { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + AuthScope.ANY, + new UsernamePasswordCredentials(ES_CLUSTER_USERNAME, ES_CLUSTER_PASSWORD)); + return RestClient.builder( + new HttpHost( + ES_CONTAINER.getHost(), ES_CONTAINER.getFirstMappedPort(), "https")) + .setHttpClientConfigCallback( + httpClientBuilder -> + httpClientBuilder + .setDefaultCredentialsProvider(credentialsProvider) + .setSSLContext(ES_CONTAINER.createSslContextFromCa())) + .build(); + } +} diff --git a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriterITCase.java b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriterITCase.java index f30872e9..ba41ad88 100644 --- a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriterITCase.java +++ b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriterITCase.java @@ -22,7 +22,6 @@ package org.apache.flink.connector.elasticsearch.sink; import org.apache.flink.connector.base.sink.writer.TestSinkInitContext; -import org.apache.flink.connector.elasticsearch.sink.Elasticsearch8AsyncSinkITCase.DummyData; import org.apache.flink.metrics.Gauge; import co.elastic.clients.elasticsearch.core.bulk.IndexOperation; @@ -79,12 +78,12 @@ public void testBulkOnFlush() throws IOException, InterruptedException { writer.write(new DummyData("test-2", "test-2"), null); writer.flush(false); - assertIdsAreWritten(index, new String[] {"test-1", "test-2"}); + assertIdsAreWritten(client, index, new String[] {"test-1", "test-2"}); writer.write(new DummyData("3", "test-3"), null); writer.flush(true); - assertIdsAreWritten(index, new String[] {"test-3"}); + assertIdsAreWritten(client, index, new String[] {"test-3"}); } } @@ -99,18 +98,18 @@ public void testBulkOnBufferTimeFlush() throws Exception { writer.write(new DummyData("test-1", "test-1"), null); writer.flush(true); - assertIdsAreWritten(index, new String[] {"test-1"}); + assertIdsAreWritten(client, index, new String[] {"test-1"}); writer.write(new DummyData("test-2", "test-2"), null); writer.write(new DummyData("test-3", "test-3"), null); - assertIdsAreNotWritten(index, new String[] {"test-2", "test-3"}); + assertIdsAreNotWritten(client, index, new String[] {"test-2", "test-3"}); context.getTestProcessingTimeService().advance(6000L); await(); } - assertIdsAreWritten(index, new String[] {"test-2", "test-3"}); + assertIdsAreWritten(client, index, new String[] {"test-2", "test-3"}); } @Test @@ -131,7 +130,7 @@ public void testBytesSentMetric() throws Exception { } assertThat(context.getNumBytesOutCounter().getCount()).isGreaterThan(0); - assertIdsAreWritten(index, new String[] {"test-1", "test-2", "test-3"}); + assertIdsAreWritten(client, index, new String[] {"test-1", "test-2", "test-3"}); } @Test @@ -152,7 +151,7 @@ public void testRecordsSentMetric() throws Exception { } assertThat(context.getNumRecordsOutCounter().getCount()).isEqualTo(3); - assertIdsAreWritten(index, new String[] {"test-1", "test-2", "test-3"}); + assertIdsAreWritten(client, index, new String[] {"test-1", "test-2", "test-3"}); } @Test @@ -175,7 +174,7 @@ public void testSendTimeMetric() throws Exception { assertThat(currentSendTime.get().getValue()).isGreaterThan(0L); } - assertIdsAreWritten(index, new String[] {"test-1", "test-2", "test-3"}); + assertIdsAreWritten(client, index, new String[] {"test-1", "test-2", "test-3"}); } @Test @@ -207,8 +206,8 @@ public void testHandlePartiallyFailedBulk() throws Exception { await(); assertThat(context.metricGroup().getNumRecordsOutErrorsCounter().getCount()).isEqualTo(1); - assertIdsAreWritten(index, new String[] {"test-2"}); - assertIdsAreNotWritten(index, new String[] {"test-1"}); + assertIdsAreWritten(client, index, new String[] {"test-2"}); + assertIdsAreNotWritten(client, index, new String[] {"test-1"}); } private Elasticsearch8AsyncSinkBuilder.OperationConverter @@ -245,7 +244,7 @@ private Elasticsearch8AsyncWriter createWriter( 5 * 1024 * 1024, 5000, 1024 * 1024, - new NetworkConfig(esHost, null, null, null, null)) { + new NetworkConfig(esHost, null, null, null, null, null)) { @Override public StatefulSinkWriter createWriter(InitContext context) { return new Elasticsearch8AsyncWriter( diff --git a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBaseITCase.java b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBaseITCase.java index 70e76015..7977cfe9 100644 --- a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBaseITCase.java +++ b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBaseITCase.java @@ -79,7 +79,8 @@ public RestClient getRestClient() { .build(); } - public void assertIdsAreWritten(String index, String[] ids) throws IOException { + public static void assertIdsAreWritten(RestClient client, String index, String[] ids) + throws IOException { client.performRequest(new Request("GET", "_refresh")); Response response = client.performRequest(new Request("GET", index + "/_search/")); String responseEntity = EntityUtils.toString(response.getEntity()); @@ -92,7 +93,8 @@ public void assertIdsAreWritten(String index, String[] ids) throws IOException { } } - public void assertIdsAreNotWritten(String index, String[] ids) throws IOException { + public static void assertIdsAreNotWritten(RestClient client, String index, String[] ids) + throws IOException { client.performRequest(new Request("GET", "_refresh")); Response response = client.performRequest(new Request("GET", index + "/_search/")); String responseEntity = EntityUtils.toString(response.getEntity()); @@ -103,4 +105,24 @@ public void assertIdsAreNotWritten(String index, String[] ids) throws IOExceptio assertThat(responseEntity).doesNotContain(id); } } + + /** DummyData is a POJO to helping during integration tests. */ + public static class DummyData { + private final String id; + + private final String name; + + public DummyData(String id, String name) { + this.id = id; + this.name = name; + } + + public String getId() { + return id; + } + + public String getName() { + return name; + } + } }