Skip to content

Commit

Permalink
[CALCITE-1967] Elasticsearch 5 adapter (Christian Beikov)
Browse files Browse the repository at this point in the history
Move code that is independent of Elasticsearch into a
org.apache.calcite.adapter.elasticsearch package under core. It
provides abstract classes that the elasticsearch2 and elasticsearch5
modules make concrete.

Close apache#528
  • Loading branch information
beikov authored and julianhyde committed Aug 29, 2017
1 parent 2156d82 commit e152592
Show file tree
Hide file tree
Showing 36 changed files with 1,243 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@
package org.apache.calcite.adapter.elasticsearch;

import org.apache.calcite.adapter.java.AbstractQueryableTable;
import org.apache.calcite.linq4j.AbstractEnumerable;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.linq4j.QueryProvider;
import org.apache.calcite.linq4j.Queryable;
import org.apache.calcite.linq4j.function.Function1;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode;
Expand All @@ -33,30 +31,22 @@
import org.apache.calcite.schema.impl.AbstractTableQueryable;
import org.apache.calcite.sql.type.SqlTypeName;

import org.apache.calcite.util.Util;

import org.elasticsearch.client.Client;
import org.elasticsearch.search.SearchHit;

import java.util.Iterator;
import java.util.List;
import java.util.Map;

