Skip to content

Commit

Permalink
[Spark/Rust] fix unicode colum name at native io (#420)
Browse files Browse the repository at this point in the history
Signed-off-by: zenghua <[email protected]>
Co-authored-by: zenghua <[email protected]>
  • Loading branch information
Ceng23333 and zenghua authored Jan 17, 2024
1 parent eed595f commit b8e8b21
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -300,10 +300,16 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
assert(!inputDF.schema.fields.head.dataType.isInstanceOf[StructType])
val df = inputDF.toDF("temp")
Seq(
(
df.withColumnRenamed("temp", "列名"),
"列名", // zero nesting
(x: Any) => x
),
(
df.withColumnRenamed("temp", "a"),
"a", // zero nesting
(x: Any) => x),
(x: Any) => x
),
// (
// df.withColumn("a", struct(df("temp") as "b")).drop("temp"),
// "a.b", // one level nesting
Expand Down Expand Up @@ -671,6 +677,48 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
}
}

// TODO: results of unicode string filter are not correct

// test("filter pushdown - unicode string") {
// def toUnicodeChar(i: Int) = {
// new StringBuffer().append("\\u" + (8544 - 1 + i).toHexString).toString
// }
//
// val data = (1 to 4).map(i => Tuple1(Option(toUnicodeChar(i))))
// println(data)
// withNestedParquetDataFrame(data) { case (inputDF, colName, resultFun) =>
// implicit val df: DataFrame = inputDF
//
// val stringAttr = df(colName).expr
// assert(df(colName).expr.dataType === StringType)
//
// checkFilterPredicate(stringAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
// checkFilterPredicate(stringAttr.isNotNull, classOf[NotEq[_]],
// (1 to 4).map(i => Row.apply(resultFun(toUnicodeChar(i)))))
//
// checkFilterPredicate(stringAttr === "\u2160", classOf[Eq[_]], resultFun("\u2160"))
// checkFilterPredicate(stringAttr <=> "\u2160", classOf[Eq[_]], resultFun("\u2160"))
// checkFilterPredicate(stringAttr =!= "\u2160", classOf[NotEq[_]],
// (2 to 4).map(i => Row.apply(resultFun(toUnicodeChar(i)))))
//
// checkFilterPredicate(stringAttr < "\u2161", classOf[Lt[_]], resultFun("\u2160"))
// checkFilterPredicate(stringAttr > "\u2162", classOf[Gt[_]], resultFun("\u2163"))
// checkFilterPredicate(stringAttr <= "\u2160", classOf[LtEq[_]], resultFun("\u2160"))
// checkFilterPredicate(stringAttr >= "\u2163", classOf[GtEq[_]], resultFun("\u2163"))
//
// checkFilterPredicate(Literal("\u2160") === stringAttr, classOf[Eq[_]], resultFun("\u2160"))
// checkFilterPredicate(Literal("\u2160") <=> stringAttr, classOf[Eq[_]], resultFun("\u2160"))
// checkFilterPredicate(Literal("\u2161") > stringAttr, classOf[Lt[_]], resultFun("\u2160"))
// checkFilterPredicate(Literal("\u2162") < stringAttr, classOf[Gt[_]], resultFun("\u2163"))
// checkFilterPredicate(Literal("\u2160") >= stringAttr, classOf[LtEq[_]], resultFun("\u2160"))
// checkFilterPredicate(Literal("\u2163") <= stringAttr, classOf[GtEq[_]], resultFun("\u2163"))
//
// checkFilterPredicate(!(stringAttr < "\u2163"), classOf[GtEq[_]], resultFun("\u2163"))
// checkFilterPredicate(stringAttr < "\u2161" || stringAttr > "\u2162", classOf[Operators.Or],
// Seq(Row(resultFun("\u2160")), Row(resultFun("\u2163"))))
// }
// }

