Skip to content

Commit

Permalink
add lakesoul_namespace
Browse files Browse the repository at this point in the history
Signed-off-by: mag1c1an1 <[email protected]>
  • Loading branch information
mag1c1an1 committed Jan 23, 2024
1 parent b8cd75b commit 0e9cb73
Show file tree
Hide file tree
Showing 7 changed files with 285 additions and 128 deletions.
42 changes: 36 additions & 6 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions rust/lakesoul-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ chrono = { version = "0.4", features = ["unstable-locales"] }
json = "0.12"
serde = { workspace = true }
serde_json = { workspace = true }
log = { workspace = true }
tokio = { workspace = true }
rand = { workspace = true }
bytes = { workspace = true }
tracing = "0.1.40"

[dev-dependencies]
ctor = "0.2"
test-log = "0.2.14"
test-log = { version = "0.2.14", features = ["trace"] }
rand = "0.8.5"
rand_chacha = "0.3.1"
138 changes: 113 additions & 25 deletions rust/lakesoul-datafusion/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,25 @@ use std::sync::{Arc};
use std::time::SystemTime;
use std::{env};
use std::any::Any;
use std::fmt::{Debug, Formatter, Pointer};


use async_trait::async_trait;
use datafusion::catalog::{CatalogProvider};
use datafusion::catalog::schema::SchemaProvider;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::TableProvider;
use datafusion::prelude::SessionContext;
use futures::future::try_maybe_done;
use tokio::runtime::Runtime;
use tracing::debug;
use tracing::field::debug;
use lakesoul_io::datasource::file_format::LakeSoulParquetFormat;
use lakesoul_io::datasource::listing::LakeSoulListingTable;

use lakesoul_io::lakesoul_io_config::{LakeSoulIOConfig, LakeSoulIOConfigBuilder};
use lakesoul_metadata::{MetaDataClientRef};
use proto::proto::entity::{CommitOp, DataCommitInfo, DataFileOp, FileOp, TableInfo, Uuid};
use proto::proto::entity::{CommitOp, DataCommitInfo, DataFileOp, FileOp, Namespace, TableInfo, Uuid};

use crate::lakesoul_table::helpers::create_io_config_builder_from_table_info;
use crate::serialize::arrow_java::{ArrowJavaSchema};
Expand Down Expand Up @@ -148,55 +157,134 @@ struct LakeSoulCatalog {
}


impl LakeSoulCatalog {}
impl LakeSoulCatalog {
pub fn new() -> Self {
Self {}
}
}

impl CatalogProvider for LakeSoulCatalog {
fn as_any(&self) -> &dyn Any {
self
}

fn schema_names(&self) -> Vec<String> {
todo!()
}

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
todo!()
}
}


/// A ['SchemaProvider`] that query pg to automatically discover tables
#[derive(Debug)]
struct LakeSoulNamespace {
pub struct LakeSoulNamespace {
metadata_client: MetaDataClientRef,
runtime: Arc<Runtime>,
context: Arc<SessionContext>,
// primary key
namespace: String,
}


impl LakeSoulNamespace {
pub fn new(meta_data_client_ref: MetaDataClientRef) -> Self {
pub fn new(
meta_data_client_ref: MetaDataClientRef,
runtime: Arc<Runtime>,
context: Arc<SessionContext>,
namespace: &str) -> Self {
Self {
metadata_client: meta_data_client_ref
metadata_client: meta_data_client_ref,
runtime,
context,
namespace: namespace.to_string(),
}
}


pub fn metadata_client(&self) -> MetaDataClientRef {
self.metadata_client.clone()
}
pub fn runtime(&self) -> Arc<Runtime> {
self.runtime.clone()
}
pub fn context(&self) -> Arc<SessionContext> {
self.context.clone()
}
pub fn namespace(&self) -> &str {
&self.namespace
}
}

impl Debug for LakeSoulNamespace {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LakeSoulNamespace{...}").finish()
}
}


#[async_trait]
impl SchemaProvider for LakeSoulNamespace {
fn as_any(&self) -> &dyn Any {
self
}

/// query table_name_id by namespace
/// ref: https://www.modb.pro/db/618126
fn table_names(&self) -> Vec<String> {
// self.metadata_client.
todo!()
let client = self.metadata_client.clone();
let np = self.namespace.clone();
futures::executor::block_on(
async move {
self.runtime.spawn(async move {
client.get_all_table_name_id_by_namespace(&np).await.unwrap()
}).await.unwrap().into_iter().map(|t| t.table_name).collect()
}
)
}

/// Search table by name
/// return LakesoulListing table
async fn table(&self, _name: &str) -> Option<Arc<dyn TableProvider>> {
todo!()
}

fn table_exist(&self, _name: &str) -> bool {
todo!()
async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
if let Ok(t) = self.metadata_client.get_table_info_by_table_name(name, &self.namespace).await
{
let config = create_io_config_builder_from_table_info(Arc::new(t)).build();
let file_format = Arc::new(LakeSoulParquetFormat::new(
Arc::new(ParquetFormat::new()),
config.clone(),
));
if let Ok(table_provider) = LakeSoulListingTable::new_with_config_and_format(
&self.context.state(),
config,
file_format,
false,
).await {
debug!("create table provider success");
return Some(Arc::new(table_provider));
}
debug("create table provider fail");
return None;
} else {
debug("create table provider fail");
None
}
}
}


#[cfg(test)]
mod tests {
use datafusion::prelude::SessionContext;

#[test]
fn simple() {
let ctx = SessionContext::new();
let vec = ctx.catalog_names();
println!("{vec:?}");
fn table_exist(&self, name: &str) -> bool {
let client = self.metadata_client.clone();
let name = name.to_string();
let np = self.namespace.clone();
futures::executor::block_on(
async move {
self.runtime.spawn(async move {
if let Ok(t) = client.get_table_name_id_by_table_name(&name, &np).await {
&t.table_name == &name
} else {
false
}
}).await.unwrap()
}
)
}
}
Loading

0 comments on commit 0e9cb73

Please sign in to comment.