/**
* Table based on an Elasticsearch type.
*/
public class ElasticsearchTable extends AbstractQueryableTable implements TranslatableTable {
private final Client client;
private final String indexName;
private final String typeName;
public abstract class AbstractElasticsearchTable extends AbstractQueryableTable
implements TranslatableTable {
protected final String indexName;
protected final String typeName;

/**
* Creates an ElasticsearchTable.
*/
public ElasticsearchTable(Client client, String indexName,
String typeName) {
public AbstractElasticsearchTable(String indexName, String typeName) {
super(Object[].class);
this.client = client;
this.indexName = indexName;
this.typeName = typeName;
}
Expand Down Expand Up @@ -96,32 +86,18 @@ public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable)
* @param fields List of fields to project; or null to return map
* @return Enumerator of results
*/
private Enumerable<Object> find(String index, List<String> ops,
List<Map.Entry<String, Class>> fields) {
final String dbName = index;

final String queryString = "{" + Util.toString(ops, "", ", ", "") + "}";

final Function1<SearchHit, Object> getter = ElasticsearchEnumerator.getter(fields);

return new AbstractEnumerable<Object>() {
public Enumerator<Object> enumerator() {
final Iterator<SearchHit> cursor = client.prepareSearch(dbName).setTypes(typeName)
.setSource(queryString).execute().actionGet().getHits().iterator();
return new ElasticsearchEnumerator(cursor, getter);
}
};
}
protected abstract Enumerable<Object> find(String index, List<String> ops,
List<Map.Entry<String, Class>> fields);

/**
* Implementation of {@link org.apache.calcite.linq4j.Queryable} based on
* a {@link org.apache.calcite.adapter.elasticsearch.ElasticsearchTable}.
* Implementation of {@link Queryable} based on
* a {@link AbstractElasticsearchTable}.
*
* @param <T> element type
*/
public static class ElasticsearchQueryable<T> extends AbstractTableQueryable<T> {
public ElasticsearchQueryable(QueryProvider queryProvider, SchemaPlus schema,
ElasticsearchTable table, String tableName) {
AbstractElasticsearchTable table, String tableName) {
super(queryProvider, schema, table, tableName);
}

Expand All @@ -130,16 +106,16 @@ public Enumerator<T> enumerator() {
}

private String getIndex() {
return schema.unwrap(ElasticsearchSchema.class).index;
return schema.unwrap(ElasticsearchSchema.class).getIndex();
}

private ElasticsearchTable getTable() {
return (ElasticsearchTable) table;
private AbstractElasticsearchTable getTable() {
return (AbstractElasticsearchTable) table;
}

/** Called via code-generation.
*
* @see org.apache.calcite.adapter.elasticsearch.ElasticsearchMethod#ELASTICSEARCH_QUERYABLE_FIND
* @see ElasticsearchMethod#ELASTICSEARCH_QUERYABLE_FIND
*/
@SuppressWarnings("UnusedDeclaration")
public Enumerable<Object> find(List<String> ops,
Expand All @@ -149,4 +125,4 @@ public Enumerable<Object> find(List<String> ops,
}
}

// End ElasticsearchTable.java
// End AbstractElasticsearchTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
Expand Down Expand Up @@ -65,10 +66,19 @@ public ElasticsearchFilter(RelOptCluster cluster, RelTraitSet traitSet, RelNode

@Override public void implement(Implementor implementor) {
implementor.visitChild(0, getInput());
final List<String> fieldNames =
ElasticsearchRules.elasticsearchFieldNames(getRowType());
final Translator translator = new Translator(fieldNames);
final String match = translator.translateMatch(condition);
List<String> fieldNames;
if (input instanceof Project) {
final List<RexNode> projects = ((Project) input).getProjects();
fieldNames = new ArrayList<>(projects.size());
for (RexNode project : projects) {
String name = project.accept(MapProjectionFieldVisitor.INSTANCE);
fieldNames.add(name);
}
} else {
fieldNames = ElasticsearchRules.elasticsearchFieldNames(getRowType());
}
Translator translator = new Translator(fieldNames);
String match = translator.translateMatch(condition);
implementor.add(match);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
* Builtin methods in the Elasticsearch adapter.
*/
enum ElasticsearchMethod {
ELASTICSEARCH_QUERYABLE_FIND(ElasticsearchTable.ElasticsearchQueryable.class, "find",
List.class, List.class);
ELASTICSEARCH_QUERYABLE_FIND(AbstractElasticsearchTable.ElasticsearchQueryable.class,
"find", List.class, List.class);

public final Method method;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,11 @@ public ElasticsearchProject(RelOptCluster cluster, RelTraitSet traitSet, RelNode
@Override public void implement(Implementor implementor) {
implementor.visitChild(0, getInput());

final List<String> inFields =
ElasticsearchRules.elasticsearchFieldNames(getInput().getRowType());
final ElasticsearchRules.RexToElasticsearchTranslator translator =
new ElasticsearchRules.RexToElasticsearchTranslator(
(JavaTypeFactory) getCluster().getTypeFactory(),
ElasticsearchRules.elasticsearchFieldNames(getInput().getRowType()));
(JavaTypeFactory) getCluster().getTypeFactory(), inFields);

final List<String> findItems = new ArrayList<>();
final List<String> scriptFieldItems = new ArrayList<>();
Expand All @@ -70,21 +71,23 @@ public ElasticsearchProject(RelOptCluster cluster, RelTraitSet traitSet, RelNode
if (expr.equals("\"" + name + "\"")) {
findItems.add(ElasticsearchRules.quote(name));
} else if (expr.matches("\"literal\":.+")) {
scriptFieldItems.add(ElasticsearchRules.quote(name) + ":{\"script\": "
scriptFieldItems.add(ElasticsearchRules.quote(name)
+ ":{\"script\": "
+ expr.split(":")[1] + "}");
} else {
scriptFieldItems.add(ElasticsearchRules.quote(name) + ":{\"script\":\"_source."
scriptFieldItems.add(ElasticsearchRules.quote(name)
+ ":{\"script\":\"params._source."
+ expr.replaceAll("\"", "") + "\"}");
}
}
final String findString = Util.toString(findItems, "", ", ", "");
final String scriptFieldString = "\"script_fields\": {"
+ Util.toString(scriptFieldItems, "", ", ", "") + "}";
final String fieldString = "\"fields\" : [" + findString + "]"
final String fieldString = "\"_source\" : [" + findString + "]"
+ ", " + scriptFieldString;

for (String opfield : implementor.list) {
if (opfield.startsWith("\"fields\"")) {
if (opfield.startsWith("\"_source\"")) {
implementor.list.remove(opfield);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class Implementor {
final List<String> list = new ArrayList<>();

RelOptTable table;
ElasticsearchTable elasticsearchTable;
AbstractElasticsearchTable elasticsearchTable;

public void add(String findOp) {
list.add(findOp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,12 @@ abstract static class ElasticsearchConverterRule extends ConverterRule {
* {@link ElasticsearchSort}.
*/
private static class ElasticsearchSortRule extends ElasticsearchConverterRule {
private static final ElasticsearchSortRule INSTANCE = new ElasticsearchSortRule();
private static final ElasticsearchSortRule INSTANCE =
new ElasticsearchSortRule();

private ElasticsearchSortRule() {
super(Sort.class, Convention.NONE, ElasticsearchRel.CONVENTION, "ElasticsearchSortRule");
super(Sort.class, Convention.NONE, ElasticsearchRel.CONVENTION,
"ElasticsearchSortRule");
}

@Override public RelNode convert(RelNode relNode) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.adapter.elasticsearch;

import org.apache.calcite.schema.Schema;

/**
* Gives access to some basic information of the Elasticsearch schema.
*/
public interface ElasticsearchSchema extends Schema {
/**
* The name of the Elasticsearch index.
*
* @return The index name
*/
String getIndex();
}

// End ElasticsearchSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataTypeField;
Expand Down Expand Up @@ -58,11 +59,21 @@ public ElasticsearchSort(RelOptCluster cluster, RelTraitSet traitSet, RelNode ch
implementor.visitChild(0, getInput());
if (!collation.getFieldCollations().isEmpty()) {
final List<String> keys = new ArrayList<>();
final List<RelDataTypeField> fields = getRowType().getFieldList();
if (input instanceof Project) {
final List<RexNode> projects = ((Project) input).getProjects();

for (RelFieldCollation fieldCollation: collation.getFieldCollations()) {
final String name = fields.get(fieldCollation.getFieldIndex()).getName();
keys.add(ElasticsearchRules.quote(name) + ": " + direction(fieldCollation));
for (RelFieldCollation fieldCollation : collation.getFieldCollations()) {
RexNode project = projects.get(fieldCollation.getFieldIndex());
String name = project.accept(MapProjectionFieldVisitor.INSTANCE);
keys.add(ElasticsearchRules.quote(name) + ": " + direction(fieldCollation));
}
} else {
final List<RelDataTypeField> fields = getRowType().getFieldList();

for (RelFieldCollation fieldCollation : collation.getFieldCollations()) {
final String name = fields.get(fieldCollation.getFieldIndex()).getName();
keys.add(ElasticsearchRules.quote(name) + ": " + direction(fieldCollation));
}
}

implementor.add("\"sort\": [ " + Util.toString(keys, "{", "}, {", "}") + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;

import com.google.common.base.Preconditions;

import java.util.List;

/**
Expand All @@ -36,7 +38,7 @@
* using the "find" method.</p>
*/
public class ElasticsearchTableScan extends TableScan implements ElasticsearchRel {
private final ElasticsearchTable elasticsearchTable;
private final AbstractElasticsearchTable elasticsearchTable;
private final RelDataType projectRowType;

/**
Expand All @@ -48,13 +50,13 @@ public class ElasticsearchTableScan extends TableScan implements ElasticsearchRe
* @param elasticsearchTable Elasticsearch table
* @param projectRowType Fields and types to project; null to project raw row
*/
protected ElasticsearchTableScan(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table,
ElasticsearchTable elasticsearchTable, RelDataType projectRowType) {
protected ElasticsearchTableScan(RelOptCluster cluster, RelTraitSet traitSet,
RelOptTable table, AbstractElasticsearchTable elasticsearchTable,
RelDataType projectRowType) {
super(cluster, traitSet, table);
this.elasticsearchTable = elasticsearchTable;
this.elasticsearchTable = Preconditions.checkNotNull(elasticsearchTable);
this.projectRowType = projectRowType;

assert elasticsearchTable != null;
assert getConvention() == ElasticsearchRel.CONVENTION;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ protected ElasticsearchToEnumerableConverter(RelOptCluster cluster, RelTraitSet
Pair.class));
final Expression table = list.append("table",
elasticsearchImplementor.table
.getExpression(ElasticsearchTable.ElasticsearchQueryable.class));
.getExpression(AbstractElasticsearchTable.ElasticsearchQueryable.class));
List<String> opList = elasticsearchImplementor.list;
final Expression ops = list.append("ops", constantArrayList(opList, String.class));
Expression enumerable = list.append("enumerable",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.adapter.elasticsearch;

import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexVisitorImpl;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;

/**
* Visitor that extracts the actual field name from an item expression.
*/
public class MapProjectionFieldVisitor extends RexVisitorImpl<String> {
public static final MapProjectionFieldVisitor INSTANCE = new MapProjectionFieldVisitor();

private MapProjectionFieldVisitor() {
super(true);
}

@Override public String visitCall(RexCall call) {
if (call.op == SqlStdOperatorTable.ITEM) {
return ((RexLiteral) call.getOperands().get(1)).getValueAs(String.class);
}
return super.visitCall(call);
}
}

// End MapProjectionFieldVisitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/

/**
* Query provider based on an Elasticsearch DB.
* Base classes for a query provider based on an Elasticsearch DB.
*/
@PackageMarker
package org.apache.calcite.adapter.elasticsearch;
Expand Down
2 changes: 1 addition & 1 deletion elasticsearch/pom.xml → elasticsearch2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ limitations under the License.
<version>1.14.0-SNAPSHOT</version>
</parent>

<artifactId>calcite-elasticsearch</artifactId>
<artifactId>calcite-elasticsearch2</artifactId>
<packaging>jar</packaging>
<version>1.14.0-SNAPSHOT</version>
<name>Calcite Elasticsearch</name>
Expand Down
Loading

0 comments on commit e152592

Please sign in to comment.