Skip to content

Commit

Permalink
[Rust] (metadata) move metadataclient to rawclient
Browse files Browse the repository at this point in the history
Signed-off-by: mag1c1an1 <[email protected]>
  • Loading branch information
mag1c1an1 committed Mar 10, 2024
1 parent 9c670fd commit a7dccd0
Show file tree
Hide file tree
Showing 15 changed files with 1,024 additions and 229 deletions.
36 changes: 36 additions & 0 deletions lakesoul-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,42 @@ SPDX-License-Identifier: Apache-2.0
<version>3.3.2</version>
<scope>${local.scope}</scope>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.parquet</groupId>-->
<!-- <artifactId>parquet-column</artifactId>-->
<!-- <version>1.13.1</version>-->
<!-- <scope>compile</scope>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>io.substrait</groupId>-->
<!-- <artifactId>core</artifactId>-->
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <groupId>org.slf4j</groupId>-->
<!-- <artifactId>slf4j-jdk14</artifactId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
<!-- <version>0.28.0</version>-->
<!-- <scope>compile</scope>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-table-common</artifactId>-->
<!-- <version>1.17.1</version>-->
<!-- <scope>compile</scope>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.spark</groupId>-->
<!-- <artifactId>spark-catalyst_2.12</artifactId>-->
<!-- <version>3.3.1</version>-->
<!-- <scope>compile</scope>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.spark</groupId>-->
<!-- <artifactId>spark-catalyst_2.12</artifactId>-->
<!-- <version>3.3.1</version>-->
<!-- <scope>compile</scope>-->
<!-- </dependency>-->

</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public interface LibLakeSoulMetaData {

void clean_meta_for_test(IntegerCallback integerCallback, Pointer runtime, Pointer client);

Pointer create_split_desc_array(IntegerCallback integerCallback, Pointer client, Pointer prepared, Pointer runtime, String tableName, String namespace);
Pointer create_split_desc_array(BooleanCallback booleanCallback, Pointer client, Pointer prepared, Pointer runtime, String tableName, String namespace);

void free_split_desc_array(Pointer json);

Expand All @@ -50,6 +50,8 @@ public interface LibLakeSoulMetaData {

void rust_logger_init();

void call_rust(@LongLong long addr, Integer len);

void hello_world(Callback<byte[]> bytesCallback);

void namespace(byte[] bytes, Integer len);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,69 +557,64 @@ public static void closeAll() {


/**
* if ffi function failed with -100
* should recreate pg client and prepared map
* if ffi function failed with -100
* should recreate pg client and prepared map
*
* @param tableName name
* @param namespace the np of TableInfo
* @return split(partition) desc array in json format by table_name, namespace , filter(WIP)
*/
public List<SplitDesc> createSplitDescArray(String tableName, String namespace) {
final CompletableFuture<Integer> future = new CompletableFuture<>();
getReadLock();
final CompletableFuture<Boolean> future = new CompletableFuture<>();
Pointer ptr = getLibLakeSoulMetaData()
.create_split_desc_array(
new ReferencedIntegerCallback((result, msg) -> {
new ReferencedBooleanCallback((result, msg) -> {
if (msg != null) {
System.err.println(msg);
}
future.complete(result);
}, getIntegerCallbackObjectReferenceManager()),
}, getbooleanCallbackObjectReferenceManager()),
tokioPostgresClient,
preparedStatement,
tokioRuntime,
tableName,
namespace);
try {
Integer ans = future.get(timeout, TimeUnit.MILLISECONDS);
if (ans == 0) {
Boolean ans = future.get(timeout, TimeUnit.MILLISECONDS);
if (ans) {
// This copies a zero (nul) terminated by array from native memory.
String json = ptr.getString(0);
List<SplitDesc> list = JSON.parseArray(json, SplitDesc.class);
getLibLakeSoulMetaData().free_split_desc_array(ptr);
return list;
}
// errors
if (ans == -100) {
// recreate pg client and prepared
String config = String.format(
"host=%s port=%s dbname=%s user=%s password=%s",
dataBaseProperty.getHost(),
dataBaseProperty.getPort(),
dataBaseProperty.getDbName(),
dataBaseProperty.getUsername(),
dataBaseProperty.getPassword());
final CompletableFuture<Boolean> future_ = new CompletableFuture<>();
tokioPostgresClient = libLakeSoulMetaData.create_tokio_postgres_client(
new ReferencedBooleanCallback((bool, msg) -> {
if (msg.isEmpty()) {
future_.complete(bool);
} else {
System.err.println(msg);
future_.completeExceptionally(new IOException(msg));
}
}, getbooleanCallbackObjectReferenceManager()),
config,
tokioRuntime
);
preparedStatement = libLakeSoulMetaData.create_prepared_statement();
future.get(timeout, TimeUnit.MILLISECONDS);
}
// other errors
throw new RuntimeException("create split desc array failed");
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
LOG.error("create split desc array timeout");
throw new RuntimeException(e);
} finally {
unlockReadLock();
}
}

// void filter_rel(io.substrait.proto.Expression e) {
// byte[] byteArray = e.toByteArray();
// int length = byteArray.length;
// Pointer buffer = fixedBuffer;
// if (length < fixedBuffer.size())
// fixedBuffer.put(0, byteArray, 0, length);
// else if (length < mutableBuffer.size()) {
// mutableBuffer.put(0, byteArray, 0, length);
// buffer = mutableBuffer;
// } else {
// mutableBuffer = Runtime.getRuntime(libLakeSoulMetaData).getMemoryManager().allocateDirect(length);
// mutableBuffer.put(0, byteArray, 0, length);
// buffer = mutableBuffer;
// }
// getLibLakeSoulMetaData().call_rust(buffer.address(), length);
// }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
//package com.dmetasoul.lakesoul.meta.substrait;
//
//import io.substrait.expression.ImmutableExpression;
//import org.apache.flink.table.api.TableException;
//import org.apache.flink.table.expressions.Expression;
//import org.apache.flink.table.expressions.ExpressionDefaultVisitor;
//import org.apache.flink.table.expressions.ValueLiteralExpression;
//
//public class SubstraitLiteralVisitor extends ExpressionDefaultVisitor<io.substrait.expression.Expression.Literal> {
// public io.substrait.expression.Expression.Literal visit(ValueLiteralExpression valueExpr) {
// return null;
// }
//
// @Override
// protected io.substrait.expression.Expression.Literal defaultMethod(Expression expression) {
// throw new TableException("Unexpected expression: " + expression);
// }
//}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
//package com.dmetasoul.lakesoul.meta.substrait;
//
//import io.substrait.expression.Expression;
//
//import io.substrait.expression.proto.ExpressionProtoConverter;
//import org.apache.flink.table.expressions.ResolvedExpression;
//
//public class SubstraitUtil {
//
// public static Expression doTransform(ResolvedExpression flinkExpression) {
// // TODO
// SubstraitVisitor substraitVisitor = new SubstraitVisitor();
// return flinkExpression.accept(substraitVisitor);
// }
//
// public static io.substrait.proto.Expression toProto(Expression expr) {
// ExpressionProtoConverter converter = new ExpressionProtoConverter(null, null);
// return expr.accept(converter);
// }
//
//}
//
Loading

0 comments on commit a7dccd0

Please sign in to comment.