Skip to content

Commit

Permalink
update catalog test
Browse files Browse the repository at this point in the history
Signed-off-by: mag1c1an1 <[email protected]>
  • Loading branch information
mag1c1an1 committed Jan 23, 2024
1 parent a1caadc commit df252f0
Show file tree
Hide file tree
Showing 25 changed files with 846 additions and 898 deletions.
107 changes: 107 additions & 0 deletions rust/lakesoul-datafusion/src/catalog/lakesoul_catalog.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// SPDX-FileCopyrightText: 2024 LakeSoul Contributors
//
// SPDX-License-Identifier: Apache-2.0

use crate::catalog::LakeSoulNamespace;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::{CatalogProvider, MemoryCatalogProvider};
use datafusion::prelude::SessionContext;
use lakesoul_metadata::MetaDataClientRef;
use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use tokio::runtime::Handle;

/// A metadata wrapper
pub struct LakeSoulCatalog {
metadata_client: MetaDataClientRef,
context: Arc<SessionContext>,
}

impl Debug for LakeSoulCatalog {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LakeSoulCatalog{..}").finish()
}
}

impl LakeSoulCatalog {
pub fn new(meta_data_client_ref: MetaDataClientRef, context: Arc<SessionContext>) -> Self {
Self {
metadata_client: meta_data_client_ref,
context,
}
}
pub fn metadata_client(&self) -> MetaDataClientRef {
self.metadata_client.clone()
}
pub fn context(&self) -> Arc<SessionContext> {
self.context.clone()
}
}

impl CatalogProvider for LakeSoulCatalog {
fn as_any(&self) -> &dyn Any {
self
}

fn schema_names(&self) -> Vec<String> {
let client = self.metadata_client.clone();
futures::executor::block_on(async move {
Handle::current()
.spawn(async move { client.get_all_namespace().await.unwrap() })
.await
.unwrap()
.into_iter()
.map(|t| t.namespace)
.collect()
})
}

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
if self.schema_names().contains(&name.to_string()) {
Some(Arc::new(LakeSoulNamespace::new(
self.metadata_client.clone(),
self.context.clone(),
name,
)))
} else {
None
}
}

/// Adds a new schema to this catalog.
///
/// If a schema of the same name existed before, it is replaced in
/// the catalog and returned.
///
/// By default returns a "Not Implemented" error
fn register_schema(
&self,
name: &str,
schema: Arc<dyn SchemaProvider>,
) -> lakesoul_io::lakesoul_io_config::Result<Option<Arc<dyn SchemaProvider>>> {
// the type info of dyn schema is not enough, nothing to use
let _ = name;
let _ = schema;
unimplemented!("Registering new schemas is not supported")
}

/// Removes a schema from this catalog. Implementations of this method should return
/// errors if the schema exists but cannot be dropped. For example, in DataFusion's
/// default in-memory catalog, [`MemoryCatalogProvider`], a non-empty schema
/// will only be successfully dropped when `cascade` is true.
/// This is equivalent to how DROP SCHEMA works in PostgreSQL.
///
/// Implementations of this method should return None if schema with `name`
/// does not exist.
///
/// By default returns a "Not Implemented" error
fn deregister_schema(
&self,
_name: &str,
_cascade: bool,
) -> lakesoul_io::lakesoul_io_config::Result<Option<Arc<dyn SchemaProvider>>> {
// the type info of dyn schema is not enough, nothing to use
unimplemented!("Deregistering new schemas is not supported")
}
}
148 changes: 148 additions & 0 deletions rust/lakesoul-datafusion/src/catalog/lakesoul_namespace.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// SPDX-FileCopyrightText: 2023 LakeSoul Contributors
//
// SPDX-License-Identifier: Apache-2.0

use crate::catalog::create_io_config_builder;
use async_trait::async_trait;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::TableProvider;
use datafusion::prelude::SessionContext;
use lakesoul_io::datasource::file_format::LakeSoulParquetFormat;
use lakesoul_io::datasource::listing::LakeSoulListingTable;
use lakesoul_metadata::MetaDataClientRef;
use proto::proto::entity::Namespace;
use std::any::Any;
use std::collections::HashSet;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use tokio::runtime::Handle;
use tracing::debug;
use tracing::field::debug;

/// A ['SchemaProvider`] that query pg to automatically discover tables
pub struct LakeSoulNamespace {
metadata_client: MetaDataClientRef,
context: Arc<SessionContext>,
// primary key
namespace: String,
}

impl LakeSoulNamespace {
pub fn new(meta_data_client_ref: MetaDataClientRef, context: Arc<SessionContext>, namespace: &str) -> Self {
Self {
metadata_client: meta_data_client_ref,
context,
namespace: namespace.to_string(),
}
}

pub fn metadata_client(&self) -> MetaDataClientRef {
self.metadata_client.clone()
}

pub fn context(&self) -> Arc<SessionContext> {
self.context.clone()
}

pub fn namespace(&self) -> &str {
&self.namespace
}
}

impl Debug for LakeSoulNamespace {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LakeSoulNamespace{...}").finish()
}
}

#[async_trait]
impl SchemaProvider for LakeSoulNamespace {
fn as_any(&self) -> &dyn Any {
self
}

/// query table_name_id by namespace
/// ref: https://www.modb.pro/db/618126
fn table_names(&self) -> Vec<String> {
let client = self.metadata_client.clone();
let np = self.namespace.clone();
futures::executor::block_on(async move {
Handle::current()
.spawn(async move { client.get_all_table_name_id_by_namespace(&np).await.unwrap() })
.await
.unwrap()
.into_iter()
.map(|t| t.table_name)
.collect()
})
}

/// Search table by name
/// return LakeSoulListing table
async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
if let Ok(_t) = self
.metadata_client
.get_table_info_by_table_name(name, &self.namespace)
.await
{
let config;
if let Ok(config_builder) =
create_io_config_builder(self.metadata_client.clone(), Some(name), true, self.namespace()).await
{
config = config_builder.build();
} else {
return None;
}
// Maybe should change
let file_format = Arc::new(LakeSoulParquetFormat::new(
Arc::new(ParquetFormat::new()),
config.clone(),
));
if let Ok(table_provider) = LakeSoulListingTable::new_with_config_and_format(
&self.context.state(),
config,
file_format,
// care this
false,
)
.await
{
debug!("get table provider success");
return Some(Arc::new(table_provider));
}
debug("get table provider fail");
return None;
} else {
debug("get table provider fail");
None
}
}

/// If supported by the implementation, adds a new table to this schema.
/// If a table of the same name existed before, it returns "Table already exists" error.
#[allow(unused_variables)]
fn register_table(
&self,
name: String,
table: Arc<dyn TableProvider>,
) -> lakesoul_io::lakesoul_io_config::Result<Option<Arc<dyn TableProvider>>> {
// the type info of dyn TableProvider is not enough or use AST??????
unimplemented!("schema provider does not support registering tables")
}
/// If supported by the implementation, removes an existing table from this schema and returns it.
/// If no table of that name exists, returns Ok(None).
#[allow(unused_variables)]
fn deregister_table(&self, name: &str) -> lakesoul_io::lakesoul_io_config::Result<Option<Arc<dyn TableProvider>>> {
// the type info of dyn TableProvider is not enough or use AST??????
unimplemented!("schema provider does not support deregistering tables")
}

fn table_exist(&self, name: &str) -> bool {
// table name is primary key for `table_name_id`
self.table_names().into_iter().find(|s| s == name).is_some()
}
}

#[cfg(test)]
mod test {}
Loading

0 comments on commit df252f0

Please sign in to comment.