-
Notifications
You must be signed in to change notification settings - Fork 515
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Rust] add substrait for flink and be compatible for other engines #454
Conversation
fdfad6b
to
f06e75f
Compare
@@ -235,6 +235,7 @@ public DynamicTableSource copy() { | |||
lsts.projectedFields = this.projectedFields; | |||
lsts.remainingPartitions = this.remainingPartitions; | |||
lsts.filter = this.filter; | |||
lsts.filter = this.filter; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
duplicate code
@@ -52,15 +55,18 @@ public LakeSoulSource(TableId tableId, | |||
List<String> pkColumns, | |||
Map<String, String> optionParams, | |||
@Nullable List<Map<String, String>> remainingPartitions, | |||
@Nullable FilterPredicate filter) { | |||
@Nullable FilterPredicate filterStr, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
improper name 'filterStr'
@@ -129,7 +133,11 @@ private void initializeReader() throws IOException { | |||
} | |||
|
|||
if (filter != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will two kinds of filter cause conflict?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, filterSter
is null forever in above code. It is used to debug the difference of result datafusion::expr of two kinds filters.
return Tuple2.of(SupportsFilterPushDown.Result.of(accepted, remaining), planToProto(filter)); | ||
} | ||
|
||
static Schema toArrowSchema(String tableSchema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No diff from Schema.fromJSON
import java.util.stream.Collectors; | ||
import java.util.stream.Stream; | ||
|
||
public class SubstraitUtil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move to lakesoul-io-java maybe better.
* | ||
* @param plan Filter{} | ||
*/ | ||
public void addFilterProto(Plan plan) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refer com.dmetasoul.lakesoul.meta.jnr.NativeMetadataJavaClient#executeInsert
@@ -255,20 +255,36 @@ pub async fn prune_filter_and_execute( | |||
df: DataFrame, | |||
request_schema: SchemaRef, | |||
filter_str: Vec<String>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use Vec as input directly
21bd868
to
b7104da
Compare
rebase from main, please review the dependency |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please provide some flink test cases.
@@ -73,7 +74,8 @@ public class LakeSoulOneSplitRecordsReader implements RecordsWithSplitIds<RowDat | |||
// arrow batch -> row, with requested schema | |||
private ArrowReader curArrowReaderRequestedSchema; | |||
|
|||
private final FilterPredicate filter; | |||
private final FilterPredicate _filterPredicate; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why use underline as var name
return Tuple2.of(SupportsFilterPushDown.Result.of(accepted, remaining), planToProto(filter)); | ||
} | ||
|
||
public static Expression doTransform(ResolvedExpression flinkExpression, Schema arrow_schema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
arrow_schema should use CamelCase name in Java
} | ||
return ExpressionCreator.binary(nullable, b); | ||
} | ||
case TINYINT: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use integer type with exactly bit-width
} | ||
return ExpressionCreator.fp64(nullable, d); | ||
} | ||
case DATE: { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any unit test for date/timestamp case?
2873000
to
1e49766
Compare
Signed-off-by: mag1c1an1 <[email protected]> add flink expression to substrait Signed-off-by: mag1c1an1 <[email protected]> add more functions Signed-off-by: mag1c1an1 <[email protected]> add more tests Signed-off-by: mag1c1an1 <[email protected]> add base schema for namedscan, substriat type to arrow type Signed-off-by: mag1c1an1 <[email protected]> compatibility Signed-off-by: mag1c1an1 <[email protected]> switch to java8 Signed-off-by: mag1c1an1 <[email protected]> before apply cargo fix Signed-off-by: mag1c1an1 <[email protected]> cargo clippy && cargo fmt Signed-off-by: mag1c1an1 <[email protected]> fix ci Signed-off-by: mag1c1an1 <[email protected]> rebase Signed-off-by: mag1c1an1 <[email protected]> refactor Signed-off-by: mag1c1an1 <[email protected]>
Signed-off-by: mag1c1an1 <[email protected]>
Signed-off-by: mag1c1an1 <[email protected]>
Signed-off-by: mag1c1an1 <[email protected]>
Signed-off-by: mag1c1an1 <[email protected]>
Signed-off-by: mag1c1an1 <[email protected]>
import java.util.stream.Stream; | ||
|
||
public class SubstraitUtil { | ||
public static final SimpleExtension.ExtensionCollection Se; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace 'Se' with recognizable name.
|
||
public class SubstraitUtil { | ||
public static final SimpleExtension.ExtensionCollection Se; | ||
public static final SubstraitBuilder Builder; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename static constants with UPPERCASE_UNDERSCORE format
@@ -466,6 +466,8 @@ SPDX-License-Identifier: Apache-2.0 | |||
<include>com.google.code.gson:gson</include> | |||
<include>dev.failsafe:failsafe</include> | |||
<include>com.google.protobuf:protobuf-java</include> | |||
<!--substrait--> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Try removing org.apache.parquet:parquet-column from pom
createLakeSoulSourceTableWithDateType(createTableEnv); | ||
// not supported | ||
// String testSql = "select * from type_info where modifyTime=TO_TIMESTAMP_LTZ(1612176000,0)"; | ||
String testSql = "select * from type_info " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mark not supported datatype with comment.
|
||
public class SubstraitTest extends AbstractTestBase { | ||
|
||
private final String BATCH_TYPE = "batch"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add tests which filter on hash column/range column
Signed-off-by: mag1c1an1 <[email protected]>
add flink expression to substrait
add more functions
add more tests
add base schema for namedscan, substriat type to arrow type
compatibility
switch to java8