Skip to content

Commit

Permalink
[Rust] add substriat for flink and be compatible for other engines
Browse files Browse the repository at this point in the history
Signed-off-by: mag1c1an1 <[email protected]>

add flink expression to substrait

Signed-off-by: mag1c1an1 <[email protected]>

add more functions

Signed-off-by: mag1c1an1 <[email protected]>

add more tests

Signed-off-by: mag1c1an1 <[email protected]>

add base schema for namedscan, substriat type to arrow type

Signed-off-by: mag1c1an1 <[email protected]>

compatibility

Signed-off-by: mag1c1an1 <[email protected]>

switch to java8

Signed-off-by: mag1c1an1 <[email protected]>

before apply cargo fix

Signed-off-by: mag1c1an1 <[email protected]>

cargo clippy && cargo fmt

Signed-off-by: mag1c1an1 <[email protected]>
  • Loading branch information
mag1c1an1 committed Mar 21, 2024
1 parent 6ef0076 commit 215c5ee
Show file tree
Hide file tree
Showing 41 changed files with 2,070 additions and 477 deletions.
14 changes: 14 additions & 0 deletions lakesoul-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,18 @@ SPDX-License-Identifier: Apache-2.0
<version>3.3.2</version>
<scope>${local.scope}</scope>
</dependency>
<dependency>
<groupId>io.substrait</groupId>
<artifactId>core</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
</exclusion>
</exclusions>
<version>0.28.0</version>
<scope>compile</scope>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -592,6 +604,8 @@ SPDX-License-Identifier: Apache-2.0
<include>com.google.code.gson:gson</include>
<include>dev.failsafe:failsafe</include>
<include>com.google.protobuf:protobuf-java</include>
<!--substrait-->
<inclue>io.substrait:core</inclue>
</includes>
<excludes>
<exclude>org.apache.logging.log4j:*</exclude>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ public DynamicTableSource copy() {
lsts.projectedFields = this.projectedFields;
lsts.remainingPartitions = this.remainingPartitions;
lsts.filter = this.filter;
lsts.filterPlan = this.filterPlan;
return lsts;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.dmetasoul.lakesoul.LakeSoulArrowReader;
import com.dmetasoul.lakesoul.lakesoul.io.NativeIOReader;
import io.substrait.proto.Plan;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.Schema;
Expand Down Expand Up @@ -75,14 +76,17 @@ public class LakeSoulOneSplitRecordsReader implements RecordsWithSplitIds<RowDat

private final FilterPredicate filter;

private final Plan filterPlan;

public LakeSoulOneSplitRecordsReader(Configuration conf,
LakeSoulSplit split,
RowType schema,
RowType schemaWithPk,
List<String> pkColumns,
boolean isStreaming,
String cdcColumn,
FilterPredicate filter)
FilterPredicate filter,
Plan filterPlan)
throws Exception {
this.split = split;
this.skipRecords = split.getSkipRecord();
Expand All @@ -95,6 +99,7 @@ public LakeSoulOneSplitRecordsReader(Configuration conf,
this.cdcColumn = cdcColumn;
this.finishedSplit = Collections.singleton(splitId);
this.filter = filter;
this.filterPlan = filterPlan;
initializeReader();
recoverFromSkipRecord();
}
Expand Down Expand Up @@ -131,6 +136,9 @@ private void initializeReader() throws IOException {
if (filter != null) {
reader.addFilter(filter.toString());
}
if (filterPlan != null) {
reader.addFilterProto(this.filterPlan);
}

LOG.info("Initializing reader for split {}, pk={}, partitions={}," +
" non partition cols={}, cdc column={}, filter={}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.dmetasoul.lakesoul.meta.DataOperation;
import com.dmetasoul.lakesoul.meta.LakeSoulOptions;
import com.dmetasoul.lakesoul.meta.entity.TableInfo;
import io.substrait.proto.Plan;
import org.apache.flink.api.connector.source.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
Expand Down Expand Up @@ -42,17 +43,22 @@ public class LakeSoulSource implements Source<RowData, LakeSoulSplit, LakeSoulPe
@Nullable
List<Map<String, String>> remainingPartitions;

// TODO remove this
@Nullable
FilterPredicate filter;

@Nullable
Plan filterPlan;

public LakeSoulSource(TableId tableId,
RowType rowType,
RowType rowTypeWithPk,
boolean isStreaming,
List<String> pkColumns,
Map<String, String> optionParams,
@Nullable List<Map<String, String>> remainingPartitions,
@Nullable FilterPredicate filter) {
@Nullable FilterPredicate filter,
@Nullable Plan filterPlan) {
this.tableId = tableId;
this.rowType = rowType;
this.rowTypeWithPk = rowTypeWithPk;
Expand All @@ -61,6 +67,7 @@ public LakeSoulSource(TableId tableId,
this.optionParams = optionParams;
this.remainingPartitions = remainingPartitions;
this.filter = filter;
this.filterPlan = filterPlan;
}

@Override
Expand All @@ -83,7 +90,8 @@ public SourceReader<RowData, LakeSoulSplit> createReader(SourceReaderContext rea
this.pkColumns,
this.isStreaming,
this.optionParams.getOrDefault(LakeSoulSinkOptions.CDC_CHANGE_COLUMN, ""),
this.filter),
this.filter,
this.filterPlan),
new LakeSoulRecordEmitter(),
readerContext.getConfiguration(),
readerContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package org.apache.flink.lakesoul.source;

import io.substrait.proto.Plan;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
Expand Down Expand Up @@ -39,8 +40,11 @@ public class LakeSoulSplitReader implements SplitReader<RowData, LakeSoulSplit>

String cdcColumn;

// TODO remove this
FilterPredicate filter;

Plan filterPlan;

private LakeSoulOneSplitRecordsReader lastSplitReader;

public LakeSoulSplitReader(Configuration conf,
Expand All @@ -49,7 +53,8 @@ public LakeSoulSplitReader(Configuration conf,
List<String> pkColumns,
boolean isStreaming,
String cdcColumn,
FilterPredicate filter) {
FilterPredicate filter,
Plan filterPlan) {
this.conf = conf;
this.splits = new ArrayDeque<>();
this.rowType = rowType;
Expand All @@ -58,6 +63,7 @@ public LakeSoulSplitReader(Configuration conf,
this.isStreaming = isStreaming;
this.cdcColumn = cdcColumn;
this.filter = filter;
this.filterPlan = filterPlan;
}

@Override
Expand All @@ -72,7 +78,9 @@ public RecordsWithSplitIds<RowData> fetch() throws IOException {
this.pkColumns,
this.isStreaming,
this.cdcColumn,
this.filter);
this.filter,
this.filterPlan
);
return lastSplitReader;
} catch (Exception e) {
throw new IOException(e);
Expand Down
Loading

0 comments on commit 215c5ee

Please sign in to comment.