Skip to content

Commit

Permalink
fix ci
Browse files Browse the repository at this point in the history
Signed-off-by: mag1c1an1 <[email protected]>
  • Loading branch information
mag1c1an1 committed Mar 21, 2024
1 parent 215c5ee commit fdfad6b
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 237 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public DynamicTableSource copy() {
lsts.projectedFields = this.projectedFields;
lsts.remainingPartitions = this.remainingPartitions;
lsts.filter = this.filter;
lsts.filterPlan = this.filterPlan;
lsts.filter = this.filter;
return lsts;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,8 @@ public class LakeSoulOneSplitRecordsReader implements RecordsWithSplitIds<RowDat
// arrow batch -> row, with requested schema
private ArrowReader curArrowReaderRequestedSchema;

private final FilterPredicate filter;

private final Plan filterPlan;
private final FilterPredicate filterPredicate;
private final Plan filter;

public LakeSoulOneSplitRecordsReader(Configuration conf,
LakeSoulSplit split,
Expand All @@ -85,8 +84,8 @@ public LakeSoulOneSplitRecordsReader(Configuration conf,
List<String> pkColumns,
boolean isStreaming,
String cdcColumn,
FilterPredicate filter,
Plan filterPlan)
FilterPredicate filterPredicate,
Plan filter)
throws Exception {
this.split = split;
this.skipRecords = split.getSkipRecord();
Expand All @@ -98,8 +97,8 @@ public LakeSoulOneSplitRecordsReader(Configuration conf,
this.isStreaming = isStreaming;
this.cdcColumn = cdcColumn;
this.finishedSplit = Collections.singleton(splitId);
this.filterPredicate = filterPredicate;
this.filter = filter;
this.filterPlan = filterPlan;
initializeReader();
recoverFromSkipRecord();
}
Expand Down Expand Up @@ -134,10 +133,11 @@ private void initializeReader() throws IOException {
}

if (filter != null) {
reader.addFilter(filter.toString());
reader.addFilterProto(this.filter);
}
if (filterPlan != null) {
reader.addFilterProto(this.filterPlan);

if (filterPredicate !=null) {
reader.addFilter(filterPredicate.toString());
}

LOG.info("Initializing reader for split {}, pk={}, partitions={}," +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,10 @@ public class LakeSoulSource implements Source<RowData, LakeSoulSplit, LakeSoulPe
@Nullable
List<Map<String, String>> remainingPartitions;

// TODO remove this
@Nullable
FilterPredicate filter;

FilterPredicate filterStr;
@Nullable
Plan filterPlan;
Plan filter;

public LakeSoulSource(TableId tableId,
RowType rowType,
Expand All @@ -57,17 +55,18 @@ public LakeSoulSource(TableId tableId,
List<String> pkColumns,
Map<String, String> optionParams,
@Nullable List<Map<String, String>> remainingPartitions,
@Nullable FilterPredicate filter,
@Nullable Plan filterPlan) {
@Nullable FilterPredicate filterStr,
@Nullable Plan filter) {
this.tableId = tableId;
this.rowType = rowType;
this.rowTypeWithPk = rowTypeWithPk;
this.isStreaming = isStreaming;
this.pkColumns = pkColumns;
this.optionParams = optionParams;
this.remainingPartitions = remainingPartitions;
this.filterStr = filterStr;
this.filter = filter;
this.filterPlan = filterPlan;

}

@Override
Expand All @@ -90,8 +89,8 @@ public SourceReader<RowData, LakeSoulSplit> createReader(SourceReaderContext rea
this.pkColumns,
this.isStreaming,
this.optionParams.getOrDefault(LakeSoulSinkOptions.CDC_CHANGE_COLUMN, ""),
this.filter,
this.filterPlan),
this.filterStr,
this.filter),
new LakeSoulRecordEmitter(),
readerContext.getConfiguration(),
readerContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,8 @@ public class LakeSoulSplitReader implements SplitReader<RowData, LakeSoulSplit>

String cdcColumn;

// TODO remove this
FilterPredicate filter;

Plan filterPlan;
FilterPredicate filterStr;
Plan filter;

private LakeSoulOneSplitRecordsReader lastSplitReader;

Expand All @@ -53,17 +51,17 @@ public LakeSoulSplitReader(Configuration conf,
List<String> pkColumns,
boolean isStreaming,
String cdcColumn,
FilterPredicate filter,
Plan filterPlan) {
FilterPredicate filterStr,
Plan filter) {
this.conf = conf;
this.splits = new ArrayDeque<>();
this.rowType = rowType;
this.rowTypeWithPk = rowTypeWithPk;
this.pkColumns = pkColumns;
this.isStreaming = isStreaming;
this.cdcColumn = cdcColumn;
this.filterStr = filterStr;
this.filter = filter;
this.filterPlan = filterPlan;
}

@Override
Expand All @@ -78,8 +76,8 @@ public RecordsWithSplitIds<RowData> fetch() throws IOException {
this.pkColumns,
this.isStreaming,
this.cdcColumn,
this.filter,
this.filterPlan
this.filterStr,
this.filter
);
return lastSplitReader;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ public static Tuple2<SupportsFilterPushDown.Result, io.substrait.proto.Plan> toP
String tableSchema
) {
List<ResolvedExpression> accepted = new ArrayList<>();
Schema arrowSchema = toArrowSchema(tableSchema);
Expression last = null;
for (ResolvedExpression expr : exprs) {
Expression e = doTransform(expr);
Expression e = doTransform(expr,arrowSchema);
if (e == null) {
remaining.add(expr);
} else {
Expand All @@ -70,44 +71,51 @@ public static Tuple2<SupportsFilterPushDown.Result, io.substrait.proto.Plan> toP
}
}
}
Plan filter = toFilter(last, tableName, tableSchema);
Plan filter = toFilter(last, tableName, arrowSchema);
return Tuple2.of(SupportsFilterPushDown.Result.of(accepted, remaining), planToProto(filter));
}

static Plan toFilter(Expression e, String tableName, String tableSchema) {
static Schema toArrowSchema(String tableSchema) {
try {
Schema arrow_schema = Schema.fromJSON(tableSchema);
List<String> tableNames = Stream.of(tableName).collect(Collectors.toList());
List<String> columnNames = new ArrayList<>();
List<Field> fields = arrow_schema.getFields();
List<Type> columnTypes = new ArrayList<>();
for (Field field : fields) {
Type type = fromArrowType(field.getType(), field.isNullable());
if (type == null) {
return null;
}
columnTypes.add(type);
String name = field.getName();
columnNames.add(name);
Schema arrowSchema = Schema.fromJSON(tableSchema);
return arrowSchema;
} catch (IOException e) {
// FIXME fix this elegantly
throw new RuntimeException(e);
}
}

static Plan toFilter(Expression e, String tableName, Schema arrowSchema) {
if (e == null) {
return null;
}
List<String> tableNames = Stream.of(tableName).collect(Collectors.toList());
List<String> columnNames = new ArrayList<>();
List<Field> fields = arrowSchema.getFields();
List<Type> columnTypes = new ArrayList<>();
for (Field field : fields) {
Type type = fromArrowType(field.getType(), field.isNullable());
if (type == null) {
return null;
}
NamedScan namedScan = Builder.namedScan(tableNames, columnNames, columnTypes);
namedScan =
NamedScan.builder()
.from(namedScan)
.filter(e)
.build();
columnTypes.add(type);
String name = field.getName();
columnNames.add(name);
}
NamedScan namedScan = Builder.namedScan(tableNames, columnNames, columnTypes);
namedScan =
NamedScan.builder()
.from(namedScan)
.filter(e)
.build();


Plan.Root root = Builder.root(namedScan);
return Builder.plan(root);
} catch (IOException ex) {
// FIXME fix this elegantly
throw new RuntimeException(ex);
}
Plan.Root root = Builder.root(namedScan);
return Builder.plan(root);
}

public static Expression doTransform(ResolvedExpression flinkExpression) {
SubstraitVisitor substraitVisitor = new SubstraitVisitor();
public static Expression doTransform(ResolvedExpression flinkExpression,Schema arrow_schema) {
SubstraitVisitor substraitVisitor = new SubstraitVisitor(arrow_schema);
return flinkExpression.accept(substraitVisitor);
}

Expand Down
Loading

0 comments on commit fdfad6b

Please sign in to comment.