test("filter pushdown - binary") {
implicit class IntToBinary(int: Int) {
def b: Array[Byte] = int.toString.getBytes(StandardCharsets.UTF_8)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,17 @@ public void setExternalAllocator(BufferAllocator allocator) {
}

public void addFile(String file) {
Pointer ptr = LibLakeSoulIO.buildStringPointer(libLakeSoulIO, file);
ioConfigBuilder = libLakeSoulIO.lakesoul_config_builder_add_single_file(ioConfigBuilder, ptr);
ioConfigBuilder = libLakeSoulIO.lakesoul_config_builder_add_single_file(ioConfigBuilder, file);
}

public void addColumn(String column) {
assert ioConfigBuilder != null;
Pointer columnPtr = LibLakeSoulIO.buildStringPointer(libLakeSoulIO, column);
ioConfigBuilder = libLakeSoulIO.lakesoul_config_builder_add_single_column(ioConfigBuilder, columnPtr);
ioConfigBuilder = libLakeSoulIO.lakesoul_config_builder_add_single_column(ioConfigBuilder, column);
}

public void setPrimaryKeys(Iterable<String> primaryKeys) {
for (String pk : primaryKeys) {
Pointer ptr = LibLakeSoulIO.buildStringPointer(libLakeSoulIO, pk);
ioConfigBuilder = libLakeSoulIO.lakesoul_config_builder_add_single_primary_key(ioConfigBuilder, ptr);
ioConfigBuilder = libLakeSoulIO.lakesoul_config_builder_add_single_primary_key(ioConfigBuilder, pk);
}
}

Expand Down Expand Up @@ -120,9 +117,7 @@ public void setObjectStoreOptions(String accessKey, String accessSecret,
public void setObjectStoreOption(String key, String value) {
assert ioConfigBuilder != null;
if (key != null && value != null) {
Pointer ptrKey = LibLakeSoulIO.buildStringPointer(libLakeSoulIO, key);
Pointer ptrValue = LibLakeSoulIO.buildStringPointer(libLakeSoulIO, value);
ioConfigBuilder = libLakeSoulIO.lakesoul_config_builder_set_object_store_option(ioConfigBuilder, ptrKey, ptrValue);
ioConfigBuilder = libLakeSoulIO.lakesoul_config_builder_set_object_store_option(ioConfigBuilder, key, value);
}
}

Expand Down Expand Up @@ -166,7 +161,7 @@ public void removerReferenceKey() {

@Override
public void invoke(Boolean status, String err) {
if (err!=null) {
if (err != null) {
System.err.println("[ERROR][com.dmetasoul.lakesoul.io.lakesoul.NativeIOBase.BooleanCallback.invoke]" + err);
}
callback.accept(status, err);
Expand Down Expand Up @@ -198,7 +193,7 @@ public void removerReferenceKey() {

@Override
public void invoke(Integer status, String err) {
if (err!=null) {
if (err != null) {
System.err.println("[ERROR][com.dmetasoul.lakesoul.io.lakesoul.NativeIOBase.IntegerCallback.invoke]" + err);
}
callback.accept(status, err);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,12 @@ public void addFile(String file) {

public void addFilter(String filter) {
assert ioConfigBuilder != null;
Pointer ptr = LibLakeSoulIO.buildStringPointer(libLakeSoulIO, filter);
ioConfigBuilder = libLakeSoulIO.lakesoul_config_builder_add_filter(ioConfigBuilder, ptr);
ioConfigBuilder = libLakeSoulIO.lakesoul_config_builder_add_filter(ioConfigBuilder, filter);
}

public void addMergeOps(Map<String, String> mergeOps) {
for (Map.Entry<String, String> entry:mergeOps.entrySet()) {
Pointer fieldPtr = LibLakeSoulIO.buildStringPointer(libLakeSoulIO, entry.getKey());
Pointer mergeOpPtr = LibLakeSoulIO.buildStringPointer(libLakeSoulIO, entry.getValue());
ioConfigBuilder = libLakeSoulIO.lakesoul_config_builder_add_merge_op(ioConfigBuilder, fieldPtr, mergeOpPtr);
for (Map.Entry<String, String> entry : mergeOps.entrySet()) {
ioConfigBuilder = libLakeSoulIO.lakesoul_config_builder_add_merge_op(ioConfigBuilder, entry.getKey(), entry.getValue());
}
}

Expand Down Expand Up @@ -103,7 +100,7 @@ private void startReader(BiConsumer<Boolean, String> callback) {
if (status) {
this.readerSchema = getReaderSchema();
}
if (err!=null) {
if (err != null) {
System.err.println("[ERROR][com.dmetasoul.lakesoul.io.lakesoul.NativeIOReader.startReader]err=" + err);
}
callback.accept(status, err);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ public NativeIOWriter(Schema schema) {

public void setAuxSortColumns(Iterable<String> auxSortColumns) {
for (String col : auxSortColumns) {
Pointer ptr = LibLakeSoulIO.buildStringPointer(libLakeSoulIO, col);
ioConfigBuilder = libLakeSoulIO.lakesoul_config_builder_add_single_aux_sort_column(ioConfigBuilder, ptr);
ioConfigBuilder = libLakeSoulIO.lakesoul_config_builder_add_single_aux_sort_column(ioConfigBuilder, col);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,6 @@

public interface LibLakeSoulIO {

static Pointer buildStringPointer(LibLakeSoulIO lib, String s) {
Pointer str = Memory.allocate(Runtime.getRuntime(lib), s.length());
str.put(0, s.getBytes(),0,s.length());

return str;
}

Pointer new_tokio_runtime_builder();

Pointer tokio_runtime_builder_set_thread_num(Pointer builder, int thread_num);
Expand All @@ -29,21 +22,21 @@ static Pointer buildStringPointer(LibLakeSoulIO lib, String s) {

Pointer new_lakesoul_io_config_builder();

Pointer lakesoul_config_builder_add_single_file(Pointer builder, Pointer file);
Pointer lakesoul_config_builder_add_single_file(Pointer builder, String file);

Pointer lakesoul_config_builder_add_single_primary_key(Pointer builder, Pointer pk);
Pointer lakesoul_config_builder_add_single_primary_key(Pointer builder, String pk);

Pointer lakesoul_config_builder_add_single_column(Pointer builder, Pointer column);
Pointer lakesoul_config_builder_add_single_column(Pointer builder, String column);

Pointer lakesoul_config_builder_add_single_aux_sort_column(Pointer builder, Pointer column);
Pointer lakesoul_config_builder_add_single_aux_sort_column(Pointer builder, String column);

Pointer lakesoul_config_builder_add_filter(Pointer builder, Pointer filter);
Pointer lakesoul_config_builder_add_filter(Pointer builder, String filter);

Pointer lakesoul_config_builder_add_merge_op(Pointer builder, Pointer field, Pointer mergeOp);
Pointer lakesoul_config_builder_add_merge_op(Pointer builder, String field, String mergeOp);

Pointer lakesoul_config_builder_set_schema(Pointer builder, @LongLong long schemaAddr);

Pointer lakesoul_config_builder_set_object_store_option(Pointer builder, Pointer key, Pointer value);
Pointer lakesoul_config_builder_set_object_store_option(Pointer builder, String key, String value);

Pointer lakesoul_config_builder_set_thread_num(Pointer builder, int thread_num);

Expand Down
8 changes: 5 additions & 3 deletions rust/lakesoul-io/src/filter/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,21 @@ impl Parser {
panic!("Invalid filter string");
}
let filter = &filter[1..filter.len() - 1];
let mut k: i8 = 0;
let mut k: usize = 0;
let mut left_offset: usize = 0;
for (i, ch) in filter.chars().enumerate() {
let mut offset_counter: usize = 0;
for ch in filter.chars() {
match ch {
'(' => k += 1,
')' => k -= 1,
',' => {
if k == 0 && left_offset == 0 {
left_offset = i
left_offset = offset_counter
}
}
_ => {}
}
offset_counter += ch.len_utf8()
}
if k != 0 {
panic!("Invalid filter string");
Expand Down

0 comments on commit b8e8b21

Please sign in to comment.