Skip to content

Commit

Permalink
[Rust] add substriat for flink and be compatible for other engines
Browse files Browse the repository at this point in the history
Signed-off-by: mag1c1an1 <[email protected]>

add flink expression to substrait

Signed-off-by: mag1c1an1 <[email protected]>

add more functions

Signed-off-by: mag1c1an1 <[email protected]>

add more tests

Signed-off-by: mag1c1an1 <[email protected]>

add base schema for namedscan, substriat type to arrow type

Signed-off-by: mag1c1an1 <[email protected]>

compatibility

Signed-off-by: mag1c1an1 <[email protected]>

switch to java8

Signed-off-by: mag1c1an1 <[email protected]>

before apply cargo fix

Signed-off-by: mag1c1an1 <[email protected]>

cargo clippy && cargo fmt

Signed-off-by: mag1c1an1 <[email protected]>

fix ci

Signed-off-by: mag1c1an1 <[email protected]>

rebase

Signed-off-by: mag1c1an1 <[email protected]>

refactor

Signed-off-by: mag1c1an1 <[email protected]>
  • Loading branch information
mag1c1an1 committed Mar 22, 2024
1 parent c199b07 commit 9aed0a0
Show file tree
Hide file tree
Showing 41 changed files with 2,007 additions and 499 deletions.
2 changes: 2 additions & 0 deletions lakesoul-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,8 @@ SPDX-License-Identifier: Apache-2.0
<include>com.google.code.gson:gson</include>
<include>dev.failsafe:failsafe</include>
<include>com.google.protobuf:protobuf-java</include>
<!--substrait-->
<inclue>io.substrait:core</inclue>
</includes>
<excludes>
<exclude>org.apache.logging.log4j:*</exclude>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ public DynamicTableSource copy() {
this.optionParams);
lsts.projectedFields = this.projectedFields;
lsts.remainingPartitions = this.remainingPartitions;
lsts._filterPredicate = this._filterPredicate;
lsts.filter = this.filter;
return lsts;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.dmetasoul.lakesoul.LakeSoulArrowReader;
import com.dmetasoul.lakesoul.lakesoul.io.NativeIOReader;
import io.substrait.proto.Plan;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.Schema;
Expand Down Expand Up @@ -73,7 +74,8 @@ public class LakeSoulOneSplitRecordsReader implements RecordsWithSplitIds<RowDat
// arrow batch -> row, with requested schema
private ArrowReader curArrowReaderRequestedSchema;

private final FilterPredicate filter;
private final FilterPredicate _filterPredicate;
private final Plan filter;

