Skip to content

Commit

Permalink
[FLINK-29042][Connectors/ElasticSearch] Support lookup join for es co…
Browse files Browse the repository at this point in the history
…nnector (apache#39)
  • Loading branch information
liyubin117 authored Sep 11, 2023
1 parent ce44a13 commit fef5c96
Show file tree
Hide file tree
Showing 24 changed files with 976 additions and 60 deletions.
8 changes: 8 additions & 0 deletions flink-connector-elasticsearch-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,14 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>

<!-- ArchUit test dependencies -->

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
package org.apache.flink.streaming.connectors.elasticsearch;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple2;

import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.search.SearchRequest;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -61,6 +63,21 @@ public interface ElasticsearchApiCallBridge<C extends AutoCloseable> extends Ser
*/
BulkProcessor.Builder createBulkProcessorBuilder(C client, BulkProcessor.Listener listener);

/**
* Executes a search using the Search API.
*
* @param client the Elasticsearch client.
* @param searchRequest A request to execute search against one or more indices (or all).
*/
Tuple2<String, String[]> search(C client, SearchRequest searchRequest) throws IOException;

/**
* Closes this client and releases any system resources associated with it.
*
* @param client the Elasticsearch client.
*/
void close(C client) throws IOException;

/**
* Extracts the cause of failure of a bulk item action.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.elasticsearch.table;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.LookupFunction;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;

import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.flink.util.Preconditions.checkNotNull;

/** A lookup function implementing {@link LookupTableSource} in Elasticsearch connector. */
@Internal
public class ElasticsearchRowDataLookupFunction<C extends AutoCloseable> extends LookupFunction {

private static final Logger LOG =
LoggerFactory.getLogger(ElasticsearchRowDataLookupFunction.class);
private static final long serialVersionUID = 1L;

private final DeserializationSchema<RowData> deserializationSchema;

private final String index;
private final String type;

private final String[] producedNames;
private final String[] lookupKeys;
private final int maxRetryTimes;
// converters to convert data from internal to external in order to generate keys for the cache.
private final DataFormatConverters.DataFormatConverter[] converters;
private SearchRequest searchRequest;
private SearchSourceBuilder searchSourceBuilder;

private final ElasticsearchApiCallBridge<C> callBridge;

private transient C client;

public ElasticsearchRowDataLookupFunction(
DeserializationSchema<RowData> deserializationSchema,
int maxRetryTimes,
String index,
String type,
String[] producedNames,
DataType[] producedTypes,
String[] lookupKeys,
ElasticsearchApiCallBridge<C> callBridge) {

checkNotNull(deserializationSchema, "No DeserializationSchema supplied.");
checkNotNull(maxRetryTimes, "No maxRetryTimes supplied.");
checkNotNull(producedNames, "No fieldNames supplied.");
checkNotNull(producedTypes, "No fieldTypes supplied.");
checkNotNull(lookupKeys, "No keyNames supplied.");
checkNotNull(callBridge, "No ElasticsearchApiCallBridge supplied.");

this.deserializationSchema = deserializationSchema;
this.maxRetryTimes = maxRetryTimes;
this.index = index;
this.type = type;
this.producedNames = producedNames;
this.lookupKeys = lookupKeys;
this.converters = new DataFormatConverters.DataFormatConverter[lookupKeys.length];
Map<String, Integer> nameToIndex =
IntStream.range(0, producedNames.length)
.boxed()
.collect(Collectors.toMap(i -> producedNames[i], i -> i));
for (int i = 0; i < lookupKeys.length; i++) {
Integer position = nameToIndex.get(lookupKeys[i]);
Preconditions.checkArgument(
position != null, "Lookup keys %s not selected", Arrays.toString(lookupKeys));
converters[i] = DataFormatConverters.getConverterForDataType(producedTypes[position]);
}

this.callBridge = callBridge;
}

@Override
public void open(FunctionContext context) throws Exception {
this.client = callBridge.createClient();

// Set searchRequest in open method in case of amount of calling in eval method when every
// record comes.
this.searchRequest = new SearchRequest(index);
if (type == null) {
searchRequest.types(Strings.EMPTY_ARRAY);
} else {
searchRequest.types(type);
}
searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.fetchSource(producedNames, null);
deserializationSchema.open(null);
}

@Override
public Collection<RowData> lookup(RowData keyRow) {
BoolQueryBuilder lookupCondition = new BoolQueryBuilder();
for (int i = 0; i < lookupKeys.length; i++) {
lookupCondition.must(
new TermQueryBuilder(lookupKeys[i], converters[i].toExternal(keyRow, i)));
}
searchSourceBuilder.query(lookupCondition);
searchRequest.source(searchSourceBuilder);

for (int retry = 0; retry <= maxRetryTimes; retry++) {
try {
ArrayList<RowData> rows = new ArrayList<>();
Tuple2<String, String[]> searchResponse = callBridge.search(client, searchRequest);
if (searchResponse.f1.length > 0) {
String[] result = searchResponse.f1;
for (String s : result) {
RowData row = parseSearchResult(s);
rows.add(row);
}
rows.trimToSize();
return rows;
}
} catch (IOException e) {
LOG.error(String.format("Elasticsearch search error, retry times = %d", retry), e);
if (retry >= maxRetryTimes) {
throw new FlinkRuntimeException("Execution of Elasticsearch search failed.", e);
}
try {
Thread.sleep(1000L * retry);
} catch (InterruptedException e1) {
LOG.warn(
"Interrupted while waiting to retry failed elasticsearch search, aborting");
throw new FlinkRuntimeException(e1);
}
}
}
return Collections.emptyList();
}

private RowData parseSearchResult(String result) {
RowData row = null;
try {
row = deserializationSchema.deserialize(result.getBytes());
} catch (IOException e) {
LOG.error("Deserialize search hit failed: " + e.getMessage());
}

return row;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.elasticsearch;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.MultiShotLatch;
Expand All @@ -35,6 +36,7 @@
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.junit.Test;
Expand All @@ -43,6 +45,7 @@

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -603,6 +606,15 @@ public BulkProcessor.Builder createBulkProcessorBuilder(
return null;
}

@Override
public Tuple2<String, String[]> search(Client client, SearchRequest searchRequest)
throws IOException {
return null;
}

@Override
public void close(Client client) throws IOException {}

@Nullable
@Override
public Throwable extractFailureCauseFromBulkItemResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ under the License.
<artifactId>junit-jupiter</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>

</project>
Loading

0 comments on commit fef5c96

Please sign in to comment.