Skip to content

Commit

Permalink
[Rust] Create split in rust code (#448)
Browse files Browse the repository at this point in the history
bump up prost



copy from doris



add c bindings



add tests



write interface in java



interface 0.1.0



cargo clippy && cargo fmt



use try_logger



clean code



change splitdesc def

Signed-off-by: mag1c1an1 <[email protected]>
  • Loading branch information
mag1c1an1 authored Mar 5, 2024
1 parent 2224829 commit 0d377b8
Show file tree
Hide file tree
Showing 30 changed files with 1,941 additions and 308 deletions.
2 changes: 2 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
cmake_minimum_required(VERSION 3.13 FATAL_ERROR)
project(LakeSoulCPP VERSION 1.0.0.0 LANGUAGES CXX)

set(CMAKE_EXPORT_COMPILE_COMMANDS ON)

# Find Python interpreter
find_package(Python REQUIRED COMPONENTS Interpreter)
message(STATUS "Found Python ${Python_VERSION} at ${Python_EXECUTABLE}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,21 @@ public interface LibLakeSoulMetaData {

void clean_meta_for_test(IntegerCallback integerCallback, Pointer runtime, Pointer client);

Pointer create_split_desc_array(IntegerCallback integerCallback, Pointer client, Pointer prepared, Pointer runtime, String tableName, String namespace);

void free_split_desc_array(Pointer json);

/**
* caller should ensure that ptr is valid
*
* @param c_string ptr
*/
void free_c_string(Pointer c_string);

String debug(BooleanCallback booleanCallback);

void rust_logger_init();

void hello_world(Callback<byte[]> bytesCallback);

void namespace(byte[] bytes, Integer len);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// SPDX-License-Identifier: Apache-2.0
package com.dmetasoul.lakesoul.meta.jnr;

import com.alibaba.fastjson.JSON;
import com.dmetasoul.lakesoul.meta.DBUtil;
import com.dmetasoul.lakesoul.meta.DataBaseProperty;
import com.dmetasoul.lakesoul.meta.entity.JniWrapper;
Expand Down Expand Up @@ -187,6 +188,7 @@ public void close() {
}

private void initialize() {
libLakeSoulMetaData.rust_logger_init();
DataBaseProperty dataBaseProperty = NativeMetadataJavaClient.dataBaseProperty;
if (dataBaseProperty == null) {
dataBaseProperty = DBUtil.getDBInfo();
Expand Down Expand Up @@ -552,4 +554,72 @@ public static void closeAll() {
instance = null;
}
}


/**
* if ffi function failed with -100
* should recreate pg client and prepared map
* @param tableName name
* @param namespace the np of TableInfo
* @return split(partition) desc array in json format by table_name, namespace , filter(WIP)
*/
public List<SplitDesc> createSplitDescArray(String tableName, String namespace) {
final CompletableFuture<Integer> future = new CompletableFuture<>();
Pointer ptr = getLibLakeSoulMetaData()
.create_split_desc_array(
new ReferencedIntegerCallback((result, msg) -> {
if (msg != null) {
System.err.println(msg);
}
future.complete(result);
}, getIntegerCallbackObjectReferenceManager()),
tokioPostgresClient,
preparedStatement,
tokioRuntime,
tableName,
namespace);
try {
Integer ans = future.get(timeout, TimeUnit.MILLISECONDS);
if (ans == 0) {
// This copies a zero (nul) terminated by array from native memory.
String json = ptr.getString(0);
List<SplitDesc> list = JSON.parseArray(json, SplitDesc.class);
getLibLakeSoulMetaData().free_split_desc_array(ptr);
return list;
}
// errors
if (ans == -100) {
// recreate pg client and prepared
String config = String.format(
"host=%s port=%s dbname=%s user=%s password=%s",
dataBaseProperty.getHost(),
dataBaseProperty.getPort(),
dataBaseProperty.getDbName(),
dataBaseProperty.getUsername(),
dataBaseProperty.getPassword());
final CompletableFuture<Boolean> future_ = new CompletableFuture<>();
tokioPostgresClient = libLakeSoulMetaData.create_tokio_postgres_client(
new ReferencedBooleanCallback((bool, msg) -> {
if (msg.isEmpty()) {
future_.complete(bool);
} else {
System.err.println(msg);
future_.completeExceptionally(new IOException(msg));
}
}, getbooleanCallbackObjectReferenceManager()),
config,
tokioRuntime
);
preparedStatement = libLakeSoulMetaData.create_prepared_statement();
future.get(timeout, TimeUnit.MILLISECONDS);
}
// other errors
throw new RuntimeException("create split desc array failed");
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
LOG.error("create split desc array timeout");
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.dmetasoul.lakesoul.meta.jnr;

import com.alibaba.fastjson.annotation.JSONField;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.hadoop.util.hash.Hash;

import java.util.HashMap;
import java.util.List;

public class SplitDesc {
@JSONField(name = "file_paths")
private List<String> filePaths;
@JSONField(name = "primary_keys")
private List<String> primaryKeys;
@JSONField(name = "partition_desc")
private HashMap<String, String> partitionDesc;
@JSONField(name = "table_schema")
private String tableSchema;

public List<String> getFilePaths() {
return filePaths;
}

public void setFilePaths(List<String> filePaths) {
this.filePaths = filePaths;
}

public List<String> getPrimaryKeys() {
return primaryKeys;
}

public void setPrimaryKeys(List<String> primaryKeys) {
this.primaryKeys = primaryKeys;
}

public HashMap<String, String> getPartitionDesc() {
return partitionDesc;
}

public void setPartitionDesc(HashMap<String, String> partitionDesc) {
this.partitionDesc= partitionDesc;
}

public String getTableSchema() {
return tableSchema;
}

public void setTableSchema(String tableSchema) {
this.tableSchema = tableSchema;
}

@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.MULTI_LINE_STYLE)
.append("file_paths", filePaths)
.append("primary_keys", primaryKeys)
.append("partition_desc", partitionDesc)
.append("table_schema", tableSchema)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class LakeSoulPendingSplits {
private final List<LakeSoulSplit> splits;

/**
* Already discovered lastest version's timestamp
* Already discovered latest version's timestamp
* For streaming only
*/
private final long lastReadTimestamp;
Expand Down
Loading

0 comments on commit 0d377b8

Please sign in to comment.