public LakeSoulOneSplitRecordsReader(Configuration conf,
LakeSoulSplit split,
Expand All @@ -82,7 +84,8 @@ public LakeSoulOneSplitRecordsReader(Configuration conf,
List<String> pkColumns,
boolean isStreaming,
String cdcColumn,
FilterPredicate filter)
FilterPredicate _filterPredicate,
Plan filter)
throws Exception {
this.split = split;
this.skipRecords = split.getSkipRecord();
Expand All @@ -94,6 +97,7 @@ public LakeSoulOneSplitRecordsReader(Configuration conf,
this.isStreaming = isStreaming;
this.cdcColumn = cdcColumn;
this.finishedSplit = Collections.singleton(splitId);
this._filterPredicate = _filterPredicate;
this.filter = filter;
initializeReader();
recoverFromSkipRecord();
Expand Down Expand Up @@ -129,7 +133,11 @@ private void initializeReader() throws IOException {
}

if (filter != null) {
reader.addFilter(filter.toString());
reader.addFilterProto(this.filter);
}

if (_filterPredicate !=null) {
reader.addFilter(_filterPredicate.toString());
}

LOG.info("Initializing reader for split {}, pk={}, partitions={}," +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.dmetasoul.lakesoul.meta.DataOperation;
import com.dmetasoul.lakesoul.meta.LakeSoulOptions;
import com.dmetasoul.lakesoul.meta.entity.TableInfo;
import io.substrait.proto.Plan;
import org.apache.flink.api.connector.source.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
Expand Down Expand Up @@ -43,7 +44,9 @@ public class LakeSoulSource implements Source<RowData, LakeSoulSplit, LakeSoulPe
List<Map<String, String>> remainingPartitions;

@Nullable
FilterPredicate filter;
FilterPredicate _filterPredicate;
@Nullable
Plan filter;

public LakeSoulSource(TableId tableId,
RowType rowType,
Expand All @@ -52,15 +55,18 @@ public LakeSoulSource(TableId tableId,
List<String> pkColumns,
Map<String, String> optionParams,
@Nullable List<Map<String, String>> remainingPartitions,
@Nullable FilterPredicate filter) {
@Nullable FilterPredicate _filterPredicate,
@Nullable Plan filter) {
this.tableId = tableId;
this.rowType = rowType;
this.rowTypeWithPk = rowTypeWithPk;
this.isStreaming = isStreaming;
this.pkColumns = pkColumns;
this.optionParams = optionParams;
this.remainingPartitions = remainingPartitions;
this._filterPredicate = _filterPredicate;
this.filter = filter;

}

@Override
Expand All @@ -83,6 +89,7 @@ public SourceReader<RowData, LakeSoulSplit> createReader(SourceReaderContext rea
this.pkColumns,
this.isStreaming,
this.optionParams.getOrDefault(LakeSoulSinkOptions.CDC_CHANGE_COLUMN, ""),
this._filterPredicate,
this.filter),
new LakeSoulRecordEmitter(),
readerContext.getConfiguration(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package org.apache.flink.lakesoul.source;

import io.substrait.proto.Plan;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
Expand Down Expand Up @@ -39,7 +40,8 @@ public class LakeSoulSplitReader implements SplitReader<RowData, LakeSoulSplit>

String cdcColumn;

FilterPredicate filter;
FilterPredicate _filterPredicate;
Plan filter;

private LakeSoulOneSplitRecordsReader lastSplitReader;

Expand All @@ -49,14 +51,16 @@ public LakeSoulSplitReader(Configuration conf,
List<String> pkColumns,
boolean isStreaming,
String cdcColumn,
FilterPredicate filter) {
FilterPredicate _filterPredicate,
Plan filter) {
this.conf = conf;
this.splits = new ArrayDeque<>();
this.rowType = rowType;
this.rowTypeWithPk = rowTypeWithPk;
this.pkColumns = pkColumns;
this.isStreaming = isStreaming;
this.cdcColumn = cdcColumn;
this._filterPredicate = _filterPredicate;
this.filter = filter;
}

Expand All @@ -72,7 +76,9 @@ public RecordsWithSplitIds<RowData> fetch() throws IOException {
this.pkColumns,
this.isStreaming,
this.cdcColumn,
this.filter);
this._filterPredicate,
this.filter
);
return lastSplitReader;
} catch (Exception e) {
throw new IOException(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package org.apache.flink.lakesoul.substrait;

import io.substrait.expression.Expression;
import io.substrait.expression.ExpressionCreator;
import io.substrait.extension.SimpleExtension;
import io.substrait.plan.Plan;
import io.substrait.type.TypeCreator;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ResolvedExpression;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;

import static com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil.*;

public class SubstraitFlinkUtil {

public static Tuple2<SupportsFilterPushDown.Result, io.substrait.proto.Plan> flinkExprToSubStraitPlan(
List<ResolvedExpression> exprs,
List<ResolvedExpression> remaining,
String tableName,
String tableSchema
) throws IOException {
List<ResolvedExpression> accepted = new ArrayList<>();
Schema arrowSchema = Schema.fromJSON(tableSchema);
Expression last = null;
for (ResolvedExpression expr : exprs) {
Expression e = doTransform(expr,arrowSchema);
if (e == null) {
remaining.add(expr);
} else {
accepted.add(expr);
if (last != null) {
SimpleExtension.FunctionAnchor fa = SimpleExtension.FunctionAnchor.of(BooleanNamespace, "and:bool");
last = ExpressionCreator.scalarFunction(Se.getScalarFunction(fa), TypeCreator.NULLABLE.BOOLEAN, last, e);
} else {
last = e;
}
}
}
Plan filter = exprToFilter(last, tableName, arrowSchema);
return Tuple2.of(SupportsFilterPushDown.Result.of(accepted, remaining), planToProto(filter));
}

public static Expression doTransform(ResolvedExpression flinkExpression, Schema arrow_schema) {
SubstraitVisitor substraitVisitor = new SubstraitVisitor(arrow_schema);
return flinkExpression.accept(substraitVisitor);
}

public static boolean filterContainsPartitionColumn(ResolvedExpression expression, Set<String> partitionCols) {
if (expression instanceof FieldReferenceExpression) {
return partitionCols.contains(((FieldReferenceExpression) expression).getName());
} else if (expression instanceof CallExpression) {
for (ResolvedExpression child : expression.getResolvedChildren()) {
if (filterContainsPartitionColumn(child, partitionCols)) {
return true;
}
}
}
return false;
}
}
Loading

0 comments on commit 9aed0a0

Please sign in to comment.