Skip to content

Commit

Permalink
replace inputs_map with split_map
Browse files Browse the repository at this point in the history
Signed-off-by: mag1c1an1 <[email protected]>
  • Loading branch information
mag1c1an1 committed Apr 12, 2024
1 parent fd8d12b commit d5fd86b
Show file tree
Hide file tree
Showing 13 changed files with 122 additions and 88 deletions.
1 change: 1 addition & 0 deletions rust/lakesoul-datafusion/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::error::{LakeSoulError, Result};
// pub mod lakesoul_sink;
// pub mod lakesoul_source;
mod lakesoul_catalog;
#[allow(unused)]
pub use lakesoul_catalog::*;
mod lakesoul_namespace;
pub use lakesoul_namespace::*;
Expand Down
151 changes: 85 additions & 66 deletions rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::collections::{HashMap, HashSet};
use std::fmt::{self, Debug};
use std::sync::Arc;

use anyhow::anyhow;
use arrow::array::{ArrayRef, StringArray, UInt64Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaBuilder, SchemaRef};
use arrow::record_batch::RecordBatch;
Expand Down Expand Up @@ -39,13 +38,15 @@ use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tracing::debug;

use lakesoul_io::datasource::file_format::{compute_project_column_indices, flatten_file_scan_config};
use lakesoul_io::datasource::file_format::{compute_project_column_indices/*, flatten_file_scan_config*/};
use lakesoul_io::datasource::physical_plan::MergeParquetExec;
use lakesoul_io::helpers::{columnar_values_to_partition_desc, columnar_values_to_sub_path, get_columnar_values, partition_desc_from_file_scan_config};
use lakesoul_io::helpers::{columnar_values_to_partition_desc, columnar_values_to_sub_path, get_columnar_values};
// use lakesoul_io::datasource::physical_plan::MergeParquetExec;
// use lakesoul_io::helpers::partition_desc_from_file_scan_config;
use lakesoul_io::lakesoul_io_config::LakeSoulIOConfig;
use lakesoul_io::lakesoul_writer::{AsyncBatchWriter, MultiPartAsyncWriter};
use lakesoul_metadata::MetaDataClientRef;
use lakesoul_metadata::transfusion::{split_desc_array, split_desc_array_with_client};
use lakesoul_metadata::transfusion::SplitDesc;
use proto::proto::entity::TableInfo;

