Skip to content

Commit

Permalink
fix statement set with connection level cache
Browse files Browse the repository at this point in the history
Signed-off-by: chenxu <[email protected]>
  • Loading branch information
dmetasoul01 committed Nov 8, 2024
1 parent db4f800 commit aa19309
Show file tree
Hide file tree
Showing 9 changed files with 361 additions and 359 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,25 @@ public interface LibLakeSoulMetaData {

void free_tokio_runtime(Pointer runtime);

Pointer create_prepared_statement();

void free_prepared_statement(Pointer prepared);

Pointer create_tokio_postgres_client(BooleanCallback booleanCallback, String config, Pointer runtime);

void free_tokio_postgres_client(Pointer client);

Pointer execute_query(IntegerCallback integerCallback, Pointer runtime, Pointer client, Pointer prepared, Integer type, String texts);
Pointer execute_query(IntegerCallback integerCallback, Pointer runtime, Pointer client, Integer type, String texts);

void export_bytes_result(BooleanCallback booleanCallback, Pointer bytes, Integer len, @LongLong long addr);

void free_bytes_result(Pointer bytes);

void execute_update(IntegerCallback integerCallback, Pointer runtime, Pointer client, Pointer prepared, Integer type, String texts);
void execute_update(IntegerCallback integerCallback, Pointer runtime, Pointer client, Integer type, String texts);

void execute_query_scalar(StringCallback stringCallback, Pointer runtime, Pointer client, Pointer prepared, Integer type, String texts);
void execute_query_scalar(StringCallback stringCallback, Pointer runtime, Pointer client, Integer type, String texts);

void execute_insert(IntegerCallback integerCallback, Pointer runtime, Pointer client, Pointer prepared, Integer type, @LongLong long addr, int length);
void execute_insert(IntegerCallback integerCallback, Pointer runtime, Pointer client, Integer type, @LongLong long addr, int length);

void clean_meta_for_test(IntegerCallback integerCallback, Pointer runtime, Pointer client);

Pointer create_split_desc_array(BooleanCallback booleanCallback, Pointer client, Pointer prepared, Pointer runtime, String tableName, String namespace);
Pointer create_split_desc_array(BooleanCallback booleanCallback, Pointer client, Pointer runtime, String tableName, String namespace);

void free_split_desc_array(Pointer json);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package com.dmetasoul.lakesoul.meta.jnr;

import com.alibaba.fastjson.JSON;
import com.dmetasoul.lakesoul.meta.DBConnector;
import com.dmetasoul.lakesoul.meta.DBUtil;
import com.dmetasoul.lakesoul.meta.DataBaseProperty;
import com.dmetasoul.lakesoul.meta.entity.JniWrapper;
Expand Down Expand Up @@ -39,9 +38,6 @@ public class NativeMetadataJavaClient implements AutoCloseable {
private Pointer tokioPostgresClient = null;
private Pointer tokioRuntime = null;

private Pointer preparedStatement = null;


protected final LibLakeSoulMetaData libLakeSoulMetaData;

protected final ObjectReferenceManager<LibLakeSoulMetaData.BooleanCallback> booleanCallbackObjectReferenceManager;
Expand Down Expand Up @@ -212,7 +208,6 @@ private void initialize() {
config,
tokioRuntime
);
preparedStatement = libLakeSoulMetaData.create_prepared_statement();
try {
future.get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException e) {
Expand Down Expand Up @@ -241,7 +236,6 @@ public JniWrapper executeQuery(Integer queryType, List<String> params) {
}, getIntegerCallbackObjectReferenceManager()),
tokioRuntime,
tokioPostgresClient,
preparedStatement,
queryType,
String.join(PARAM_DELIM, params)
);
Expand Down Expand Up @@ -340,7 +334,6 @@ public Integer executeInsert(Integer insertType, JniWrapper jniWrapper) {
}, getIntegerCallbackObjectReferenceManager()),
tokioRuntime,
tokioPostgresClient,
preparedStatement,
insertType,
buffer.address(),
bytes.length
Expand Down Expand Up @@ -388,7 +381,6 @@ public Integer executeUpdate(Integer updateType, List<String> params) {
}, getIntegerCallbackObjectReferenceManager()),
tokioRuntime,
tokioPostgresClient,
preparedStatement,
updateType,
String.join(PARAM_DELIM, params)
);
Expand Down Expand Up @@ -435,7 +427,6 @@ public List<String> executeQueryScalar(Integer queryScalarType, List<String> par
}, getStringCallbackObjectReferenceManager()),
tokioRuntime,
tokioPostgresClient,
preparedStatement,
queryScalarType,
String.join(PARAM_DELIM, params)
);
Expand Down Expand Up @@ -529,10 +520,6 @@ public void close() {
libLakeSoulMetaData.free_tokio_postgres_client(tokioPostgresClient);
tokioPostgresClient = null;
}
if (preparedStatement != null) {
libLakeSoulMetaData.free_prepared_statement(preparedStatement);
preparedStatement = null;
}
}

