Skip to content

Commit

Permalink
[FLINK-35424] Elasticsearch connector 8 supports SSL context
Browse files Browse the repository at this point in the history
  • Loading branch information
liuml07 authored May 26, 2024
1 parent 0b6d9f8 commit 50327f8
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,21 @@
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.util.function.SerializableSupplier;

import co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant;
import co.elastic.clients.transport.TransportUtils;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.conn.ssl.TrustAllStrategy;
import org.apache.http.ssl.SSLContexts;

import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;

import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -58,9 +68,6 @@ public class Elasticsearch8AsyncSinkBuilder<InputT>
/** The headers to be sent with the requests made to Elasticsearch cluster. */
private List<Header> headers;

/** The Certificate Fingerprint will be used to verify the HTTPS connection. */
private String certificateFingerprint;

/** The username to authenticate the connection with the Elasticsearch cluster. */
private String username;

Expand All @@ -73,6 +80,10 @@ public class Elasticsearch8AsyncSinkBuilder<InputT>
*/
private ElementConverter<InputT, BulkOperationVariant> elementConverter;

private SerializableSupplier<SSLContext> sslContextSupplier;

private SerializableSupplier<HostnameVerifier> sslHostnameVerifier;

/**
* setHosts set the hosts where the Elasticsearch cluster is reachable.
*
Expand Down Expand Up @@ -100,16 +111,62 @@ public Elasticsearch8AsyncSinkBuilder<InputT> setHeaders(Header... headers) {
}

/**
* setCertificateFingerprint set the certificate fingerprint to be used to verify the HTTPS
* connection.
* Allows to bypass the certificates chain validation and connect to insecure network endpoints
* (for example, servers which use self-signed certificates).
*
* @return this builder
*/
public Elasticsearch8AsyncSinkBuilder<InputT> allowInsecure() {
this.sslContextSupplier =
() -> {
try {
return SSLContexts.custom()
.loadTrustMaterial(TrustAllStrategy.INSTANCE)
.build();
} catch (final NoSuchAlgorithmException
| KeyStoreException
| KeyManagementException ex) {
throw new IllegalStateException("Unable to create custom SSL context", ex);
}
};
return this;
}

/**
* Set the certificate fingerprint to be used to verify the HTTPS connection.
*
* @param certificateFingerprint the certificate fingerprint
* @return {@code Elasticsearch8AsyncSinkBuilder}
*/
public Elasticsearch8AsyncSinkBuilder<InputT> setCertificateFingerprint(
String certificateFingerprint) {
checkNotNull(certificateFingerprint, "certificateFingerprint must not be null");
this.certificateFingerprint = certificateFingerprint;
this.sslContextSupplier =
() -> TransportUtils.sslContextFromCaFingerprint(certificateFingerprint);
return this;
}

/**
* Sets the supplier for getting an {@link SSLContext} instance.
*
* @param sslContextSupplier the serializable SSLContext supplier function
* @return this builder
*/
public Elasticsearch8AsyncSinkBuilder<InputT> setSslContextSupplier(
SerializableSupplier<SSLContext> sslContextSupplier) {
this.sslContextSupplier = checkNotNull(sslContextSupplier);
return this;
}

/**
* Sets the supplier for getting an SSL {@link HostnameVerifier} instance.
*
* @param sslHostnameVerifierSupplier the serializable hostname verifier supplier function
* @return this builder
*/
public Elasticsearch8AsyncSinkBuilder<InputT> setSslHostnameVerifier(
SerializableSupplier<HostnameVerifier> sslHostnameVerifierSupplier) {
this.sslHostnameVerifier = sslHostnameVerifierSupplier;
return this;
}

