From 0d377b8306618500e3547d0490563ee68e1dcba7 Mon Sep 17 00:00:00 2001 From: mag1c1an1 <103239869+mag1c1an1@users.noreply.github.com> Date: Tue, 5 Mar 2024 10:34:45 +0800 Subject: [PATCH] [Rust] Create split in rust code (#448) bump up prost copy from doris add c bindings add tests write interface in java interface 0.1.0 cargo clippy && cargo fmt use try_logger clean code change splitdesc def Signed-off-by: mag1c1an1 --- cpp/CMakeLists.txt | 2 + .../meta/jnr/LibLakeSoulMetaData.java | 15 + .../meta/jnr/NativeMetadataJavaClient.java | 70 +++ .../lakesoul/meta/jnr/SplitDesc.java | 62 ++ .../source/LakeSoulPendingSplits.java | 2 +- .../spark/sql/lakesoul/SplitDescSuite.scala | 216 +++++++ rust/Cargo.lock | 151 +++-- rust/Cargo.toml | 3 +- rust/lakesoul-datafusion/.gitignore | 2 + .../lakesoul-datafusion/src/benchmarks/mod.rs | 2 +- .../src/benchmarks/tpch/mod.rs | 63 +- .../src/benchmarks/tpch/run.rs | 2 - .../datasource/file_format/metadata_format.rs | 10 +- rust/lakesoul-datafusion/src/lib.rs | 2 +- .../src/test/catalog_tests.rs | 14 +- .../src/test/integration_tests.rs | 48 +- rust/lakesoul-io-c/src/lib.rs | 20 +- .../src/datasource/parquet_source.rs | 580 ++++++++++++++++++ rust/lakesoul-metadata-c/.gitignore | 2 + rust/lakesoul-metadata-c/Cargo.toml | 5 +- rust/lakesoul-metadata-c/src/lib.rs | 232 +++++-- .../test_bindings/CMakeLists.txt | 38 ++ .../test_bindings/binding.h | 21 + .../test_bindings/binding_test.cc | 20 + rust/lakesoul-metadata/Cargo.toml | 3 + rust/lakesoul-metadata/src/error.rs | 3 + rust/lakesoul-metadata/src/lib.rs | 239 ++++---- rust/lakesoul-metadata/src/metadata_client.rs | 100 ++- rust/lakesoul-metadata/src/transfusion.rs | 318 ++++++++++ rust/proto/Cargo.toml | 4 +- 30 files changed, 1941 insertions(+), 308 deletions(-) create mode 100644 lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/SplitDesc.java create mode 100644 lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/SplitDescSuite.scala create mode 100644 rust/lakesoul-datafusion/.gitignore create mode 100644 rust/lakesoul-io/src/datasource/parquet_source.rs create mode 100644 rust/lakesoul-metadata-c/.gitignore create mode 100644 rust/lakesoul-metadata-c/test_bindings/CMakeLists.txt create mode 100644 rust/lakesoul-metadata-c/test_bindings/binding.h create mode 100644 rust/lakesoul-metadata-c/test_bindings/binding_test.cc create mode 100644 rust/lakesoul-metadata/src/transfusion.rs diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 061e903e9..e811840fb 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -5,6 +5,8 @@ cmake_minimum_required(VERSION 3.13 FATAL_ERROR) project(LakeSoulCPP VERSION 1.0.0.0 LANGUAGES CXX) +set(CMAKE_EXPORT_COMPILE_COMMANDS ON) + # Find Python interpreter find_package(Python REQUIRED COMPONENTS Interpreter) message(STATUS "Found Python ${Python_VERSION} at ${Python_EXECUTABLE}") diff --git a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/LibLakeSoulMetaData.java b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/LibLakeSoulMetaData.java index d1a6094aa..be85d019c 100644 --- a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/LibLakeSoulMetaData.java +++ b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/LibLakeSoulMetaData.java @@ -35,6 +35,21 @@ public interface LibLakeSoulMetaData { void clean_meta_for_test(IntegerCallback integerCallback, Pointer runtime, Pointer client); + Pointer create_split_desc_array(IntegerCallback integerCallback, Pointer client, Pointer prepared, Pointer runtime, String tableName, String namespace); + + void free_split_desc_array(Pointer json); + + /** + * caller should ensure that ptr is valid + * + * @param c_string ptr + */ + void free_c_string(Pointer c_string); + + String debug(BooleanCallback booleanCallback); + + void rust_logger_init(); + void hello_world(Callback bytesCallback); void namespace(byte[] bytes, Integer len); diff --git a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/NativeMetadataJavaClient.java b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/NativeMetadataJavaClient.java index 65fc77aea..96ff9aeb1 100644 --- a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/NativeMetadataJavaClient.java +++ b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/NativeMetadataJavaClient.java @@ -3,6 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 package com.dmetasoul.lakesoul.meta.jnr; +import com.alibaba.fastjson.JSON; import com.dmetasoul.lakesoul.meta.DBUtil; import com.dmetasoul.lakesoul.meta.DataBaseProperty; import com.dmetasoul.lakesoul.meta.entity.JniWrapper; @@ -187,6 +188,7 @@ public void close() { } private void initialize() { + libLakeSoulMetaData.rust_logger_init(); DataBaseProperty dataBaseProperty = NativeMetadataJavaClient.dataBaseProperty; if (dataBaseProperty == null) { dataBaseProperty = DBUtil.getDBInfo(); @@ -552,4 +554,72 @@ public static void closeAll() { instance = null; } } + + + /** + * if ffi function failed with -100 + * should recreate pg client and prepared map + * @param tableName name + * @param namespace the np of TableInfo + * @return split(partition) desc array in json format by table_name, namespace , filter(WIP) + */ + public List createSplitDescArray(String tableName, String namespace) { + final CompletableFuture future = new CompletableFuture<>(); + Pointer ptr = getLibLakeSoulMetaData() + .create_split_desc_array( + new ReferencedIntegerCallback((result, msg) -> { + if (msg != null) { + System.err.println(msg); + } + future.complete(result); + }, getIntegerCallbackObjectReferenceManager()), + tokioPostgresClient, + preparedStatement, + tokioRuntime, + tableName, + namespace); + try { + Integer ans = future.get(timeout, TimeUnit.MILLISECONDS); + if (ans == 0) { + // This copies a zero (nul) terminated by array from native memory. + String json = ptr.getString(0); + List list = JSON.parseArray(json, SplitDesc.class); + getLibLakeSoulMetaData().free_split_desc_array(ptr); + return list; + } + // errors + if (ans == -100) { + // recreate pg client and prepared + String config = String.format( + "host=%s port=%s dbname=%s user=%s password=%s", + dataBaseProperty.getHost(), + dataBaseProperty.getPort(), + dataBaseProperty.getDbName(), + dataBaseProperty.getUsername(), + dataBaseProperty.getPassword()); + final CompletableFuture future_ = new CompletableFuture<>(); + tokioPostgresClient = libLakeSoulMetaData.create_tokio_postgres_client( + new ReferencedBooleanCallback((bool, msg) -> { + if (msg.isEmpty()) { + future_.complete(bool); + } else { + System.err.println(msg); + future_.completeExceptionally(new IOException(msg)); + } + }, getbooleanCallbackObjectReferenceManager()), + config, + tokioRuntime + ); + preparedStatement = libLakeSoulMetaData.create_prepared_statement(); + future.get(timeout, TimeUnit.MILLISECONDS); + } + // other errors + throw new RuntimeException("create split desc array failed"); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } catch (TimeoutException e) { + LOG.error("create split desc array timeout"); + throw new RuntimeException(e); + } + } } diff --git a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/SplitDesc.java b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/SplitDesc.java new file mode 100644 index 000000000..33d1de49b --- /dev/null +++ b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/SplitDesc.java @@ -0,0 +1,62 @@ +package com.dmetasoul.lakesoul.meta.jnr; + +import com.alibaba.fastjson.annotation.JSONField; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import org.apache.hadoop.util.hash.Hash; + +import java.util.HashMap; +import java.util.List; + +public class SplitDesc { + @JSONField(name = "file_paths") + private List filePaths; + @JSONField(name = "primary_keys") + private List primaryKeys; + @JSONField(name = "partition_desc") + private HashMap partitionDesc; + @JSONField(name = "table_schema") + private String tableSchema; + + public List getFilePaths() { + return filePaths; + } + + public void setFilePaths(List filePaths) { + this.filePaths = filePaths; + } + + public List getPrimaryKeys() { + return primaryKeys; + } + + public void setPrimaryKeys(List primaryKeys) { + this.primaryKeys = primaryKeys; + } + + public HashMap getPartitionDesc() { + return partitionDesc; + } + + public void setPartitionDesc(HashMap partitionDesc) { + this.partitionDesc= partitionDesc; + } + + public String getTableSchema() { + return tableSchema; + } + + public void setTableSchema(String tableSchema) { + this.tableSchema = tableSchema; + } + + @Override + public String toString() { + return new ToStringBuilder(this, ToStringStyle.MULTI_LINE_STYLE) + .append("file_paths", filePaths) + .append("primary_keys", primaryKeys) + .append("partition_desc", partitionDesc) + .append("table_schema", tableSchema) + .toString(); + } +} diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulPendingSplits.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulPendingSplits.java index 8249229b6..e4a77f746 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulPendingSplits.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulPendingSplits.java @@ -14,7 +14,7 @@ public class LakeSoulPendingSplits { private final List splits; /** - * Already discovered lastest version's timestamp + * Already discovered latest version's timestamp * For streaming only */ private final long lastReadTimestamp; diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/SplitDescSuite.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/SplitDescSuite.scala new file mode 100644 index 000000000..c60121d1e --- /dev/null +++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/SplitDescSuite.scala @@ -0,0 +1,216 @@ +package org.apache.spark.sql.lakesoul + +import com.dmetasoul.lakesoul.meta.jnr.NativeMetadataJavaClient +import org.apache.spark.sql._ +import org.apache.spark.sql.lakesoul.RandomStringGenerator.generateRandomString +import org.apache.spark.sql.lakesoul.test.LakeSoulTestUtils +import org.apache.spark.sql.test.SharedSparkSession +import org.junit.runner.RunWith +import org.scalatestplus.junit.JUnitRunner + +import scala.collection.JavaConverters.collectionAsScalaIterableConverter +import scala.util.Random + +@RunWith(classOf[JUnitRunner]) +class SplitDescSuite extends QueryTest + with SharedSparkSession + with LakeSoulTestUtils { + + import testImplicits._ + + val names: Seq[String] = Seq.empty + val base_path = "/tmp/spark_test" + val name_length = 10 + +// override protected def afterAll(): Unit = { +// for (tName <- names) { +// val tablePath = s"$base_path/$tName" +// // LakeSoulTable.forPath(tablePath).dropTable() +// } +// } + + private def create_dataframe(): DataFrame = { + val df = Seq( + ("2021-01-01", 1, 1, "apple"), + ("2021-01-01", 2, 2, "banana"), + ("2021-01-02", 3, 3, "pear"), + ("2021-01-02", 4, 4, "lemon"), + ("2021-01-03", 5, 5, "watermelon"), + ("2021-01-03", 6, 6, "grape"), + ("2021-01-04", 7, 7, "cherry"), + ("2021-01-04", 8, 8, "pineapple"), + ).toDF("date", "id", "num", "name") + df + } + + test("no range, no hash") { + val tName = generateRandomString(name_length); + withTable(tName) { + val tablePath = s"$base_path/$tName" + val df = create_dataframe() + df.write + .mode("append") + .format("lakesoul") + .option("shortTableName", tName) + .save(tablePath) + val descs = NativeMetadataJavaClient + .getInstance() + .createSplitDescArray(tName, "default") + .asScala + .toArray + descs.foreach(println) + assert(descs.length == 1) + val desc = descs(0); + assert(!desc.getFilePaths.isEmpty) + assert(desc.getPrimaryKeys.isEmpty) + assert(desc.getPartitionDesc.isEmpty) + } + } + + test("one range, no hash") { + val tName = generateRandomString(name_length); + withTable(tName) { + val tablePath = s"$base_path/$tName" + val df = create_dataframe() + df.write + .mode("append") + .format("lakesoul") + .option("shortTableName", tName) + .option("rangePartitions", "date") + .save(tablePath) + val descs = NativeMetadataJavaClient + .getInstance() + .createSplitDescArray(tName, "default") + .asScala + .toSeq + descs.foreach(println) + assert(descs.length == 4) + } + } + + test("multiple range, no hash") { + val tName = generateRandomString(name_length); + withTable(tName) { + val tablePath = s"$base_path/$tName" + val df = create_dataframe() + df.write + .mode("append") + .format("lakesoul") + .option("shortTableName", tName) + .option("rangePartitions", "date,name") + .save(tablePath) + val descs = NativeMetadataJavaClient + .getInstance() + .createSplitDescArray(tName, "default") + .asScala + .toSeq + descs.foreach(println) + assert(descs.length == 8) + } + } + + test("no range, one hash") { + val tName = generateRandomString(name_length); + withTable(tName) { + val tablePath = s"$base_path/$tName" + val df = create_dataframe() + df.write + .mode("append") + .format("lakesoul") + .option("shortTableName", tName) + .option("hashPartitions", "id") + .option("hashBucketNum", "4") + .save(tablePath) + val descs = NativeMetadataJavaClient + .getInstance() + .createSplitDescArray(tName, "default") + .asScala + .toSeq + descs.foreach(println) + assert(descs.length == 3) + } + } + + test("one range, one hash") { + val tName = generateRandomString(name_length); + withTable(tName) { + val tablePath = s"$base_path/$tName" + val df = create_dataframe() + df.write + .mode("append") + .format("lakesoul") + .option("shortTableName", tName) + .option("rangePartitions","date") + .option("hashPartitions", "id") + .option("hashBucketNum", "4") + .save(tablePath) + val descs = NativeMetadataJavaClient + .getInstance() + .createSplitDescArray(tName, "default") + .asScala + .toSeq + descs.foreach(println) + assert(descs.length == 7) + } + } + + test("multiple range, one hash") { + val tName = generateRandomString(name_length); + withTable(tName) { + val tablePath = s"$base_path/$tName" + val df = create_dataframe() + df.write + .mode("append") + .format("lakesoul") + .option("shortTableName", tName) + .option("rangePartitions","date,name") + .option("hashPartitions", "id") + .option("hashBucketNum", "4") + .save(tablePath) + val descs = NativeMetadataJavaClient + .getInstance() + .createSplitDescArray(tName, "default") + .asScala + .toSeq + descs.foreach(println) + assert(descs.length == 8) + } + } + + test("multiple range, multiple hash") { + val tName = generateRandomString(name_length); + withTable(tName) { + val tablePath = s"$base_path/$tName" + val df = create_dataframe() + df.write + .mode("append") + .format("lakesoul") + .option("shortTableName", tName) + .option("rangePartitions","date,name") + .option("hashPartitions", "id,num") + .option("hashBucketNum", "4") + .save(tablePath) + val descs = NativeMetadataJavaClient + .getInstance() + .createSplitDescArray(tName, "default") + .asScala + .toSeq + descs.foreach(println) + assert(descs.length == 8) + } + } +} + +object RandomStringGenerator { + val random = new Random() + + def generateRandomString(length: Int): String = { + val chars = ('a' to 'z') ++ ('A' to 'Z') ++ ('0' to '9') + val sb = new StringBuilder + for (_ <- 1 to length) { + val randomIndex = random.nextInt(chars.length) + sb.append(chars(randomIndex)) + } + sb.toString() + } +} diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 15754e7ae..bd68e7de7 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -76,6 +76,54 @@ dependencies = [ "libc", ] +[[package]] +name = "anstream" +version = "0.6.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d96bd03f33fe50a863e394ee9718a706f988b9079b20c3784fb726e7678b62fb" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8901269c6307e8d93993578286ac0edf7f195079ffff5ebdeea6a59ffb7e36bc" + +[[package]] +name = "anstyle-parse" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c75ac65da39e5fe5ab759307499ddad880d724eed2f6ce5b5e8a26f4f387928c" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e28923312444cdd728e4738b3f9c9cac739500909bb3d3c94b43551b16517648" +dependencies = [ + "windows-sys 0.52.0", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7" +dependencies = [ + "anstyle", + "windows-sys 0.52.0", +] + [[package]] name = "anyhow" version = "1.0.79" @@ -648,6 +696,12 @@ dependencies = [ "os_str_bytes", ] +[[package]] +name = "colorchoice" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" + [[package]] name = "comfy-table" version = "6.2.0" @@ -865,7 +919,7 @@ dependencies = [ "half", "hashbrown 0.14.3", "indexmap 2.1.0", - "itertools 0.11.0", + "itertools", "log", "num_cpus", "object_store", @@ -947,7 +1001,7 @@ dependencies = [ "datafusion-expr", "datafusion-physical-expr", "hashbrown 0.14.3", - "itertools 0.11.0", + "itertools", "log", "regex-syntax 0.8.2", ] @@ -973,7 +1027,7 @@ dependencies = [ "hashbrown 0.14.3", "hex", "indexmap 2.1.0", - "itertools 0.11.0", + "itertools", "libc", "log", "md-5", @@ -1006,7 +1060,7 @@ dependencies = [ "half", "hashbrown 0.14.3", "indexmap 2.1.0", - "itertools 0.11.0", + "itertools", "log", "once_cell", "parking_lot", @@ -1072,6 +1126,16 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "env_filter" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a009aa4810eb158359dda09d0c87378e4bbb89b5a801f016885a4707ba24f7ea" +dependencies = [ + "log", + "regex", +] + [[package]] name = "env_logger" version = "0.10.1" @@ -1081,6 +1145,19 @@ dependencies = [ "log", ] +[[package]] +name = "env_logger" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c012a26a7f605efc424dd53697843a72be7dc86ad2d01f7814337794a12231d" +dependencies = [ + "anstream", + "anstyle", + "env_filter", + "humantime", + "log", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -1571,15 +1648,6 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" -[[package]] -name = "itertools" -version = "0.10.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" -dependencies = [ - "either", -] - [[package]] name = "itertools" version = "0.11.0" @@ -1715,6 +1783,8 @@ dependencies = [ "proto", "rand", "rand_chacha", + "regex", + "serde", "serde_json", "test-log", "thiserror", @@ -1730,9 +1800,12 @@ dependencies = [ name = "lakesoul-metadata-c" version = "0.1.0" dependencies = [ + "env_logger 0.11.2", "lakesoul-metadata", + "log", "prost", "proto", + "serde_json", ] [[package]] @@ -2056,7 +2129,7 @@ dependencies = [ "futures", "humantime", "hyper", - "itertools 0.11.0", + "itertools", "parking_lot", "percent-encoding", "quick-xml", @@ -2337,12 +2410,12 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "prettyplease" -version = "0.1.25" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c8646e95016a7a6c4adea95bafa8a16baab64b583356217f2c85db4a39d9a86" +checksum = "a41cf62165e97c7f814d2221421dbb9afcbcdb0a88068e5ea206e19951c2cbb5" dependencies = [ "proc-macro2", - "syn 1.0.109", + "syn 2.0.48", ] [[package]] @@ -2366,9 +2439,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.11.9" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" +checksum = "146c289cda302b98a28d40c8b3b90498d6e526dd24ac2ecea73e4e491685b94a" dependencies = [ "bytes", "prost-derive", @@ -2376,44 +2449,44 @@ dependencies = [ [[package]] name = "prost-build" -version = "0.11.9" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270" +checksum = "c55e02e35260070b6f716a2423c2ff1c3bb1642ddca6f99e1f26d06268a0e2d2" dependencies = [ "bytes", "heck", - "itertools 0.10.5", - "lazy_static", + "itertools", "log", "multimap", + "once_cell", "petgraph", "prettyplease", "prost", "prost-types", "regex", - "syn 1.0.109", + "syn 2.0.48", "tempfile", "which", ] [[package]] name = "prost-derive" -version = "0.11.9" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" +checksum = "efb6c9a1dd1def8e2124d17e83a20af56f1570d6c2d2bd9e266ccb768df3840e" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools", "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.48", ] [[package]] name = "prost-types" -version = "0.11.9" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13" +checksum = "193898f59edcf43c26227dcd4c8427f00d99d61e95dcde58dabd49fa291d470e" dependencies = [ "prost", ] @@ -2503,13 +2576,13 @@ dependencies = [ [[package]] name = "regex" -version = "1.10.2" +version = "1.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343" +checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.4.3", + "regex-automata 0.4.5", "regex-syntax 0.8.2", ] @@ -2524,9 +2597,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.3" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f" +checksum = "5bb987efffd3c6d0d8f5f89510bb458559eab11e4f869acb20bf845e016259cd" dependencies = [ "aho-corasick", "memchr", @@ -3050,7 +3123,7 @@ version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6159ab4116165c99fc88cce31f99fa2c9dbe08d3691cb38da02fc3b45f357d2b" dependencies = [ - "env_logger", + "env_logger 0.10.1", "test-log-macros", "tracing-subscriber", ] @@ -3400,6 +3473,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "utf8parse" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" + [[package]] name = "uuid" version = "1.6.1" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 299242ca7..414ebd914 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -37,7 +37,8 @@ url = "2.2" async-trait = "0.1" serde_json = "1.0" log = "^0.4" -prost = "0.11" +prost = "0.12.3" +prost-build = "0.12.3" uuid = { version = "1.4.0", features = ["v4", "fast-rng", "macro-diagnostics"] } serde = { version = "1.0", features = ["derive", "std", "rc"] } rand = "^0.8" diff --git a/rust/lakesoul-datafusion/.gitignore b/rust/lakesoul-datafusion/.gitignore new file mode 100644 index 000000000..4935f67e3 --- /dev/null +++ b/rust/lakesoul-datafusion/.gitignore @@ -0,0 +1,2 @@ +default/ +test_data/ diff --git a/rust/lakesoul-datafusion/src/benchmarks/mod.rs b/rust/lakesoul-datafusion/src/benchmarks/mod.rs index 5ca4531c9..7fa0324a5 100644 --- a/rust/lakesoul-datafusion/src/benchmarks/mod.rs +++ b/rust/lakesoul-datafusion/src/benchmarks/mod.rs @@ -2,4 +2,4 @@ // // SPDX-License-Identifier: Apache-2.0 -pub mod tpch; \ No newline at end of file +pub mod tpch; diff --git a/rust/lakesoul-datafusion/src/benchmarks/tpch/mod.rs b/rust/lakesoul-datafusion/src/benchmarks/tpch/mod.rs index 282116660..20e3e4144 100644 --- a/rust/lakesoul-datafusion/src/benchmarks/tpch/mod.rs +++ b/rust/lakesoul-datafusion/src/benchmarks/tpch/mod.rs @@ -3,62 +3,35 @@ // SPDX-License-Identifier: Apache-2.0 mod run; -use arrow::datatypes::{Schema, SchemaBuilder, Field, DataType}; +use arrow::datatypes::{DataType, Field, Schema, SchemaBuilder}; pub const TPCH_TABLES: &[&str] = &[ - "part", "supplier", "partsupp", "customer", "orders", - "lineitem", - "nation", "region", + "part", "supplier", "partsupp", "customer", "orders", "lineitem", "nation", "region", ]; /// The `.tbl` file contains a trailing column pub fn get_tbl_tpch_table_primary_keys(table: &str) -> Vec { match table { - "part" => vec![ - String::from("p_partkey"), - String::from("p_name"), - ], - - "supplier" => vec![ - String::from("s_suppkey"), - String::from("s_name"), - ], - - "partsupp" => vec![ - String::from("ps_partkey"), - String::from("ps_suppkey"), - ], - - "customer" => vec![ - String::from("c_custkey"), - String::from("c_name"), - ], - - "orders" => vec![ - String::from("o_orderkey"), - String::from("o_custkey"), - ], - - "lineitem" => vec![ - String::from("l_orderkey"), - String::from("l_partkey"), - ], - - "nation" => vec![ - String::from("n_nationkey"), - String::from("n_name"), - ], - - "region" => vec![ - String::from("r_regionkey"), - String::from("r_name"), - ], + "part" => vec![String::from("p_partkey"), String::from("p_name")], + + "supplier" => vec![String::from("s_suppkey"), String::from("s_name")], + + "partsupp" => vec![String::from("ps_partkey"), String::from("ps_suppkey")], + + "customer" => vec![String::from("c_custkey"), String::from("c_name")], + + "orders" => vec![String::from("o_orderkey"), String::from("o_custkey")], + + "lineitem" => vec![String::from("l_orderkey"), String::from("l_partkey")], + + "nation" => vec![String::from("n_nationkey"), String::from("n_name")], + + "region" => vec![String::from("r_regionkey"), String::from("r_name")], _ => unimplemented!(), } } - /// The `.tbl` file contains a trailing column pub fn get_tbl_tpch_table_schema(table: &str) -> Schema { let mut schema = SchemaBuilder::from(get_tpch_table_schema(table).fields); @@ -161,5 +134,3 @@ pub fn get_tpch_table_schema(table: &str) -> Schema { _ => unimplemented!(), } } - - diff --git a/rust/lakesoul-datafusion/src/benchmarks/tpch/run.rs b/rust/lakesoul-datafusion/src/benchmarks/tpch/run.rs index b360b4009..ca6ae19a4 100644 --- a/rust/lakesoul-datafusion/src/benchmarks/tpch/run.rs +++ b/rust/lakesoul-datafusion/src/benchmarks/tpch/run.rs @@ -1,5 +1,3 @@ // SPDX-FileCopyrightText: 2023 LakeSoul Contributors // // SPDX-License-Identifier: Apache-2.0 - - diff --git a/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs b/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs index 0afb30254..1b9b23431 100644 --- a/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs +++ b/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs @@ -561,15 +561,10 @@ impl ExecutionPlan for LakeSoulHashSinkExec { } } - fn make_sink_batch(count: u64, msg: String) -> RecordBatch { let count_array = Arc::new(UInt64Array::from(vec![count])) as ArrayRef; let msg_array = Arc::new(StringArray::from(vec![msg])) as ArrayRef; - RecordBatch::try_from_iter_with_nullable( - vec![ - ("count", count_array, false), - ("msg", msg_array, false) - ]).unwrap() + RecordBatch::try_from_iter_with_nullable(vec![("count", count_array, false), ("msg", msg_array, false)]).unwrap() } fn make_sink_schema() -> SchemaRef { @@ -577,6 +572,5 @@ fn make_sink_schema() -> SchemaRef { Arc::new(Schema::new(vec![ Field::new("count", DataType::UInt64, false), Field::new("msg", DataType::Utf8, false), - ]) - ) + ])) } diff --git a/rust/lakesoul-datafusion/src/lib.rs b/rust/lakesoul-datafusion/src/lib.rs index be56e303a..5389b08d4 100644 --- a/rust/lakesoul-datafusion/src/lib.rs +++ b/rust/lakesoul-datafusion/src/lib.rs @@ -6,13 +6,13 @@ // after finished. remove above attr extern crate core; +mod benchmarks; mod catalog; mod datasource; mod error; mod lakesoul_table; mod planner; mod serialize; -mod benchmarks; #[cfg(test)] mod test; diff --git a/rust/lakesoul-datafusion/src/test/catalog_tests.rs b/rust/lakesoul-datafusion/src/test/catalog_tests.rs index de24b6198..2b4517c72 100644 --- a/rust/lakesoul-datafusion/src/test/catalog_tests.rs +++ b/rust/lakesoul-datafusion/src/test/catalog_tests.rs @@ -78,7 +78,12 @@ mod catalog_tests { } v }; - let path = format!("{}{}/{}", env::current_dir().unwrap_or(env::temp_dir()).to_str().unwrap(), &np.namespace, &table_name); + let path = format!( + "{}/test_data/{}/{}", + env::current_dir().unwrap_or(env::temp_dir()).to_str().unwrap(), + &np.namespace, + &table_name + ); let table_id = format!( "table_{}", (&mut rng) @@ -104,7 +109,12 @@ mod catalog_tests { } fn table_info(table_name: &str, namespace: &str, schema: SchemaRef) -> TableInfo { - let path = format!("{}{}/{}", env::current_dir().unwrap_or(env::temp_dir()).to_str().unwrap(), namespace, table_name); + let path = format!( + "{}/test_data/{}/{}", + env::current_dir().unwrap_or(env::temp_dir()).to_str().unwrap(), + namespace, + table_name + ); let schema = serde_json::to_string::(&schema.into()).unwrap(); TableInfo { table_id: "table_000000001".into(), diff --git a/rust/lakesoul-datafusion/src/test/integration_tests.rs b/rust/lakesoul-datafusion/src/test/integration_tests.rs index 46eb7c66b..fcd7a7969 100644 --- a/rust/lakesoul-datafusion/src/test/integration_tests.rs +++ b/rust/lakesoul-datafusion/src/test/integration_tests.rs @@ -5,25 +5,35 @@ mod integration_tests { use std::{path::Path, sync::Arc}; - use datafusion::{execution::context::SessionContext, datasource::{TableProvider, file_format::{FileFormat, csv::CsvFormat}, listing::{ListingOptions, ListingTableUrl, ListingTableConfig, ListingTable}}}; + use datafusion::{ + datasource::{ + file_format::{csv::CsvFormat, FileFormat}, + listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}, + TableProvider, + }, + execution::context::SessionContext, + }; use lakesoul_io::lakesoul_io_config::{create_session_context_with_planner, LakeSoulIOConfigBuilder}; use lakesoul_metadata::MetaDataClient; - use crate::{error::{Result, LakeSoulError}, benchmarks::tpch::{TPCH_TABLES, get_tbl_tpch_table_schema, get_tpch_table_schema, get_tbl_tpch_table_primary_keys}, lakesoul_table::LakeSoulTable, catalog::{create_io_config_builder, create_table}, planner::query_planner::LakeSoulQueryPlanner}; - - async fn get_table( - ctx: &SessionContext, - table: &str, - ) -> Result> { + use crate::{ + benchmarks::tpch::{ + get_tbl_tpch_table_primary_keys, get_tbl_tpch_table_schema, get_tpch_table_schema, TPCH_TABLES, + }, + catalog::{create_io_config_builder, create_table}, + error::{LakeSoulError, Result}, + lakesoul_table::LakeSoulTable, + planner::query_planner::LakeSoulQueryPlanner, + }; + + async fn get_table(ctx: &SessionContext, table: &str) -> Result> { let path = get_tpch_data_path()?; // Obtain a snapshot of the SessionState let state = ctx.state(); let (format, path, extension): (Arc, String, &'static str) = { let path = format!("{path}/{table}.tbl"); - let format = CsvFormat::default() - .with_delimiter(b'|') - .with_has_header(false); + let format = CsvFormat::default().with_delimiter(b'|').with_has_header(false); (Arc::new(format), path, ".tbl") }; @@ -35,16 +45,12 @@ mod integration_tests { let table_path = ListingTableUrl::parse(path)?; let config = ListingTableConfig::new(table_path).with_listing_options(options); - let config = { - config.with_schema(Arc::new(get_tbl_tpch_table_schema(table))) - }; + let config = { config.with_schema(Arc::new(get_tbl_tpch_table_schema(table))) }; Ok(Arc::new(ListingTable::try_new(config)?)) - } - + fn get_tpch_data_path() -> Result { - let path = - std::env::var("TPCH_DATA").unwrap_or_else(|_| "benchmarks/data".to_string()); + let path = std::env::var("TPCH_DATA").unwrap_or_else(|_| "benchmarks/data".to_string()); if !Path::new(&path).exists() { return Err(LakeSoulError::Internal(format!( "Benchmark data not found (set TPCH_DATA env var to override): {}", @@ -54,19 +60,17 @@ mod integration_tests { Ok(path) } - #[tokio::test] async fn load_tpch_data() -> Result<()> { let client = Arc::new(MetaDataClient::from_env().await?); let builder = create_io_config_builder(client.clone(), None, false, "default").await?; let ctx = create_session_context_with_planner(&mut builder.clone().build(), Some(LakeSoulQueryPlanner::new_ref()))?; - + for table in TPCH_TABLES { let table_provider = get_table(&ctx, table).await?; ctx.register_table(*table, table_provider)?; - let dataframe = ctx.sql(format!("select * from {}", table).as_str()) - .await?; + let dataframe = ctx.sql(format!("select * from {}", table).as_str()).await?; let schema = get_tpch_table_schema(table); @@ -82,4 +86,4 @@ mod integration_tests { Ok(()) } -} \ No newline at end of file +} diff --git a/rust/lakesoul-io-c/src/lib.rs b/rust/lakesoul-io-c/src/lib.rs index 6a7178506..0d707ef92 100644 --- a/rust/lakesoul-io-c/src/lib.rs +++ b/rust/lakesoul-io-c/src/lib.rs @@ -17,10 +17,9 @@ pub use arrow::array::StructArray; use arrow::datatypes::Schema; use arrow::ffi::from_ffi; pub use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema}; - -use lakesoul_io::lakesoul_io_config::{LakeSoulIOConfig, LakeSoulIOConfigBuilder}; use tokio::runtime::{Builder, Runtime}; +use lakesoul_io::lakesoul_io_config::{LakeSoulIOConfig, LakeSoulIOConfigBuilder}; use lakesoul_io::lakesoul_reader::{LakeSoulReader, RecordBatch, Result, SyncSendableMutableLakeSoulReader}; use lakesoul_io::lakesoul_writer::SyncSendableMutableLakeSoulWriter; @@ -493,7 +492,9 @@ pub extern "C" fn next_record_batch_blocked( struct Cvoid { data: *const c_void, } + unsafe impl Send for Cvoid {} + unsafe impl Sync for Cvoid {} #[no_mangle] @@ -740,6 +741,14 @@ pub extern "C" fn free_tokio_runtime(runtime: NonNull>) { #[cfg(test)] mod tests { + use core::ffi::c_ptrdiff_t; + use std::ffi::{CStr, CString}; + use std::os::raw::c_char; + use std::ptr::NonNull; + use std::sync::{Condvar, Mutex}; + + use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema}; + use crate::{ create_lakesoul_io_config_from_builder, create_lakesoul_reader_from_config, create_lakesoul_writer_from_config, flush_and_close_writer, free_lakesoul_reader, lakesoul_config_builder_add_single_file, @@ -748,12 +757,6 @@ mod tests { lakesoul_config_builder_set_schema, lakesoul_config_builder_set_thread_num, lakesoul_reader_get_schema, next_record_batch, start_reader, tokio_runtime_builder_set_thread_num, write_record_batch, IOConfigBuilder, }; - use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema}; - use core::ffi::c_ptrdiff_t; - use std::ffi::{CStr, CString}; - use std::os::raw::c_char; - use std::ptr::NonNull; - use std::sync::{Condvar, Mutex}; fn set_object_store_kv(builder: NonNull, key: &str, value: &str) -> NonNull { unsafe { @@ -796,6 +799,7 @@ mod tests { } static mut CALL_BACK_I32_CV: (Mutex, Condvar) = (Mutex::new(-1), Condvar::new()); + #[no_mangle] pub extern "C" fn reader_i32_callback(status: i32, err: *const c_char) { unsafe { diff --git a/rust/lakesoul-io/src/datasource/parquet_source.rs b/rust/lakesoul-io/src/datasource/parquet_source.rs new file mode 100644 index 000000000..c1e952ee5 --- /dev/null +++ b/rust/lakesoul-io/src/datasource/parquet_source.rs @@ -0,0 +1,580 @@ +// SPDX-FileCopyrightText: 2023 LakeSoul Contributors +// +// SPDX-License-Identifier: Apache-2.0 + +use std::any::Any; +use std::collections::HashMap; +use std::fmt::{self, Debug}; +use std::sync::Arc; + +use async_trait::async_trait; +use datafusion::arrow::datatypes::{Field, Schema, SchemaRef}; +use datafusion::common::{DFSchemaRef, ToDFSchema}; +use datafusion::datasource::{provider_as_source, TableProvider}; +use datafusion::error::Result; +use datafusion::execution::context::{SessionState, TaskContext}; +use datafusion::logical_expr::{ + Expr, Expr::Column, LogicalPlan, LogicalPlanBuilder, TableProviderFilterPushDown, TableType, +}; +use datafusion::physical_expr::PhysicalSortExpr; +use datafusion::physical_plan::{ + project_schema, DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream, +}; +use datafusion::prelude::{DataFrame, SessionContext}; +use datafusion_common::DataFusionError; + +use crate::default_column_stream::empty_schema_stream::EmptySchemaStream; +use crate::default_column_stream::DefaultColumnStream; +use crate::filter::parser::Parser as FilterParser; +use crate::lakesoul_io_config::LakeSoulIOConfig; +use crate::projection::ProjectionStream; +use crate::sorted_merge::merge_operator::MergeOperator; +use crate::sorted_merge::sorted_stream_merger::{SortedStream, SortedStreamMerger}; +use crate::transform::uniform_schema; + +use super::empty_schema::EmptySchemaProvider; + +#[derive(Clone, Debug)] +pub struct LakeSoulParquetProvider { + config: LakeSoulIOConfig, + plans: Vec, + full_schema: SchemaRef, +} + +impl LakeSoulParquetProvider { + pub fn from_config(config: LakeSoulIOConfig) -> Self { + Self { + config, + plans: vec![], + full_schema: SchemaRef::new(Schema::empty()), + } + } + + pub async fn build(&self) -> Result { + let context = SessionContext::default(); + self.build_with_context(&context).await + } + + pub async fn build_with_context(&self, context: &SessionContext) -> Result { + let mut plans = vec![]; + let mut full_schema = uniform_schema(self.config.schema.0.clone()).to_dfschema()?; + // one file is a table scan + for i in 0..self.config.files.len() { + let file = self.config.files[i].clone(); + let df = context.read_parquet(file, Default::default()).await?; + full_schema.merge(&Schema::from(df.schema()).to_dfschema()?); + let plan = df.into_unoptimized_plan(); + plans.push(plan); + } + Ok(Self { + config: self.config.clone(), + plans, + full_schema: SchemaRef::new(Schema::from(full_schema)), + }) + } + + pub(crate) fn get_full_schema(&self) -> SchemaRef { + self.full_schema.clone() + } + + pub(crate) async fn create_physical_plan( + &self, + projections: Option<&Vec>, + full_schema: SchemaRef, + inputs: Vec>, + ) -> Result> { + Ok(Arc::new(LakeSoulParquetScanExec::new( + projections, + full_schema, + inputs, + Arc::new(self.config.default_column_value.clone()), + Arc::new(self.config.merge_operators.clone()), + Arc::new(self.config.primary_keys.clone()), + )?)) + } +} + +#[async_trait] +impl TableProvider for LakeSoulParquetProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.full_schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &SessionState, + projections: Option<&Vec>, + // filters and limit can be used here to inject some push-down operations if needed + _filters: &[Expr], + _limit: Option, + ) -> Result> { + let projected_schema = project_schema(&self.get_full_schema(), projections)?; + let mut inputs = vec![]; + for i in 0..self.plans.len() { + let df = DataFrame::new(_state.clone(), self.plans[i].clone()); + let df = _filters + .iter() + .fold(df, |df, f| df.clone().filter(f.clone()).unwrap_or(df)); + let df_schema = Arc::new(df.schema().clone()); + let projected_cols = schema_intersection(df_schema, projected_schema.clone(), &self.config.primary_keys); + let df = if projected_cols.is_empty() { + let plan = LogicalPlanBuilder::scan( + "empty", + provider_as_source(Arc::new(EmptySchemaProvider::new(df.count().await?))), + None, + )? + .build()?; + DataFrame::new(_state.clone(), plan) + } else { + df.select(projected_cols)? + }; + + let physical_plan = df.create_physical_plan().await?; + inputs.push(physical_plan); + } + + let full_schema = SchemaRef::new(Schema::new( + self.get_full_schema() + .fields() + .iter() + .map(|field| { + Field::new( + field.name(), + field.data_type().clone(), + field.is_nullable() + | inputs.iter().any(|plan| { + if let Some((_, plan_field)) = plan.schema().column_with_name(field.name()) { + plan_field.is_nullable() + } else { + true + } + }), + ) + }) + .collect::>(), + )); + + self.create_physical_plan(projections, full_schema, inputs).await + } + + fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result> { + if self.config.primary_keys.is_empty() { + Ok(vec![TableProviderFilterPushDown::Exact; filters.len()]) + } else { + filters + .iter() + .map(|f| { + if let Ok(cols) = f.to_columns() { + if cols.iter().all(|col| self.config.primary_keys.contains(&col.name)) { + Ok(TableProviderFilterPushDown::Inexact) + } else { + Ok(TableProviderFilterPushDown::Unsupported) + } + } else { + Ok(TableProviderFilterPushDown::Unsupported) + } + }) + .collect() + } + } +} + +#[derive(Debug, Clone)] +struct LakeSoulParquetScanExec { + projections: Vec, + origin_schema: SchemaRef, + target_schema_with_pks: SchemaRef, + target_schema: SchemaRef, + inputs: Vec>, + default_column_value: Arc>, + merge_operators: Arc>, + primary_keys: Arc>, +} + +impl LakeSoulParquetScanExec { + fn new( + projections: Option<&Vec>, + full_schema: SchemaRef, + inputs: Vec>, + default_column_value: Arc>, + merge_operators: Arc>, + primary_keys: Arc>, + ) -> Result { + let target_schema_with_pks = if let Some(proj) = projections { + let mut proj_with_pks = proj.clone(); + for idx in 0..primary_keys.len() { + let field_idx = full_schema.index_of(primary_keys[idx].as_str())?; + if !proj.contains(&field_idx) { + proj_with_pks.push(field_idx); + } + } + project_schema(&full_schema, Some(&proj_with_pks))? + } else { + full_schema.clone() + }; + + Ok(Self { + projections: projections + .ok_or(DataFusionError::Internal("no projection".into()))? + .clone(), + origin_schema: full_schema.clone(), + target_schema_with_pks, + target_schema: project_schema(&full_schema, projections)?, + inputs, + default_column_value, + merge_operators, + primary_keys, + }) + } + + fn origin_schema(&self) -> SchemaRef { + self.origin_schema.clone() + } +} + +impl DisplayAs for LakeSoulParquetScanExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "LakeSoulParquetScanExec") + } +} + +impl ExecutionPlan for LakeSoulParquetScanExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.target_schema.clone() + } + + fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning { + datafusion::physical_plan::Partitioning::UnknownPartitioning(1) + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + fn children(&self) -> Vec> { + self.inputs.clone() + } + + fn with_new_children(self: Arc, _: Vec>) -> Result> { + Ok(self) + } + + fn execute(&self, _partition: usize, _context: Arc) -> Result { + let mut stream_init_futs = Vec::with_capacity(self.inputs.len()); + for i in 0..self.inputs.len() { + let plan = self + .inputs + .get(i) + .ok_or(DataFusionError::Internal("get plan failed".into()))?; + let stream = plan.execute(_partition, _context.clone())?; + stream_init_futs.push(stream); + } + let merged_stream = merge_stream( + stream_init_futs, + self.target_schema_with_pks.clone(), + self.primary_keys.clone(), + self.default_column_value.clone(), + self.merge_operators.clone(), + _context.session_config().batch_size(), + )?; + + let result = ProjectionStream { + expr: self + .projections + .iter() + .map(|&idx| { + datafusion::physical_expr::expressions::col(self.origin_schema().field(idx).name(), &self.schema()) + }) + .collect::>>()?, + schema: self.target_schema.clone(), + input: merged_stream, + }; + + Ok(Box::pin(result)) + } +} + +pub fn merge_stream( + streams: Vec, + schema: SchemaRef, + primary_keys: Arc>, + default_column_value: Arc>, + merge_operators: Arc>, + batch_size: usize, +) -> Result { + let merge_stream = if primary_keys.is_empty() { + Box::pin(DefaultColumnStream::new_from_streams_with_default( + streams, + schema, + default_column_value, + )) + } else { + let merge_schema: SchemaRef = Arc::new(Schema::new( + schema + .fields + .iter() + .filter_map(|field| { + if default_column_value.get(field.name()).is_none() { + Some(field.clone()) + } else { + None + } + }) + .collect::>(), + )); // merge_schema + let merge_ops = schema + .fields() + .iter() + .map(|field| { + MergeOperator::from_name(merge_operators.get(field.name()).unwrap_or(&String::from("UseLast"))) + }) + .collect::>(); + + let streams = streams + .into_iter() + .map(|s| SortedStream::new(Box::pin(DefaultColumnStream::new_from_stream(s, merge_schema.clone())))) + .collect(); + let merge_stream = SortedStreamMerger::new_from_streams( + streams, + merge_schema, + primary_keys.iter().cloned().collect(), + batch_size, + merge_ops, + )?; + Box::pin(DefaultColumnStream::new_from_streams_with_default( + vec![Box::pin(merge_stream)], + schema, + default_column_value, + )) + }; + Ok(merge_stream) +} + +fn schema_intersection(df_schema: DFSchemaRef, request_schema: SchemaRef, primary_keys: &[String]) -> Vec { + let mut exprs = primary_keys + .iter() + .map(|pk| Column(datafusion::common::Column::new_unqualified(pk))) + .collect::>(); + for field in request_schema.fields() { + if primary_keys.contains(field.name()) { + continue; + } + if df_schema.field_with_unqualified_name(field.name()).is_ok() { + exprs.push(Column(datafusion::common::Column::new_unqualified(field.name()))); + } + } + exprs +} + +pub async fn prune_filter_and_execute( + df: DataFrame, + request_schema: SchemaRef, + filter_str: Vec, + batch_size: usize, +) -> Result { + let df_schema = df.schema().clone(); + // find columns requested and prune others + let cols = schema_intersection(Arc::new(df_schema.clone()), request_schema.clone(), &[]); + if cols.is_empty() { + Ok(Box::pin(EmptySchemaStream::new(batch_size, df.count().await?))) + } else { + // row filtering should go first since filter column may not in the selected cols + let arrow_schema = Arc::new(Schema::from(df_schema)); + let df = filter_str.iter().try_fold(df, |df, f| { + let filter = FilterParser::parse(f.clone(), arrow_schema.clone())?; + df.filter(filter) + })?; + // column pruning + let df = df.select(cols)?; + // return a stream + df.execute_stream().await + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + use std::time::Duration; + + use arrow::datatypes::DataType; + use arrow::util::pretty::print_batches; + use datafusion::datasource::provider_as_source; + use datafusion::logical_expr::Expr; + use datafusion::logical_expr::LogicalPlanBuilder; + use datafusion::prelude::*; + use datafusion::scalar::ScalarValue; + use tokio::time::timeout; + + use crate::filter::parser::Parser; + use crate::lakesoul_io_config::LakeSoulIOConfigBuilder; + + use super::*; + + #[tokio::test] + async fn test_lakesoul_parquet_source_with_pk() -> Result<()> { + // create our custom datasource and adding some users + let schema = SchemaRef::new(Schema::new(vec![ + Field::new("hash", DataType::Int32, false), + Field::new("value", DataType::Int32, true), + Field::new("name", DataType::Int32, true), + Field::new("range", DataType::Int32, true), + ])); + let builder = LakeSoulIOConfigBuilder::new() + .with_file("/Users/ceng/Desktop/test/range=20201101/2.parquet".to_owned()) + .with_file("/Users/ceng/Desktop/test/range=20201101/1-3-before.parquet".to_owned()) + .with_file("/Users/ceng/Desktop/test/range=20201101/1-3-after.parquet".to_owned()) + .with_file("/Users/ceng/Desktop/test/range=20201101/4.parquet".to_owned()) + .with_schema(schema.clone()) + .with_default_column_value("range".to_string(), "20201101".to_string()) + .with_primary_keys(vec!["hash".to_string()]); + + query( + LakeSoulParquetProvider::from_config(builder.build()), + Some(Parser::parse("gt(value,0)".to_string(), schema.clone())?), + ) + .await?; + + Ok(()) + } + + #[tokio::test] + async fn test_lakesoul_parquet_source_exclude_pk() -> Result<()> { + let schema = SchemaRef::new(Schema::new(vec![ + // Field::new("hash", DataType::Int32, false), + Field::new("value", DataType::Int32, true), + Field::new("name", DataType::Int32, true), + Field::new("range", DataType::Int32, true), + ])); + let builder = LakeSoulIOConfigBuilder::new() + .with_file("/Users/ceng/Desktop/test/range=20201101/2.parquet".to_owned()) + .with_file("/Users/ceng/Desktop/test/range=20201101/1-3-before.parquet".to_owned()) + .with_file("/Users/ceng/Desktop/test/range=20201101/1-3-after.parquet".to_owned()) + .with_file("/Users/ceng/Desktop/test/range=20201101/4.parquet".to_owned()) + .with_schema(schema.clone()) + .with_default_column_value("range".to_string(), "20201101".to_string()) + .with_primary_keys(vec!["hash".to_string()]); + + query( + LakeSoulParquetProvider::from_config(builder.build()), + Some(Parser::parse("gt(hash,0)".to_string(), schema.clone())?), + ) + .await?; + + Ok(()) + } + + async fn query(db: LakeSoulParquetProvider, filter: Option) -> Result<()> { + // create local execution context + let config = SessionConfig::default(); + let ctx = SessionContext::new_with_config(config); + + let db = db.build_with_context(&ctx).await?; + + // create logical plan composed of a single TableScan + let logical_plan = + LogicalPlanBuilder::scan_with_filters("name", provider_as_source(Arc::new(db)), None, vec![])?.build()?; + + let mut dataframe = DataFrame::new(ctx.state(), logical_plan); + + if let Some(f) = filter { + dataframe = dataframe.filter(f)?; + } + dataframe = dataframe.select_columns(&["hash", "value", "name", "range"])?; + dataframe = dataframe.explain(true, false)?; + + timeout(Duration::from_secs(10), async move { + let result = dataframe.collect().await?; + // let record_batch = result.get(0).unwrap(); + + // assert_eq!(expected_result_length, record_batch.column(1).len()); + let _ = print_batches(&result); + Ok(()) + }) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))? + } + + #[tokio::test] + async fn test_lakesoul_parquet_source_by_select_from_sql_and_filter_api() -> Result<()> { + let ctx = SessionContext::new(); + let schema = SchemaRef::new(Schema::new(vec![ + Field::new("hash", DataType::Int32, false), + Field::new("value", DataType::Int32, true), + Field::new("name", DataType::Int32, true), + Field::new("range", DataType::Int32, true), + ])); + let builder = LakeSoulIOConfigBuilder::new() + .with_file("/Users/ceng/Desktop/test/range=20201101/2.parquet".to_owned()) + .with_file("/Users/ceng/Desktop/test/range=20201101/1-3-before.parquet".to_owned()) + .with_file("/Users/ceng/Desktop/test/range=20201101/1-3-after.parquet".to_owned()) + .with_file("/Users/ceng/Desktop/test/range=20201101/4.parquet".to_owned()) + .with_schema(schema.clone()) + .with_default_column_value("range".to_string(), "20201101".to_string()) + .with_primary_keys(vec!["hash".to_string()]); + + let provider = LakeSoulParquetProvider::from_config(builder.build()) + .build_with_context(&ctx) + .await?; + ctx.register_table("lakesoul", Arc::new(provider))?; + + let results = ctx + .sql("SELECT * FROM lakesoul") + .await? + .filter(col("value").gt(Expr::Literal(ScalarValue::Int32(Some(1)))))? + .select(vec![col("hash")])? + .explain(true, false)? + .collect() + .await?; + + let _ = print_batches(&results); + + Ok(()) + } + + #[tokio::test] + async fn test_lakesoul_parquet_source_by_select_from_where_sql() -> Result<()> { + let ctx = SessionContext::new(); + let schema = SchemaRef::new(Schema::new(vec![ + Field::new("hash", DataType::Int32, false), + Field::new("value", DataType::Int32, true), + Field::new("name", DataType::Int32, true), + Field::new("range", DataType::Int32, true), + ])); + let builder = LakeSoulIOConfigBuilder::new() + .with_file("/Users/ceng/Desktop/test/range=20201101/2.parquet".to_owned()) + .with_file("/Users/ceng/Desktop/test/range=20201101/1-3-before.parquet".to_owned()) + .with_file("/Users/ceng/Desktop/test/range=20201101/1-3-after.parquet".to_owned()) + .with_file("/Users/ceng/Desktop/test/range=20201101/4.parquet".to_owned()) + .with_schema(schema.clone()) + .with_default_column_value("range".to_string(), "20201101".to_string()) + .with_primary_keys(vec!["hash".to_string()]); + + let provider = LakeSoulParquetProvider::from_config(builder.build()) + .build_with_context(&ctx) + .await?; + ctx.register_table("lakesoul", Arc::new(provider))?; + + let results = ctx + .sql("SELECT hash FROM lakesoul where value > 1") + .await? + .explain(true, false)? + .collect() + .await?; + + let _ = print_batches(&results); + + Ok(()) + } +} diff --git a/rust/lakesoul-metadata-c/.gitignore b/rust/lakesoul-metadata-c/.gitignore new file mode 100644 index 000000000..c26939f47 --- /dev/null +++ b/rust/lakesoul-metadata-c/.gitignore @@ -0,0 +1,2 @@ +**/build/ +**/.cache/ diff --git a/rust/lakesoul-metadata-c/Cargo.toml b/rust/lakesoul-metadata-c/Cargo.toml index 6e26989f0..ad3b5db88 100644 --- a/rust/lakesoul-metadata-c/Cargo.toml +++ b/rust/lakesoul-metadata-c/Cargo.toml @@ -15,4 +15,7 @@ crate-type = ["cdylib"] [dependencies] lakesoul-metadata = { path = "../lakesoul-metadata" } proto = { path = "../proto" } -prost = "0.11" +prost = {workspace = true} +serde_json = "1.0.111" +log = {workspace = true} +env_logger = "0.11" diff --git a/rust/lakesoul-metadata-c/src/lib.rs b/rust/lakesoul-metadata-c/src/lib.rs index 6603a30eb..5c939dddd 100644 --- a/rust/lakesoul-metadata-c/src/lib.rs +++ b/rust/lakesoul-metadata-c/src/lib.rs @@ -7,31 +7,36 @@ extern crate core; use core::ffi::c_ptrdiff_t; +use std::collections::HashMap; use std::ffi::{c_char, c_uchar, CStr, CString}; use std::io::Write; -use std::ptr::NonNull; +use std::ptr::{NonNull, null, null_mut}; -use lakesoul_metadata::{Builder, Client, MetaDataClient, PreparedStatementMap, Runtime}; +use log::debug; use prost::bytes::BufMut; use prost::Message; + +use lakesoul_metadata::{Builder, Client, MetaDataClient, PreparedStatementMap, Runtime}; +use lakesoul_metadata::error::LakeSoulMetaDataError; +use lakesoul_metadata::transfusion::SplitDesc; use proto::proto::entity; #[repr(C)] -pub struct Result { +pub struct CResult { ptr: *mut OpaqueT, err: *const c_char, } -impl Result { +impl CResult { pub fn new(obj: T) -> Self { - Result { + CResult { ptr: convert_to_opaque_raw::(obj), err: std::ptr::null(), } } pub fn error(err_msg: &str) -> Self { - Result { + CResult { ptr: std::ptr::null_mut(), err: CString::new(err_msg).unwrap().into_raw(), } @@ -40,6 +45,7 @@ impl Result { pub fn free(&mut self) { unsafe { if !self.ptr.is_null() { + // Invoking `std::mem::drop` with a value that implements `Copy` does nothing drop(from_opaque::(NonNull::new_unchecked(self.ptr))); } if !self.err.is_null() { @@ -49,6 +55,44 @@ impl Result { } } +pub type ResultCallback = extern "C" fn(bool, *const c_char); + +pub type IntegerResultCallBack = extern "C" fn(i32, *const c_char); + +// pub type DataResultCallback = extern "C" fn(bool, *const c_char, *const c_void); + +/// for jnr +/// can use as_ptr instead of into_raw? +#[allow(unused)] +fn call_result_callback(callback: ResultCallback, status: bool, err: *const c_char) { + callback(status, err); + // release error string + if !err.is_null() { + unsafe { + let _ = CString::from_raw(err as *mut c_char); + } + } +} + +fn call_integer_result_callback(callback: IntegerResultCallBack, status: i32, err: *const c_char) { + // release error string + callback(status, err); + if !err.is_null() { + unsafe { + let _ = CString::from_raw(err as *mut c_char); + } + } +} + +// #[repr(C)] +// struct CVoid { +// data: *const c_void, +// } + +// unsafe impl Send for CVoid {} + +// unsafe impl Sync for CVoid {} + #[repr(C)] pub struct PreparedStatement { private: [u8; 0], @@ -89,14 +133,12 @@ fn string_from_ptr(ptr: *const c_char) -> String { unsafe { CStr::from_ptr(ptr).to_str().unwrap().to_string() } } -pub type ResultCallback = extern "C" fn(T, *const c_char); - #[no_mangle] pub extern "C" fn execute_insert( callback: extern "C" fn(i32, *const c_char), - runtime: NonNull>, - client: NonNull>, - prepared: NonNull>, + runtime: NonNull>, + client: NonNull>, + prepared: NonNull>, insert_type: i32, addr: c_ptrdiff_t, len: i32, @@ -118,9 +160,9 @@ pub extern "C" fn execute_insert( #[no_mangle] pub extern "C" fn execute_update( callback: extern "C" fn(i32, *const c_char), - runtime: NonNull>, - client: NonNull>, - prepared: NonNull>, + runtime: NonNull>, + client: NonNull>, + prepared: NonNull>, update_type: i32, joined_string: *const c_char, ) { @@ -140,9 +182,9 @@ pub extern "C" fn execute_update( #[no_mangle] pub extern "C" fn execute_query_scalar( callback: extern "C" fn(*const c_char, *const c_char), - runtime: NonNull>, - client: NonNull>, - prepared: NonNull>, + runtime: NonNull>, + client: NonNull>, + prepared: NonNull>, update_type: i32, joined_string: *const c_char, ) { @@ -172,12 +214,12 @@ pub extern "C" fn execute_query_scalar( #[no_mangle] pub extern "C" fn execute_query( callback: extern "C" fn(i32, *const c_char), - runtime: NonNull>, - client: NonNull>, - prepared: NonNull>, + runtime: NonNull>, + client: NonNull>, + prepared: NonNull>, query_type: i32, joined_string: *const c_char, -) -> NonNull> { +) -> NonNull> { let runtime = unsafe { NonNull::new_unchecked(runtime.as_ref().ptr as *mut Runtime).as_ref() }; let client = unsafe { NonNull::new_unchecked(client.as_ref().ptr as *mut Client).as_ref() }; let prepared = unsafe { NonNull::new_unchecked(prepared.as_ref().ptr as *mut PreparedStatementMap).as_mut() }; @@ -189,11 +231,11 @@ pub extern "C" fn execute_query( Ok(u8_vec) => { let len = u8_vec.len(); callback(len as i32, CString::new("").unwrap().into_raw()); - convert_to_nonnull(Result::::new::>(u8_vec)) + convert_to_nonnull(CResult::::new::>(u8_vec)) } Err(e) => { callback(-1, CString::new(e.to_string().as_str()).unwrap().into_raw()); - convert_to_nonnull(Result::::new::>(vec![])) + convert_to_nonnull(CResult::::new::>(vec![])) } } } @@ -201,7 +243,7 @@ pub extern "C" fn execute_query( #[no_mangle] pub extern "C" fn export_bytes_result( callback: extern "C" fn(bool, *const c_char), - bytes: NonNull>, + bytes: NonNull>, len: i32, addr: c_ptrdiff_t, ) { @@ -228,15 +270,15 @@ pub extern "C" fn export_bytes_result( } #[no_mangle] -pub extern "C" fn free_bytes_result(bytes: NonNull>) { +pub extern "C" fn free_bytes_result(bytes: NonNull>) { from_nonnull(bytes).free::>(); } #[no_mangle] pub extern "C" fn clean_meta_for_test( callback: extern "C" fn(i32, *const c_char), - runtime: NonNull>, - client: NonNull>, + runtime: NonNull>, + client: NonNull>, ) { let runtime = unsafe { NonNull::new_unchecked(runtime.as_ref().ptr as *mut Runtime).as_ref() }; let client = unsafe { NonNull::new_unchecked(client.as_ref().ptr as *mut Client).as_ref() }; @@ -248,18 +290,18 @@ pub extern "C" fn clean_meta_for_test( } #[no_mangle] -pub extern "C" fn create_tokio_runtime() -> NonNull> { +pub extern "C" fn create_tokio_runtime() -> NonNull> { let runtime = Builder::new_multi_thread() .enable_all() .worker_threads(2) .max_blocking_threads(8) .build() .unwrap(); - convert_to_nonnull(Result::::new(runtime)) + convert_to_nonnull(CResult::::new(runtime)) } #[no_mangle] -pub extern "C" fn free_tokio_runtime(runtime: NonNull>) { +pub extern "C" fn free_tokio_runtime(runtime: NonNull>) { from_nonnull(runtime).free::(); } @@ -267,8 +309,8 @@ pub extern "C" fn free_tokio_runtime(runtime: NonNull>) { pub extern "C" fn create_tokio_postgres_client( callback: extern "C" fn(bool, *const c_char), config: *const c_char, - runtime: NonNull>, -) -> NonNull> { + runtime: NonNull>, +) -> NonNull> { let config = string_from_ptr(config); let runtime = unsafe { NonNull::new_unchecked(runtime.as_ref().ptr as *mut Runtime).as_ref() }; @@ -277,39 +319,145 @@ pub extern "C" fn create_tokio_postgres_client( let result = match result { Ok(client) => { callback(true, CString::new("").unwrap().into_raw()); - Result::::new(client) + CResult::::new(client) } Err(e) => { callback(false, CString::new(e.to_string().as_str()).unwrap().into_raw()); - Result::::error(format!("{}", e).as_str()) + CResult::::error(format!("{}", e).as_str()) } }; convert_to_nonnull(result) } #[no_mangle] -pub extern "C" fn free_tokio_postgres_client(client: NonNull>) { +pub extern "C" fn free_tokio_postgres_client(client: NonNull>) { from_nonnull(client).free::(); } #[no_mangle] -pub extern "C" fn create_prepared_statement() -> NonNull> { +pub extern "C" fn create_prepared_statement() -> NonNull> { let prepared = PreparedStatementMap::new(); - convert_to_nonnull(Result::::new(prepared)) + convert_to_nonnull(CResult::::new(prepared)) } #[no_mangle] -pub extern "C" fn free_prepared_statement(prepared: NonNull>) { +pub extern "C" fn free_prepared_statement(prepared: NonNull>) { from_nonnull(prepared).free::(); } #[no_mangle] -pub extern "C" fn create_lakesoul_metadata_client() -> NonNull> { +pub extern "C" fn create_lakesoul_metadata_client() -> NonNull> { let client = MetaDataClient::from_env(); - convert_to_nonnull(Result::::new(client)) + convert_to_nonnull(CResult::::new(client)) +} + +#[no_mangle] +pub extern "C" fn free_lakesoul_metadata_client(client: NonNull>) { + from_nonnull(client).free::(); +} + +/// # Safety +/// check nothing +fn c_char2str<'a>(ptr: *const c_char) -> &'a str { + unsafe { + let c_str = CStr::from_ptr(ptr); + c_str.to_str().unwrap() + } +} + +/// USE: JNR +/// return split(partition) desc array in json format by table_name, namespace , filter(WIP) +/// 0: success +/// -100: ffi::restore error +/// -1: error +#[no_mangle] +pub extern "C" fn create_split_desc_array( + callback: IntegerResultCallBack, + client: NonNull>, + prepared: NonNull>, + runtime: NonNull>, + table_name: *const c_char, + namespace: *const c_char, +) -> *mut c_char { + let runtime = unsafe { NonNull::new_unchecked(runtime.as_ref().ptr as *mut Runtime).as_ref() }; + let client = unsafe { NonNull::new_unchecked(client.as_ref().ptr as *mut Client).as_ref() }; + let prepared = unsafe { NonNull::new_unchecked(prepared.as_ref().ptr as *mut PreparedStatementMap).as_ref() }; + let table_name = c_char2str(table_name); + let namespace = c_char2str(namespace); + let result: Result<*mut c_char, LakeSoulMetaDataError> = runtime.block_on(async { + let ret = lakesoul_metadata::transfusion::split_desc_array(client, prepared, table_name, namespace).await?; + let v = serde_json::to_vec(&ret)?; + Ok(CString::new(v) + .map_err(|e| LakeSoulMetaDataError::Internal(e.to_string()))? + .into_raw()) + }); + + let (ret, status, e) = match result { + Ok(ptr) => (ptr, 0, null()), + Err(e) => match e { + LakeSoulMetaDataError::FfiError(ffi) => { + (null_mut(), -100, CString::new(ffi).unwrap().into_raw() as *const c_char) + } + other => ( + null_mut(), + -1, + CString::new(other.to_string()).unwrap().into_raw() as *const c_char, + ), + }, + }; + call_integer_result_callback(callback, status, e); + ret +} + +/// # Safety +/// caller should keep it safe +#[no_mangle] +pub unsafe extern "C" fn free_split_desc_array(json: *mut c_char) { + free_c_string(json) +} + +#[no_mangle] +pub extern "C" fn debug(callback: extern "C" fn(bool, *const c_char)) -> *mut c_char { + debug!("in debug"); + let x = vec![ + SplitDesc { + file_paths: vec!["hello jnr".into()], + primary_keys: vec![], + partition_desc: HashMap::new(), + table_schema: "".to_string(), + };1]; + let array = lakesoul_metadata::transfusion::SplitDescArray(x); + let json_vec = serde_json::to_vec(&array).unwrap(); + let c_string = CString::new(json_vec).unwrap(); + let x = CString::new("oops").unwrap().into_raw(); + callback(false, x); + unsafe { + let _s = CString::from_raw(x); + } + c_string.into_raw() +} + +/// # Safety +/// c_string should be valid +#[no_mangle] +pub unsafe extern "C" fn free_c_string(c_string: *mut c_char) { + unsafe { + // only check ptr is not null + if c_string.is_null() { + debug!("early return due to null ptr"); + return; + } + debug!("free c string start"); + let _s = CString::from_raw(c_string); + debug!("free c string finished"); + } } +/// init a global logger for rust code +/// now use RUST_LOG=LEVEL to activate +/// TODO use tokio::tracing +/// TODO add logger format #[no_mangle] -pub extern "C" fn free_lakesoul_metadata_client(prepared: NonNull>) { - from_nonnull(prepared).free::(); +pub extern "C" fn rust_logger_init() { + let _ = env_logger::try_init(); } diff --git a/rust/lakesoul-metadata-c/test_bindings/CMakeLists.txt b/rust/lakesoul-metadata-c/test_bindings/CMakeLists.txt new file mode 100644 index 000000000..aac608e47 --- /dev/null +++ b/rust/lakesoul-metadata-c/test_bindings/CMakeLists.txt @@ -0,0 +1,38 @@ +cmake_minimum_required(VERSION 3.14) +project(lakesoul-c-bindings) + +# GoogleTest requires at least C++14 +set(CMAKE_CXX_STANDARD 14) +set(CMAKE_CXX_STANDARD_REQUIRED ON) + + +add_compile_options(-fsanitize=address) +add_link_options(-fsanitize=address) + +include(FetchContent) +FetchContent_Declare( + googletest + URL https://github.com/google/googletest/archive/03597a01ee50ed33e9dfd640b249b4be3799d395.zip +) +# For Windows: Prevent overriding the parent project's compiler/linker settings +set(gtest_force_shared_crt ON CACHE BOOL "" FORCE) +set(CMAKE_EXPORT_COMPILE_COMMANDS on) +FetchContent_MakeAvailable(googletest) + +link_directories("../../target/debug") + + +enable_testing() + +add_executable( + binding_test + binding_test.cc +) +target_link_libraries( + binding_test + GTest::gtest_main + lakesoul_metadata_c +) + +include(GoogleTest) +gtest_discover_tests(binding_test) diff --git a/rust/lakesoul-metadata-c/test_bindings/binding.h b/rust/lakesoul-metadata-c/test_bindings/binding.h new file mode 100644 index 000000000..2e4c1d39e --- /dev/null +++ b/rust/lakesoul-metadata-c/test_bindings/binding.h @@ -0,0 +1,21 @@ +#include + +template struct Result { + OpaqueT *ptr; + const char *err; +}; + +struct TokioRuntime { + uint8_t private_[0]; +}; + +extern "C" { +char *create_split_desc_array(const char *, const char *, + Result *runtime); + +void free_split_desc_array(char *); + +char *debug(); + +void rust_logger_init(); +} diff --git a/rust/lakesoul-metadata-c/test_bindings/binding_test.cc b/rust/lakesoul-metadata-c/test_bindings/binding_test.cc new file mode 100644 index 000000000..eb2ce9173 --- /dev/null +++ b/rust/lakesoul-metadata-c/test_bindings/binding_test.cc @@ -0,0 +1,20 @@ +#include "binding.h" +#include "gtest/gtest.h" +#include + +TEST(TokioRuntimeTest, GlobalTest) { + rust_logger_init(); + char *s = debug(); + std::cout << s; +} + +/* TEST(SplitDescArrayTest, HeapUseAfterFree) { */ +/* ASSERT_DEATH( */ +/* { */ +/* auto *json = create_split_desc_array(nullptr, nullptr, nullptr); */ +/* std::string js(json); */ +/* free_split_desc_array(json); */ +/* auto _x = *json; */ +/* }, */ +/* "heap-use-after-free"); */ +/* } */ diff --git a/rust/lakesoul-metadata/Cargo.toml b/rust/lakesoul-metadata/Cargo.toml index ee89ea7e7..cc2f07e81 100644 --- a/rust/lakesoul-metadata/Cargo.toml +++ b/rust/lakesoul-metadata/Cargo.toml @@ -26,6 +26,9 @@ url = { workspace = true } tracing = { workspace = true } thiserror = { workspace = true } anyhow = { workspace = true } +regex = "1.10.3" +serde = { workspace = true } + [dev-dependencies] test-log = "0.2.14" diff --git a/rust/lakesoul-metadata/src/error.rs b/rust/lakesoul-metadata/src/error.rs index 40ba1792b..f5fbde97a 100644 --- a/rust/lakesoul-metadata/src/error.rs +++ b/rust/lakesoul-metadata/src/error.rs @@ -3,6 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 use std::{io, num, result, sync::Arc}; + use thiserror::Error; /// Result type for operations that could result in an [LakeSoulMetaDataError] @@ -33,6 +34,8 @@ pub enum LakeSoulMetaDataError { ProstDecodeError(#[from] prost::DecodeError), #[error("prost encode error: {0}")] ProstEncodeError(#[from] prost::EncodeError), + #[error("ffi error: {0}")] + FfiError(String), #[error( "Internal error: {0}\nThis was likely caused by a bug in LakeSoul's \ code and we would welcome that you file an bug report in our issue tracker" diff --git a/rust/lakesoul-metadata/src/lib.rs b/rust/lakesoul-metadata/src/lib.rs index 213ab7e9d..91801aaae 100644 --- a/rust/lakesoul-metadata/src/lib.rs +++ b/rust/lakesoul-metadata/src/lib.rs @@ -2,24 +2,24 @@ // // SPDX-License-Identifier: Apache-2.0 -pub mod error; -mod metadata_client; - use std::str::FromStr; use std::{collections::HashMap, io::ErrorKind}; -use error::{LakeSoulMetaDataError, Result}; +use postgres_types::{FromSql, ToSql}; use prost::Message; -use proto::proto::entity; - pub use tokio::runtime::{Builder, Runtime}; - -use postgres_types::{FromSql, ToSql}; use tokio::spawn; pub use tokio_postgres::{Client, NoTls, Statement}; use tokio_postgres::{Error, Row}; +use error::{LakeSoulMetaDataError, Result}; pub use metadata_client::{MetaDataClient, MetaDataClientRef}; +use proto::proto::entity; + +pub mod transfusion; + +pub mod error; +mod metadata_client; pub const DAO_TYPE_QUERY_ONE_OFFSET: i32 = 0; pub const DAO_TYPE_QUERY_LIST_OFFSET: i32 = 100; @@ -56,10 +56,7 @@ impl DataFileOp { fn from_proto_data_file_op(data_file_op: &entity::DataFileOp) -> Result { Ok(DataFileOp { path: data_file_op.path.clone(), - file_op: entity::FileOp::from_i32(data_file_op.file_op) - .ok_or(LakeSoulMetaDataError::ProstDecodeError(prost::DecodeError::new( - "file_op decode failed", - )))? + file_op: entity::FileOp::try_from(data_file_op.file_op)? .as_str_name() .to_string(), size: data_file_op.size, @@ -170,95 +167,95 @@ async fn get_prepared_statement( let statement = match dao_type { // Select Namespace DaoType::SelectNamespaceByNamespace => - "select namespace, properties, comment, domain - from namespace + "select namespace, properties, comment, domain + from namespace where namespace = $1::TEXT", DaoType::ListNamespaces => - "select namespace, properties, comment, domain + "select namespace, properties, comment, domain from namespace", // Select TablePathId DaoType::SelectTablePathIdByTablePath => - "select table_path, table_id, table_namespace, domain - from table_path_id + "select table_path, table_id, table_namespace, domain + from table_path_id where table_path = $1::TEXT", DaoType::ListAllTablePath => - "select table_path, table_id, table_namespace, domain + "select table_path, table_id, table_namespace, domain from table_path_id", DaoType::ListAllPathTablePathByNamespace => - "select table_path - from table_path_id + "select table_path + from table_path_id where table_namespace = $1::TEXT ", // Select TableNameId DaoType::SelectTableNameIdByTableName => - "select table_name, table_id, table_namespace, domain - from table_name_id + "select table_name, table_id, table_namespace, domain + from table_name_id where table_name = $1::TEXT and table_namespace = $2::TEXT", DaoType::ListTableNameByNamespace => - "select table_name, table_id, table_namespace, domain - from table_name_id + "select table_name, table_id, table_namespace, domain + from table_name_id where table_namespace = $1::TEXT", // Select TableInfo DaoType::SelectTableInfoByTableId => - "select table_id, table_name, table_path, table_schema, properties, partitions, table_namespace, domain - from table_info + "select table_id, table_name, table_path, table_schema, properties, partitions, table_namespace, domain + from table_info where table_id = $1::TEXT", DaoType::SelectTableInfoByTableNameAndNameSpace => - "select table_id, table_name, table_path, table_schema, properties, partitions, table_namespace, domain - from table_info + "select table_id, table_name, table_path, table_schema, properties, partitions, table_namespace, domain + from table_info where table_name = $1::TEXT and table_namespace=$2::TEXT", DaoType::SelectTableInfoByTablePath => - "select table_id, table_name, table_path, table_schema, properties, partitions, table_namespace, domain - from table_info + "select table_id, table_name, table_path, table_schema, properties, partitions, table_namespace, domain + from table_info where table_path = $1::TEXT", DaoType::SelectTableInfoByIdAndTablePath => - "select table_id, table_name, table_path, table_schema, properties, partitions, table_namespace, domain - from table_info + "select table_id, table_name, table_path, table_schema, properties, partitions, table_namespace, domain + from table_info where table_id = $1::TEXT and table_path=$2::TEXT", // Select PartitionInfo DaoType::SelectPartitionVersionByTableIdAndDescAndVersion => - "select table_id, partition_desc, version, commit_op, snapshot, expression, domain + "select table_id, partition_desc, version, commit_op, snapshot, expression, domain from partition_info where table_id = $1::TEXT and partition_desc = $2::TEXT and version = $3::INT", DaoType::SelectOnePartitionVersionByTableIdAndDesc => "select m.table_id, t.partition_desc, m.version, m.commit_op, m.snapshot, m.expression, m.domain from ( - select table_id,partition_desc,max(version) from partition_info - where table_id = $1::TEXT and partition_desc = $2::TEXT group by table_id, partition_desc) t - left join partition_info m on t.table_id = m.table_id + select table_id,partition_desc,max(version) from partition_info + where table_id = $1::TEXT and partition_desc = $2::TEXT group by table_id, partition_desc) t + left join partition_info m on t.table_id = m.table_id and t.partition_desc = m.partition_desc and t.max = m.version", DaoType::ListPartitionByTableIdAndDesc => - "select table_id, partition_desc, version, commit_op, snapshot, timestamp, expression, domain - from partition_info + "select table_id, partition_desc, version, commit_op, snapshot, timestamp, expression, domain + from partition_info where table_id = $1::TEXT and partition_desc = $2::TEXT ", DaoType::ListPartitionByTableId => - "select m.table_id, t.partition_desc, m.version, m.commit_op, m.snapshot, m.expression, m.domain + "select m.table_id, t.partition_desc, m.version, m.commit_op, m.snapshot, m.expression, m.domain from ( - select table_id,partition_desc,max(version) - from partition_info - where table_id = $1::TEXT - group by table_id,partition_desc) t - left join partition_info m + select table_id,partition_desc,max(version) + from partition_info + where table_id = $1::TEXT + group by table_id,partition_desc) t + left join partition_info m on t.table_id = m.table_id and t.partition_desc = m.partition_desc and t.max = m.version", DaoType::ListPartitionVersionByTableIdAndPartitionDescAndTimestampRange => - "select table_id, partition_desc, version, commit_op, snapshot, timestamp, expression, domain - from partition_info + "select table_id, partition_desc, version, commit_op, snapshot, timestamp, expression, domain + from partition_info where table_id = $1::TEXT and partition_desc = $2::TEXT and timestamp >= $3::BIGINT and timestamp < $4::BIGINT", DaoType::ListCommitOpsBetweenVersions => - "select distinct(commit_op) - from partition_info + "select distinct(commit_op) + from partition_info where table_id = $1::TEXT and partition_desc = $2::TEXT and version between $3::INT and $4::INT", DaoType::ListPartitionVersionByTableIdAndPartitionDescAndVersionRange => - "select table_id, partition_desc, version, commit_op, snapshot, timestamp, expression, domain - from partition_info + "select table_id, partition_desc, version, commit_op, snapshot, timestamp, expression, domain + from partition_info where table_id = $1::TEXT and partition_desc = $2::TEXT and version >= $3::INT and version <= $4::INT", // Select DataCommitInfo DaoType::SelectOneDataCommitInfoByTableIdAndPartitionDescAndCommitId => - "select table_id, partition_desc, commit_id, file_ops, commit_op, timestamp, committed, domain - from data_commit_info + "select table_id, partition_desc, commit_id, file_ops, commit_op, timestamp, committed, domain + from data_commit_info where table_id = $1::TEXT and partition_desc = $2::TEXT and commit_id = $3::UUID", @@ -266,124 +263,124 @@ async fn get_prepared_statement( DaoType::InsertNamespace => "insert into namespace( namespace, - properties, - comment, - domain) + properties, + comment, + domain) values($1::TEXT, $2::JSON, $3::TEXT, $4::TEXT)", DaoType::InsertTableInfo => "insert into table_info( - table_id, - table_name, - table_path, - table_schema, - properties, - partitions, - table_namespace, - domain) + table_id, + table_name, + table_path, + table_schema, + properties, + partitions, + table_namespace, + domain) values($1::TEXT, $2::TEXT, $3::TEXT, $4::TEXT, $5::JSON, $6::TEXT, $7::TEXT, $8::TEXT)", DaoType::InsertTableNameId => "insert into table_name_id( - table_id, - table_name, - table_namespace, - domain) + table_id, + table_name, + table_namespace, + domain) values($1::TEXT, $2::TEXT, $3::TEXT, $4::TEXT)", DaoType::InsertTablePathId => "insert into table_path_id( - table_id, - table_path, - table_namespace, - domain) + table_id, + table_path, + table_namespace, + domain) values($1::TEXT, $2::TEXT, $3::TEXT, $4::TEXT)", DaoType::InsertPartitionInfo => "insert into partition_info( - table_id, + table_id, partition_desc, - version, - commit_op, + version, + commit_op, snapshot, expression, domain - ) + ) values($1::TEXT, $2::TEXT, $3::INT, $4::TEXT, $5::_UUID, $6::TEXT, $7::TEXT)", DaoType::InsertDataCommitInfo => "insert into data_commit_info( - table_id, + table_id, partition_desc, - commit_id, - file_ops, + commit_id, + file_ops, commit_op, timestamp, committed, domain - ) + ) values($1::TEXT, $2::TEXT, $3::UUID, $4::_data_file_op, $5::TEXT, $6::BIGINT, $7::BOOL, $8::TEXT)", // Query Scalar DaoType::GetLatestTimestampFromPartitionInfo => - "select max(timestamp) as timestamp - from partition_info + "select max(timestamp) as timestamp + from partition_info where table_id = $1::TEXT and partition_desc = $2::TEXT", DaoType::GetLatestTimestampFromPartitionInfoWithoutPartitionDesc => - "select max(timestamp) as timestamp - from partition_info + "select max(timestamp) as timestamp + from partition_info where table_id = $1::TEXT", DaoType::GetLatestVersionUpToTimeFromPartitionInfo => - "select max(version) as version - from partition_info + "select max(version) as version + from partition_info where table_id = $1::TEXT and partition_desc = $2::TEXT and timestamp < $3::BIGINT", DaoType::GetLatestVersionTimestampUpToTimeFromPartitionInfo => - "select max(timestamp) as timestamp - from partition_info + "select max(timestamp) as timestamp + from partition_info where table_id = $1::TEXT and partition_desc = $2::TEXT and timestamp < $3::BIGINT", // Update / Delete DaoType::DeleteNamespaceByNamespace => - "delete from namespace + "delete from namespace where namespace = $1::TEXT ", DaoType::UpdateNamespacePropertiesByNamespace => - "update namespace + "update namespace set properties = $2::JSON where namespace = $1::TEXT", DaoType::DeleteTableNameIdByTableNameAndNamespace => - "delete from table_name_id + "delete from table_name_id where table_name = $1::TEXT and table_namespace = $2::TEXT", DaoType::DeleteTableNameIdByTableId => - "delete from table_name_id + "delete from table_name_id where table_id = $1::TEXT", DaoType::DeleteTableInfoByIdAndPath => - "delete from table_info + "delete from table_info where table_id = $1::TEXT and table_path = $2::TEXT", DaoType::UpdateTableInfoPropertiesById => - "update table_info + "update table_info set properties = $2::JSON where table_id = $1::TEXT", DaoType::DeleteTablePathIdByTablePath => - "delete from table_path_id + "delete from table_path_id where table_path = $1::TEXT ", DaoType::DeleteTablePathIdByTableId => - "delete from table_path_id + "delete from table_path_id where table_id = $1::TEXT ", DaoType::DeleteOneDataCommitInfoByTableIdAndPartitionDescAndCommitId => - "delete from data_commit_info + "delete from data_commit_info where table_id = $1::TEXT and partition_desc = $2::TEXT and commit_id = $3::UUID ", DaoType::DeleteDataCommitInfoByTableIdAndPartitionDesc => - "delete from data_commit_info + "delete from data_commit_info where table_id = $1::TEXT and partition_desc = $2::TEXT", DaoType::DeleteDataCommitInfoByTableId => - "delete from data_commit_info + "delete from data_commit_info where table_id = $1::TEXT", DaoType::DeletePartitionInfoByTableId => - "delete from partition_info + "delete from partition_info where table_id = $1::TEXT", DaoType::DeletePartitionInfoByTableIdAndPartitionDesc => - "delete from partition_info + "delete from partition_info where table_id = $1::TEXT and partition_desc = $2::TEXT", DaoType::DeletePreviousVersionPartition => - "delete from partition_info + "delete from partition_info where table_id = $1::TEXT and partition_desc = $2::TEXT and timestamp <= $3::BIGINT", @@ -553,9 +550,9 @@ pub async fn execute_query( .join("','") + "'"; let statement = format!("select m.table_id, t.partition_desc, m.version, m.commit_op, m.snapshot, m.expression, m.domain from ( - select table_id,partition_desc,max(version) from partition_info - where table_id = $1::TEXT and partition_desc in ({}) - group by table_id,partition_desc) t + select table_id,partition_desc,max(version) from partition_info + where table_id = $1::TEXT and partition_desc in ({}) + group by table_id,partition_desc) t left join partition_info m on t.table_id = m.table_id and t.partition_desc = m.partition_desc and t.max = m.version", partitions); let result = { let statement = client.prepare(&statement).await?; @@ -597,10 +594,10 @@ pub async fn execute_query( let uuid_list_str = uuid_list.join(""); let statement = format!( - "select table_id, partition_desc, commit_id, file_ops, commit_op, timestamp, committed, domain - from data_commit_info - where table_id = $1::TEXT and partition_desc = $2::TEXT - and commit_id in ({}) + "select table_id, partition_desc, commit_id, file_ops, commit_op, timestamp, committed, domain + from data_commit_info + where table_id = $1::TEXT and partition_desc = $2::TEXT + and commit_id in ({}) order by position(commit_id::text in '{}')", uuid_str_list, uuid_list_str ); @@ -959,14 +956,14 @@ pub async fn execute_insert( let prepared = transaction .prepare( "insert into partition_info( - table_id, + table_id, partition_desc, - version, - commit_op, + version, + commit_op, snapshot, expression, domain - ) + ) values($1::TEXT, $2::TEXT, $3::INT, $4::TEXT, $5::_UUID, $6::TEXT, $7::TEXT)", ) .await; @@ -1039,15 +1036,15 @@ pub async fn execute_insert( let prepared = transaction .prepare( "insert into data_commit_info( - table_id, + table_id, partition_desc, - commit_id, - file_ops, + commit_id, + file_ops, commit_op, timestamp, committed, domain - ) + ) values($1::TEXT, $2::TEXT, $3::UUID, $4::_data_file_op, $5::TEXT, $6::BIGINT, $7::BOOL, $8::TEXT)", ) .await; @@ -1219,7 +1216,7 @@ pub async fn execute_update( let uuid_str_list = "'".to_owned() + &uuid_list.join("','") + "'"; let statement = format!( - "delete from data_commit_info + "delete from data_commit_info where table_id = $1::TEXT and partition_desc = $2::TEXT and commit_id in ({}) ", uuid_str_list ); @@ -1359,10 +1356,11 @@ fn row_to_uuid_list(row: &Row) -> Vec { #[cfg(test)] mod tests { use prost::Message; + use proto::proto::entity; - #[tokio::test] - async fn test_entity() -> std::io::Result<()> { + #[test] + fn test_entity() -> std::io::Result<()> { let namespace = entity::Namespace { namespace: "default".to_owned(), properties: "{}".to_owned(), @@ -1407,7 +1405,4 @@ mod tests { Ok(()) } - - #[test] - fn test_client() {} } diff --git a/rust/lakesoul-metadata/src/metadata_client.rs b/rust/lakesoul-metadata/src/metadata_client.rs index abda5f8b3..492ce79b9 100644 --- a/rust/lakesoul-metadata/src/metadata_client.rs +++ b/rust/lakesoul-metadata/src/metadata_client.rs @@ -2,22 +2,24 @@ // // SPDX-License-Identifier: Apache-2.0 +use std::collections::HashSet; use std::fmt::{Debug, Formatter}; use std::ops::DerefMut; use std::sync::Arc; use std::{collections::HashMap, env, fs, vec}; use prost::Message; -use proto::proto::entity::{ - self, CommitOp, DataCommitInfo, JniWrapper, MetaInfo, Namespace, PartitionInfo, TableInfo, TableNameId, TablePathId, -}; use tokio::sync::Mutex; use tokio_postgres::Client; use tracing::debug; - use url::Url; +use proto::proto::entity::{ + self, CommitOp, DataCommitInfo, JniWrapper, MetaInfo, Namespace, PartitionInfo, TableInfo, TableNameId, TablePathId, +}; + use crate::error::{LakeSoulMetaDataError, Result}; +use crate::transfusion::DataFileInfo; use crate::{ clean_meta_for_test, create_connection, execute_insert, execute_query, execute_update, DaoType, PreparedStatementMap, PARAM_DELIM, PARTITION_DESC_DELIM, @@ -94,6 +96,28 @@ impl MetaDataClient { }) } + /// Construct Self from raw client and prepared + pub fn compose(client: Client, prepared: PreparedStatementMap, max_retry: usize) -> Self { + Self { + client: Arc::new(Mutex::new(client)), + prepared: Arc::new(Mutex::new(prepared)), + max_retry, + } + } + + /// consume Arc to raw client and prepared + pub fn decompose(db: Self) -> Result<(Client, PreparedStatementMap)> { + debug_assert_eq!(Arc::strong_count(&db.client), 1); + let client = Arc::into_inner(db.client) + .ok_or(LakeSoulMetaDataError::FfiError("restore client failed".to_string()))? + .into_inner(); + debug_assert_eq!(Arc::strong_count(&db.prepared), 1); + let prepared = Arc::into_inner(db.prepared) + .ok_or(LakeSoulMetaDataError::FfiError("restore prepared failed".to_string()))? + .into_inner(); + Ok((client, prepared)) + } + pub async fn create_namespace(&self, namespace: Namespace) -> Result<()> { self.insert_namespace(&namespace).await?; Ok(()) @@ -281,14 +305,13 @@ impl MetaDataClient { pub async fn meta_cleanup(&self) -> Result { clean_meta_for_test(self.client.lock().await.deref_mut()).await?; - self.insert_namespace( - &Namespace { - namespace: "default".to_string(), - properties: "{}".to_string(), - comment: "".to_string(), - domain: "public".to_string() - } - ).await + self.insert_namespace(&Namespace { + namespace: "default".to_string(), + properties: "{}".to_string(), + comment: "".to_string(), + domain: "public".to_string(), + }) + .await } pub async fn commit_data(&self, meta_info: MetaInfo, commit_op: CommitOp) -> Result<()> { @@ -417,7 +440,8 @@ impl MetaDataClient { }], ..Default::default() }, - CommitOp::from_i32(commit_op).ok_or(LakeSoulMetaDataError::Internal("unknown commit_op".to_string()))?, + CommitOp::try_from(commit_op) + .map_err(|_| LakeSoulMetaDataError::Internal("unknown commit_op".to_string()))?, ) .await } @@ -504,6 +528,56 @@ impl MetaDataClient { } } + async fn get_table_data_info_by_partition_info( + &self, + partition_info_arr: Vec, + ) -> Result> { + let mut file_info_buf = Vec::new(); + for pi in &partition_info_arr { + file_info_buf.extend(self.get_single_partition_data_info(pi).await?) + } + Ok(file_info_buf) + } + + /// return file info in this partition that match the current read version + async fn get_single_partition_data_info(&self, partition_info: &PartitionInfo) -> Result> { + let mut file_arr_buf = Vec::new(); + let data_commit_info_list = self.get_data_commit_info_of_single_partition(partition_info).await?; + for data_commit_info in &data_commit_info_list { + for file in &data_commit_info.file_ops { + file_arr_buf.push(DataFileInfo::compose(data_commit_info, file, partition_info)?) + } + } + Ok(self.filter_files(file_arr_buf)) + } + + /// 1:1 fork from scala by chat_gpt + fn filter_files(&self, file_arr_buf: Vec) -> Vec { + let mut dup_check = HashSet::new(); + let mut file_res_arr_buf = Vec::new(); + + if file_arr_buf.len() > 1 { + for i in (0..file_arr_buf.len()).rev() { + if file_arr_buf[i].file_op == "del" { + dup_check.insert(file_arr_buf[i].path.clone()); + } else if dup_check.is_empty() || !dup_check.contains(&file_arr_buf[i].path) { + file_res_arr_buf.push(file_arr_buf[i].clone()); + } + } + file_res_arr_buf.reverse(); + } else { + file_res_arr_buf = file_arr_buf.into_iter().filter(|item| item.file_op == "add").collect(); + } + + file_res_arr_buf + } + + pub async fn get_table_data_info(&self, table_id: &str) -> Result> { + // logic from scala: DataOperation + self.get_table_data_info_by_partition_info(self.get_all_partition_info(table_id).await?) + .await + } + pub async fn get_data_files_by_table_name( &self, table_name: &str, diff --git a/rust/lakesoul-metadata/src/transfusion.rs b/rust/lakesoul-metadata/src/transfusion.rs new file mode 100644 index 000000000..4bb98ceef --- /dev/null +++ b/rust/lakesoul-metadata/src/transfusion.rs @@ -0,0 +1,318 @@ +// SPDX-FileCopyrightText: 2024 LakeSoul Contributors +// +// SPDX-License-Identifier: Apache-2.0 + +//! wrap catalog->split->reader +//! [WIP] +//! prototype +use std::collections::HashMap; + +use regex::Regex; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use tokio_postgres::Client; + +use proto::proto::entity::{DataCommitInfo, DataFileOp, FileOp, PartitionInfo, TableInfo}; + +use crate::{error::Result, MetaDataClient, PreparedStatementMap}; +use crate::error::LakeSoulMetaDataError; +use crate::transfusion::config::{ + LAKESOUL_HASH_PARTITION_SPLITTER, LAKESOUL_NON_PARTITION_TABLE_PART_DESC, + LAKESOUL_PARTITION_SPLITTER_OF_RANGE_AND_HASH, LAKESOUL_RANGE_PARTITION_SPLITTER, +}; + +mod config { + #![allow(unused)] + + /// copy from DBConfig + pub const MAX_COMMIT_ATTEMPTS: i32 = 5; + // + pub const LAKESOUL_DEFAULT_NAMESPACE: &str = "default"; + // + pub const LAKESOUL_NAMESPACE_LEVEL_SPLITTER: &str = "."; + + pub const LAKESOUL_NULL_STRING: &str = "__L@KE$OUL_NULL__"; + + pub const LAKESOUL_EMPTY_STRING: &str = "__L@KE$OUL_EMPTY_STRING__"; + + pub const LAKESOUL_PARTITION_SPLITTER_OF_RANGE_AND_HASH: &str = ";"; + + pub const LAKESOUL_RANGE_PARTITION_SPLITTER: &str = ","; + + pub const LAKESOUL_HASH_PARTITION_SPLITTER: &str = ","; + + pub const LAKESOUL_FILE_EXISTS_COLUMN_SPLITTER: &str = ","; + + pub const LAKESOUL_NON_PARTITION_TABLE_PART_DESC: &str = "-5"; + + pub const LAKESOUL_PARTITION_DESC_KV_DELIM: &str = "="; + + pub const HASH_BUCKET_NUM: &str = "hashBucketNum"; + + pub const DROPPED_COLUMN: &str = "droppedColumn"; + // + pub const DROPPED_COLUMN_SPLITTER: &str = ","; + // + pub const LAST_TABLE_SCHEMA_CHANGE_TIME: &str = "last_schema_change_time"; +} + +/// partitiondesc of non-range table is "-5" +pub fn table_without_range(range_key: &str) -> bool { + range_key == LAKESOUL_NON_PARTITION_TABLE_PART_DESC +} + +/// hashbucketnum of Non-primary key table is "-1" +pub fn table_without_pk(hash_bucket_num: &str) -> bool { + hash_bucket_num == "-1" +} + +/// use raw ptr to create `MetadataClientRef` +/// stay origin memory the same +/// see https://users.rust-lang.org/t/dereferencing-a-boxed-value/86768 +pub async fn split_desc_array( + client: &Client, + prepared: &PreparedStatementMap, + table_name: &str, + namespace: &str, +) -> Result { + // make MetadataClient + let (mut boxed_client, mut boxed_prepared) = unsafe { + let boxed_client = Box::from_raw(client as *const Client as *mut Client); + let boxed_prepared = Box::from_raw(prepared as *const PreparedStatementMap as *mut PreparedStatementMap); + (boxed_client, boxed_prepared) + }; + let db = MetaDataClient::compose(*boxed_client, *boxed_prepared, 3); + let ret = split_desc_array_inner(&db, table_name, namespace).await; + + // restore client and prepared + + // the origin ptr disappeared + let (c, p) = MetaDataClient::decompose(db)?; + *boxed_client = c; + *boxed_prepared = p; + Box::into_raw(boxed_client); + Box::into_raw(boxed_prepared); + ret +} + +async fn split_desc_array_inner(db: &MetaDataClient, table_name: &str, namespace: &str) -> Result { + let table_info = db.get_table_info_by_table_name(table_name, namespace).await?; + let data_files = db.get_table_data_info(&table_info.table_id).await?; + + // create splits + let mut splits = Vec::new(); + // // split by range and hash partition + let mut map = HashMap::new(); + + for df in &data_files { + if has_hash_partitions(&table_info) && df.bucket_id() != -1 { + map.entry(df.partition_desc.as_str()) + .or_insert(HashMap::new()) + .entry(df.bucket_id()) + .or_insert(Vec::new()) + .push(df.path.clone()); + } else { + map.entry(df.partition_desc.as_str()) + .or_insert(HashMap::new()) + .entry(-1) + .or_insert(Vec::new()) + .push(df.path.clone()); + } + } + // hash keys + let (_rk, pk) = parse_table_info_partitions(&table_info.partitions); + + for (range_key, value_map) in map { + let mut range_desc = HashMap::new(); + if !table_without_range(range_key) { + let keys: Vec = range_key + .split(LAKESOUL_RANGE_PARTITION_SPLITTER) + .map(ToString::to_string) + .collect(); + for k in keys { + let (k, v) = match k.split_once('=') { + None => { + return Err(LakeSoulMetaDataError::Internal("split error".to_string())); + } + Some((k, v)) => { + (k.to_string(), v.to_string()) + } + }; + range_desc.insert(k, v); + } + } + for physical_files in value_map { + let sd = SplitDesc { + file_paths: physical_files.1, + primary_keys: pk.clone(), + partition_desc: range_desc.clone(), + table_schema: table_info.table_schema.clone(), + }; + splits.push(sd) + } + } + Ok(SplitDescArray(splits)) +} + +fn has_hash_partitions(table_info: &TableInfo) -> bool { + let properties: Value = serde_json::from_str(&table_info.properties).expect("wrong properties"); + if properties["hashBucketNum"] != Value::Null && properties["hashBucketNum"] == "-1" { + false + } else { + properties["hashBucketNum"] != Value::Null + } +} + +// The file name of bucketed data should have 3 parts: +// 1. some other information in the head of file name +// 2. bucket id part, some numbers, starts with "_" +// * The other-information part may use `-` as separator and may have numbers at the end, +// e.g. a normal parquet file without bucketing may have name: +// part-r-00000-2dd664f9-d2c4-4ffe-878f-431234567891.gz.parquet, and we will mistakenly +// treat `431234567891` as bucket id. So here we pick `_` as separator. +// 3. optional file extension part, in the tail of file name, starts with `.` +// An example of bucketed parquet file name with bucket id 3: +// part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet + +#[derive(Debug, Clone, Default)] +pub struct DataFileInfo { + // range partitions + pub partition_desc: String, + pub path: String, + pub file_op: String, + pub size: i64, + pub bucket_id: Option, + // unix timestamp + pub modification_time: i64, + pub file_exist_cols: String, +} + +impl DataFileInfo { + const BUCKET_FILE_NAME_REGEX: &'static str = r#".*_(\d+)(?:\..*)?$"#; + pub fn new() -> Self { + Default::default() + } + + pub(crate) fn compose( + data_commit_info: &DataCommitInfo, + data_file_op: &DataFileOp, + partition_info: &PartitionInfo, + ) -> Result { + Ok(Self { + partition_desc: partition_info.partition_desc.clone(), + path: data_file_op.path.clone(), + file_op: FileOp::try_from(data_file_op.file_op)?.as_str_name().to_string(), + size: data_file_op.size, + bucket_id: Self::parse_bucket_id(&data_file_op.path), + modification_time: data_commit_info.timestamp, + file_exist_cols: data_file_op.file_exist_cols.clone(), + }) + } + + fn parse_bucket_id(filename: &str) -> Option { + let re = Regex::new(DataFileInfo::BUCKET_FILE_NAME_REGEX).unwrap(); + let Some(caps) = re.captures(filename) else { + return Some(-1); + }; + caps[1].parse::().ok() + } + + pub fn bucket_id(&self) -> isize { + self.bucket_id.unwrap_or(-1) + } +} + +/// COPY from lakesoul-datafusion +pub fn parse_table_info_partitions(partitions: &str) -> (Vec, Vec) { + let (range_keys, hash_keys) = + partitions.split_at(partitions.find(LAKESOUL_PARTITION_SPLITTER_OF_RANGE_AND_HASH).unwrap()); + let hash_keys = &hash_keys[1..]; + ( + range_keys + .split(LAKESOUL_RANGE_PARTITION_SPLITTER) + .collect::>() + .iter() + .filter_map(|str| if str.is_empty() { None } else { Some(str.to_string()) }) + .collect::>(), + hash_keys + .split(LAKESOUL_HASH_PARTITION_SPLITTER) + .collect::>() + .iter() + .filter_map(|str| if str.is_empty() { None } else { Some(str.to_string()) }) + .collect::>(), + ) +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SplitDesc { + pub file_paths: Vec, + pub primary_keys: Vec, + pub partition_desc: HashMap, + pub table_schema: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SplitDescArray(pub Vec); + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn prop_test() { + let s = r##"{"hashBucketNum":"2","hashPartitions":"id"}"##; + println!("{s}"); + let x: Value = serde_json::from_str(s).unwrap(); + assert_eq!(x["hello"], Value::Null); + assert_eq!(x["hashBucketNum"], "2") + } + + #[test] + fn regex_test() { + let file_name = "part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet"; + assert_eq!(Some(3isize), DataFileInfo::parse_bucket_id(file_name)); + let file_name = "part-r-00000-2dd664f9-d2c4-4ffe-878f-431234567891.gz.parquet"; + assert_eq!(Some(-1), DataFileInfo::parse_bucket_id(file_name)); + } + + #[test] + fn ownership_test() { + struct A { + x: u32, + } + impl A { + fn x(&self) -> u32 { + self.x + } + } + unsafe { + let origin = Box::new(A { x: 42 }); + // simulate a ptr from ffi + let raw1 = Box::into_raw(origin); + let mut origin = Box::from_raw(raw1); + // move_back to stack + let a = *origin; + let a_x = a.x(); + // move to origin + *origin = a; + let o_x = origin.x(); + assert_eq!(o_x, a_x); + let raw2 = Box::into_raw(origin); + assert_eq!(raw1, raw2); + // free + let _f = Box::from_raw(raw2); + } + } + + #[test] + fn serialize_test() { + let sd = SplitDesc { + file_paths: vec![], + primary_keys: vec![], + partition_desc: Default::default(), + table_schema: "".to_string(), + }; + let s = serde_json::to_string(&sd).unwrap(); + println!("{s}"); + } +} diff --git a/rust/proto/Cargo.toml b/rust/proto/Cargo.toml index 1cdb5099c..eb32781b5 100644 --- a/rust/proto/Cargo.toml +++ b/rust/proto/Cargo.toml @@ -12,10 +12,10 @@ build = "build.rs" [dependencies] bytes = "1" -prost = "0.11" +prost = { workspace = true } [build-dependencies] -prost-build = "0.11" +prost-build = { workspace = true } [target.'cfg(target_os = "linux")'.build-dependencies] protobuf-src = "1.1.0" \ No newline at end of file