From eb499f3f156be212a5ff5a9a27b6d14a82d6430e Mon Sep 17 00:00:00 2001 From: mag1c1an1 Date: Thu, 7 Mar 2024 17:53:48 +0800 Subject: [PATCH] [Rust] (metadata) move metadataclient to rawclient Signed-off-by: mag1c1an1 clean code Signed-off-by: mag1c1an1 --- lakesoul-common/pom.xml | 1 - .../meta/jnr/LibLakeSoulMetaData.java | 4 +- .../meta/jnr/NativeMetadataJavaClient.java | 61 +++--- .../lakesoul/meta/jnr/SubstraitTest.java | 82 ++++++++ .../spark/sql/lakesoul/SplitDescSuite.scala | 104 +++++++--- rust/Cargo.lock | 89 ++++----- rust/Cargo.toml | 1 + rust/lakesoul-metadata-c/src/lib.rs | 27 +-- rust/lakesoul-metadata/Cargo.toml | 1 + rust/lakesoul-metadata/src/error.rs | 2 - rust/lakesoul-metadata/src/metadata_client.rs | 73 ------- rust/lakesoul-metadata/src/transfusion.rs | 178 +++++++++++++++--- 12 files changed, 393 insertions(+), 230 deletions(-) create mode 100644 lakesoul-common/src/test/java/com/dmetasoul/lakesoul/meta/jnr/SubstraitTest.java diff --git a/lakesoul-common/pom.xml b/lakesoul-common/pom.xml index a7d80ac1c..2b24dcaec 100644 --- a/lakesoul-common/pom.xml +++ b/lakesoul-common/pom.xml @@ -237,7 +237,6 @@ SPDX-License-Identifier: Apache-2.0 3.3.2 ${local.scope} - diff --git a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/LibLakeSoulMetaData.java b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/LibLakeSoulMetaData.java index be85d019c..4c7fff7a9 100644 --- a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/LibLakeSoulMetaData.java +++ b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/LibLakeSoulMetaData.java @@ -35,7 +35,7 @@ public interface LibLakeSoulMetaData { void clean_meta_for_test(IntegerCallback integerCallback, Pointer runtime, Pointer client); - Pointer create_split_desc_array(IntegerCallback integerCallback, Pointer client, Pointer prepared, Pointer runtime, String tableName, String namespace); + Pointer create_split_desc_array(BooleanCallback booleanCallback, Pointer client, Pointer prepared, Pointer runtime, String tableName, String namespace); void free_split_desc_array(Pointer json); @@ -50,6 +50,8 @@ public interface LibLakeSoulMetaData { void rust_logger_init(); + void call_rust(@LongLong long addr, Integer len); + void hello_world(Callback bytesCallback); void namespace(byte[] bytes, Integer len); diff --git a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/NativeMetadataJavaClient.java b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/NativeMetadataJavaClient.java index 96ff9aeb1..64ddf59da 100644 --- a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/NativeMetadataJavaClient.java +++ b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/NativeMetadataJavaClient.java @@ -557,62 +557,38 @@ public static void closeAll() { /** - * if ffi function failed with -100 - * should recreate pg client and prepared map + * if ffi function failed with -100 + * should recreate pg client and prepared map + * * @param tableName name * @param namespace the np of TableInfo * @return split(partition) desc array in json format by table_name, namespace , filter(WIP) */ public List createSplitDescArray(String tableName, String namespace) { - final CompletableFuture future = new CompletableFuture<>(); + getReadLock(); + final CompletableFuture future = new CompletableFuture<>(); Pointer ptr = getLibLakeSoulMetaData() .create_split_desc_array( - new ReferencedIntegerCallback((result, msg) -> { + new ReferencedBooleanCallback((result, msg) -> { if (msg != null) { System.err.println(msg); } future.complete(result); - }, getIntegerCallbackObjectReferenceManager()), + }, getbooleanCallbackObjectReferenceManager()), tokioPostgresClient, preparedStatement, tokioRuntime, tableName, namespace); try { - Integer ans = future.get(timeout, TimeUnit.MILLISECONDS); - if (ans == 0) { + Boolean ans = future.get(timeout, TimeUnit.MILLISECONDS); + if (ans) { // This copies a zero (nul) terminated by array from native memory. String json = ptr.getString(0); List list = JSON.parseArray(json, SplitDesc.class); getLibLakeSoulMetaData().free_split_desc_array(ptr); return list; } - // errors - if (ans == -100) { - // recreate pg client and prepared - String config = String.format( - "host=%s port=%s dbname=%s user=%s password=%s", - dataBaseProperty.getHost(), - dataBaseProperty.getPort(), - dataBaseProperty.getDbName(), - dataBaseProperty.getUsername(), - dataBaseProperty.getPassword()); - final CompletableFuture future_ = new CompletableFuture<>(); - tokioPostgresClient = libLakeSoulMetaData.create_tokio_postgres_client( - new ReferencedBooleanCallback((bool, msg) -> { - if (msg.isEmpty()) { - future_.complete(bool); - } else { - System.err.println(msg); - future_.completeExceptionally(new IOException(msg)); - } - }, getbooleanCallbackObjectReferenceManager()), - config, - tokioRuntime - ); - preparedStatement = libLakeSoulMetaData.create_prepared_statement(); - future.get(timeout, TimeUnit.MILLISECONDS); - } // other errors throw new RuntimeException("create split desc array failed"); } catch (InterruptedException | ExecutionException e) { @@ -620,6 +596,25 @@ public List createSplitDescArray(String tableName, String namespace) } catch (TimeoutException e) { LOG.error("create split desc array timeout"); throw new RuntimeException(e); + } finally { + unlockReadLock(); } } + +// void filter_rel(io.substrait.proto.Expression e) { +// byte[] byteArray = e.toByteArray(); +// int length = byteArray.length; +// Pointer buffer = fixedBuffer; +// if (length < fixedBuffer.size()) +// fixedBuffer.put(0, byteArray, 0, length); +// else if (length < mutableBuffer.size()) { +// mutableBuffer.put(0, byteArray, 0, length); +// buffer = mutableBuffer; +// } else { +// mutableBuffer = Runtime.getRuntime(libLakeSoulMetaData).getMemoryManager().allocateDirect(length); +// mutableBuffer.put(0, byteArray, 0, length); +// buffer = mutableBuffer; +// } +// getLibLakeSoulMetaData().call_rust(buffer.address(), length); +// } } diff --git a/lakesoul-common/src/test/java/com/dmetasoul/lakesoul/meta/jnr/SubstraitTest.java b/lakesoul-common/src/test/java/com/dmetasoul/lakesoul/meta/jnr/SubstraitTest.java new file mode 100644 index 000000000..865843611 --- /dev/null +++ b/lakesoul-common/src/test/java/com/dmetasoul/lakesoul/meta/jnr/SubstraitTest.java @@ -0,0 +1,82 @@ +//package com.dmetasoul.lakesoul.meta.jnr; +// +//import com.dmetasoul.lakesoul.meta.substrait.SubstraitUtil; +//import io.substrait.expression.Expression; +//import io.substrait.expression.ExpressionCreator; +//import io.substrait.expression.FunctionArg; +//import io.substrait.expression.ImmutableExpression; +//import io.substrait.expression.proto.ExpressionProtoConverter; +//import io.substrait.extension.ExtensionCollector; +//import io.substrait.extension.ImmutableSimpleExtension; +//import io.substrait.extension.SimpleExtension; +//import io.substrait.type.Type; +//import io.substrait.type.TypeCreator; +//import org.apache.flink.table.expressions.*; +//import org.apache.flink.table.functions.BuiltInFunctionDefinition; +//import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +//import org.apache.flink.table.types.AtomicDataType; +//import org.apache.flink.table.types.logical.BooleanType; +//import org.apache.flink.table.types.logical.IntType; +//import org.junit.Test; +// +//import java.io.IOException; +//import java.util.ArrayList; +//import java.util.Collections; +//import java.util.List; +// +//public class SubstraitTest { +// @Test +// public void generalExprTest() { +// BuiltInFunctionDefinition equalsFunction = BuiltInFunctionDefinitions.EQUALS; +// ValueLiteralExpression valExpr = new ValueLiteralExpression(3, new AtomicDataType(new IntType(false))); +// FieldReferenceExpression orderId = new FieldReferenceExpression("order_id", +// new AtomicDataType(new IntType()) +// , 0, 0); +// List args = new ArrayList<>(); +// args.add(orderId); +// args.add(valExpr); +// CallExpression p = CallExpression.permanent( +// equalsFunction, +// args, +// new AtomicDataType(new BooleanType())); +// Expression substraitExpr = SubstraitUtil.doTransform(p); +// io.substrait.proto.Expression proto = toProto(new ExtensionCollector(), substraitExpr); +// NativeMetadataJavaClient.getInstance().filter_rel(proto); +// } +// +// @Test +// public void literalExprTest() { +// ValueLiteralExpression valExpr = new ValueLiteralExpression(3, new AtomicDataType(new IntType(false))); +// Expression substraitExpr = SubstraitUtil.doTransform(valExpr); +// System.out.println(substraitExpr); +// } +// +// @Test +// public void FieldRefTest() { +// FieldReferenceExpression orderId = new FieldReferenceExpression("order_id", +// new AtomicDataType(new IntType()) +// , 0, 0); +// Expression expr = SubstraitUtil.doTransform(orderId); +// System.out.println(expr); +// System.out.println(toProto(null, expr)); +// } +// +// private io.substrait.proto.Expression toProto(ExtensionCollector collector, Expression expr) { +// return expr.accept(new ExpressionProtoConverter(collector, null)); +// } +// +// @Test +// public void callExprTest() { +// try { +// SimpleExtension.ExtensionCollection extensionCollection = SimpleExtension.loadDefaults(); +// SimpleExtension.ScalarFunctionVariant desc = extensionCollection.getScalarFunction(SimpleExtension.FunctionAnchor.of("/functions_comparison.yaml", "equal:any_any")); +// Expression.ScalarFunctionInvocation si = ExpressionCreator.scalarFunction(desc, TypeCreator.NULLABLE.I32); +// io.substrait.proto.Expression p = toProto(new ExtensionCollector(), si); +// System.out.println(p); +// +// +// } catch (IOException e) { +// throw new RuntimeException(e); +// } +// } +//} diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/SplitDescSuite.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/SplitDescSuite.scala index c60121d1e..7dd4a508b 100644 --- a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/SplitDescSuite.scala +++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/SplitDescSuite.scala @@ -1,6 +1,7 @@ package org.apache.spark.sql.lakesoul import com.dmetasoul.lakesoul.meta.jnr.NativeMetadataJavaClient +import com.dmetasoul.lakesoul.tables.LakeSoulTable import org.apache.spark.sql._ import org.apache.spark.sql.lakesoul.RandomStringGenerator.generateRandomString import org.apache.spark.sql.lakesoul.test.LakeSoulTestUtils @@ -19,15 +20,8 @@ class SplitDescSuite extends QueryTest import testImplicits._ val names: Seq[String] = Seq.empty - val base_path = "/tmp/spark_test" - val name_length = 10 - -// override protected def afterAll(): Unit = { -// for (tName <- names) { -// val tablePath = s"$base_path/$tName" -// // LakeSoulTable.forPath(tablePath).dropTable() -// } -// } + val basePath = "/tmp/spark_test" + val nameLength = 10 private def create_dataframe(): DataFrame = { val df = Seq( @@ -43,10 +37,53 @@ class SplitDescSuite extends QueryTest df } + private def withUpsert(tableName: String): Unit = { + + val df1 = Seq( + ("2021-01-01", 1, 1, "apple"), + ("2021-01-02", 2, 2, "banana"), + ).toDF("date", "id", "num", "name") + + val tablePath = s"$basePath/$tableName" + + df1.write + .mode("append") + .format("lakesoul") + .option("shortTableName", tableName) + .option("rangePartitions", "date") + .option("hashPartitions", "id,num") + .option("hashBucketNum", "4") + .save(tablePath) + + val lake = LakeSoulTable.forPath(tablePath) + + val df2 = Seq( + ("2021-01-01", 1, 1, "pear"), + ("2021-01-02", 2, 2, "lemon"), + ).toDF("date", "id", "num", "name") + + lake.upsert(df2) + + val df3 = Seq( + ("2021-01-01", 1, 2, "watermelon"), + ("2021-01-02", 1, 2, "grape"), + ).toDF("date", "id", "num", "name") + + lake.upsert(df3) + + val df4 = Seq( + ("2021-01-01", 1, 1, "cherry"), + ("2021-01-02", 2, 2, "pineapple"), + ).toDF("date", "id", "num", "name") + + lake.upsert(df4) + } + + test("no range, no hash") { - val tName = generateRandomString(name_length); + val tName = generateRandomString(nameLength); withTable(tName) { - val tablePath = s"$base_path/$tName" + val tablePath = s"$basePath/$tName" val df = create_dataframe() df.write .mode("append") @@ -68,9 +105,9 @@ class SplitDescSuite extends QueryTest } test("one range, no hash") { - val tName = generateRandomString(name_length); + val tName = generateRandomString(nameLength); withTable(tName) { - val tablePath = s"$base_path/$tName" + val tablePath = s"$basePath/$tName" val df = create_dataframe() df.write .mode("append") @@ -89,9 +126,9 @@ class SplitDescSuite extends QueryTest } test("multiple range, no hash") { - val tName = generateRandomString(name_length); + val tName = generateRandomString(nameLength); withTable(tName) { - val tablePath = s"$base_path/$tName" + val tablePath = s"$basePath/$tName" val df = create_dataframe() df.write .mode("append") @@ -110,9 +147,9 @@ class SplitDescSuite extends QueryTest } test("no range, one hash") { - val tName = generateRandomString(name_length); + val tName = generateRandomString(nameLength); withTable(tName) { - val tablePath = s"$base_path/$tName" + val tablePath = s"$basePath/$tName" val df = create_dataframe() df.write .mode("append") @@ -132,15 +169,15 @@ class SplitDescSuite extends QueryTest } test("one range, one hash") { - val tName = generateRandomString(name_length); + val tName = generateRandomString(nameLength); withTable(tName) { - val tablePath = s"$base_path/$tName" + val tablePath = s"$basePath/$tName" val df = create_dataframe() df.write .mode("append") .format("lakesoul") .option("shortTableName", tName) - .option("rangePartitions","date") + .option("rangePartitions", "date") .option("hashPartitions", "id") .option("hashBucketNum", "4") .save(tablePath) @@ -155,15 +192,15 @@ class SplitDescSuite extends QueryTest } test("multiple range, one hash") { - val tName = generateRandomString(name_length); + val tName = generateRandomString(nameLength); withTable(tName) { - val tablePath = s"$base_path/$tName" + val tablePath = s"$basePath/$tName" val df = create_dataframe() df.write .mode("append") .format("lakesoul") .option("shortTableName", tName) - .option("rangePartitions","date,name") + .option("rangePartitions", "date,name") .option("hashPartitions", "id") .option("hashBucketNum", "4") .save(tablePath) @@ -178,15 +215,15 @@ class SplitDescSuite extends QueryTest } test("multiple range, multiple hash") { - val tName = generateRandomString(name_length); + val tName = generateRandomString(nameLength); withTable(tName) { - val tablePath = s"$base_path/$tName" + val tablePath = s"$basePath/$tName" val df = create_dataframe() df.write .mode("append") .format("lakesoul") .option("shortTableName", tName) - .option("rangePartitions","date,name") + .option("rangePartitions", "date,name") .option("hashPartitions", "id,num") .option("hashBucketNum", "4") .save(tablePath) @@ -199,8 +236,23 @@ class SplitDescSuite extends QueryTest assert(descs.length == 8) } } + + + test("multiple range, multiple hash , with upsert") { + val tName = generateRandomString(nameLength); + withTable(tName) { + withUpsert(tName) + val descs = NativeMetadataJavaClient + .getInstance() + .createSplitDescArray(tName, "default") + .asScala + .toSeq + descs.foreach(println) + } + } } + object RandomStringGenerator { val random = new Random() diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 0f499ae26..7c7cfbbfb 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -281,7 +281,7 @@ dependencies = [ "arrow-schema", "chrono", "half", - "indexmap 2.1.0", + "indexmap 2.2.5", "lexical-core", "num", "serde", @@ -418,7 +418,7 @@ checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.52", ] [[package]] @@ -869,7 +869,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30d2b3721e861707777e3195b0158f950ae6dc4a27e4d02ff9f67e3eb3de199e" dependencies = [ "quote", - "syn 2.0.48", + "syn 2.0.52", ] [[package]] @@ -918,7 +918,7 @@ dependencies = [ "glob", "half", "hashbrown 0.14.3", - "indexmap 2.1.0", + "indexmap 2.2.5", "itertools", "log", "num_cpus", @@ -1026,7 +1026,7 @@ dependencies = [ "half", "hashbrown 0.14.3", "hex", - "indexmap 2.1.0", + "indexmap 2.2.5", "itertools", "libc", "log", @@ -1059,7 +1059,7 @@ dependencies = [ "futures", "half", "hashbrown 0.14.3", - "indexmap 2.1.0", + "indexmap 2.2.5", "itertools", "log", "once_cell", @@ -1326,7 +1326,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.52", ] [[package]] @@ -1404,7 +1404,7 @@ dependencies = [ "futures-sink", "futures-util", "http", - "indexmap 2.1.0", + "indexmap 2.2.5", "slab", "tokio", "tokio-util", @@ -1628,9 +1628,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.1.0" +version = "2.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d530e1a18b1cb4c484e6e34556a0d948706958449fca0cab753d649f2bce3d1f" +checksum = "7b0b929d511467233429c45a44ac1dcaa21ba0f5ba11e4879e6ed28ddb4f9df4" dependencies = [ "equivalent", "hashbrown 0.14.3", @@ -1777,6 +1777,7 @@ name = "lakesoul-metadata" version = "0.1.0" dependencies = [ "anyhow", + "bytes", "num_enum", "postgres", "postgres-types", @@ -2273,7 +2274,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9" dependencies = [ "fixedbitset", - "indexmap 2.1.0", + "indexmap 2.2.5", ] [[package]] @@ -2366,7 +2367,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.52", ] [[package]] @@ -2416,7 +2417,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a41cf62165e97c7f814d2221421dbb9afcbcdb0a88068e5ea206e19951c2cbb5" dependencies = [ "proc-macro2", - "syn 2.0.48", + "syn 2.0.52", ] [[package]] @@ -2431,9 +2432,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.76" +version = "1.0.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95fc56cda0b5c3325f5fbbd7ff9fda9e02bb00bb3dac51252d2f1bfa1cb8cc8c" +checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae" dependencies = [ "unicode-ident", ] @@ -2465,7 +2466,7 @@ dependencies = [ "prost", "prost-types", "regex", - "syn 2.0.48", + "syn 2.0.52", "tempfile", "which", ] @@ -2480,7 +2481,7 @@ dependencies = [ "itertools", "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.52", ] [[package]] @@ -2788,9 +2789,9 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.21" +version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b97ed7a9823b74f99c7742f5336af7be5ecd3eeafcb1507d1fa93347b1d589b0" +checksum = "92d43fe69e652f3df9bdc2b85b2854a0825b86e4fb76bc44d945137d053639ca" [[package]] name = "seq-macro" @@ -2800,29 +2801,29 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.195" +version = "1.0.197" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63261df402c67811e9ac6def069e4786148c4563f4b50fd4bf30aa370d626b02" +checksum = "3fb1c873e1b9b056a4dc4c0c198b24c3ffa059243875552b2bd0933b1aee4ce2" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.195" +version = "1.0.197" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46fe8f8603d81ba86327b23a2e9cdf49e1255fb94a4c5f297f6ee0547178ea2c" +checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.52", ] [[package]] name = "serde_json" -version = "1.0.111" +version = "1.0.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "176e46fa42316f18edd598015a5166857fc835ec732f5215eac6b7bdbf0a84f4" +checksum = "c5f09b1bd632ef549eaa9f60a1f8de742bdbc698e6cee2095fc84dde5f549ae0" dependencies = [ "itoa", "ryu", @@ -3044,7 +3045,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.48", + "syn 2.0.52", ] [[package]] @@ -3066,9 +3067,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.48" +version = "2.0.52" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f3531638e407dfc0814761abb7c00a5b54992b849452a0646b7f65c9f770f3f" +checksum = "b699d15b36d1f02c3e7c69f8ffef53de37aefae075d8488d4ba1a7788d574a07" dependencies = [ "proc-macro2", "quote", @@ -3137,7 +3138,7 @@ checksum = "7ba277e77219e9eea169e8508942db1bf5d8a41ff2db9b20aab5a5aadc9fa25d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.52", ] [[package]] @@ -3148,22 +3149,22 @@ checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d" [[package]] name = "thiserror" -version = "1.0.56" +version = "1.0.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d54378c645627613241d077a3a79db965db602882668f9136ac42af9ecb730ad" +checksum = "1e45bcbe8ed29775f228095caf2cd67af7a4ccf756ebff23a306bf3e8b47b24b" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.56" +version = "1.0.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa0faa943b50f3db30a20aa7e265dbc66076993efed8463e8de414e5d06d3471" +checksum = "a953cb265bef375dae3de6663da4d3804eee9682ea80d8e2542529b73c531c81" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.52", ] [[package]] @@ -3238,7 +3239,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.52", ] [[package]] @@ -3324,7 +3325,7 @@ version = "0.19.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ - "indexmap 2.1.0", + "indexmap 2.2.5", "toml_datetime", "winnow", ] @@ -3354,7 +3355,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.52", ] [[package]] @@ -3499,7 +3500,7 @@ checksum = "f49e7f3f3db8040a100710a11932239fd30697115e2ba4107080d8252939845e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.52", ] [[package]] @@ -3516,9 +3517,9 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" [[package]] name = "walkdir" -version = "2.4.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d71d857dc86794ca4c280d616f7da00d2dbfd8cd788846559a6813e6aa4b54ee" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" dependencies = [ "same-file", "winapi-util", @@ -3560,7 +3561,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.52", "wasm-bindgen-shared", ] @@ -3594,7 +3595,7 @@ checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.52", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -3873,7 +3874,7 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.52", ] [[package]] diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 414ebd914..4a40dd388 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -18,6 +18,7 @@ resolver = "2" [workspace.dependencies] datafusion = { git = "https://github.com/lakesoul-io/arrow-datafusion.git", branch = "datafusion-33-parquet-prefetch" } datafusion-common = { git = "https://github.com/lakesoul-io/arrow-datafusion.git", branch = "datafusion-33-parquet-prefetch" } +datafusion-substrait = { git = "https://github.com/lakesoul-io/arrow-datafusion.git", branch = "datafusion-33-parquet-prefetch" } arrow = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-48-parquet-bufferred" } arrow-schema = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-48-parquet-bufferred" } arrow-array = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-48-parquet-bufferred" } diff --git a/rust/lakesoul-metadata-c/src/lib.rs b/rust/lakesoul-metadata-c/src/lib.rs index 5c939dddd..872edcbd0 100644 --- a/rust/lakesoul-metadata-c/src/lib.rs +++ b/rust/lakesoul-metadata-c/src/lib.rs @@ -74,7 +74,7 @@ fn call_result_callback(callback: ResultCallback, status: bool, err: *const c_ch } } -fn call_integer_result_callback(callback: IntegerResultCallBack, status: i32, err: *const c_char) { +fn _call_integer_result_callback(callback: IntegerResultCallBack, status: i32, err: *const c_char) { // release error string callback(status, err); if !err.is_null() { @@ -367,12 +367,9 @@ fn c_char2str<'a>(ptr: *const c_char) -> &'a str { /// USE: JNR /// return split(partition) desc array in json format by table_name, namespace , filter(WIP) -/// 0: success -/// -100: ffi::restore error -/// -1: error #[no_mangle] pub extern "C" fn create_split_desc_array( - callback: IntegerResultCallBack, + callback: ResultCallback, client: NonNull>, prepared: NonNull>, runtime: NonNull>, @@ -381,7 +378,7 @@ pub extern "C" fn create_split_desc_array( ) -> *mut c_char { let runtime = unsafe { NonNull::new_unchecked(runtime.as_ref().ptr as *mut Runtime).as_ref() }; let client = unsafe { NonNull::new_unchecked(client.as_ref().ptr as *mut Client).as_ref() }; - let prepared = unsafe { NonNull::new_unchecked(prepared.as_ref().ptr as *mut PreparedStatementMap).as_ref() }; + let prepared = unsafe { NonNull::new_unchecked(prepared.as_ref().ptr as *mut PreparedStatementMap).as_mut() }; let table_name = c_char2str(table_name); let namespace = c_char2str(namespace); let result: Result<*mut c_char, LakeSoulMetaDataError> = runtime.block_on(async { @@ -393,19 +390,11 @@ pub extern "C" fn create_split_desc_array( }); let (ret, status, e) = match result { - Ok(ptr) => (ptr, 0, null()), - Err(e) => match e { - LakeSoulMetaDataError::FfiError(ffi) => { - (null_mut(), -100, CString::new(ffi).unwrap().into_raw() as *const c_char) - } - other => ( - null_mut(), - -1, - CString::new(other.to_string()).unwrap().into_raw() as *const c_char, - ), - }, + Ok(ptr) => (ptr, true, null()), + Err(e) => + (null_mut(), false, CString::new(e.to_string()).unwrap().into_raw() as *const c_char), }; - call_integer_result_callback(callback, status, e); + call_result_callback(callback, status, e); ret } @@ -425,7 +414,7 @@ pub extern "C" fn debug(callback: extern "C" fn(bool, *const c_char)) -> *mut c_ primary_keys: vec![], partition_desc: HashMap::new(), table_schema: "".to_string(), - };1]; + }; 1]; let array = lakesoul_metadata::transfusion::SplitDescArray(x); let json_vec = serde_json::to_vec(&array).unwrap(); let c_string = CString::new(json_vec).unwrap(); diff --git a/rust/lakesoul-metadata/Cargo.toml b/rust/lakesoul-metadata/Cargo.toml index cc2f07e81..5008c344a 100644 --- a/rust/lakesoul-metadata/Cargo.toml +++ b/rust/lakesoul-metadata/Cargo.toml @@ -14,6 +14,7 @@ edition = "2021" postgres = "0.19.5" tokio-postgres = { version = "0.7.8", features = ["default", "with-serde_json-1", "with-uuid-1", "array-impls"] } postgres-types = { version = "0.2.5", features = ["derive"] } +bytes = {version = "1.5.0"} tokio = { workspace = true } proto = { path = "../proto" } diff --git a/rust/lakesoul-metadata/src/error.rs b/rust/lakesoul-metadata/src/error.rs index f5fbde97a..4d6b53eb2 100644 --- a/rust/lakesoul-metadata/src/error.rs +++ b/rust/lakesoul-metadata/src/error.rs @@ -34,8 +34,6 @@ pub enum LakeSoulMetaDataError { ProstDecodeError(#[from] prost::DecodeError), #[error("prost encode error: {0}")] ProstEncodeError(#[from] prost::EncodeError), - #[error("ffi error: {0}")] - FfiError(String), #[error( "Internal error: {0}\nThis was likely caused by a bug in LakeSoul's \ code and we would welcome that you file an bug report in our issue tracker" diff --git a/rust/lakesoul-metadata/src/metadata_client.rs b/rust/lakesoul-metadata/src/metadata_client.rs index 492ce79b9..c784359cd 100644 --- a/rust/lakesoul-metadata/src/metadata_client.rs +++ b/rust/lakesoul-metadata/src/metadata_client.rs @@ -2,7 +2,6 @@ // // SPDX-License-Identifier: Apache-2.0 -use std::collections::HashSet; use std::fmt::{Debug, Formatter}; use std::ops::DerefMut; use std::sync::Arc; @@ -19,7 +18,6 @@ use proto::proto::entity::{ }; use crate::error::{LakeSoulMetaDataError, Result}; -use crate::transfusion::DataFileInfo; use crate::{ clean_meta_for_test, create_connection, execute_insert, execute_query, execute_update, DaoType, PreparedStatementMap, PARAM_DELIM, PARTITION_DESC_DELIM, @@ -96,28 +94,6 @@ impl MetaDataClient { }) } - /// Construct Self from raw client and prepared - pub fn compose(client: Client, prepared: PreparedStatementMap, max_retry: usize) -> Self { - Self { - client: Arc::new(Mutex::new(client)), - prepared: Arc::new(Mutex::new(prepared)), - max_retry, - } - } - - /// consume Arc to raw client and prepared - pub fn decompose(db: Self) -> Result<(Client, PreparedStatementMap)> { - debug_assert_eq!(Arc::strong_count(&db.client), 1); - let client = Arc::into_inner(db.client) - .ok_or(LakeSoulMetaDataError::FfiError("restore client failed".to_string()))? - .into_inner(); - debug_assert_eq!(Arc::strong_count(&db.prepared), 1); - let prepared = Arc::into_inner(db.prepared) - .ok_or(LakeSoulMetaDataError::FfiError("restore prepared failed".to_string()))? - .into_inner(); - Ok((client, prepared)) - } - pub async fn create_namespace(&self, namespace: Namespace) -> Result<()> { self.insert_namespace(&namespace).await?; Ok(()) @@ -528,55 +504,6 @@ impl MetaDataClient { } } - async fn get_table_data_info_by_partition_info( - &self, - partition_info_arr: Vec, - ) -> Result> { - let mut file_info_buf = Vec::new(); - for pi in &partition_info_arr { - file_info_buf.extend(self.get_single_partition_data_info(pi).await?) - } - Ok(file_info_buf) - } - - /// return file info in this partition that match the current read version - async fn get_single_partition_data_info(&self, partition_info: &PartitionInfo) -> Result> { - let mut file_arr_buf = Vec::new(); - let data_commit_info_list = self.get_data_commit_info_of_single_partition(partition_info).await?; - for data_commit_info in &data_commit_info_list { - for file in &data_commit_info.file_ops { - file_arr_buf.push(DataFileInfo::compose(data_commit_info, file, partition_info)?) - } - } - Ok(self.filter_files(file_arr_buf)) - } - - /// 1:1 fork from scala by chat_gpt - fn filter_files(&self, file_arr_buf: Vec) -> Vec { - let mut dup_check = HashSet::new(); - let mut file_res_arr_buf = Vec::new(); - - if file_arr_buf.len() > 1 { - for i in (0..file_arr_buf.len()).rev() { - if file_arr_buf[i].file_op == "del" { - dup_check.insert(file_arr_buf[i].path.clone()); - } else if dup_check.is_empty() || !dup_check.contains(&file_arr_buf[i].path) { - file_res_arr_buf.push(file_arr_buf[i].clone()); - } - } - file_res_arr_buf.reverse(); - } else { - file_res_arr_buf = file_arr_buf.into_iter().filter(|item| item.file_op == "add").collect(); - } - - file_res_arr_buf - } - - pub async fn get_table_data_info(&self, table_id: &str) -> Result> { - // logic from scala: DataOperation - self.get_table_data_info_by_partition_info(self.get_all_partition_info(table_id).await?) - .await - } pub async fn get_data_files_by_table_name( &self, diff --git a/rust/lakesoul-metadata/src/transfusion.rs b/rust/lakesoul-metadata/src/transfusion.rs index 4bb98ceef..70bae9253 100644 --- a/rust/lakesoul-metadata/src/transfusion.rs +++ b/rust/lakesoul-metadata/src/transfusion.rs @@ -5,16 +5,19 @@ //! wrap catalog->split->reader //! [WIP] //! prototype -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; +use std::ops::{Deref, DerefMut}; +use prost::Message; use regex::Regex; use serde::{Deserialize, Serialize}; use serde_json::Value; +use tokio::sync::Mutex; use tokio_postgres::Client; -use proto::proto::entity::{DataCommitInfo, DataFileOp, FileOp, PartitionInfo, TableInfo}; +use proto::proto::entity::{DataCommitInfo, DataFileOp, FileOp, JniWrapper, PartitionInfo, TableInfo}; -use crate::{error::Result, MetaDataClient, PreparedStatementMap}; +use crate::{DaoType, error::Result, execute_query, PARAM_DELIM, PreparedStatementMap}; use crate::error::LakeSoulMetaDataError; use crate::transfusion::config::{ LAKESOUL_HASH_PARTITION_SPLITTER, LAKESOUL_NON_PARTITION_TABLE_PART_DESC, @@ -71,31 +74,11 @@ pub fn table_without_pk(hash_bucket_num: &str) -> bool { /// see https://users.rust-lang.org/t/dereferencing-a-boxed-value/86768 pub async fn split_desc_array( client: &Client, - prepared: &PreparedStatementMap, + prepared: &mut PreparedStatementMap, table_name: &str, namespace: &str, ) -> Result { - // make MetadataClient - let (mut boxed_client, mut boxed_prepared) = unsafe { - let boxed_client = Box::from_raw(client as *const Client as *mut Client); - let boxed_prepared = Box::from_raw(prepared as *const PreparedStatementMap as *mut PreparedStatementMap); - (boxed_client, boxed_prepared) - }; - let db = MetaDataClient::compose(*boxed_client, *boxed_prepared, 3); - let ret = split_desc_array_inner(&db, table_name, namespace).await; - - // restore client and prepared - - // the origin ptr disappeared - let (c, p) = MetaDataClient::decompose(db)?; - *boxed_client = c; - *boxed_prepared = p; - Box::into_raw(boxed_client); - Box::into_raw(boxed_prepared); - ret -} - -async fn split_desc_array_inner(db: &MetaDataClient, table_name: &str, namespace: &str) -> Result { + let db = RawClient::new(client, prepared); let table_info = db.get_table_info_by_table_name(table_name, namespace).await?; let data_files = db.get_table_data_info(&table_info.table_id).await?; @@ -154,6 +137,139 @@ async fn split_desc_array_inner(db: &MetaDataClient, table_name: &str, namespace Ok(SplitDescArray(splits)) } + +struct RawClient<'a> { + client: Mutex<&'a Client>, + prepared: Mutex<&'a mut PreparedStatementMap>, +} +// struct RawClient<'a> { +// client: &'a Client, +// prepared: &'a mut PreparedStatementMap, +// } + +impl<'a> RawClient<'_> { + fn new(client: &'a Client, prepared: &'a mut PreparedStatementMap) -> RawClient<'a> { + RawClient { + client: Mutex::new(client), + prepared: Mutex::new(prepared), + } + } + pub async fn get_table_info_by_table_name(&self, table_name: &str, namespace: &str) -> Result { + match self + .query( + DaoType::SelectTableInfoByTableNameAndNameSpace as i32, + [table_name, namespace].join(PARAM_DELIM), + ) + .await + { + Ok(wrapper) if wrapper.table_info.is_empty() => Err(crate::error::LakeSoulMetaDataError::NotFound( + format!("Table '{}' not found", table_name), + )), + Ok(wrapper) => Ok(wrapper.table_info[0].clone()), + Err(err) => Err(err), + } + } + + pub async fn get_table_data_info(&self, table_id: &str) -> Result> { + // logic from scala: DataOperation + let vec = self.get_all_partition_info(table_id).await?; + self.get_table_data_info_by_partition_info(vec) + .await + } + + async fn get_table_data_info_by_partition_info( + &self, + partition_info_arr: Vec, + ) -> Result> { + let mut file_info_buf = Vec::new(); + for pi in &partition_info_arr { + file_info_buf.extend(self.get_single_partition_data_info(pi).await?) + } + Ok(file_info_buf) + } + + + /// return file info in this partition that match the current read version + async fn get_single_partition_data_info(&self, partition_info: &PartitionInfo) -> Result> { + let mut file_arr_buf = Vec::new(); + let data_commit_info_list = self.get_data_commit_info_of_single_partition(partition_info).await?; + for data_commit_info in &data_commit_info_list { + for file in &data_commit_info.file_ops { + file_arr_buf.push(DataFileInfo::compose(data_commit_info, file, partition_info)?) + } + } + Ok(self.filter_files(file_arr_buf)) + } + + /// 1:1 fork from scala by chat_gpt + fn filter_files(&self, file_arr_buf: Vec) -> Vec { + let mut dup_check = HashSet::new(); + let mut file_res_arr_buf = Vec::new(); + + if file_arr_buf.len() > 1 { + for i in (0..file_arr_buf.len()).rev() { + if file_arr_buf[i].file_op == "del" { + dup_check.insert(file_arr_buf[i].path.clone()); + } else if dup_check.is_empty() || !dup_check.contains(&file_arr_buf[i].path) { + file_res_arr_buf.push(file_arr_buf[i].clone()); + } + } + file_res_arr_buf.reverse(); + } else { + file_res_arr_buf = file_arr_buf.into_iter().filter(|item| item.file_op == "add").collect(); + } + + file_res_arr_buf + } + + pub async fn get_all_partition_info(&self, table_id: &str) -> Result> { + match self + .query(DaoType::ListPartitionByTableId as i32, table_id.to_string()) + .await + { + Ok(wrapper) => Ok(wrapper.partition_info), + Err(e) => Err(e), + } + } + + /// maybe use AnyMap + async fn query(&self, query_type: i32, joined_string: String) -> Result { + let encoded = execute_query( + self.client.lock().await.deref(), + self.prepared.lock().await.deref_mut(), + query_type, + joined_string.clone(), + ) + .await?; + Ok(JniWrapper::decode(prost::bytes::Bytes::from(encoded))?) + } + + async fn get_data_commit_info_of_single_partition( + &self, + partition_info: &PartitionInfo, + ) -> Result> { + let table_id = &partition_info.table_id; + let partition_desc = &partition_info.partition_desc; + let joined_commit_id = &partition_info + .snapshot + .iter() + .map(|commit_id| format!("{:0>16x}{:0>16x}", commit_id.high, commit_id.low)) + .collect::>() + .join(""); + let joined_string = [table_id.as_str(), partition_desc.as_str(), joined_commit_id.as_str()].join(PARAM_DELIM); + match self + .query( + DaoType::ListDataCommitInfoByTableIdAndPartitionDescAndCommitList as i32, + joined_string, + ) + .await + { + Ok(wrapper) => Ok(wrapper.data_commit_info), + Err(e) => Err(e), + } + } +} + fn has_hash_partitions(table_info: &TableInfo) -> bool { let properties: Value = serde_json::from_str(&table_info.properties).expect("wrong properties"); if properties["hashBucketNum"] != Value::Null && properties["hashBucketNum"] == "-1" { @@ -306,12 +422,12 @@ mod test { #[test] fn serialize_test() { - let sd = SplitDesc { - file_paths: vec![], - primary_keys: vec![], - partition_desc: Default::default(), - table_schema: "".to_string(), - }; + let sd = SplitDesc { + file_paths: vec![], + primary_keys: vec![], + partition_desc: Default::default(), + table_schema: "".to_string(), + }; let s = serde_json::to_string(&sd).unwrap(); println!("{s}"); }