Expand Down Expand Up @@ -181,7 +238,8 @@ private OperationConverter<InputT> buildOperationConverter(

private NetworkConfig buildNetworkConfig() {
checkArgument(!hosts.isEmpty(), "Hosts cannot be empty.");
return new NetworkConfig(hosts, username, password, headers, certificateFingerprint);
return new NetworkConfig(
hosts, username, password, headers, sslContextSupplier, sslHostnameVerifier);
}

/** A wrapper that evolves the Operation, since a BulkOperationVariant is not Serializable. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@

package org.apache.flink.connector.elasticsearch.sink;

import org.apache.flink.util.function.SerializableSupplier;

import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.TransportUtils;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.Header;
import org.apache.http.HttpHost;
Expand All @@ -34,6 +35,10 @@
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;

import javax.annotation.Nullable;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;

import java.io.Serializable;
import java.util.List;

Expand All @@ -49,20 +54,24 @@ public class NetworkConfig implements Serializable {

private final String password;

private final String certificateFingerprint;
@Nullable private final SerializableSupplier<SSLContext> sslContextSupplier;

@Nullable private final SerializableSupplier<HostnameVerifier> sslHostnameVerifier;

public NetworkConfig(
List<HttpHost> hosts,
String username,
String password,
List<Header> headers,
String certificateFingerprint) {
checkState(hosts.size() > 0, "Hosts must not be null");
SerializableSupplier<SSLContext> sslContextSupplier,
SerializableSupplier<HostnameVerifier> sslHostnameVerifier) {
checkState(!hosts.isEmpty(), "Hosts must not be empty");
this.hosts = hosts;
this.username = username;
this.password = password;
this.headers = headers;
this.certificateFingerprint = certificateFingerprint;
this.sslContextSupplier = sslContextSupplier;
this.sslHostnameVerifier = sslHostnameVerifier;
}

public ElasticsearchAsyncClient createEsClient() {
Expand All @@ -80,10 +89,13 @@ private RestClient getRestClient() {
getCredentials());
}

if (certificateFingerprint != null) {
httpClientBuilder.setSSLContext(
TransportUtils.sslContextFromCaFingerprint(
certificateFingerprint));
if (sslContextSupplier != null) {
httpClientBuilder.setSSLContext(sslContextSupplier.get());
}

if (sslHostnameVerifier != null) {
httpClientBuilder.setSSLHostnameVerifier(
sslHostnameVerifier.get());
}

return httpClientBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void testWriteToElasticsearch() throws Exception {
env.execute();
}

assertIdsAreWritten(index, new String[] {"first_v1_index", "second_v1_index"});
assertIdsAreWritten(client, index, new String[] {"first_v1_index", "second_v1_index"});
}

@Test
Expand Down Expand Up @@ -149,24 +149,4 @@ public void notifyCheckpointComplete(long l) throws Exception {
}
}
}

/** DummyData is a POJO to helping during integration tests. */
public static class DummyData {
private final String id;

private final String name;

public DummyData(String id, String name) {
this.id = id;
this.name = name;
}

public String getId() {
return id;
}

public String getName() {
return name;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.apache.flink.connector.elasticsearch.sink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import co.elastic.clients.elasticsearch.core.bulk.IndexOperation;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

import java.io.IOException;

import static org.apache.flink.connector.elasticsearch.sink.ElasticsearchSinkBaseITCase.DummyData;
import static org.apache.flink.connector.elasticsearch.sink.ElasticsearchSinkBaseITCase.ELASTICSEARCH_IMAGE;
import static org.apache.flink.connector.elasticsearch.sink.ElasticsearchSinkBaseITCase.assertIdsAreWritten;

/** Integration tests for {@link Elasticsearch8AsyncSink} against a secure Elasticsearch cluster. */
@Testcontainers
class Elasticsearch8AsyncSinkSecureITCase {
private static final Logger LOG =
LoggerFactory.getLogger(Elasticsearch8AsyncSinkSecureITCase.class);
private static final String ES_CLUSTER_USERNAME = "elastic";
private static final String ES_CLUSTER_PASSWORD = "s3cret";

@Container
private static final ElasticsearchContainer ES_CONTAINER = createSecureElasticsearchContainer();

private RestClient client;

@BeforeEach
void setUp() {
this.client = getRestClient();
}

@AfterEach
void shutdown() throws IOException {
if (client != null) {
client.close();
}
}

@Test
void testWriteToSecureElasticsearch8() throws Exception {
final String index = "test-write-to-secure-elasticsearch8";

try (StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1)) {

env.setRestartStrategy(RestartStrategies.noRestart());

final Elasticsearch8AsyncSink<DummyData> sink =
Elasticsearch8AsyncSinkBuilder.<DummyData>builder()
.setMaxBatchSize(5)
.setHosts(
new HttpHost(
ES_CONTAINER.getHost(),
ES_CONTAINER.getFirstMappedPort(),
"https"))
.setElementConverter(
(element, ctx) ->
new IndexOperation.Builder<>()
.index(index)
.id(element.getId())
.document(element)
.build())
.setUsername(ES_CLUSTER_USERNAME)
.setPassword(ES_CLUSTER_PASSWORD)
.setSslContextSupplier(() -> ES_CONTAINER.createSslContextFromCa())
.build();

env.fromElements("first", "second", "third", "fourth", "fifth")
.map(
(MapFunction<String, DummyData>)
value -> new DummyData(value + "_v1_index", value))
.sinkTo(sink);

env.execute();
}

assertIdsAreWritten(client, index, new String[] {"first_v1_index", "second_v1_index"});
}

static ElasticsearchContainer createSecureElasticsearchContainer() {
ElasticsearchContainer container =
new ElasticsearchContainer(ELASTICSEARCH_IMAGE)
.withPassword(ES_CLUSTER_PASSWORD) /* set password */
.withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g")
.withLogConsumer(new Slf4jLogConsumer(LOG));

// Set log message based wait strategy as the default wait strategy is not aware of TLS
container
.withEnv("logger.org.elasticsearch", "INFO")
.setWaitStrategy(
new LogMessageWaitStrategy().withRegEx(".*\"message\":\"started.*"));

return container;
}

private RestClient getRestClient() {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(ES_CLUSTER_USERNAME, ES_CLUSTER_PASSWORD));
return RestClient.builder(
new HttpHost(
ES_CONTAINER.getHost(), ES_CONTAINER.getFirstMappedPort(), "https"))
.setHttpClientConfigCallback(
httpClientBuilder ->
httpClientBuilder
.setDefaultCredentialsProvider(credentialsProvider)
.setSSLContext(ES_CONTAINER.createSslContextFromCa()))
.build();
}
}
Loading

0 comments on commit 50327f8

Please sign in to comment.