Skip to content

Commit

Permalink
create split in rust code
Browse files Browse the repository at this point in the history
Signed-off-by: mag1c1an1 <[email protected]>

bump up prost

Signed-off-by: mag1c1an1 <[email protected]>

copy from doris

Signed-off-by: mag1c1an1 <[email protected]>

add c bindings

Signed-off-by: mag1c1an1 <[email protected]>

add tests

Signed-off-by: mag1c1an1 <[email protected]>

write interface in java

Signed-off-by: mag1c1an1 <[email protected]>

interface 0.1.0

Signed-off-by: mag1c1an1 <[email protected]>

cargo clippy && cargo fmt

Signed-off-by: mag1c1an1 <[email protected]>

use try_logger

Signed-off-by: mag1c1an1 <[email protected]>
  • Loading branch information
mag1c1an1 committed Mar 1, 2024
1 parent 78a0456 commit 77727a0
Show file tree
Hide file tree
Showing 31 changed files with 1,225 additions and 234 deletions.
2 changes: 2 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
cmake_minimum_required(VERSION 3.13 FATAL_ERROR)
project(LakeSoulCPP VERSION 1.0.0.0 LANGUAGES CXX)

set(CMAKE_EXPORT_COMPILE_COMMANDS ON)

# Find Python interpreter
find_package(Python REQUIRED COMPONENTS Interpreter)
message(STATUS "Found Python ${Python_VERSION} at ${Python_EXECUTABLE}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,21 @@ public interface LibLakeSoulMetaData {

void clean_meta_for_test(IntegerCallback integerCallback, Pointer runtime, Pointer client);

Pointer create_split_desc_array(IntegerCallback integerCallback, Pointer client, Pointer prepared, Pointer runtime, String tableName, String namespace);

void free_split_desc_array(Pointer json);

/**
* caller should ensure that ptr is valid
*
* @param c_string ptr
*/
void free_c_string(Pointer c_string);

String debug(BooleanCallback booleanCallback);

void rust_logger_init();

void hello_world(Callback<byte[]> bytesCallback);

void namespace(byte[] bytes, Integer len);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// SPDX-License-Identifier: Apache-2.0
package com.dmetasoul.lakesoul.meta.jnr;

import com.alibaba.fastjson.JSON;
import com.dmetasoul.lakesoul.meta.DBUtil;
import com.dmetasoul.lakesoul.meta.DataBaseProperty;
import com.dmetasoul.lakesoul.meta.entity.JniWrapper;
Expand Down Expand Up @@ -187,6 +188,7 @@ public void close() {
}

private void initialize() {
libLakeSoulMetaData.rust_logger_init();
DataBaseProperty dataBaseProperty = NativeMetadataJavaClient.dataBaseProperty;
if (dataBaseProperty == null) {
dataBaseProperty = DBUtil.getDBInfo();
Expand Down Expand Up @@ -552,4 +554,72 @@ public static void closeAll() {
instance = null;
}
}


/**
* if ffi function failed with -100
* should recreate pg client and prepared map
* @param tableName name
* @param namespace the np of TableInfo
* @return split(partition) desc array in json format by table_name, namespace , filter(WIP)
*/
public List<SplitDesc> createSplitDescArray(String tableName, String namespace) {
final CompletableFuture<Integer> future = new CompletableFuture<>();
Pointer ptr = getLibLakeSoulMetaData()
.create_split_desc_array(
new ReferencedIntegerCallback((result, msg) -> {
if (msg != null) {
System.err.println(msg);
}
future.complete(result);
}, getIntegerCallbackObjectReferenceManager()),
tokioPostgresClient,
preparedStatement,
tokioRuntime,
tableName,
namespace);
try {
Integer ans = future.get(timeout, TimeUnit.MILLISECONDS);
if (ans == 0) {
// This copies a zero (nul) terminated by array from native memory.
String json = ptr.getString(0);
List<SplitDesc> list = JSON.parseArray(json, SplitDesc.class);
getLibLakeSoulMetaData().free_split_desc_array(ptr);
return list;
}
// errors
if (ans == -100) {
// recreate pg client and prepared
String config = String.format(
"host=%s port=%s dbname=%s user=%s password=%s",
dataBaseProperty.getHost(),
dataBaseProperty.getPort(),
dataBaseProperty.getDbName(),
dataBaseProperty.getUsername(),
dataBaseProperty.getPassword());
final CompletableFuture<Boolean> future_ = new CompletableFuture<>();
tokioPostgresClient = libLakeSoulMetaData.create_tokio_postgres_client(
new ReferencedBooleanCallback((bool, msg) -> {
if (msg.isEmpty()) {
future_.complete(bool);
} else {
System.err.println(msg);
future_.completeExceptionally(new IOException(msg));
}
}, getbooleanCallbackObjectReferenceManager()),
config,
tokioRuntime
);
preparedStatement = libLakeSoulMetaData.create_prepared_statement();
future.get(timeout, TimeUnit.MILLISECONDS);
}
// other errors
throw new RuntimeException("create split desc array failed");
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
LOG.error("create split desc array timeout");
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.dmetasoul.lakesoul.meta.jnr;

import com.alibaba.fastjson.annotation.JSONField;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;

import java.util.List;

public class SplitDesc {
@JSONField(name = "file_paths")
private List<String> filePaths;
@JSONField(name = "primary_keys")
private List<String> primaryKeys;
@JSONField(name = "partition_desc_array")
private List<String> partitionDescArray;
@JSONField(name = "table_schema")
private String tableSchema;

public List<String> getFilePaths() {
return filePaths;
}

public void setFilePaths(List<String> filePaths) {
this.filePaths = filePaths;
}

public List<String> getPrimaryKeys() {
return primaryKeys;
}

public void setPrimaryKeys(List<String> primaryKeys) {
this.primaryKeys = primaryKeys;
}

public List<String> getPartitionDescArray() {
return partitionDescArray;
}

public void setPartitionDescArray(List<String> partitionDescArray) {
this.partitionDescArray = partitionDescArray;
}

public String getTableSchema() {
return tableSchema;
}

public void setTableSchema(String tableSchema) {
this.tableSchema = tableSchema;
}
@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.MULTI_LINE_STYLE)
.append("file_paths", filePaths)
.append("primary_keys", primaryKeys)
.append("partition_desc_array", partitionDescArray)
.append("table_schema", tableSchema)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class LakeSoulPendingSplits {
private final List<LakeSoulSplit> splits;

/**
* Already discovered lastest version's timestamp
* Already discovered latest version's timestamp
* For streaming only
*/
private final long lastReadTimestamp;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package org.apache.spark.sql.lakesoul

import com.dmetasoul.lakesoul.meta.jnr.NativeMetadataJavaClient
import com.dmetasoul.lakesoul.tables.LakeSoulTable
import org.apache.spark.sql._
import org.junit.runner.RunWith
import org.scalatest.funsuite.AnyFunSuite
import org.scalatestplus.junit.JUnitRunner

// TODO scala_test
//object JnrSuite {
// def main(args: Array[String]): Unit = {
// val spark = SparkSession.builder.master("local")
// .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
// // 使用 SQL 功能还需要增加以下两个配置项
// .config("spark.sql.catalog.lakesoul", "org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog")
// .config("spark.sql.defaultCatalog", "lakesoul")
// .getOrCreate()
// import spark.implicits._
// val df = Seq(
// ("2021-01-01", 1, "rice1"),
// ("2021-01-02", 2, "rice2"),
// ("2021-01-03", 3, "rice3"),
// ("2021-01-04", 4, "rice4"),
// ("2021-01-05", 5, "rice5"),
// ("2021-01-06", 6, "bread"),
// ("2021-01-01", 7, "rice7")
// ).toDF("date", "id", "name")
// val tablePath = "file:///tmp/lakesoul/lakesoul-test-bucket/data"
// //create table
// //spark batch
// df.write
// .mode("append")
// .format("lakesoul")
// .option("rangePartitions", "date")
// .option("hashPartitions", "id")
// .option("hashBucketNum", "2")
// .save(tablePath)
// val descs = NativeMetadataJavaClient.getInstance().createSplitDescArray("", "default");
// println(descs)
// }
//}


@RunWith(classOf[JUnitRunner])
class JnrSuite extends AnyFunSuite {
test("basic" ) {
val spark = SparkSession.builder.master("local")
.config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
// 使用 SQL 功能还需要增加以下两个配置项
.config("spark.sql.catalog.lakesoul", "org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog")
.config("spark.sql.defaultCatalog", "lakesoul")
.config("spark.hadoop.fs.s3.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.hadoop.fs.s3a.buffer.dir","/tmp/spark")
.config("spark.hadoop.fs.s3a.path.style.access","true")
.config("spark.hadoop.fs.s3a.endpoint","http://localhost:9000")
.config("spark.hadoop.fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
.getOrCreate()
import spark.implicits._
val tablePath = "s3://lakesoul-test-bucket/test_table"
val df = Seq(("2021-01-01", 1, "rice"), ("2021-01-01", 2, "bread")).toDF("date", "id", "name")
df.write
.mode("append")
.format("lakesoul")
.option("rangePartitions", "date")
.option("hashPartitions", "id")
.option("hashBucketNum", "2")
.save(tablePath)
println("hello scala test")
}

test("read") {
val spark = SparkSession.builder.master("local")
.config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
// 使用 SQL 功能还需要增加以下两个配置项
.config("spark.sql.catalog.lakesoul", "org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog")
.config("spark.sql.defaultCatalog", "lakesoul")
.config("spark.hadoop.fs.s3.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.hadoop.fs.s3a.buffer.dir","/tmp/spark")
.config("spark.hadoop.fs.s3a.path.style.access","true")
.config("spark.hadoop.fs.s3a.endpoint","http://localhost:9000")
.config("spark.hadoop.fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
.getOrCreate()
val tablePath = "s3://lakesoul-test-bucket/test_table"
val df = spark.read.format("lakesoul").load(tablePath)
df.show()
}

test("delete") {
val spark = SparkSession.builder.master("local")
.config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
// 使用 SQL 功能还需要增加以下两个配置项
.config("spark.sql.catalog.lakesoul", "org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog")
.config("spark.sql.defaultCatalog", "lakesoul")
.config("spark.hadoop.fs.s3.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.hadoop.fs.s3a.buffer.dir","/tmp/spark")
.config("spark.hadoop.fs.s3a.path.style.access","true")
.config("spark.hadoop.fs.s3a.endpoint","http://localhost:9000")
.config("spark.hadoop.fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
.getOrCreate()
val tablePath = "s3://lakesoul-test-bucket/test_table"
LakeSoulTable.forPath(tablePath).delete()
}

test("clean bucket") {
val spark = SparkSession.builder.master("local")
.config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
// 使用 SQL 功能还需要增加以下两个配置项
.config("spark.sql.catalog.lakesoul", "org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog")
.config("spark.sql.defaultCatalog", "lakesoul")
.config("spark.hadoop.fs.s3.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.hadoop.fs.s3a.buffer.dir","/tmp/spark")
.config("spark.hadoop.fs.s3a.path.style.access","true")
.config("spark.hadoop.fs.s3a.endpoint","http://localhost:9000")
.config("spark.hadoop.fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
.getOrCreate()
}

}
Loading

0 comments on commit 77727a0

Please sign in to comment.