Skip to content

Commit

Permalink
[Rust] (metadata) move metadataclient to rawclient (#451)
Browse files Browse the repository at this point in the history
clean code

Signed-off-by: mag1c1an1 <[email protected]>
  • Loading branch information
mag1c1an1 authored Mar 13, 2024
1 parent 9c670fd commit 57bb458
Show file tree
Hide file tree
Showing 12 changed files with 393 additions and 230 deletions.
1 change: 0 additions & 1 deletion lakesoul-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,6 @@ SPDX-License-Identifier: Apache-2.0
<version>3.3.2</version>
<scope>${local.scope}</scope>
</dependency>

</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public interface LibLakeSoulMetaData {

void clean_meta_for_test(IntegerCallback integerCallback, Pointer runtime, Pointer client);

Pointer create_split_desc_array(IntegerCallback integerCallback, Pointer client, Pointer prepared, Pointer runtime, String tableName, String namespace);
Pointer create_split_desc_array(BooleanCallback booleanCallback, Pointer client, Pointer prepared, Pointer runtime, String tableName, String namespace);

void free_split_desc_array(Pointer json);

Expand All @@ -50,6 +50,8 @@ public interface LibLakeSoulMetaData {

void rust_logger_init();

void call_rust(@LongLong long addr, Integer len);

void hello_world(Callback<byte[]> bytesCallback);

void namespace(byte[] bytes, Integer len);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,69 +557,64 @@ public static void closeAll() {


/**
* if ffi function failed with -100
* should recreate pg client and prepared map
* if ffi function failed with -100
* should recreate pg client and prepared map
*
* @param tableName name
* @param namespace the np of TableInfo
* @return split(partition) desc array in json format by table_name, namespace , filter(WIP)
*/
public List<SplitDesc> createSplitDescArray(String tableName, String namespace) {
final CompletableFuture<Integer> future = new CompletableFuture<>();
getReadLock();
final CompletableFuture<Boolean> future = new CompletableFuture<>();
Pointer ptr = getLibLakeSoulMetaData()
.create_split_desc_array(
new ReferencedIntegerCallback((result, msg) -> {
new ReferencedBooleanCallback((result, msg) -> {
if (msg != null) {
System.err.println(msg);
}
future.complete(result);
}, getIntegerCallbackObjectReferenceManager()),
}, getbooleanCallbackObjectReferenceManager()),
tokioPostgresClient,
preparedStatement,
tokioRuntime,
tableName,
namespace);
try {
Integer ans = future.get(timeout, TimeUnit.MILLISECONDS);
if (ans == 0) {
Boolean ans = future.get(timeout, TimeUnit.MILLISECONDS);
if (ans) {
// This copies a zero (nul) terminated by array from native memory.
String json = ptr.getString(0);
List<SplitDesc> list = JSON.parseArray(json, SplitDesc.class);
getLibLakeSoulMetaData().free_split_desc_array(ptr);
return list;
}
// errors
if (ans == -100) {
// recreate pg client and prepared
String config = String.format(
"host=%s port=%s dbname=%s user=%s password=%s",
dataBaseProperty.getHost(),
dataBaseProperty.getPort(),
dataBaseProperty.getDbName(),
dataBaseProperty.getUsername(),
dataBaseProperty.getPassword());
final CompletableFuture<Boolean> future_ = new CompletableFuture<>();
tokioPostgresClient = libLakeSoulMetaData.create_tokio_postgres_client(
new ReferencedBooleanCallback((bool, msg) -> {
if (msg.isEmpty()) {
future_.complete(bool);
} else {
System.err.println(msg);
future_.completeExceptionally(new IOException(msg));
}
}, getbooleanCallbackObjectReferenceManager()),
config,
tokioRuntime
);
preparedStatement = libLakeSoulMetaData.create_prepared_statement();
future.get(timeout, TimeUnit.MILLISECONDS);
}
// other errors
throw new RuntimeException("create split desc array failed");
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
LOG.error("create split desc array timeout");
throw new RuntimeException(e);
} finally {
unlockReadLock();
}
}

// void filter_rel(io.substrait.proto.Expression e) {
// byte[] byteArray = e.toByteArray();
// int length = byteArray.length;
// Pointer buffer = fixedBuffer;
// if (length < fixedBuffer.size())
// fixedBuffer.put(0, byteArray, 0, length);
// else if (length < mutableBuffer.size()) {
// mutableBuffer.put(0, byteArray, 0, length);
// buffer = mutableBuffer;
// } else {
// mutableBuffer = Runtime.getRuntime(libLakeSoulMetaData).getMemoryManager().allocateDirect(length);
// mutableBuffer.put(0, byteArray, 0, length);
// buffer = mutableBuffer;
// }
// getLibLakeSoulMetaData().call_rust(buffer.address(), length);
// }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
//package com.dmetasoul.lakesoul.meta.jnr;
//
//import com.dmetasoul.lakesoul.meta.substrait.SubstraitUtil;
//import io.substrait.expression.Expression;
//import io.substrait.expression.ExpressionCreator;
//import io.substrait.expression.FunctionArg;
//import io.substrait.expression.ImmutableExpression;
//import io.substrait.expression.proto.ExpressionProtoConverter;
//import io.substrait.extension.ExtensionCollector;
//import io.substrait.extension.ImmutableSimpleExtension;
//import io.substrait.extension.SimpleExtension;
//import io.substrait.type.Type;
//import io.substrait.type.TypeCreator;
//import org.apache.flink.table.expressions.*;
//import org.apache.flink.table.functions.BuiltInFunctionDefinition;
//import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
//import org.apache.flink.table.types.AtomicDataType;
//import org.apache.flink.table.types.logical.BooleanType;
//import org.apache.flink.table.types.logical.IntType;
//import org.junit.Test;
//
//import java.io.IOException;
//import java.util.ArrayList;
//import java.util.Collections;
//import java.util.List;
//
//public class SubstraitTest {
// @Test
// public void generalExprTest() {
// BuiltInFunctionDefinition equalsFunction = BuiltInFunctionDefinitions.EQUALS;
// ValueLiteralExpression valExpr = new ValueLiteralExpression(3, new AtomicDataType(new IntType(false)));
// FieldReferenceExpression orderId = new FieldReferenceExpression("order_id",
// new AtomicDataType(new IntType())
// , 0, 0);
// List<ResolvedExpression> args = new ArrayList<>();
// args.add(orderId);
// args.add(valExpr);
// CallExpression p = CallExpression.permanent(
// equalsFunction,
// args,
// new AtomicDataType(new BooleanType()));
// Expression substraitExpr = SubstraitUtil.doTransform(p);
// io.substrait.proto.Expression proto = toProto(new ExtensionCollector(), substraitExpr);
// NativeMetadataJavaClient.getInstance().filter_rel(proto);
// }
//
// @Test
// public void literalExprTest() {
// ValueLiteralExpression valExpr = new ValueLiteralExpression(3, new AtomicDataType(new IntType(false)));
// Expression substraitExpr = SubstraitUtil.doTransform(valExpr);
// System.out.println(substraitExpr);
// }
//
// @Test
// public void FieldRefTest() {
// FieldReferenceExpression orderId = new FieldReferenceExpression("order_id",
// new AtomicDataType(new IntType())
// , 0, 0);
// Expression expr = SubstraitUtil.doTransform(orderId);
// System.out.println(expr);
// System.out.println(toProto(null, expr));
// }
//
// private io.substrait.proto.Expression toProto(ExtensionCollector collector, Expression expr) {
// return expr.accept(new ExpressionProtoConverter(collector, null));
// }
//
// @Test
// public void callExprTest() {
// try {
// SimpleExtension.ExtensionCollection extensionCollection = SimpleExtension.loadDefaults();
// SimpleExtension.ScalarFunctionVariant desc = extensionCollection.getScalarFunction(SimpleExtension.FunctionAnchor.of("/functions_comparison.yaml", "equal:any_any"));
// Expression.ScalarFunctionInvocation si = ExpressionCreator.scalarFunction(desc, TypeCreator.NULLABLE.I32);
// io.substrait.proto.Expression p = toProto(new ExtensionCollector(), si);
// System.out.println(p);
//
//
// } catch (IOException e) {
// throw new RuntimeException(e);
// }
// }
//}
Loading

0 comments on commit 57bb458

Please sign in to comment.