public static void closeAll() {
Expand Down Expand Up @@ -566,7 +553,6 @@ public List<SplitDesc> createSplitDescArray(String tableName, String namespace)
future.complete(result);
}, getbooleanCallbackObjectReferenceManager()),
tokioPostgresClient,
preparedStatement,
tokioRuntime,
tableName,
namespace);
Expand Down
1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 5 additions & 40 deletions rust/lakesoul-metadata-c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,20 +86,6 @@ fn _call_integer_result_callback(callback: IntegerResultCallBack, status: i32, e
}
}

// #[repr(C)]
// struct CVoid {
// data: *const c_void,
// }

// unsafe impl Send for CVoid {}

// unsafe impl Sync for CVoid {}

#[repr(C)]
pub struct PreparedStatement {
private: [u8; 0],
}

#[repr(C)]
pub struct TokioPostgresClient {
private: [u8; 0],
Expand Down Expand Up @@ -140,19 +126,17 @@ pub extern "C" fn execute_insert(
callback: extern "C" fn(i32, *const c_char),
runtime: NonNull<CResult<TokioRuntime>>,
client: NonNull<CResult<TokioPostgresClient>>,
prepared: NonNull<CResult<PreparedStatement>>,
insert_type: i32,
addr: c_ptrdiff_t,
len: i32,
) {
let runtime = unsafe { NonNull::new_unchecked(runtime.as_ref().ptr as *mut Runtime).as_ref() };
let client = unsafe { NonNull::new_unchecked(client.as_ref().ptr as *mut PooledClient).as_mut() };
let prepared = unsafe { NonNull::new_unchecked(prepared.as_ref().ptr as *mut PreparedStatementMap).as_mut() };

let raw_parts = unsafe { std::slice::from_raw_parts(addr as *const u8, len as usize) };
let wrapper = entity::JniWrapper::decode(prost::bytes::Bytes::from(raw_parts)).unwrap();
let result =
runtime.block_on(async { lakesoul_metadata::execute_insert(client, prepared, insert_type, wrapper).await });
runtime.block_on(async { lakesoul_metadata::execute_insert(client, insert_type, wrapper).await });
match result {
Ok(count) => call_result_callback(callback, count, null()),
Err(e) => call_result_callback(callback, -1, CString::new(e.to_string().as_str()).unwrap().into_raw()),
Expand All @@ -164,16 +148,14 @@ pub extern "C" fn execute_update(
callback: extern "C" fn(i32, *const c_char),
runtime: NonNull<CResult<TokioRuntime>>,
client: NonNull<CResult<TokioPostgresClient>>,
prepared: NonNull<CResult<PreparedStatement>>,
update_type: i32,
joined_string: *const c_char,
) {
let runtime = unsafe { NonNull::new_unchecked(runtime.as_ref().ptr as *mut Runtime).as_ref() };
let client = unsafe { NonNull::new_unchecked(client.as_ref().ptr as *mut PooledClient).as_mut() };
let prepared = unsafe { NonNull::new_unchecked(prepared.as_ref().ptr as *mut PreparedStatementMap).as_mut() };

let result = runtime.block_on(async {
lakesoul_metadata::execute_update(client, prepared, update_type, string_from_ptr(joined_string)).await
lakesoul_metadata::execute_update(client, update_type, string_from_ptr(joined_string)).await
});
match result {
Ok(count) => call_result_callback(callback, count, null()),
Expand All @@ -186,16 +168,14 @@ pub extern "C" fn execute_query_scalar(
callback: extern "C" fn(*const c_char, *const c_char),
runtime: NonNull<CResult<TokioRuntime>>,
client: NonNull<CResult<TokioPostgresClient>>,
prepared: NonNull<CResult<PreparedStatement>>,
update_type: i32,
joined_string: *const c_char,
) {
let runtime = unsafe { NonNull::new_unchecked(runtime.as_ref().ptr as *mut Runtime).as_ref() };
let client = unsafe { NonNull::new_unchecked(client.as_ref().ptr as *mut PooledClient).as_mut() };
let prepared = unsafe { NonNull::new_unchecked(prepared.as_ref().ptr as *mut PreparedStatementMap).as_mut() };

let result = runtime.block_on(async {
lakesoul_metadata::execute_query_scalar(client, prepared, update_type, string_from_ptr(joined_string)).await
lakesoul_metadata::execute_query_scalar(client, update_type, string_from_ptr(joined_string)).await
});
let (result, err): (*mut c_char, *const c_char) = match result {
Ok(Some(result)) => (CString::new(result.as_str()).unwrap().into_raw(), null()),
Expand All @@ -216,16 +196,14 @@ pub extern "C" fn execute_query(
callback: extern "C" fn(i32, *const c_char),
runtime: NonNull<CResult<TokioRuntime>>,
client: NonNull<CResult<TokioPostgresClient>>,
prepared: NonNull<CResult<PreparedStatement>>,
query_type: i32,
joined_string: *const c_char,
) -> NonNull<CResult<BytesResult>> {
let runtime = unsafe { NonNull::new_unchecked(runtime.as_ref().ptr as *mut Runtime).as_ref() };
let client = unsafe { NonNull::new_unchecked(client.as_ref().ptr as *mut PooledClient).as_ref() };
let prepared = unsafe { NonNull::new_unchecked(prepared.as_ref().ptr as *mut PreparedStatementMap).as_mut() };

let result = runtime.block_on(async {
lakesoul_metadata::execute_query(client, prepared, query_type, string_from_ptr(joined_string)).await
lakesoul_metadata::execute_query(client, query_type, string_from_ptr(joined_string)).await
});
match result {
Ok(u8_vec) => {
Expand Down Expand Up @@ -339,17 +317,6 @@ pub extern "C" fn free_tokio_postgres_client(client: NonNull<CResult<TokioPostgr
from_nonnull(client).free::<PooledClient>();
}

#[no_mangle]
pub extern "C" fn create_prepared_statement() -> NonNull<CResult<PreparedStatement>> {
let prepared = PreparedStatementMap::new();
convert_to_nonnull(CResult::<PreparedStatement>::new(prepared))
}

#[no_mangle]
pub extern "C" fn free_prepared_statement(prepared: NonNull<CResult<PreparedStatement>>) {
from_nonnull(prepared).free::<PreparedStatementMap>();
}

#[no_mangle]
pub extern "C" fn create_lakesoul_metadata_client() -> NonNull<CResult<MetaDataClient>> {
let client = MetaDataClient::from_env();
Expand All @@ -376,18 +343,16 @@ fn c_char2str<'a>(ptr: *const c_char) -> &'a str {
pub extern "C" fn create_split_desc_array(
callback: ResultCallback,
client: NonNull<CResult<TokioPostgresClient>>,
prepared: NonNull<CResult<PreparedStatement>>,
runtime: NonNull<CResult<TokioRuntime>>,
table_name: *const c_char,
namespace: *const c_char,
) -> *mut c_char {
let runtime = unsafe { NonNull::new_unchecked(runtime.as_ref().ptr as *mut Runtime).as_ref() };
let client = unsafe { NonNull::new_unchecked(client.as_ref().ptr as *mut PooledClient).as_ref() };
let prepared = unsafe { NonNull::new_unchecked(prepared.as_ref().ptr as *mut PreparedStatementMap).as_mut() };
let table_name = c_char2str(table_name);
let namespace = c_char2str(namespace);
let result: Result<*mut c_char, LakeSoulMetaDataError> = runtime.block_on(async {
let ret = lakesoul_metadata::transfusion::split_desc_array(client, prepared, table_name, namespace).await?;
let ret = lakesoul_metadata::transfusion::split_desc_array(client, table_name, namespace).await?;
let v = serde_json::to_vec(&ret)?;
Ok(CString::new(v)
.map_err(|e| LakeSoulMetaDataError::Internal(e.to_string()))?
Expand Down
1 change: 1 addition & 0 deletions rust/lakesoul-metadata/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ thiserror = { workspace = true }
anyhow = { workspace = true }
regex = "1.10.3"
serde = { workspace = true }
async-trait = "0.1.82"


[dev-dependencies]
Expand Down
Loading

0 comments on commit aa19309

Please sign in to comment.