use crate::catalog::{commit_data, parse_table_info_partitions};
Expand Down Expand Up @@ -140,60 +141,13 @@ impl FileFormat for LakeSoulMetaDataParquetFormat {
let merged_projection = compute_project_column_indices(table_schema.clone(), target_schema.clone(), self.conf.primary_keys_slice());
let merged_schema = project_schema(&table_schema, merged_projection.as_ref())?;

// files to read
let flatten_conf =
flatten_file_scan_config(state, self.parquet_format.clone(), conf, self.conf.primary_keys_slice(), target_schema.clone()).await?;
// (partition desc, (partition_k_v,parquet_execs))
let split_array = split_desc_array_with_client(
Arc::clone(&self.client),
&self.table_info.table_name,
&self.table_info.table_namespace)
.await
.map_err(|_| DataFusionError::External(anyhow!("wrong").into()))?
.to_vec();

// println!("{:#?}", split_array);

let mut inputs_map: HashMap<String, (Arc<HashMap<String, String>>, Vec<Arc<dyn ExecutionPlan>>)> = HashMap::new();
let mut column_nullable = HashSet::<String>::new();

// let mut inputs_map_fork = HashMap::new();
// for desc in &split_array {
// println!("{:#?}", &desc.partition_desc);
// let p_desc = desc.partition_desc.iter().map(|k, v| format!("{}={}", k, v)).collect::<Vec<_>>().join(",");
// let p_c_v = Arc::new(desc.partition_desc.clone());
// let parquet_exec = Arc::new(ParquetExec::new(config.clone(), predicate.clone(), self.parquet_format.metadata_size_hint(state.config_options())));
// for field in parquet_exec.schema().fields().iter() {
// if field.is_nullable() {
// column_nullable.insert(field.name().clone());
// }
// }
// }


for config in &flatten_conf {
let (partition_desc, partition_columnar_value) = partition_desc_from_file_scan_config(&config)?;
// println!("{}", partition_desc);
// println!("{:#?}", partition_columnar_value);
let split_map = create_split_map(&conf, self.parquet_format.clone(), state, self.conf.primary_keys_slice(), target_schema.clone(), predicate.clone()).await?;
let column_nullable = split_map.values().fold(HashSet::new(), |mut acc, v| {
acc.extend(v.1.clone());
acc
});

let partition_columnar_value = Arc::new(partition_columnar_value);

let parquet_exec = Arc::new(ParquetExec::new(config.clone(), predicate.clone(), self.parquet_format.metadata_size_hint(state.config_options())));
for field in parquet_exec.schema().fields().iter() {
if field.is_nullable() {
column_nullable.insert(field.name().clone());
}
}
if let Some((_, inputs)) = inputs_map.get_mut(&partition_desc)
{
inputs.push(parquet_exec);
} else {
inputs_map.insert(
partition_desc.clone(),
(partition_columnar_value.clone(), vec![parquet_exec]),
);
}
}
debug!(?split_map);

let merged_schema = SchemaRef::new(
Schema::new(
Expand All @@ -210,17 +164,19 @@ impl FileFormat for LakeSoulMetaDataParquetFormat {
.collect::<Vec<_>>()
)
);
println!("{:#?}", inputs_map);

let mut partitioned_exec = Vec::new();
for (_, (partition_columnar_values, inputs)) in inputs_map {

for (k, v) in split_map {
let merge_exec = Arc::new(MergeParquetExec::new_with_inputs(
merged_schema.clone(),
inputs,
v.0,
self.conf.clone(),
partition_columnar_values.clone(),
Arc::new(k.partition_desc),
)?) as Arc<dyn ExecutionPlan>;
partitioned_exec.push(merge_exec);
}

let exec = if partitioned_exec.len() > 1 {
Arc::new(UnionExec::new(partitioned_exec)) as Arc<dyn ExecutionPlan>
} else {
Expand Down Expand Up @@ -601,17 +557,80 @@ fn make_sink_schema() -> SchemaRef {
}


/// File group to splits then flatten
/// maintain the relational structure
/// split -> (Vec<ParquetExec>, Vec<NullableCols>)
async fn create_split_map(
conf: &FileScanConfig,
format: Arc<ParquetFormat>,
state: &SessionState,
pks: &[String],
target_schema: SchemaRef,
predicate: Option<Arc<dyn PhysicalExpr>>,
) -> Result<HashMap<SplitDesc, (Vec<Arc<dyn ExecutionPlan>>, HashSet<String>)>> {
let url = &conf.object_store_url;
let store = state.runtime_env().object_store(url)?;
let mut ret = HashMap::new();
for i in 0..conf.file_groups.len() {
let mut objects = vec![];
let file_group = &conf.file_groups[i];
let mut execs = vec![];
let mut partition_desc = HashMap::new();
let mut nullable_cols = HashSet::new();
for file in file_group {
let objs = vec![file.object_meta.clone()];
let file_groups = vec![vec![file.clone()]];
let file_schema = format.infer_schema(state, &store, &objs).await?;
let statistics = format.infer_stats(state, &store, file_schema.clone(), &file.object_meta).await?;
let projection = compute_project_column_indices(file_schema.clone(), target_schema.clone(), pks);
let limit = conf.limit;
let table_partition_cols = conf.table_partition_cols.clone();
for (idx, f) in table_partition_cols.iter().enumerate() {
partition_desc.insert(f.name().clone(), file.partition_values[idx].to_string());
}
let output_ordering = conf.output_ordering.clone();
let infinite_source = conf.infinite_source;
objects.extend(objs);
let config = FileScanConfig {
object_store_url: url.clone(),
file_schema,
file_groups,
statistics,
projection,
limit,
table_partition_cols,
output_ordering,
infinite_source,
};
let exec = Arc::new(ParquetExec::new(config, predicate.clone(), format.metadata_size_hint(state.config_options()))) as Arc<dyn ExecutionPlan>;
let set = exec.schema().fields.iter().filter(|f| f.is_nullable()).map(|f| f.to_string()).collect::<HashSet<String>>();
nullable_cols.extend(set);
execs.push(exec);
}
let locations: Vec<String> = objects.iter().map(|o| o.location.to_string()).collect();
let schema = format.infer_schema(state, &store, &objects).await?.to_string();
let sd = SplitDesc {
file_paths: locations,
primary_keys: pks.to_vec(),
partition_desc,
table_schema: schema,
};
ret.insert(sd, (execs, nullable_cols));
}
Ok(ret)
}


#[cfg(test)]
mod tests {
use std::sync::Arc;

use datafusion::catalog::CatalogProvider;
use datafusion::prelude::SessionContext;
use tokio::runtime::Runtime;

use lakesoul_io::lakesoul_io_config::{create_session_context, LakeSoulIOConfig, LakeSoulIOConfigBuilder};
use lakesoul_io::lakesoul_io_config::{create_session_context, LakeSoulIOConfigBuilder};
use lakesoul_metadata::MetaDataClient;

#[test_log::test]
// #[test]
fn simple_test() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
Expand All @@ -625,7 +644,7 @@ mod tests {
// let vec = sma.table_names();
// let arc = sma.table("KAtG7Fh6hB").await.unwrap();
// println!("{:?}",.);
let test_sql = "select * from lk.default.h6a49kmgwl;";
let test_sql = "select * from lk.default.fv22cscz03;";
let df = sc.sql(test_sql).await.unwrap();
df.show().await.unwrap()
});
Expand Down
1 change: 0 additions & 1 deletion rust/lakesoul-datafusion/src/datasource/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use futures::StreamExt;
use lakesoul_io::helpers::listing_table_from_lakesoul_io_config;
use lakesoul_io::lakesoul_io_config::LakeSoulIOConfig;
use lakesoul_metadata::MetaDataClientRef;
use lakesoul_metadata::transfusion::SplitDesc;
use proto::proto::entity::TableInfo;

use crate::catalog::parse_table_info_partitions;
Expand Down
1 change: 0 additions & 1 deletion rust/lakesoul-datafusion/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

use std::{result, sync::Arc};
use std::num::ParseIntError;
use std::string::ParseError;

use lakesoul_io::lakesoul_reader::{ArrowError, DataFusionError};
use lakesoul_metadata::error::LakeSoulMetaDataError;
Expand Down
5 changes: 3 additions & 2 deletions rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::Arc;
use arrow::{array::{Array, ArrayRef, AsArray, StringBuilder}, compute::prep_null_mask_filter, datatypes::{DataType, Field, Fields, Schema}, record_batch::RecordBatch};
use arrow_arith::boolean::and;
use arrow_cast::cast;
use datafusion::{common::{DFField, DFSchema}, error::DataFusionError, execution::context::ExecutionProps, logical_expr::Expr, physical_expr::create_physical_expr, scalar::ScalarValue};
use datafusion::{common::{DFField, DFSchema}, error::DataFusionError, execution::context::ExecutionProps, logical_expr::Expr, physical_expr::create_physical_expr};
use object_store::{ObjectMeta, ObjectStore, path::Path};
use serde_json::Value;
use tracing::{debug, trace};
Expand All @@ -18,7 +18,7 @@ use lakesoul_metadata::MetaDataClientRef;
use proto::proto::entity::{PartitionInfo, TableInfo};

use crate::{
catalog::{LakeSoulTableProperty, parse_table_info_partitions},
catalog::parse_table_info_partitions,
serialize::arrow_java::schema_from_metadata_str,
};
use crate::error::{LakeSoulError, Result};
Expand Down Expand Up @@ -171,3 +171,4 @@ pub async fn listing_partition_info(partition_info: PartitionInfo, store: &dyn O
files.push(result);
}
Ok((partition_info, files))
}
2 changes: 1 addition & 1 deletion rust/lakesoul-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ mod planner;
mod serialize;

#[cfg(test)]
mod test;
mod test;
2 changes: 1 addition & 1 deletion rust/lakesoul-datafusion/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ fn init() {
.unwrap()
.block_on(async {
let client = Arc::new(MetaDataClient::from_env().await.unwrap());
// client.meta_cleanup().await.unwrap();
client.meta_cleanup().await.unwrap();
debug!("clean metadata");
})
}
Expand Down
8 changes: 4 additions & 4 deletions rust/lakesoul-io-c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1232,15 +1232,15 @@ mod tests {

#[no_mangle]
pub extern "C" fn rust_logger_init() {
let sub = tracing_subscriber::fmt();
let timer = LocalTime::new(format_description!(
"[year]-[month]-[day] [hour]:[minute]:[second]"
));
sub.with_env_filter(EnvFilter::from_default_env())
let _ = tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.with_thread_ids(true)
.with_thread_names(true)
.with_line_number(true)
.with_timer(timer)
.init();
let _ = tracing_subscriber::fmt::try_init();
.try_init();

}
4 changes: 2 additions & 2 deletions rust/lakesoul-io/src/datasource/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl FileFormat for LakeSoulParquetFormat {

// files to read
let flatten_conf =
flatten_file_scan_config(state, self.parquet_format.clone(), conf, self.conf.primary_keys_slice(), target_schema.clone()).await?;
flatten_file_scan_config(state, self.parquet_format.clone(), &conf, self.conf.primary_keys_slice(), target_schema.clone()).await?;


let merge_exec = Arc::new(MergeParquetExec::new(
Expand Down Expand Up @@ -152,7 +152,7 @@ impl FileFormat for LakeSoulParquetFormat {
pub async fn flatten_file_scan_config(
state: &SessionState,
format: Arc<ParquetFormat>,
conf: FileScanConfig,
conf: &FileScanConfig,
primary_keys: &[String],
target_schema: SchemaRef,
) -> Result<Vec<FileScanConfig>> {
Expand Down
3 changes: 1 addition & 2 deletions rust/lakesoul-io/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,5 +231,4 @@ pub async fn infer_schema(sc: &SessionState, table_paths: &[ListingTableUrl], fi

// Resolve the schema
file_format.infer_schema(sc, &store, &objects).await
}

}
2 changes: 2 additions & 0 deletions rust/lakesoul-io/src/lakesoul_io_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ pub struct LakeSoulIOConfig {
pub(crate) use_dynamic_partition: bool,
}


impl LakeSoulIOConfig {
pub fn schema(&self) -> SchemaRef {
self.schema.0.clone()
Expand Down Expand Up @@ -521,6 +522,7 @@ pub fn create_session_context_with_planner(
Ok(SessionContext::new_with_state(state))
}


#[cfg(test)]
mod tests {
use crate::lakesoul_io_config::{create_session_context, LakeSoulIOConfigBuilder};
Expand Down
7 changes: 3 additions & 4 deletions rust/lakesoul-metadata-c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,15 +454,14 @@ pub unsafe extern "C" fn free_c_string(c_string: *mut c_char) {
/// TODO refactor this
#[no_mangle]
pub extern "C" fn rust_logger_init() {
let sub = tracing_subscriber::fmt();
let timer = LocalTime::new(format_description!(
"[year]-[month]-[day] [hour]:[minute]:[second]"
));
sub.with_env_filter(EnvFilter::from_default_env())
let _ = tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.with_thread_ids(true)
.with_thread_names(true)
.with_line_number(true)
.with_timer(timer)
.init();
let _ = tracing_subscriber::fmt::try_init();
.try_init();
}
Loading

0 comments on commit d5fd86b

Please sign in to comment.