From 14e72f20dd683781f79f5d8cd4919558ced03c3e Mon Sep 17 00:00:00 2001 From: zenghua Date: Fri, 1 Mar 2024 10:50:36 +0800 Subject: [PATCH] datafusion connector supports partition column Signed-off-by: zenghua --- rust/Cargo.lock | 2 + rust/Cargo.toml | 2 + rust/lakesoul-datafusion/Cargo.toml | 2 + rust/lakesoul-datafusion/src/catalog/mod.rs | 21 +- .../datasource/file_format/metadata_format.rs | 225 +++++++++--- .../src/datasource/table_provider.rs | 343 ++++++++++++++++-- rust/lakesoul-datafusion/src/error.rs | 4 + .../src/lakesoul_table/helpers.rs | 184 +++++++++- .../src/lakesoul_table/mod.rs | 18 +- .../src/planner/physical_planner.rs | 32 +- .../src/test/insert_tests.rs | 47 +-- rust/lakesoul-datafusion/src/test/mod.rs | 33 ++ .../src/test/upsert_tests.rs | 93 +++-- .../lakesoul-io/src/datasource/file_format.rs | 17 +- rust/lakesoul-io/src/datasource/listing.rs | 194 +--------- rust/lakesoul-io/src/datasource/mod.rs | 2 +- .../src/datasource/parquet_source.rs | 2 +- .../physical_plan/defatul_column.rs | 88 +++++ .../src/datasource/physical_plan/merge.rs | 30 +- .../src/datasource/physical_plan/mod.rs | 1 + rust/lakesoul-io/src/helpers.rs | 135 ++++++- rust/lakesoul-io/src/lakesoul_io_config.rs | 10 + rust/lakesoul-io/src/lakesoul_writer.rs | 6 + rust/lakesoul-io/src/lib.rs | 2 +- .../src/repartition/distributor_channels.rs | 3 + rust/lakesoul-io/src/repartition/mod.rs | 316 +++++++++++----- rust/lakesoul-metadata/src/metadata_client.rs | 59 +-- 27 files changed, 1382 insertions(+), 489 deletions(-) create mode 100644 rust/lakesoul-io/src/datasource/physical_plan/defatul_column.rs diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 848529aa2..15754e7ae 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1625,6 +1625,8 @@ version = "0.1.0" dependencies = [ "anyhow", "arrow", + "arrow-arith", + "arrow-cast", "async-trait", "bytes", "chrono", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index af42168c3..299242ca7 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -22,6 +22,8 @@ arrow = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-r arrow-schema = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-48-parquet-bufferred" } arrow-array = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-48-parquet-bufferred" } arrow-buffer = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-48-parquet-bufferred" } +arrow-cast = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-48-parquet-bufferred" } +arrow-arith = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-48-parquet-bufferred" } parquet = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-48-parquet-bufferred" } object_store = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "object_store_0.7_opt", features = ["aws", "http"] } diff --git a/rust/lakesoul-datafusion/Cargo.toml b/rust/lakesoul-datafusion/Cargo.toml index 541ad63d3..aedaf6c19 100644 --- a/rust/lakesoul-datafusion/Cargo.toml +++ b/rust/lakesoul-datafusion/Cargo.toml @@ -13,6 +13,8 @@ edition = "2021" datafusion = { workspace = true } object_store = { workspace = true } arrow = { workspace = true } +arrow-cast = { workspace = true } +arrow-arith = { workspace = true } parquet = { workspace = true } lakesoul-io = { path = "../lakesoul-io" } lakesoul-metadata = { path = "../lakesoul-metadata" } diff --git a/rust/lakesoul-datafusion/src/catalog/mod.rs b/rust/lakesoul-datafusion/src/catalog/mod.rs index 5cb7b633a..435cedce4 100644 --- a/rust/lakesoul-datafusion/src/catalog/mod.rs +++ b/rust/lakesoul-datafusion/src/catalog/mod.rs @@ -53,7 +53,12 @@ pub(crate) async fn create_table(client: MetaDataClientRef, table_name: &str, co })?, partitions: format!( "{};{}", - "", + config + .range_partitions_slice() + .iter() + .map(String::as_str) + .collect::>() + .join(","), config .primary_keys_slice() .iter() @@ -77,7 +82,7 @@ pub(crate) async fn create_io_config_builder( let table_info = client.get_table_info_by_table_name(table_name, namespace).await?; let data_files = if fetch_files { client - .get_data_files_by_table_name(table_name, vec![], namespace) + .get_data_files_by_table_name(table_name, namespace) .await? } else { vec![] @@ -114,7 +119,7 @@ pub(crate) fn parse_table_info_partitions(partitions: String) -> Result<(Vec, + partition_desc: String, files: &[String], ) -> Result<()> { let table_ref = TableReference::from(table_name); @@ -124,15 +129,7 @@ pub(crate) async fn commit_data( client .commit_data_commit_info(DataCommitInfo { table_id: table_name_id.table_id, - partition_desc: if partitions.is_empty() { - "-5".to_string() - } else { - partitions - .iter() - .map(|(k, v)| format!("{}={}", k, v)) - .collect::>() - .join(",") - }, + partition_desc, file_ops: files .iter() .map(|file| DataFileOp { diff --git a/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs b/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs index d428a92c0..5ea7665ab 100644 --- a/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs +++ b/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs @@ -1,5 +1,9 @@ +// SPDX-FileCopyrightText: 2023 LakeSoul Contributors +// +// SPDX-License-Identifier: Apache-2.0 + use std::any::Any; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt::{self, Debug}; use std::sync::Arc; @@ -7,14 +11,16 @@ use arrow::array::{ArrayRef, StringArray, UInt64Array}; use arrow::record_batch::RecordBatch; use async_trait::async_trait; -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::datatypes::{DataType, Field, Schema, SchemaBuilder, SchemaRef}; use datafusion::common::{FileType, Statistics}; +use datafusion::datasource::physical_plan::ParquetExec; use datafusion::error::DataFusionError; use datafusion::execution::TaskContext; use datafusion::physical_expr::PhysicalSortExpr; +use datafusion::physical_plan::projection::ProjectionExec; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::union::UnionExec; use datafusion::physical_plan::{DisplayAs, DisplayFormatType, Distribution, Partitioning, SendableRecordBatchStream}; -use datafusion::scalar::ScalarValue; use datafusion::sql::TableReference; use datafusion::{ datasource::{ @@ -27,8 +33,12 @@ use datafusion::{ physical_plan::{ExecutionPlan, PhysicalExpr}, }; use futures::StreamExt; +use lakesoul_io::datasource::file_format::flatten_file_scan_config; +use lakesoul_io::datasource::physical_plan::MergeParquetExec; +use lakesoul_io::helpers::partition_desc_from_file_scan_config; +use lakesoul_io::lakesoul_io_config::LakeSoulIOConfig; use lakesoul_io::lakesoul_writer::{AsyncBatchWriter, MultiPartAsyncWriter}; -use lakesoul_metadata::{MetaDataClient, MetaDataClientRef}; +use lakesoul_metadata::MetaDataClientRef; use object_store::{ObjectMeta, ObjectStore}; use proto::proto::entity::TableInfo; use rand::distributions::DistString; @@ -37,13 +47,14 @@ use tokio::sync::Mutex; use tokio::task::JoinHandle; use tracing::debug; -use crate::catalog::commit_data; -use crate::lakesoul_table::helpers::{create_io_config_builder_from_table_info, get_columnar_value}; +use crate::catalog::{commit_data, parse_table_info_partitions}; +use crate::lakesoul_table::helpers::{columnar_values_to_partition_desc, columnar_values_to_sub_path, create_io_config_builder_from_table_info, get_columnar_values}; pub struct LakeSoulMetaDataParquetFormat { parquet_format: Arc, client: MetaDataClientRef, table_info: Arc, + conf: LakeSoulIOConfig, } impl Debug for LakeSoulMetaDataParquetFormat { @@ -53,12 +64,12 @@ impl Debug for LakeSoulMetaDataParquetFormat { } impl LakeSoulMetaDataParquetFormat { - pub async fn new(parquet_format: Arc, table_info: Arc) -> crate::error::Result { - let client = Arc::new(MetaDataClient::from_env().await?); + pub async fn new(client: MetaDataClientRef, parquet_format: Arc, table_info: Arc, conf: LakeSoulIOConfig) -> crate::error::Result { Ok(Self { parquet_format, client, table_info, + conf, }) } @@ -104,7 +115,113 @@ impl FileFormat for LakeSoulMetaDataParquetFormat { conf: FileScanConfig, filters: Option<&Arc>, ) -> Result> { - self.parquet_format.create_physical_plan(state, conf, filters).await + // If enable pruning then combine the filters to build the predicate. + // If disable pruning then set the predicate to None, thus readers + // will not prune data based on the statistics. + let predicate = self + .parquet_format + .enable_pruning(state.config_options()) + .then(|| filters.cloned()) + .flatten(); + + let file_schema = conf.file_schema.clone(); + let mut builder = SchemaBuilder::from(file_schema.fields()); + for field in &conf.table_partition_cols { + builder.push(Field::new(field.name(), field.data_type().clone(), false)); + } + let projection = conf.projection.clone(); + + // files to read + let flatten_conf = + flatten_file_scan_config(state, self.parquet_format.clone(), conf, self.conf.primary_keys_slice()).await?; + + let merge_schema = Arc::new(builder.finish()); + + let mut inputs_map: HashMap>, Vec>) > = HashMap::new(); + let mut column_nullable = HashSet::::new(); + + for config in &flatten_conf { + let (partition_desc, partition_columnar_value) = partition_desc_from_file_scan_config(&config)?; + let partition_columnar_value = Arc::new(partition_columnar_value); + + let parquet_exec = Arc::new(ParquetExec::new(config.clone(), predicate.clone(), self.parquet_format.metadata_size_hint(state.config_options()))); + for field in parquet_exec.schema().fields().iter() { + if field.is_nullable() { + column_nullable.insert(field.name().clone()); + } + } + let single_exec = if partition_desc.eq("-5") { + parquet_exec + } else { + // let mut builder = SchemaBuilder::from(parquet_exec.schema().fields()); + // for field in &config.table_partition_cols { + // builder.push(field.clone()) + // } + + // Arc::new(DefaultColumnExec::new( + // parquet_exec, + // Arc::new(builder.finish()), + // partition_columnar_value.clone(), + // )?) as Arc + parquet_exec + }; + + if let Some((_, inputs)) = inputs_map.get_mut(&partition_desc) + { + inputs.push(single_exec); + } else { + inputs_map.insert( + partition_desc.clone(), + (partition_columnar_value.clone(), vec![single_exec]), + ); + } + } + + let target_schema = SchemaRef::new( + Schema::new( + merge_schema + .fields() + .iter() + .map(|field| { + Field::new( + field.name(), + field.data_type().clone(), + field.is_nullable() | column_nullable.contains(field.name()) + ) + }) + .collect::>() + ) + ); + + let mut partitioned_exec = Vec::new(); + for (_, (partition_columnar_values, inputs)) in inputs_map { + let merge_exec = Arc::new(MergeParquetExec::new_with_inputs( + target_schema.clone(), + inputs, + self.conf.clone(), + partition_columnar_values.clone(), + )?) as Arc; + partitioned_exec.push(merge_exec); + } + let exec = if partitioned_exec.len() > 1 { + Arc::new(UnionExec::new(partitioned_exec)) as Arc + } else { + partitioned_exec.first().unwrap().clone() + }; + + if let Some(projection) = projection { + let mut projection_expr = vec![]; + for idx in projection { + projection_expr.push(( + datafusion::physical_expr::expressions::col(merge_schema.field(idx).name(), &merge_schema)?, + merge_schema.field(idx).name().clone(), + )); + } + Ok(Arc::new(ProjectionExec::try_new(projection_expr, exec)?)) + } else { + Ok(exec) + } + } async fn create_writer_physical_plan( @@ -147,6 +264,8 @@ pub struct LakeSoulHashSinkExec { table_info: Arc, metadata_client: MetaDataClientRef, + + range_partitions: Arc>, } impl Debug for LakeSoulHashSinkExec { @@ -163,12 +282,15 @@ impl LakeSoulHashSinkExec { table_info: Arc, metadata_client: MetaDataClientRef, ) -> Result { + let (range_partitions, _) = parse_table_info_partitions(table_info.partitions.clone()).map_err(|_| DataFusionError::External("parse table_info.partitions failed".into()))?; + let range_partitions = Arc::new(range_partitions); Ok(Self { input, sink_schema: make_sink_schema(), sort_order, table_info, metadata_client, + range_partitions, }) } @@ -195,49 +317,66 @@ impl LakeSoulHashSinkExec { partition: usize, context: Arc, table_info: Arc, + range_partitions: Arc>, write_id: String, - partitioned_file_path_and_row_count: Arc, (Vec, u64)>>>, + partitioned_file_path_and_row_count: Arc, u64)>>>, ) -> Result { let mut data = input.execute(partition, context.clone())?; + let schema_projection_excluding_range = + data.schema() + .fields() + .iter() + .enumerate() + .filter_map(|(idx, field)| + match range_partitions.contains(field.name()) { + true => None, + false => Some(idx) + }) + .collect::>(); let mut row_count = 0; // let mut async_writer = MultiPartAsyncWriter::try_new(lakesoul_io_config).await?; - let mut partitioned_writer = HashMap::, Box>::new(); + let mut partitioned_writer = HashMap::>::new(); let mut partitioned_file_path_and_row_count_locked = partitioned_file_path_and_row_count.lock().await; while let Some(batch) = data.next().await.transpose()? { debug!("write record_batch with {} rows", batch.num_rows()); - let columnar_value = get_columnar_value(&batch); - let file_absolute_path = format!("{}/part-{}_{:0>4}.parquet", table_info.table_path, write_id, partition); - if !partitioned_writer.contains_key(&columnar_value) { + let columnar_values = get_columnar_values(&batch, range_partitions.clone())?; + let partition_desc = columnar_values_to_partition_desc(&columnar_values); + let batch_excluding_range = batch.project(&schema_projection_excluding_range)?; + let file_absolute_path = format!("{}{}part-{}_{:0>4}.parquet", table_info.table_path, columnar_values_to_sub_path(&columnar_values), write_id, partition); + + if !partitioned_writer.contains_key(&partition_desc) { let mut config = create_io_config_builder_from_table_info(table_info.clone()) .map_err(|e| DataFusionError::External(Box::new(e)))? .with_files(vec![file_absolute_path.clone()]) - .with_schema(batch.schema()) + .with_schema(batch_excluding_range.schema()) .build(); let writer = MultiPartAsyncWriter::try_new_with_context(&mut config, context.clone()).await?; - partitioned_writer.insert(columnar_value.clone(), Box::new(writer)); + partitioned_writer.insert(partition_desc.clone(), Box::new(writer)); } - if let Some(async_writer) = partitioned_writer.get_mut(&columnar_value) { - if let Some(file_path_and_row_count) = - partitioned_file_path_and_row_count_locked.get_mut(&columnar_value) - { - file_path_and_row_count.0.push(file_absolute_path); - file_path_and_row_count.1 += batch.num_rows() as u64; - } else { - partitioned_file_path_and_row_count_locked.insert( - columnar_value.clone(), - (vec![file_absolute_path], batch.num_rows() as u64), - ); - } - row_count += batch.num_rows(); - async_writer.write_record_batch(batch).await?; + if let Some(async_writer) = partitioned_writer.get_mut(&partition_desc) { + row_count += batch_excluding_range.num_rows(); + async_writer.write_record_batch(batch_excluding_range).await?; } } - let partitioned_writer = partitioned_writer.into_values().collect::>(); - for writer in partitioned_writer { + // TODO: apply rolling strategy + for (partition_desc, writer) in partitioned_writer.into_iter() { + let file_absolute_path = writer.absolute_path(); + let num_rows = writer.nun_rows(); + if let Some(file_path_and_row_count) = + partitioned_file_path_and_row_count_locked.get_mut(&partition_desc) + { + file_path_and_row_count.0.push(file_absolute_path); + file_path_and_row_count.1 += num_rows; + } else { + partitioned_file_path_and_row_count_locked.insert( + partition_desc.clone(), + (vec![file_absolute_path], num_rows), + ); + } writer.flush_and_close().await?; } @@ -248,7 +387,7 @@ impl LakeSoulHashSinkExec { join_handles: Vec>>, client: MetaDataClientRef, table_name: String, - partitioned_file_path_and_row_count: Arc, (Vec, u64)>>>, + partitioned_file_path_and_row_count: Arc, u64)>>>, ) -> Result { let count = futures::future::join_all(join_handles) @@ -261,12 +400,9 @@ impl LakeSoulHashSinkExec { })?; let partitioned_file_path_and_row_count = partitioned_file_path_and_row_count.lock().await; - for (columnar_value, (files, _)) in partitioned_file_path_and_row_count.iter() { - let partition_desc = columnar_value - .iter() - .map(|(column, value)| (column.to_string(), value.to_string())) - .collect::>(); - commit_data(client.clone(), &table_name, partition_desc, files) + for (partition_desc, (files, _)) in partitioned_file_path_and_row_count.iter() { + // let partition_desc = columnar_values_to_partition_desc(columnar_values); + commit_data(client.clone(), &table_name, partition_desc.clone(), files) .await .map_err(|e| DataFusionError::External(Box::new(e)))?; debug!( @@ -351,6 +487,7 @@ impl ExecutionPlan for LakeSoulHashSinkExec { sink_schema: self.sink_schema.clone(), sort_order: self.sort_order.clone(), table_info: self.table_info.clone(), + range_partitions: self.range_partitions.clone(), metadata_client: self.metadata_client.clone(), })) } @@ -370,16 +507,18 @@ impl ExecutionPlan for LakeSoulHashSinkExec { let write_id = rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 16); - let partitioned_file_path_and_row_count = Arc::new(Mutex::new(HashMap::< - Vec<(String, ScalarValue)>, - (Vec, u64), - >::new())); + let partitioned_file_path_and_row_count = + Arc::new( + Mutex::new( + HashMap::, u64)>::new() + )); for i in 0..num_input_partitions { let sink_task = tokio::spawn(Self::pull_and_sink( self.input().clone(), i, context.clone(), self.table_info(), + self.range_partitions.clone(), write_id.clone(), partitioned_file_path_and_row_count.clone(), )); diff --git a/rust/lakesoul-datafusion/src/datasource/table_provider.rs b/rust/lakesoul-datafusion/src/datasource/table_provider.rs index 8118db171..be01a4dc1 100644 --- a/rust/lakesoul-datafusion/src/datasource/table_provider.rs +++ b/rust/lakesoul-datafusion/src/datasource/table_provider.rs @@ -3,27 +3,42 @@ // SPDX-License-Identifier: Apache-2.0 use std::any::Any; +use std::ops::Deref; use std::sync::Arc; -use arrow::datatypes::SchemaRef; +use arrow::compute::SortOptions; +use arrow::datatypes::{DataType, Schema, SchemaRef}; use async_trait::async_trait; +use datafusion::common::{project_schema, FileTypeWriterOptions, Statistics, ToDFSchema}; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::file_format::FileFormat; +use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableUrl, PartitionedFile}; +use datafusion::datasource::physical_plan::{FileScanConfig, FileSinkConfig}; use datafusion::datasource::TableProvider; -use datafusion::error::Result; +use datafusion::error::{DataFusionError, Result}; +use datafusion::logical_expr::expr::Sort; use datafusion::logical_expr::{TableProviderFilterPushDown, TableType}; +use datafusion::optimizer::utils::conjunction; +use datafusion::physical_expr::{create_physical_expr, LexOrdering, PhysicalSortExpr}; +use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::ExecutionPlan; +use datafusion::scalar::ScalarValue; use datafusion::{execution::context::SessionState, logical_expr::Expr}; -use lakesoul_io::datasource::file_format::LakeSoulParquetFormat; -use lakesoul_io::datasource::listing::LakeSoulListingTable; +use futures::stream::FuturesUnordered; +use futures::StreamExt; + + +use lakesoul_io::helpers::listing_table_from_lakesoul_io_config; use lakesoul_io::lakesoul_io_config::LakeSoulIOConfig; +use lakesoul_metadata::MetaDataClientRef; use proto::proto::entity::TableInfo; use crate::catalog::parse_table_info_partitions; +use crate::lakesoul_table::helpers::{listing_partition_info, parse_partitions_for_partition_desc, prune_partitions}; use crate::serialize::arrow_java::schema_from_metadata_str; use super::file_format::LakeSoulMetaDataParquetFormat; @@ -42,47 +57,62 @@ use super::file_format::LakeSoulMetaDataParquetFormat; /// /// ``` pub struct LakeSoulTableProvider { - listing_table: Arc, + listing_table: Arc, + client: MetaDataClientRef, table_info: Arc, - schema: SchemaRef, + table_schema: SchemaRef, + file_schema: SchemaRef, primary_keys: Vec, + range_partitions: Vec, } impl LakeSoulTableProvider { pub async fn try_new( session_state: &SessionState, + client: MetaDataClientRef, lakesoul_io_config: LakeSoulIOConfig, table_info: Arc, as_sink: bool, ) -> crate::error::Result { - let schema = schema_from_metadata_str(&table_info.table_schema); - let (_, hash_partitions) = parse_table_info_partitions(table_info.partitions.clone())?; - - let file_format: Arc = match as_sink { - true => { - Arc::new(LakeSoulMetaDataParquetFormat::new(Arc::new(ParquetFormat::new()), table_info.clone()).await?) - } - false => Arc::new(LakeSoulParquetFormat::new( - Arc::new(ParquetFormat::new()), - lakesoul_io_config.clone(), - )), - }; + let table_schema = schema_from_metadata_str(&table_info.table_schema); + let (range_partitions, hash_partitions) = parse_table_info_partitions(table_info.partitions.clone())?; + let mut range_partition_projection = Vec::with_capacity(range_partitions.len()); + let mut file_schema_projection = Vec::with_capacity(table_schema.fields().len() - range_partitions.len()); + for (idx, field) in table_schema.fields().iter().enumerate() { + match range_partitions.contains(field.name()) { + false => file_schema_projection.push(idx), + true => range_partition_projection.push(idx) + }; + } + + let file_schema = Arc::new(table_schema.project(&file_schema_projection)?); + let table_schema = Arc::new(table_schema.project(&[file_schema_projection, range_partition_projection].concat())?); + + let file_format: Arc = + Arc::new(LakeSoulMetaDataParquetFormat::new( + client.clone(), + Arc::new(ParquetFormat::new()), + table_info.clone(), + lakesoul_io_config.clone() + ).await?); + + let listing_table = Arc::new(listing_table_from_lakesoul_io_config(session_state, lakesoul_io_config.clone(), file_format, as_sink).await?); + Ok(Self { - listing_table: Arc::new( - LakeSoulListingTable::new_with_config_and_format( - session_state, - lakesoul_io_config, - file_format, - as_sink, - ) - .await?, - ), + listing_table, + client, table_info, - schema, + table_schema, + file_schema, primary_keys: hash_partitions, + range_partitions, }) } + fn client(&self) -> MetaDataClientRef { + self.client.clone() + } + fn primary_keys(&self) -> &[String] { &self.primary_keys } @@ -90,6 +120,144 @@ impl LakeSoulTableProvider { fn table_info(&self) -> Arc { self.table_info.clone() } + + fn table_name(&self) -> &str { + &self.table_info.table_name + } + + fn table_namespace(&self) -> &str { + &self.table_info.table_namespace + } + + fn table_id(&self) -> &str { + &self.table_info.table_id + } + + + fn is_partition_filter(&self, f: &Expr) -> bool { + if let Ok(cols) = f.to_columns() { + cols + .iter() + .all(|col| self.range_partitions.contains(&col.name)) + } else { + false + } + } + + pub fn options(&self) -> &ListingOptions { + self.listing_table.options() + } + + pub fn table_paths(&self) -> &Vec { + self.listing_table.table_paths() + } + + pub fn file_schema(&self) -> SchemaRef { + self.file_schema.clone() + } + + pub fn table_partition_cols(&self) -> &[(String, DataType)]{ + &self.options().table_partition_cols + } + + /// If file_sort_order is specified, creates the appropriate physical expressions + pub fn try_create_output_ordering(&self) -> Result> { + let mut all_sort_orders = vec![]; + + for exprs in &self.options().file_sort_order { + // Construct PhsyicalSortExpr objects from Expr objects: + let sort_exprs = exprs + .iter() + .map(|expr| { + if let Expr::Sort(Sort { expr, asc, nulls_first }) = expr { + if let Expr::Column(col) = expr.as_ref() { + let expr = datafusion::physical_plan::expressions::col(&col.name, self.schema().as_ref())?; + Ok(PhysicalSortExpr { + expr, + options: SortOptions { + descending: !asc, + nulls_first: *nulls_first, + }, + }) + } else { + return Err(DataFusionError::Plan( + // Return an error if schema of the input query does not match with the table schema. + format!("Expected single column references in output_ordering, got {}", expr) + )); + } + } else { + return Err(DataFusionError::Plan( + format!("Expected Expr::Sort in output_ordering, but got {}", expr) + )); + } + }) + .collect::>>()?; + all_sort_orders.push(sort_exprs); + } + Ok(all_sort_orders) + } + + + + async fn list_files_for_scan<'a>( + &'a self, + ctx: &'a SessionState, + filters: &'a [Expr], + _limit: Option, + ) -> Result<(Vec>, Statistics)> { + let store = if let Some(url) = self.table_paths().first() { + ctx.runtime_env().object_store(url)? + } else { + return Ok((vec![], Statistics::new_unknown(&self.file_schema()))); + }; + + let all_partition_info = self.client + .get_all_partition_info(self.table_id()) + .await + .map_err(|_| DataFusionError::External(format!("get all partition_info of table {} failed", &self.table_info().table_name).into()))?; + + let prune_partition_info = + prune_partitions(all_partition_info, filters, self.table_partition_cols()) + .await + .map_err(|_| DataFusionError::External(format!("get all partition_info of table {} failed", &self.table_info().table_name).into()))?; + + let mut futures = FuturesUnordered::new(); + for partition in prune_partition_info { + futures.push(listing_partition_info(partition, store.as_ref(), self.client())) + } + + let mut file_groups = Vec::new(); + + while let Some((partition, object_metas)) = futures.next().await.transpose()? { + let cols = self.table_partition_cols().iter().map(|x| x.0.as_str()); + let parsed = parse_partitions_for_partition_desc(&partition.partition_desc, cols); + + let partition_values = parsed + .into_iter() + .flatten() + .zip(self.table_partition_cols()) + .map(|(parsed, (_, datatype))| { + ScalarValue::try_from_string(parsed.to_string(), datatype) + }) + .collect::>>()?; + + let files = object_metas + .into_iter() + .map(|object_meta| + PartitionedFile { + object_meta, + partition_values: partition_values.clone(), + range: None, + extensions: None, + } + ) + .collect::>(); + file_groups.push(files) + } + + Ok((file_groups, Statistics::new_unknown(self.schema().deref()))) + } + } #[async_trait] @@ -99,7 +267,7 @@ impl TableProvider for LakeSoulTableProvider { } fn schema(&self) -> SchemaRef { - self.schema.clone() + self.table_schema.clone() } fn table_type(&self) -> TableType { @@ -113,11 +281,80 @@ impl TableProvider for LakeSoulTableProvider { filters: &[Expr], limit: Option, ) -> Result> { - self.listing_table.scan(state, projection, filters, limit).await + let (partitioned_file_lists, _) = + self.list_files_for_scan(state, filters, limit).await?; + + // if no files need to be read, return an `EmptyExec` + if partitioned_file_lists.is_empty() { + let schema = self.schema(); + let projected_schema = project_schema(&schema, projection)?; + return Ok(Arc::new(EmptyExec::new(false, projected_schema))); + } + + // extract types of partition columns + let table_partition_cols = self + .listing_table + .options() + .table_partition_cols + .iter() + .map(|col| Ok(self.schema().field_with_name(&col.0)?.clone())) + .collect::>>()?; + + let filters = if let Some(expr) = conjunction(filters.to_vec()) { + // NOTE: Use the table schema (NOT file schema) here because `expr` may contain references to partition columns. + let table_df_schema = self.schema().as_ref().clone().to_dfschema()?; + let filters = create_physical_expr( + &expr, + &table_df_schema, + &self.schema(), + state.execution_props(), + )?; + Some(filters) + } else { + None + }; + + let object_store_url = if let Some(url) = self.listing_table.table_paths().get(0) { + url.object_store() + } else { + return Ok(Arc::new(EmptyExec::new(false, Arc::new(Schema::empty())))); + }; + + // create the execution plan + self.listing_table + .options() + .format + .create_physical_plan( + state, + FileScanConfig { + object_store_url, + file_schema: Arc::clone(&self.file_schema()), + file_groups: partitioned_file_lists, + statistics: Statistics::new_unknown(self.schema().deref()), + projection: projection.cloned(), + limit, + output_ordering: self.try_create_output_ordering()?, + table_partition_cols, + infinite_source: false, + }, + filters.as_ref(), + ) + .await + } fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result> { - self.listing_table.supports_filters_pushdown(filters) + filters + .iter() + .map(|f| { + if self.is_partition_filter(f) { + Ok(TableProviderFilterPushDown::Exact) + } else { + Ok(TableProviderFilterPushDown::Unsupported) + } + }) + .collect() + } async fn insert_into( @@ -126,6 +363,48 @@ impl TableProvider for LakeSoulTableProvider { input: Arc, overwrite: bool, ) -> Result> { - self.listing_table.insert_into(state, input, overwrite).await + + let table_path = &self.listing_table.table_paths()[0]; + // Get the object store for the table path. + let _store = state.runtime_env().object_store(table_path)?; + + let file_format = self.options().format.as_ref(); + + let file_type_writer_options = match &self.options().file_type_write_options { + Some(opt) => opt.clone(), + None => FileTypeWriterOptions::build_default(&file_format.file_type(), state.config_options())?, + }; + + // Sink related option, apart from format + let config = FileSinkConfig { + object_store_url: self.table_paths()[0].object_store(), + table_paths: self.table_paths().clone(), + file_groups: vec![], + output_schema: self.schema(), + table_partition_cols: self.options().table_partition_cols.clone(), + writer_mode: datafusion::datasource::file_format::write::FileWriterMode::PutMultipart, + // A plan can produce finite number of rows even if it has unbounded sources, like LIMIT + // queries. Thus, we can check if the plan is streaming to ensure file sink input is + // unbounded. When `unbounded_input` flag is `true` for sink, we occasionally call `yield_now` + // to consume data at the input. When `unbounded_input` flag is `false` (e.g. non-streaming data), + // all the data at the input is sink after execution finishes. See discussion for rationale: + // https://github.com/apache/arrow-datafusion/pull/7610#issuecomment-1728979918 + unbounded_input: false, + single_file_output: self.options().single_file, + overwrite, + file_type_writer_options, + }; + + let unsorted: Vec> = vec![]; + let order_requirements = if self.options().file_sort_order != unsorted { + todo!() + } else { + None + }; + + self.options() + .format + .create_writer_physical_plan(input, state, config, order_requirements) + .await } } diff --git a/rust/lakesoul-datafusion/src/error.rs b/rust/lakesoul-datafusion/src/error.rs index 934afd461..4e2f2b8d8 100644 --- a/rust/lakesoul-datafusion/src/error.rs +++ b/rust/lakesoul-datafusion/src/error.rs @@ -30,6 +30,10 @@ pub enum LakeSoulError { TokioJoinError(#[from] tokio::task::JoinError), #[error("sys time error: {0}")] SysTimeError(#[from] std::time::SystemTimeError), + // #[error("object store path error: {0}")] + // ObjectStorePathError(#[from] object_store::path::Error), + // #[error("object store error: {0}")] + // ObjectStoreError(#[from] object_store::path::Error), #[error( "Internal error: {0}.\nThis was likely caused by a bug in LakeSoul's \ code and we would welcome that you file an bug report in our issue tracker" diff --git a/rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs b/rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs index 191a2f5d5..c76a78773 100644 --- a/rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs +++ b/rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs @@ -4,13 +4,18 @@ use std::sync::Arc; -use arrow::record_batch::RecordBatch; +use arrow::{array::{Array, ArrayRef, AsArray, StringBuilder}, compute::prep_null_mask_filter, datatypes::{DataType, Field, Fields, Schema}, record_batch::RecordBatch}; +use arrow_cast::cast; +use arrow_arith::boolean::and; -use datafusion::scalar::ScalarValue; +use datafusion::{common::{DFField, DFSchema}, datasource::listing::ListingTableUrl, error::DataFusionError, execution::context::ExecutionProps, logical_expr::Expr, physical_expr::create_physical_expr, scalar::ScalarValue}; +use lakesoul_metadata::MetaDataClientRef; +use object_store::{ObjectMeta, ObjectStore}; +use tracing::{debug, trace}; use crate::error::Result; use lakesoul_io::lakesoul_io_config::LakeSoulIOConfigBuilder; -use proto::proto::entity::TableInfo; +use proto::proto::entity::{PartitionInfo, TableInfo}; use crate::{ catalog::{parse_table_info_partitions, LakeSoulTableProperty}, @@ -28,6 +33,175 @@ pub(crate) fn create_io_config_builder_from_table_info(table_info: Arc Vec<(String, ScalarValue)> { - vec![] + +pub fn get_columnar_values(batch: &RecordBatch, range_partitions: Arc>) -> datafusion::error::Result> { + range_partitions + .iter() + .map(|range_col| { + if let Some(array) = batch.column_by_name(&range_col) { + match ScalarValue::try_from_array(array, 0) { + Ok(scalar) => Ok((range_col.clone(), scalar)), + Err(e) => Err(e) + } + } else { + Err(datafusion::error::DataFusionError::External(format!("").into())) + } + }) + .collect::>>() +} + +pub fn columnar_values_to_sub_path(columnar_values: &Vec<(String, ScalarValue)>) -> String { + if columnar_values.is_empty() { + "/".to_string() + } else { + format!("/{}/", columnar_values + .iter() + .map(|(k, v)| format!("{}={}", k, v)) + .collect::>() + .join("/")) + } } + +pub fn columnar_values_to_partition_desc(columnar_values: &Vec<(String, ScalarValue)>) -> String { + if columnar_values.is_empty() { + "-5".to_string() + } else { + columnar_values + .iter() + .map(|(k, v)| format!("{}={}", k, v)) + .collect::>() + .join(",") + } +} + +pub async fn prune_partitions( + all_partition_info: Vec, + filters: &[Expr], + partition_cols: &[(String, DataType)], +) -> Result> { + if filters.is_empty() { + return Ok(all_partition_info); + } + + let mut builders: Vec<_> = (0..partition_cols.len()) + .map(|_| StringBuilder::with_capacity(all_partition_info.len(), all_partition_info.len() * 10)) + .collect(); + + for partition in &all_partition_info { + let cols = partition_cols.iter().map(|x| x.0.as_str()); + let parsed = parse_partitions_for_partition_desc(&partition.partition_desc, cols).unwrap_or_default(); + + let mut builders = builders.iter_mut(); + for (p, b) in parsed.iter().zip(&mut builders) { + b.append_value(p); + } + builders.for_each(|b| b.append_null()); + } + + let arrays = partition_cols + .iter() + .zip(builders) + .map(|((_, d), mut builder)| { + let array = builder.finish(); + cast(&array, d) + }) + .collect::>()?; + + let fields: Fields = partition_cols + .iter() + .map(|(n, d)| Field::new(n, d.clone(), true)) + .collect(); + let schema = Arc::new(Schema::new(fields)); + + let df_schema = DFSchema::new_with_metadata( + partition_cols + .iter() + .map(|(n, d)| DFField::new_unqualified(n, d.clone(), true)) + .collect(), + Default::default(), + )?; + + let batch = RecordBatch::try_new(schema.clone(), arrays)?; + + // TODO: Plumb this down + let props = ExecutionProps::new(); + + // Applies `filter` to `batch` returning `None` on error + let do_filter = |filter| -> Option { + let expr = create_physical_expr(filter, &df_schema, &schema, &props).ok()?; + expr.evaluate(&batch) + .ok()? + .into_array(all_partition_info.len()) + .ok() + }; + + //.Compute the conjunction of the filters, ignoring errors + let mask = filters + .iter() + .fold(None, |acc, filter| match (acc, do_filter(filter)) { + (Some(a), Some(b)) => Some(and(&a, b.as_boolean()).unwrap_or(a)), + (None, Some(r)) => Some(r.as_boolean().clone()), + (r, None) => r, + }); + + let mask = match mask { + Some(mask) => mask, + None => return Ok(all_partition_info), + }; + + // Don't retain partitions that evaluated to null + let prepared = match mask.null_count() { + 0 => mask, + _ => prep_null_mask_filter(&mask), + }; + + // Sanity check + assert_eq!(prepared.len(), all_partition_info.len()); + + let filtered = all_partition_info + .into_iter() + .zip(prepared.values()) + .filter_map(|(p, f)| f.then_some(p)) + .collect(); + + Ok(filtered) +} + +pub fn parse_partitions_for_partition_desc<'a, I>( + partition_desc: &'a str, + table_partition_cols: I, +) -> Option> +where + I: IntoIterator, +{ + let mut part_values = vec![]; + for (part, pn) in partition_desc.split(",").zip(table_partition_cols) { + match part.split_once('=') { + Some((name, val)) if name == pn => part_values.push(val), + _ => { + debug!( + "Ignoring file: partition_desc='{}', part='{}', partition_col='{}'", + partition_desc, + part, + pn, + ); + return None; + } + } + } + Some(part_values) + +} + + +pub async fn listing_partition_info(partition_info: PartitionInfo, store: &dyn ObjectStore, client: MetaDataClientRef) -> datafusion::error::Result<(PartitionInfo, Vec)> { + trace!("Listing partition {:?}", partition_info); + let paths = client + .get_data_files_of_single_partition(&partition_info).await.map_err(|_| DataFusionError::External("listing partition info failed".into()))?; + let mut files = Vec::new(); + for path in paths { + let result = store.head(ListingTableUrl::parse(path.clone())?.prefix()).await?; + files.push(result); + } + Ok((partition_info, files)) +} \ No newline at end of file diff --git a/rust/lakesoul-datafusion/src/lakesoul_table/mod.rs b/rust/lakesoul-datafusion/src/lakesoul_table/mod.rs index 66e68e486..277e19d2e 100644 --- a/rust/lakesoul-datafusion/src/lakesoul_table/mod.rs +++ b/rust/lakesoul-datafusion/src/lakesoul_table/mod.rs @@ -7,6 +7,7 @@ pub mod helpers; use std::{ops::Deref, sync::Arc}; use arrow::datatypes::{SchemaRef, Schema}; +use arrow_cast::pretty::pretty_format_batches; use datafusion::sql::TableReference; use datafusion::{ dataframe::DataFrame, @@ -17,6 +18,7 @@ use datafusion::{ use lakesoul_io::{lakesoul_io_config::create_session_context_with_planner, lakesoul_reader::RecordBatch}; use lakesoul_metadata::{MetaDataClient, MetaDataClientRef}; use proto::proto::entity::TableInfo; +use tracing::debug; use crate::{ catalog::{create_io_config_builder, parse_table_info_partitions, LakeSoulTableProperty}, @@ -33,6 +35,7 @@ pub struct LakeSoulTable { table_name: String, table_schema: SchemaRef, primary_keys: Vec, + range_partitions: Vec, properties: LakeSoulTableProperty, } @@ -62,7 +65,7 @@ impl LakeSoulTable { let table_name = table_info.table_name.clone(); let properties = serde_json::from_str::(&table_info.properties)?; - let (_, hash_partitions) = parse_table_info_partitions(table_info.partitions.clone())?; + let (range_partitions, hash_partitions) = parse_table_info_partitions(table_info.partitions.clone())?; Ok(Self { client, @@ -70,6 +73,7 @@ impl LakeSoulTable { table_name, table_schema, primary_keys: hash_partitions, + range_partitions, properties, }) } @@ -114,20 +118,20 @@ impl LakeSoulTable { .build()?; let dataframe = DataFrame::new(sess_ctx.state(), logical_plan); - let _results = dataframe + let results = dataframe // .explain(true, false)? .collect() .await?; + debug!("{}", pretty_format_batches(&results)?); Ok(()) - // Ok(print_batches(&results)?) } pub async fn to_dataframe(&self, context: &SessionContext) -> Result { let config_builder = create_io_config_builder(self.client(), Some(self.table_name()), true, self.table_namespace()).await?; let provider = Arc::new( - LakeSoulTableProvider::try_new(&context.state(), config_builder.build(), self.table_info(), false).await?, + LakeSoulTableProvider::try_new(&context.state(), self.client(), config_builder.build(), self.table_info(), false).await?, ); Ok(context.read_table(provider)?) } @@ -138,7 +142,7 @@ impl LakeSoulTable { .await? .with_prefix(self.table_info.table_path.clone()); Ok(Arc::new( - LakeSoulTableProvider::try_new(session_state, config_builder.build(), self.table_info(), true).await?, + LakeSoulTableProvider::try_new(session_state, self.client(), config_builder.build(), self.table_info(), true).await?, )) } @@ -158,6 +162,10 @@ impl LakeSoulTable { &self.primary_keys } + pub fn range_partitions(&self) -> &Vec { + &self.range_partitions + } + pub fn hash_bucket_num(&self) -> usize { self.properties.hash_bucket_num.unwrap_or(1) } diff --git a/rust/lakesoul-datafusion/src/planner/physical_planner.rs b/rust/lakesoul-datafusion/src/planner/physical_planner.rs index 32f180d85..a5a830574 100644 --- a/rust/lakesoul-datafusion/src/planner/physical_planner.rs +++ b/rust/lakesoul-datafusion/src/planner/physical_planner.rs @@ -13,13 +13,13 @@ use datafusion::logical_expr::{Expr, LogicalPlan}; use datafusion::physical_expr::PhysicalExpr; use datafusion::physical_plan::sorts::sort::SortExec; -use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_plan::{ExecutionPlan, Partitioning}; use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; use async_trait::async_trait; use datafusion::logical_expr::{DmlStatement, WriteOp}; -use lakesoul_io::helpers::{create_hash_partitioning, create_sort_exprs}; +use lakesoul_io::helpers::{column_names_to_physical_sort_expr, column_names_to_physical_expr}; use lakesoul_io::repartition::RepartitionByRangeAndHashExec; use crate::lakesoul_table::LakeSoulTable; @@ -62,7 +62,7 @@ impl PhysicalPlanner for LakeSoulPhysicalPlanner { Ok(provider) => { let physical_input = self.create_physical_plan(input, session_state).await?; - let physical_input = if lakesoul_table.primary_keys().is_empty() { + if lakesoul_table.primary_keys().is_empty() { if !lakesoul_table .schema() .logically_equivalent_names_and_types(&Schema::from(input.schema().as_ref())) @@ -72,25 +72,37 @@ impl PhysicalPlanner for LakeSoulPhysicalPlanner { "Inserting query must have the same schema with the table.".to_string(), )); } - physical_input - } else { + } + let physical_input = if !lakesoul_table.primary_keys().is_empty() || !lakesoul_table.range_partitions().is_empty() { let input_schema = physical_input.schema(); let input_dfschema = input.as_ref().schema(); - let sort_expr = create_sort_exprs( - lakesoul_table.primary_keys(), + let sort_expr = column_names_to_physical_sort_expr( + [ + lakesoul_table.range_partitions().clone(), + lakesoul_table.primary_keys().clone(), + ].concat().as_slice(), input_dfschema, &input_schema, session_state, )?; - let hash_partitioning = create_hash_partitioning( + let hash_partitioning_expr = column_names_to_physical_expr( lakesoul_table.primary_keys(), - lakesoul_table.hash_bucket_num(), + input_dfschema, + &input_schema, + session_state, + )?; + + let hash_partitioning = Partitioning::Hash(hash_partitioning_expr, lakesoul_table.hash_bucket_num()); + let range_partitioning_expr = column_names_to_physical_expr( + lakesoul_table.range_partitions(), input_dfschema, &input_schema, session_state, )?; let sort_exec = Arc::new(SortExec::new(sort_expr, physical_input)); - Arc::new(RepartitionByRangeAndHashExec::try_new(sort_exec, hash_partitioning)?) + Arc::new(RepartitionByRangeAndHashExec::try_new(sort_exec, range_partitioning_expr, hash_partitioning)?) + } else { + physical_input }; provider.insert_into(session_state, physical_input, false).await diff --git a/rust/lakesoul-datafusion/src/test/insert_tests.rs b/rust/lakesoul-datafusion/src/test/insert_tests.rs index 00cb3b550..8ecfa1a1f 100644 --- a/rust/lakesoul-datafusion/src/test/insert_tests.rs +++ b/rust/lakesoul-datafusion/src/test/insert_tests.rs @@ -12,12 +12,13 @@ mod insert_tests { datatypes::{DataType, Field, Schema, SchemaRef}, record_batch::RecordBatch, }; - use datafusion::assert_batches_eq; - use lakesoul_io::filter::parser::Parser; + use datafusion::logical_expr::Expr; + use datafusion::prelude::col; use lakesoul_io::lakesoul_io_config::{create_session_context, LakeSoulIOConfigBuilder}; use lakesoul_metadata::{MetaDataClient, MetaDataClientRef}; use crate::lakesoul_table::LakeSoulTable; + use crate::test::assert_batches_eq; use crate::{ catalog::{create_io_config_builder, create_table}, error::Result, @@ -38,7 +39,8 @@ mod insert_tests { // todo: partitioned table is replaced by primary key table currently let builder = LakeSoulIOConfigBuilder::new() .with_schema(schema.clone()) - .with_primary_keys(partition_key.into_iter().map(String::from).collect()); + // .with_primary_keys(partition_key.into_iter().map(String::from).collect()); + .with_range_partitions(partition_key.into_iter().map(String::from).collect()); create_table(client, table_name, builder.build()).await } @@ -51,7 +53,7 @@ mod insert_tests { client: MetaDataClientRef, table_name: &str, selected_cols: Vec<&str>, - filters: Option, + filters: Option, expected: &[&str], ) -> Result<()> { let lakesoul_table = LakeSoulTable::for_name(table_name).await?; @@ -60,10 +62,9 @@ mod insert_tests { let sess_ctx = create_session_context(&mut builder.clone().build())?; let dataframe = lakesoul_table.to_dataframe(&sess_ctx).await?; - let schema = SchemaRef::new(dataframe.schema().into()); let dataframe = if let Some(f) = filters { - dataframe.filter(Parser::parse(f.clone(), schema)?)? + dataframe.filter(f)? } else { dataframe }; @@ -74,14 +75,16 @@ mod insert_tests { dataframe.select_columns(&selected_cols)? }; - let result = dataframe.collect().await?; + // print_batches(&dataframe.clone().explain(true, false)?.collect().await?); - assert_batches_eq!(expected, &result); + let results = dataframe.collect().await?; + + assert_batches_eq(table_name, expected, &results); Ok(()) } fn create_batch_i32(names: Vec<&str>, values: Vec<&[i32]>) -> RecordBatch { - let values = values + let values: Vec> = values .into_iter() .map(|vec| Arc::new(Int32Array::from(Vec::from(vec))) as ArrayRef) .collect::>(); @@ -175,7 +178,7 @@ mod insert_tests { client.clone(), table_name, vec!["data", "id"], - Some("and(noteq(id, null), lteq(id, 2))".to_string()), + Some(col("id").lt_eq(Expr::Literal(datafusion::scalar::ScalarValue::Int32(Some(2))))), &[ "+------+----+", "| data | id |", @@ -198,7 +201,7 @@ mod insert_tests { client.clone(), table_name, vec!["data", "id"], - Some("and(noteq(id, null), lteq(id, 2))".to_string()), + Some(col("id").lt_eq(Expr::Literal(datafusion::scalar::ScalarValue::Int32(Some(2))))), &[ "+------+----+", "| data | id |", @@ -359,7 +362,7 @@ mod insert_tests { ), ( "Float64", - Arc::new(Float64Array::from(vec![1.0, -1.0])) as ArrayRef, + Arc::new(Float64Array::from(vec![3000.6, 300.6])) as ArrayRef, true, ), ("Int8", Arc::new(Int8Array::from(vec![1i8, -2i8])) as ArrayRef, true), @@ -438,22 +441,22 @@ mod insert_tests { ), ( "Time32Millisecond", - Arc::new(Time32MillisecondArray::from(vec![1i32, -2i32])) as ArrayRef, + Arc::new(Time32MillisecondArray::from(vec![1i32, 2i32])) as ArrayRef, true, ), ( "Time32Second", - Arc::new(Time32SecondArray::from(vec![1i32, -2i32])) as ArrayRef, + Arc::new(Time32SecondArray::from(vec![1i32, 2i32])) as ArrayRef, true, ), ( "Time64Microsecond", - Arc::new(Time64MicrosecondArray::from(vec![1i64, -2i64])) as ArrayRef, + Arc::new(Time64MicrosecondArray::from(vec![1i64, 2i64])) as ArrayRef, true, ), ( "Time64Nanosecond", - Arc::new(Time64NanosecondArray::from(vec![1i64, -2i64])) as ArrayRef, + Arc::new(Time64NanosecondArray::from(vec![1i64, 2i64])) as ArrayRef, true, ), ( @@ -501,12 +504,12 @@ mod insert_tests { init_table(client.clone(), record_batch.schema(), table_name).await?; do_insert(record_batch, table_name).await?; check_insert(client.clone(), table_name, vec![], None| Boolean | Binary | Date32 | Date64 | Decimal128 | Decimal256 | FixedSizeBinary | FixedSizeList | Float32 | Float64 | Int8 | Int16 | Int32 | Int64 | Map | Null | LargeBinary | LargeString | List | String | Struct | Time32Millisecond | Time32Second | Time64Microsecond | Time64Nanosecond | TimestampMicrosecond | TimestampMillisecond | TimestampNanosecond | TimestampSecond | UInt8 | UInt16 | UInt32 | UInt64 |", - "+---------+--------+------------+---------------------+---------------+--------------+-----------------+---------------+---------+---------+------+-------+-------+-------+--------------------+------+-------------+-------------+-----------+--------+-------------------+-----------------------------------------------------------------------------+------------------------------------------------------------------------+-----------------------------------------------------------------------------+----------------------------------------------------------------------------+----------------------------+-------------------------+-------------------------------+---------------------+-------+--------+--------+--------+", - "| true | 01 | 1970-01-02 | 1970-01-01T00:00:00 | 0.0000000001 | 0.0000000000 | 01 | [0, 1, 2] | 1.0 | 1.0 | 1 | 1 | 1 | 1 | {joe: 1} | | 01 | 1 | [0, 1, 2] | 1 | {b: false, c: 42} | 00:00:00.001 | 00:00:01 | 00:00:00.000001 | 00:00:00.000000001 | 1970-01-01T00:00:00.000001 | 1970-01-01T00:00:00.001 | 1970-01-01T00:00:00.000000001 | 1970-01-01T00:00:01 | 1 | 1 | 1 | 1 |", - "| false | 0203 | 1969-12-30 | 1970-01-01T00:00:00 | -0.0000000002 | | 02 | | -1.0 | -1.0 | -2 | -2 | -2 | -2 | {blogs: 2, foo: 4} | | 0203 | | | | {b: true, c: 31} | ERROR: Cast error: Failed to convert -2 to temporal for Time32(Millisecond) | ERROR: Cast error: Failed to convert -2 to temporal for Time32(Second) | ERROR: Cast error: Failed to convert -2 to temporal for Time64(Microsecond) | ERROR: Cast error: Failed to convert -2 to temporal for Time64(Nanosecond) | 1969-12-31T23:59:59.999998 | 1969-12-31T23:59:59.998 | 1969-12-31T23:59:59.999999998 | 1969-12-31T23:59:58 | 2 | 2 | 2 | 2 || Boolean | Binary | Date32 | Date64 | Decimal128 | Decimal256 | FixedSizeBinary | FixedSizeList | Float32 | Float64 | Int8 | Int16 | Int32 | Int64 | Map | Null | LargeBinary | LargeString | List | String | Struct | Time32Millisecond | Time32Second | Time64Microsecond | Time64Nanosecond | TimestampMicrosecond | TimestampMillisecond | TimestampNanosecond | TimestampSecond | UInt8 | UInt16 | UInt32 | UInt64 || true | 01 | 1970-01-02 | 1970-01-01T00:00:00 | 0.0000000001 | 0.0000000000 | 01 | [0, 1, 2] | 1.0 | 3000.6 | 1 | 1 | 1 | 1 | {joe: 1} | | 01 | 1 | [0, 1, 2] | 1 | {b: false, c: 42} | 00:00:00.001 | 00:00:01 | 00:00:00.000001 | 00:00:00.000000001 | 1970-01-01T00:00:00.000001 | 1970-01-01T00:00:00.001 | 1970-01-01T00:00:00.000000001 | 1970-01-01T00:00:01 | 1 | 1 | 1 | 1 |", + "| false | 0203 | 1969-12-30 | 1970-01-01T00:00:00 | -0.0000000002 | | 02 | | -1.0 | 300.6 | -2 | -2 | -2 | -2 | {blogs: 2, foo: 4} | | 0203 | | | | {b: true, c: 31} | 00:00:00.002 | 00:00:02 | 00:00:00.000002 | 00:00:00.000000002 | 1969-12-31T23:59:59.999998 | 1969-12-31T23:59:59.998 | 1969-12-31T23:59:59.999999998 | 1969-12-31T23:59:58 | 2 | 2 | 2 | 2 |await } diff --git a/rust/lakesoul-datafusion/src/test/mod.rs b/rust/lakesoul-datafusion/src/test/mod.rs index e98cb0fe2..1c4571d87 100644 --- a/rust/lakesoul-datafusion/src/test/mod.rs +++ b/rust/lakesoul-datafusion/src/test/mod.rs @@ -3,6 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 use std::sync::Arc; +use arrow::array::RecordBatch; use tracing::debug; use lakesoul_metadata::MetaDataClient; @@ -30,3 +31,35 @@ fn init() { debug!("clean metadata"); }) } + +fn assert_batches_eq(table_name: &str, expected: &[&str], results: &[RecordBatch]) { + // let expected_lines: Vec = + // expected.iter().map(|&s| s.into()).collect(); + let (schema, remain)= expected.split_at(3); + let (expected, end) = remain.split_at(remain.len() - 1); + let mut expected = Vec::from(expected); + + expected.sort(); + + let expected_lines = [schema, &expected, end].concat(); + + + let formatted = datafusion::arrow::util::pretty::pretty_format_batches(results) + .unwrap() + .to_string(); + + let actual_lines: Vec<&str> = formatted.trim().lines().collect(); + let (schema, remain)= actual_lines.split_at(3); + let (result, end) = remain.split_at(remain.len() - 1); + let mut result = Vec::from(result); + + result.sort(); + + let result = [schema, &result, end].concat(); + + assert_eq!( + expected_lines, result, + "\n\n{}\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", + table_name, expected_lines, result + ); +} \ No newline at end of file diff --git a/rust/lakesoul-datafusion/src/test/upsert_tests.rs b/rust/lakesoul-datafusion/src/test/upsert_tests.rs index a2ad95de1..86b58da44 100644 --- a/rust/lakesoul-datafusion/src/test/upsert_tests.rs +++ b/rust/lakesoul-datafusion/src/test/upsert_tests.rs @@ -1573,7 +1573,7 @@ mod upsert_with_metadata_tests { use crate::error::Result; use crate::lakesoul_table::LakeSoulTable; - use datafusion::assert_batches_eq; + use crate::test::assert_batches_eq; use lakesoul_io::lakesoul_io_config::{create_session_context, LakeSoulIOConfigBuilder}; @@ -1715,7 +1715,7 @@ mod upsert_with_metadata_tests { .await?; // print_batches(&result); - assert_batches_eq!(expected, &result); + assert_batches_eq(table_name, expected, &result); Ok(()) } @@ -1724,11 +1724,13 @@ mod upsert_with_metadata_tests { table_name: &str, schema: SchemaRef, pks: Vec, + range_partitions: Vec, client: MetaDataClientRef, ) -> Result<()> { let builder = LakeSoulIOConfigBuilder::new() .with_schema(schema) - .with_primary_keys(pks); + .with_primary_keys(pks) + .with_range_partitions(range_partitions); create_table(client.clone(), table_name, builder.build()).await?; let lakesoul_table = LakeSoulTable::for_name(table_name).await?; lakesoul_table.execute_upsert(batch).await @@ -1768,7 +1770,7 @@ mod upsert_with_metadata_tests { .await?; // print_batches(&result); - assert_batches_eq!(expected, &result); + assert_batches_eq(table_name, expected, &result); Ok(()) } @@ -1787,7 +1789,8 @@ mod upsert_with_metadata_tests { .map(|name| Field::new(name, DataType::Int32, true)) .collect::>(), )), - vec!["range".to_string(), "hash".to_string()], + vec!["hash".to_string()], + vec!["range".to_string()], client.clone(), ) .await?; @@ -1832,7 +1835,8 @@ mod upsert_with_metadata_tests { .map(|name| Field::new(name, DataType::Int32, true)) .collect::>(), )), - vec!["range".to_string(), "hash".to_string()], + vec!["hash".to_string()], + vec!["range".to_string()], client.clone(), ) .await?; @@ -1877,7 +1881,8 @@ mod upsert_with_metadata_tests { .map(|name| Field::new(name, DataType::Int32, true)) .collect::>(), )), - vec!["range".to_string(), "hash".to_string()], + vec!["hash".to_string()], + vec!["range".to_string()], client.clone(), ) .await?; @@ -1925,7 +1930,8 @@ mod upsert_with_metadata_tests { .map(|name| Field::new(name, DataType::Int32, true)) .collect::>(), )), - vec!["range".to_string(), "hash".to_string()], + vec!["hash".to_string()], + vec!["range".to_string()], client.clone(), ) .await?; @@ -1968,7 +1974,8 @@ mod upsert_with_metadata_tests { .map(|name| Field::new(name, DataType::Int32, true)) .collect::>(), )), - vec!["range".to_string(), "hash".to_string()], + vec!["hash".to_string()], + vec!["range".to_string()], client.clone(), ) .await?; @@ -2016,7 +2023,8 @@ mod upsert_with_metadata_tests { .map(|name| Field::new(name, DataType::Int32, true)) .collect::>(), )), - vec!["range".to_string(), "hash".to_string()], + vec!["hash".to_string()], + vec!["range".to_string()], client.clone(), ) .await?; @@ -2069,7 +2077,8 @@ mod upsert_with_metadata_tests { .map(|name| Field::new(name, DataType::Int32, true)) .collect::>(), )), - vec!["range".to_string(), "hash".to_string()], + vec!["hash".to_string()], + vec!["range".to_string()], client.clone(), ) .await?; @@ -2139,7 +2148,8 @@ mod upsert_with_metadata_tests { .map(|name| Field::new(name, DataType::Int32, true)) .collect::>(), )), - vec!["range".to_string(), "hash".to_string()], + vec!["hash".to_string()], + vec!["range".to_string()], client.clone(), ) .await?; @@ -2224,6 +2234,7 @@ mod upsert_with_metadata_tests { .collect::>(), )), vec!["hash".to_string()], + vec![], client.clone(), ) .await?; @@ -2279,11 +2290,13 @@ mod upsert_with_metadata_tests { .collect::>(), )), vec![ - "range1".to_string(), - "range2".to_string(), "hash1".to_string(), "hash2".to_string(), ], + vec![ + "range1".to_string(), + "range2".to_string(), + ], client.clone(), ) .await?; @@ -2350,7 +2363,8 @@ mod upsert_with_metadata_tests { .map(|name| Field::new(name, DataType::Int32, true)) .collect::>(), )), - vec!["range".to_string(), "hash".to_string()], + vec!["hash".to_string()], + vec!["range".to_string()], client.clone(), ) .await?; @@ -2399,7 +2413,8 @@ mod upsert_with_metadata_tests { .map(|name| Field::new(name, DataType::Int32, true)) .collect::>(), )), - vec!["range".to_string(), "hash".to_string()], + vec!["hash".to_string()], + vec!["range".to_string()], client.clone(), ) .await?; @@ -2464,7 +2479,8 @@ mod upsert_with_metadata_tests { .map(|name| Field::new(name, DataType::Int32, true)) .collect::>(), )), - vec!["range".to_string(), "hash".to_string()], + vec!["hash".to_string()], + vec!["range".to_string()], client.clone(), ) .await?; @@ -2551,7 +2567,8 @@ mod upsert_with_metadata_tests { .map(|name| Field::new(name, DataType::Int32, true)) .collect::>(), )), - vec!["range".to_string(), "hash".to_string()], + vec!["hash".to_string()], + vec!["range".to_string()], client.clone(), ) .await?; @@ -2588,7 +2605,8 @@ mod upsert_with_metadata_tests { .map(|name| Field::new(name, DataType::Int32, true)) .collect::>(), )), - vec!["range".to_string(), "hash".to_string()], + vec!["hash".to_string()], + vec!["range".to_string()], client.clone(), ) .await?; @@ -2618,7 +2636,15 @@ mod upsert_with_metadata_tests { None, client.clone(), &[ - "+-----+", "| age |", "+-----+", "| 1 |", "| 2 |", "| |", "| |", "| |", "| |", + "+-----+", + "| age |", + "+-----+", + "| 1 |", + "| 2 |", + "| |", + "| |", + "| |", + "| |", "+-----+", ], ) @@ -2641,7 +2667,8 @@ mod upsert_with_metadata_tests { .map(|name| Field::new(name, DataType::Int32, true)) .collect::>(), )), - vec!["range".to_string(), "hash1".to_string(), "hash2".to_string()], + vec!["hash1".to_string(), "hash2".to_string()], + vec!["range".to_string()], client.clone(), ) .await?; @@ -2684,7 +2711,8 @@ mod upsert_with_metadata_tests { .map(|name| Field::new(name, DataType::Int32, true)) .collect::>(), )), - vec!["range".to_string(), "hash1".to_string(), "hash2".to_string()], + vec!["hash1".to_string(), "hash2".to_string()], + vec!["range".to_string()], client.clone(), ) .await?; @@ -2743,7 +2771,8 @@ mod upsert_with_metadata_tests { .map(|name| Field::new(name, DataType::Int32, true)) .collect::>(), )), - vec!["range".to_string(), "hash1".to_string(), "hash2".to_string()], + vec!["hash1".to_string(), "hash2".to_string()], + vec!["range".to_string()], client.clone(), ) .await?; @@ -2824,7 +2853,8 @@ mod upsert_with_metadata_tests { .map(|name| Field::new(name, DataType::Utf8, true)) .collect::>(), )), - vec!["range".to_string(), "hash1".to_string(), "hash2".to_string()], + vec!["hash1".to_string(), "hash2".to_string()], + vec!["range".to_string()], client.clone(), ) .await?; @@ -2879,7 +2909,8 @@ mod upsert_with_metadata_tests { .map(|name| Field::new(name, DataType::Utf8, true)) .collect::>(), )), - vec!["range".to_string(), "hash1".to_string(), "hash2".to_string()], + vec!["hash1".to_string(), "hash2".to_string()], + vec!["range".to_string()], client.clone(), ) .await?; @@ -2950,7 +2981,8 @@ mod upsert_with_metadata_tests { .map(|name| Field::new(name, DataType::Utf8, true)) .collect::>(), )), - vec!["range".to_string(), "hash1".to_string(), "hash2".to_string()], + vec!["hash1".to_string(), "hash2".to_string()], + vec!["range".to_string()], client.clone(), ) .await?; @@ -3065,7 +3097,8 @@ mod upsert_with_metadata_tests { }) .collect::>(), )), - vec!["range".to_string(), "hash1".to_string(), "hash2".to_string()], + vec!["hash1".to_string(), "hash2".to_string()], + vec!["range".to_string()], client.clone(), ) .await?; @@ -3240,7 +3273,8 @@ mod upsert_with_metadata_tests { }) .collect::>(), )), - vec!["range".to_string(), "hash".to_string()], + vec!["hash".to_string()], + vec!["range".to_string()], client.clone(), ) .await?; @@ -3313,7 +3347,8 @@ mod upsert_with_metadata_tests { }) .collect::>(), )), - vec!["range".to_string(), "hash".to_string()], + vec!["hash".to_string()], + vec!["range".to_string()], client.clone(), ) .await?; diff --git a/rust/lakesoul-io/src/datasource/file_format.rs b/rust/lakesoul-io/src/datasource/file_format.rs index fa382e90c..6a2d161b6 100644 --- a/rust/lakesoul-io/src/datasource/file_format.rs +++ b/rust/lakesoul-io/src/datasource/file_format.rs @@ -102,20 +102,20 @@ impl FileFormat for LakeSoulParquetFormat { for field in &conf.table_partition_cols { builder.push(Field::new(field.name(), field.data_type().clone(), false)); } + + let projection = conf.projection.clone(); // files to read - let (summary_conf, flatten_conf) = + let flatten_conf = flatten_file_scan_config(state, self.parquet_format.clone(), conf, self.conf.primary_keys_slice()).await?; - let projection = summary_conf.projection.clone(); let merge_schema = Arc::new(builder.finish()); let merge_exec = Arc::new(MergeParquetExec::new( merge_schema.clone(), - summary_conf, flatten_conf, predicate, self.parquet_format.metadata_size_hint(state.config_options()), self.conf.clone(), - )); + )?); if let Some(projection) = projection { let mut projection_expr = vec![]; for idx in projection { @@ -147,13 +147,12 @@ impl FileFormat for LakeSoulParquetFormat { } } -async fn flatten_file_scan_config( +pub async fn flatten_file_scan_config( state: &SessionState, format: Arc, conf: FileScanConfig, primary_keys: &[String], -) -> Result<(FileScanConfig, Vec)> { - let summary_conf = conf.clone(); +) -> Result> { let object_store_url = conf.object_store_url.clone(); let store = state.runtime_env().object_store(object_store_url.clone())?; let projected_schema = project_schema(&conf.file_schema.clone(), conf.projection.as_ref())?; @@ -171,7 +170,7 @@ async fn flatten_file_scan_config( let projection = compute_project_column_indices(file_schema.clone(), projected_schema.clone(), primary_keys); let limit = conf.limit; - let table_partition_cols = vec![]; + let table_partition_cols = conf.table_partition_cols.clone(); let output_ordering = conf.output_ordering.clone(); let infinite_source = conf.infinite_source; let config = FileScanConfig { @@ -188,7 +187,7 @@ async fn flatten_file_scan_config( flatten_configs.push(config); } } - Ok((summary_conf, flatten_configs)) + Ok(flatten_configs) } fn compute_project_column_indices( diff --git a/rust/lakesoul-io/src/datasource/listing.rs b/rust/lakesoul-io/src/datasource/listing.rs index c59742117..784f6eb75 100644 --- a/rust/lakesoul-io/src/datasource/listing.rs +++ b/rust/lakesoul-io/src/datasource/listing.rs @@ -8,21 +8,20 @@ use std::sync::Arc; use async_trait::async_trait; -use arrow::datatypes::{SchemaBuilder, SchemaRef}; +use arrow::datatypes::SchemaRef; use datafusion::datasource::file_format::FileFormat; -use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}; -use datafusion::datasource::physical_plan::FileSinkConfig; +use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableUrl}; use datafusion::execution::context::SessionState; use datafusion::physical_plan::ExecutionPlan; use datafusion::{datasource::TableProvider, logical_expr::Expr}; use datafusion::logical_expr::{TableProviderFilterPushDown, TableType}; -use datafusion_common::{DataFusionError, FileTypeWriterOptions, Result}; +use datafusion_common::Result; use tracing::{debug, instrument}; +use crate::helpers::listing_table_from_lakesoul_io_config; use crate::lakesoul_io_config::LakeSoulIOConfig; -use crate::transform::uniform_schema; pub struct LakeSoulListingTable { listing_table: Arc, @@ -36,12 +35,6 @@ impl Debug for LakeSoulListingTable { } impl LakeSoulListingTable { - pub fn new(listing_table: Arc, lakesoul_io_config: LakeSoulIOConfig) -> Self { - Self { - listing_table, - lakesoul_io_config, - } - } pub async fn new_with_config_and_format( session_state: &SessionState, @@ -49,67 +42,8 @@ impl LakeSoulListingTable { file_format: Arc, as_sink: bool, ) -> Result { - let config = match as_sink { - false => { - // Parse the path - let table_paths = lakesoul_io_config - .files - .iter() - .map(ListingTableUrl::parse) - .collect::>>()?; - // Create default parquet options - let object_store_url = table_paths - .first() - .ok_or(DataFusionError::Internal("no table path".to_string()))? - .object_store(); - let store = session_state.runtime_env().object_store(object_store_url.clone())?; - let target_schema = uniform_schema(lakesoul_io_config.schema()); - - let listing_options = ListingOptions::new(file_format.clone()).with_file_extension(".parquet"); - // .with_table_partition_cols(table_partition_cols); - - let mut objects = vec![]; - - for url in &table_paths { - objects.push(store.head(url.prefix()).await?); - } - // Resolve the schema - let resolved_schema = file_format.infer_schema(session_state, &store, &objects).await?; - let mut builder = SchemaBuilder::from(target_schema.fields()); - for field in resolved_schema.fields() { - if target_schema.field_with_name(field.name()).is_err() { - builder.push(field.clone()); - } - } - - ListingTableConfig::new_with_multi_paths(table_paths) - .with_listing_options(listing_options) - .with_schema(Arc::new(builder.finish())) - } - true => { - let target_schema = uniform_schema(lakesoul_io_config.schema()); - let table_partition_cols = lakesoul_io_config - .range_partitions - .iter() - .map(|col| Ok((col.clone(), target_schema.field_with_name(col)?.data_type().clone()))) - .collect::>>()?; - - let listing_options = ListingOptions::new(file_format.clone()) - .with_file_extension(".parquet") - .with_table_partition_cols(table_partition_cols) - .with_insert_mode(datafusion::datasource::listing::ListingTableInsertMode::AppendNewFiles); - let prefix = - ListingTableUrl::parse_create_local_if_not_exists(lakesoul_io_config.prefix.clone(), true)?; - - ListingTableConfig::new(prefix) - .with_listing_options(listing_options) - .with_schema(target_schema) - } - }; - - // Create a new TableProvider - let listing_table = Arc::new(ListingTable::try_new(config)?); + let listing_table = Arc::new(listing_table_from_lakesoul_io_config(session_state, lakesoul_io_config.clone(), file_format, as_sink).await?); Ok(Self { listing_table, @@ -117,13 +51,14 @@ impl LakeSoulListingTable { }) } - fn options(&self) -> &ListingOptions { + pub fn options(&self) -> &ListingOptions { self.listing_table.options() } - fn table_paths(&self) -> &Vec { + pub fn table_paths(&self) -> &Vec { self.listing_table.table_paths() } + } #[async_trait] @@ -189,117 +124,6 @@ impl TableProvider for LakeSoulListingTable { input: Arc, overwrite: bool, ) -> Result> { - // Check that the schema of the plan matches the schema of this table. - // if !self - // .schema() - // .logically_equivalent_names_and_types(&input.schema()) - // { - // return plan_err!( - // // Return an error if schema of the input query does not match with the table schema. - // "Inserting query must have the same schema with the table." - // ); - // } - - let table_path = &self.listing_table.table_paths()[0]; - // Get the object store for the table path. - let _store = state.runtime_env().object_store(table_path)?; - - // let file_list_stream = pruned_partition_list( - // state, - // store.as_ref(), - // table_path, - // &[], - // &self.options.file_extension, - // &self.options.table_partition_cols, - // ) - // .await?; - - // let file_groups = file_list_stream.try_collect::>().await?; - // if we are writing a single output_partition to a table backed by a single file - // we can append to that file. Otherwise, we can write new files into the directory - // adding new files to the listing table in order to insert to the table. - let _input_partitions = input.output_partitioning().partition_count(); - // let writer_mode = match self.options().insert_mode { - // ListingTableInsertMode::AppendToFile => { - // if input_partitions > file_groups.len() { - // return Err(DataFusionError::Plan( - // format!("Cannot append {input_partitions} partitions to {} files!", - // file_groups.len()) - // )); - // } - - // datafusion::datasource::file_format::write::FileWriterMode::Append - // } - // ListingTableInsertMode::AppendNewFiles => { - // datafusion::datasource::file_format::write::FileWriterMode::PutMultipart - // } - // ListingTableInsertMode::Error => { - // return Err(DataFusionError::Plan( - // format!("Invalid plan attempting to write to table with TableWriteMode::Error!") - // )); - // } - // }; - - let file_format = self.options().format.as_ref(); - - let file_type_writer_options = match &self.options().file_type_write_options { - Some(opt) => opt.clone(), - None => FileTypeWriterOptions::build_default(&file_format.file_type(), state.config_options())?, - }; - - // Sink related option, apart from format - let config = FileSinkConfig { - object_store_url: self.table_paths()[0].object_store(), - table_paths: self.table_paths().clone(), - file_groups: vec![], - output_schema: self.schema(), - table_partition_cols: self.options().table_partition_cols.clone(), - writer_mode: datafusion::datasource::file_format::write::FileWriterMode::PutMultipart, - // A plan can produce finite number of rows even if it has unbounded sources, like LIMIT - // queries. Thus, we can check if the plan is streaming to ensure file sink input is - // unbounded. When `unbounded_input` flag is `true` for sink, we occasionally call `yield_now` - // to consume data at the input. When `unbounded_input` flag is `false` (e.g. non-streaming data), - // all the data at the input is sink after execution finishes. See discussion for rationale: - // https://github.com/apache/arrow-datafusion/pull/7610#issuecomment-1728979918 - unbounded_input: false, - single_file_output: self.options().single_file, - overwrite, - file_type_writer_options, - }; - - let unsorted: Vec> = vec![]; - let order_requirements = if self.options().file_sort_order != unsorted { - // if matches!( - // self.options().insert_mode, - // ListingTableInsertMode::AppendToFile - // ) { - // return Err(DataFusionError::Plan( - // format!("Cannot insert into a sorted ListingTable with mode append!") - // )); - // } - // // Multiple sort orders in outer vec are equivalent, so we pass only the first one - // let ordering = self - // .try_create_output_ordering()? - // .get(0) - // .ok_or(DataFusionError::Internal( - // "Expected ListingTable to have a sort order, but none found!".into(), - // ))? - // .clone(); - // // Converts Vec> into type required by execution plan to specify its required input ordering - // Some( - // ordering - // .into_iter() - // .map(PhysicalSortRequirement::from) - // .collect::>(), - // ) - todo!() - } else { - None - }; - - self.options() - .format - .create_writer_physical_plan(input, state, config, order_requirements) - .await + self.listing_table.insert_into(state, input, overwrite).await } } diff --git a/rust/lakesoul-io/src/datasource/mod.rs b/rust/lakesoul-io/src/datasource/mod.rs index fb202adea..e38799182 100644 --- a/rust/lakesoul-io/src/datasource/mod.rs +++ b/rust/lakesoul-io/src/datasource/mod.rs @@ -6,4 +6,4 @@ pub mod empty_schema; pub mod file_format; pub mod listing; pub mod parquet_source; -mod physical_plan; +pub mod physical_plan; diff --git a/rust/lakesoul-io/src/datasource/parquet_source.rs b/rust/lakesoul-io/src/datasource/parquet_source.rs index 0de6d1241..25c0725ce 100644 --- a/rust/lakesoul-io/src/datasource/parquet_source.rs +++ b/rust/lakesoul-io/src/datasource/parquet_source.rs @@ -323,7 +323,7 @@ pub fn merge_stream( default_column_value, )) } else { - let merge_schema: SchemaRef = Arc::new(Schema::new( + let merge_schema = Arc::new(Schema::new( schema .fields .iter() diff --git a/rust/lakesoul-io/src/datasource/physical_plan/defatul_column.rs b/rust/lakesoul-io/src/datasource/physical_plan/defatul_column.rs new file mode 100644 index 000000000..512ae6f73 --- /dev/null +++ b/rust/lakesoul-io/src/datasource/physical_plan/defatul_column.rs @@ -0,0 +1,88 @@ +// SPDX-FileCopyrightText: 2023 LakeSoul Contributors +// +// SPDX-License-Identifier: Apache-2.0 + +use std::{any::Any, collections::HashMap}; +use std::sync::Arc; + +use arrow_schema::SchemaRef; +use datafusion::{ + execution::TaskContext, + physical_expr::PhysicalSortExpr, + physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream}, +}; +use datafusion_common::{DataFusionError, Result}; + +use crate::default_column_stream::DefaultColumnStream; + +#[derive(Debug)] +pub struct DefaultColumnExec { + input: Arc, + target_schema: SchemaRef, + default_column_value: Arc> +} + +impl DefaultColumnExec { + pub fn new( + input: Arc, + target_schema: SchemaRef, + default_column_value: Arc> + ) -> Result { + Ok(Self { + input, + target_schema, + default_column_value + }) + } +} + +impl DisplayAs for DefaultColumnExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "DefaultColumnExec") + } +} + +impl ExecutionPlan for DefaultColumnExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.target_schema.clone() + } + + fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning { + datafusion::physical_plan::Partitioning::UnknownPartitioning(1) + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + fn children(&self) -> Vec> { + vec![] + } + + fn with_new_children(self: Arc, _: Vec>) -> Result> { + Ok(self) + } + + fn execute(&self, partition: usize, context: Arc) -> Result { + if partition != 0 { + return Err(DataFusionError::Internal(format!( + "Invalid requested partition {partition}. InsertExec requires a single input partition." + ))); + } + + let mut streams = Vec::with_capacity(self.input.output_partitioning().partition_count()); + for i in 0..self.input.output_partitioning().partition_count() { + let stream = self.input.execute(i, context.clone())?; + streams.push(stream); + } + Ok(Box::pin(DefaultColumnStream::new_from_streams_with_default( + streams, + self.schema(), + self.default_column_value.clone(), + ))) + } +} diff --git a/rust/lakesoul-io/src/datasource/physical_plan/merge.rs b/rust/lakesoul-io/src/datasource/physical_plan/merge.rs index a9cc7f588..769c0865e 100644 --- a/rust/lakesoul-io/src/datasource/physical_plan/merge.rs +++ b/rust/lakesoul-io/src/datasource/physical_plan/merge.rs @@ -23,7 +23,6 @@ pub struct MergeParquetExec { primary_keys: Arc>, default_column_value: Arc>, merge_operators: Arc>, - config: FileScanConfig, inputs: Vec>, } @@ -31,12 +30,11 @@ impl MergeParquetExec { /// Create a new Parquet reader execution plan provided file list and schema. pub fn new( schema: SchemaRef, - config: FileScanConfig, flatten_configs: Vec, predicate: Option>, metadata_size_hint: Option, io_config: LakeSoulIOConfig, - ) -> Self { + ) -> Result { // source file parquet scan let mut inputs = Vec::>::new(); for config in flatten_configs { @@ -68,16 +66,35 @@ impl MergeParquetExec { let default_column_value = Arc::new(io_config.default_column_value); let merge_operators = Arc::new(io_config.merge_operators); - Self { + Ok(Self { schema, inputs, - config, primary_keys, default_column_value, merge_operators, - } + }) } + pub fn new_with_inputs( + schema: SchemaRef, + inputs: Vec>, + io_config: LakeSoulIOConfig, + default_column_value: Arc>, + ) -> Result { + + let primary_keys = Arc::new(io_config.primary_keys); + let merge_operators = Arc::new(io_config.merge_operators); + + Ok(Self { + schema, + inputs, + primary_keys, + default_column_value, + merge_operators, + }) + } + + pub fn primary_keys(&self) -> Arc> { self.primary_keys.clone() } @@ -125,7 +142,6 @@ impl ExecutionPlan for MergeParquetExec { primary_keys: self.primary_keys(), default_column_value: self.default_column_value(), merge_operators: self.merge_operators(), - config: self.config.clone(), })) } diff --git a/rust/lakesoul-io/src/datasource/physical_plan/mod.rs b/rust/lakesoul-io/src/datasource/physical_plan/mod.rs index f5d46b0b1..6fd06f368 100644 --- a/rust/lakesoul-io/src/datasource/physical_plan/mod.rs +++ b/rust/lakesoul-io/src/datasource/physical_plan/mod.rs @@ -5,5 +5,6 @@ pub use empty_schema::EmptySchemaScanExec; pub use merge::MergeParquetExec; +pub mod defatul_column; mod empty_schema; mod merge; diff --git a/rust/lakesoul-io/src/helpers.rs b/rust/lakesoul-io/src/helpers.rs index 81805a964..4280efec5 100644 --- a/rust/lakesoul-io/src/helpers.rs +++ b/rust/lakesoul-io/src/helpers.rs @@ -2,17 +2,17 @@ // // SPDX-License-Identifier: Apache-2.0 -use arrow_schema::Schema; +use std::{collections::HashMap, sync::Arc}; + +use arrow_schema::{DataType, Schema, SchemaBuilder, SchemaRef}; use datafusion::{ - execution::context::SessionState, - logical_expr::col, - physical_expr::{create_physical_expr, PhysicalSortExpr}, - physical_plan::Partitioning, - physical_planner::create_physical_sort_expr, + datasource::{file_format::FileFormat, listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}, physical_plan::FileScanConfig}, execution::context::SessionState, logical_expr::col, physical_expr::{create_physical_expr, PhysicalSortExpr}, physical_plan::PhysicalExpr, physical_planner::create_physical_sort_expr }; -use datafusion_common::{DFSchema, Result}; +use datafusion_common::{DFSchema, DataFusionError, Result}; + +use crate::{lakesoul_io_config::LakeSoulIOConfig, transform::uniform_schema}; -pub fn create_sort_exprs( +pub fn column_names_to_physical_sort_expr( columns: &[String], input_dfschema: &DFSchema, input_schema: &Schema, @@ -31,14 +31,12 @@ pub fn create_sort_exprs( .collect::>>() } -//noinspection ALL -pub fn create_hash_partitioning( +pub fn column_names_to_physical_expr( columns: &[String], - partitioning_num: usize, input_dfschema: &DFSchema, input_schema: &Schema, session_state: &SessionState, -) -> Result { +) -> Result>> { let runtime_expr = columns .iter() .map(|column| { @@ -50,5 +48,116 @@ pub fn create_hash_partitioning( ) }) .collect::>>()?; - Ok(Partitioning::Hash(runtime_expr, partitioning_num)) + Ok(runtime_expr) +} + +fn range_partition_to_partition_cols( + schema: SchemaRef, + range_partitions: &[String], +) -> Result> { + range_partitions + .iter() + .map(|col| Ok((col.clone(), schema.field_with_name(col)?.data_type().clone()))) + .collect::>>() +} + +pub fn partition_desc_from_file_scan_config( + conf: &FileScanConfig +) -> Result<(String, HashMap)> { + if conf.table_partition_cols.is_empty() { + Ok(("-5".to_string(), HashMap::default())) + } else { + match conf.file_groups.first().unwrap().first() { + Some(file) => Ok( + (conf + .table_partition_cols + .iter() + .enumerate() + .map(|(idx, col)| { + format!("{}={}", col.name().clone(), file.partition_values[idx].to_string()) + }) + .collect::>() + .join(","), + HashMap::from_iter( + conf + .table_partition_cols + .iter() + .enumerate() + .map(|(idx, col)| { + (col.name().clone(), file.partition_values[idx].to_string()) + }) + )) + ), + None => Err(DataFusionError::External(format!("Invalid file_group {:?}", conf.file_groups).into())), + } + + } +} + +pub async fn listing_table_from_lakesoul_io_config( + session_state: &SessionState, + lakesoul_io_config: LakeSoulIOConfig, + file_format: Arc, + as_sink: bool +) -> Result { + let config = match as_sink { + false => { + // Parse the path + let table_paths = lakesoul_io_config + .files + .iter() + .map(ListingTableUrl::parse) + .collect::>>()?; + // Create default parquet options + let object_store_url = table_paths + .first() + .ok_or(DataFusionError::Internal("no table path".to_string()))? + .object_store(); + let store = session_state.runtime_env().object_store(object_store_url.clone())?; + let target_schema = uniform_schema(lakesoul_io_config.schema()); + + let table_partition_cols = range_partition_to_partition_cols(target_schema.clone(), lakesoul_io_config.range_partitions_slice())?; + let listing_options = ListingOptions::new(file_format.clone()) + .with_file_extension(".parquet") + .with_table_partition_cols(table_partition_cols); + + let mut objects = vec![]; + + for url in &table_paths { + objects.push(store.head(url.prefix()).await?); + } + // Resolve the schema + let resolved_schema = file_format.infer_schema(session_state, &store, &objects).await?; + + let mut builder = SchemaBuilder::from(target_schema.fields()); + for field in resolved_schema.fields() { + if target_schema.field_with_name(field.name()).is_err() { + builder.push(field.clone()); + } + } + + ListingTableConfig::new_with_multi_paths(table_paths) + .with_listing_options(listing_options) + // .with_schema(Arc::new(builder.finish())) + .with_schema(resolved_schema) + } + true => { + let target_schema = uniform_schema(lakesoul_io_config.schema()); + let table_partition_cols = range_partition_to_partition_cols(target_schema.clone(), lakesoul_io_config.range_partitions_slice())?; + + let listing_options = ListingOptions::new(file_format.clone()) + .with_file_extension(".parquet") + .with_table_partition_cols(table_partition_cols) + .with_insert_mode(datafusion::datasource::listing::ListingTableInsertMode::AppendNewFiles); + let prefix = + ListingTableUrl::parse_create_local_if_not_exists(lakesoul_io_config.prefix.clone(), true)?; + + ListingTableConfig::new(prefix) + .with_listing_options(listing_options) + .with_schema(target_schema) + } + }; + + ListingTable::try_new(config) } + diff --git a/rust/lakesoul-io/src/lakesoul_io_config.rs b/rust/lakesoul-io/src/lakesoul_io_config.rs index d89627d87..a6d5b82cc 100644 --- a/rust/lakesoul-io/src/lakesoul_io_config.rs +++ b/rust/lakesoul-io/src/lakesoul_io_config.rs @@ -95,6 +95,10 @@ impl LakeSoulIOConfig { &self.primary_keys } + pub fn range_partitions_slice(&self) -> &[String] { + &self.range_partitions + } + pub fn files_slice(&self) -> &[String] { &self.files } @@ -142,6 +146,12 @@ impl LakeSoulIOConfigBuilder { self } + pub fn with_range_partition(mut self, range_partition: String) -> Self { + self.config.range_partitions.push(range_partition); + self + } + + pub fn with_range_partitions(mut self, range_partitions: Vec) -> Self { self.config.range_partitions = range_partitions; self diff --git a/rust/lakesoul-io/src/lakesoul_writer.rs b/rust/lakesoul-io/src/lakesoul_writer.rs index 47fe4da9c..1c2db6fd2 100644 --- a/rust/lakesoul-io/src/lakesoul_writer.rs +++ b/rust/lakesoul-io/src/lakesoul_writer.rs @@ -68,6 +68,7 @@ pub struct MultiPartAsyncWriter { _config: LakeSoulIOConfig, object_store: Arc, path: Path, + absolute_path: String, num_rows: u64, } @@ -217,6 +218,7 @@ impl MultiPartAsyncWriter { _config: config.clone(), object_store, path, + absolute_path: file_name.to_string(), num_rows: 0, }) } @@ -261,6 +263,10 @@ impl MultiPartAsyncWriter { self.path.clone() } + pub fn absolute_path(&self) -> String { + self.absolute_path.clone() + } + pub fn task_ctx(&self) -> Arc { self.task_context.clone() } diff --git a/rust/lakesoul-io/src/lib.rs b/rust/lakesoul-io/src/lib.rs index 3b218287c..f9b584741 100644 --- a/rust/lakesoul-io/src/lib.rs +++ b/rust/lakesoul-io/src/lib.rs @@ -16,7 +16,7 @@ pub mod sorted_merge; #[cfg(feature = "hdfs")] mod hdfs; -mod constant; +pub(crate) mod constant; mod default_column_stream; mod transform; diff --git a/rust/lakesoul-io/src/repartition/distributor_channels.rs b/rust/lakesoul-io/src/repartition/distributor_channels.rs index 9b52fa6a1..b23088597 100644 --- a/rust/lakesoul-io/src/repartition/distributor_channels.rs +++ b/rust/lakesoul-io/src/repartition/distributor_channels.rs @@ -81,12 +81,15 @@ pub fn channels(n: usize) -> (Vec>, Vec = Vec>>; +#[allow(dead_code)] type PartitionAwareReceivers = Vec>>; /// Create `n_out` empty channels for each of the `n_in` inputs. /// This way, each distinct partition will communicate via a dedicated channel. /// This SPSC structure enables us to track which partition input data comes from. +#[allow(dead_code)] pub fn partition_aware_channels( n_in: usize, n_out: usize, diff --git a/rust/lakesoul-io/src/repartition/mod.rs b/rust/lakesoul-io/src/repartition/mod.rs index 9500bd81e..5ad0864dc 100644 --- a/rust/lakesoul-io/src/repartition/mod.rs +++ b/rust/lakesoul-io/src/repartition/mod.rs @@ -3,15 +3,11 @@ // SPDX-License-Identifier: Apache-2.0 use std::{ - any::Any, - collections::HashMap, - pin::Pin, - sync::Arc, - task::{Context, Poll}, + any::Any, collections::HashMap, pin::Pin, sync::Arc, task::{Context, Poll} }; use arrow_schema::SchemaRef; -use datafusion::physical_plan::metrics; +use datafusion::{physical_expr::physical_exprs_equal, physical_plan::metrics}; use datafusion::{ execution::{ memory_pool::{MemoryConsumer, MemoryReservation}, @@ -19,9 +15,8 @@ use datafusion::{ }, physical_expr::PhysicalSortExpr, physical_plan::{ - common::{transpose, AbortOnDropMany, AbortOnDropSingle}, - metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder}, - sorts::streaming_merge::streaming_merge, + common::{AbortOnDropMany, AbortOnDropSingle}, + metrics::{ExecutionPlanMetricsSet, MetricBuilder}, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, }, @@ -32,7 +27,7 @@ use arrow_array::{builder::UInt64Builder, ArrayRef, RecordBatch}; use futures::{FutureExt, Stream, StreamExt}; use tokio::task::JoinHandle; -use crate::{hash_utils::create_hashes, repartition::distributor_channels::partition_aware_channels}; +use crate::{hash_utils::create_hashes, repartition::distributor_channels::channels}; use self::distributor_channels::{DistributionReceiver, DistributionSender}; @@ -74,7 +69,6 @@ pub struct BatchPartitioner { struct BatchPartitionerState { // random_state: ahash::RandomState, - #[allow(dead_code)] range_exprs: Vec>, hash_exprs: Vec>, num_partitions: usize, @@ -85,14 +79,16 @@ impl BatchPartitioner { /// Create a new [`BatchPartitioner`] with the provided [`Partitioning`] /// /// The time spent repartitioning will be recorded to `timer` - pub fn try_new(partitioning: Partitioning, timer: metrics::Time) -> Result { - let state = match partitioning { + pub fn try_new( + range_partitioning_expr:Vec>, + hash_partitioning: Partitioning, + timer: metrics::Time + ) -> Result { + let state = match hash_partitioning { Partitioning::Hash(exprs, num_partitions) => BatchPartitionerState { - range_exprs: vec![], + range_exprs: range_partitioning_expr, hash_exprs: exprs, num_partitions, - // Use fixed random hash - // random_state: ahash::RandomState::with_seeds(0, 0, 0, 0), hash_buffer: vec![], }, other => { @@ -135,7 +131,7 @@ impl BatchPartitioner { ) -> Result> + Send + '_> { let BatchPartitionerState { // random_state, - range_exprs: _, + range_exprs, hash_exprs, num_partitions: partitions, hash_buffer, @@ -143,6 +139,11 @@ impl BatchPartitioner { let it: Box> + Send> = { let timer = self.timer.timer(); + let range_arrays = [range_exprs.clone()].concat() + .iter() + .map(|expr| expr.evaluate(&batch)?.into_array(batch.num_rows())) + .collect::>>()?; + let hash_arrays = hash_exprs .iter() .map(|expr| expr.evaluate(&batch)?.into_array(batch.num_rows())) @@ -151,22 +152,35 @@ impl BatchPartitioner { hash_buffer.clear(); hash_buffer.resize(batch.num_rows(), 0); + let mut range_buffer = Vec::::new(); + range_buffer.resize(batch.num_rows(), 0); + create_hashes(&hash_arrays, hash_buffer)?; + create_hashes(&range_arrays, &mut range_buffer)?; - let mut indices: Vec<_> = (0..*partitions) - .map(|_| UInt64Builder::with_capacity(batch.num_rows())) + let mut indices: Vec> = (0..*partitions) + .map(|_| HashMap::new()) + // .map(|_| UInt64Builder::with_capacity(batch.num_rows())) .collect(); - for (index, hash) in hash_buffer.iter().enumerate() { - indices[(*hash % *partitions as u32) as usize].append_value(index as u64); + for (index, (hash, range_hash)) in hash_buffer.iter().zip(range_buffer).enumerate() { + if !indices[(*hash % *partitions as u32) as usize].contains_key(&range_hash) { + indices[(*hash % *partitions as u32) as usize].insert(range_hash, UInt64Builder::with_capacity(batch.num_rows())); + } + if let Some(entry) = indices[(*hash % *partitions as u32) as usize].get_mut(&range_hash) { + entry.append_value(index as u64); + } } let it = indices .into_iter() .enumerate() - .filter_map(|(partition, mut indices)| { - let indices = indices.finish(); - (!indices.is_empty()).then_some((partition, indices)) + .flat_map(|(partition, mut indices_map)| { + let mut indices_vec = Vec::new(); + for indices in indices_map.values_mut() { + indices_vec.push((partition, indices.finish())); + } + indices_vec }) .map(move |(partition, indices)| { // Produce batches based on indices @@ -237,6 +251,9 @@ pub struct RepartitionByRangeAndHashExec { /// Input execution plan input: Arc, + /// Partitioning scheme to use + range_partitioning_expr: Vec>, + /// Partitioning scheme to use hash_partitioning: Partitioning, @@ -253,9 +270,14 @@ impl RepartitionByRangeAndHashExec { &self.input } - /// Partitioning scheme to use - pub fn hash_partitioning(&self) -> &Partitioning { - &self.hash_partitioning + /// Range Partitioning scheme to use + pub fn range_partitioning(&self) -> Vec> { + self.range_partitioning_expr.clone() + } + + /// Hash Partitioning scheme to use + pub fn hash_partitioning(&self) -> Partitioning { + self.hash_partitioning.clone() } /// Get name used to display this Exec @@ -289,16 +311,39 @@ impl RepartitionByRangeAndHashExec { /// Create a new RepartitionExec, that produces output `partitioning`, and /// does not preserve the order of the input (see [`Self::with_preserve_order`] /// for more details) - pub fn try_new(input: Arc, hash_partitioning: Partitioning) -> Result { - Ok(Self { - input, - hash_partitioning, - state: Arc::new(Mutex::new(RepartitionByRangeAndHashExecState { - channels: HashMap::new(), - abort_helper: Arc::new(AbortOnDropMany::<()>(vec![])), - })), - metrics: ExecutionPlanMetricsSet::new(), - }) + pub fn try_new(input: Arc, range_partitioning_expr:Vec>, hash_partitioning: Partitioning) -> Result { + if let Some(ordering) = input.output_ordering() { + let lhs = ordering.iter().map(|sort_expr| sort_expr.expr.clone()).collect::>(); + let rhs = [ + range_partitioning_expr.clone(), + match &hash_partitioning { + Partitioning::Hash(hash_exprs, _) => hash_exprs.clone(), + _ => return Err(DataFusionError::Plan(format!("Invalid hash_partitioning={} for RepartitionByRangeAndHashExec", hash_partitioning))), + }, + ].concat(); + + if physical_exprs_equal(&lhs, &rhs) { + return Ok(Self { + input, + range_partitioning_expr, + hash_partitioning, + state: Arc::new(Mutex::new(RepartitionByRangeAndHashExecState { + channels: HashMap::new(), + abort_helper: Arc::new(AbortOnDropMany::<()>(vec![])), + })), + metrics: ExecutionPlanMetricsSet::new(), + }) + } + } + Err(DataFusionError::Plan( + format!( + "Input ordering {:?} mismatch for RepartitionByRangeAndHashExec with range_partitioning_expr={:?}, hash_partitioning={}", + input.output_ordering(), + range_partitioning_expr, + hash_partitioning, + )) + ) + } /// Return the sort expressions that are used to merge @@ -314,11 +359,16 @@ impl RepartitionByRangeAndHashExec { input: Arc, partition: usize, mut output_channels: HashMap, SharedMemoryReservation)>, - partitioning: Partitioning, + range_partitioning: Vec>, + hash_partitioning: Partitioning, metrics: RepartitionMetrics, context: Arc, ) -> Result<()> { - let mut partitioner = BatchPartitioner::try_new(partitioning, metrics.repartition_time.clone())?; + let mut partitioner = BatchPartitioner::try_new( + range_partitioning, + hash_partitioning, + metrics.repartition_time.clone() + )?; // execute the child operator let timer = metrics.fetch_time.timer(); @@ -470,7 +520,11 @@ impl ExecutionPlan for RepartitionByRangeAndHashExec { fn with_new_children(self: Arc, mut children: Vec>) -> Result> { let repartition = - RepartitionByRangeAndHashExec::try_new(children.swap_remove(0), self.hash_partitioning.clone())?; + RepartitionByRangeAndHashExec::try_new( + children.swap_remove(0), + self.range_partitioning_expr.clone(), + self.hash_partitioning.clone() + )?; Ok(Arc::new(repartition)) } @@ -486,23 +540,23 @@ impl ExecutionPlan for RepartitionByRangeAndHashExec { // if this is the first partition to be invoked then we need to set up initial state if state.channels.is_empty() { let (txs, rxs) = { - let (txs, rxs) = partition_aware_channels(num_input_partitions, num_output_partitions); - // Take transpose of senders and receivers. `state.channels` keeps track of entries per output partition - let txs = transpose(txs); - let rxs = transpose(rxs); - (txs, rxs) + // let (txs, rxs) = partition_aware_channels(num_input_partitions, num_output_partitions); + // // Take transpose of senders and receivers. `state.channels` keeps track of entries per output partition + // let txs = transpose(txs); + // let rxs = transpose(rxs); + // (txs, rxs) // } else { - // // create one channel per *output* partition - // // note we use a custom channel that ensures there is always data for each receiver - // // but limits the amount of buffering if required. - // let (txs, rxs) = channels(num_output_partitions); - // // Clone sender for each input partitions - // let txs = txs - // .into_iter() - // .map(|item| vec![item; num_input_partitions]) - // .collect::>(); - // let rxs = rxs.into_iter().map(|item| vec![item]).collect::>(); - // (txs, rxs) + // create one channel per *output* partition + // note we use a custom channel that ensures there is always data for each receiver + // but limits the amount of buffering if required. + let (txs, rxs) = channels(num_output_partitions); + // Clone sender for each input partitions + let txs = txs + .into_iter() + .map(|item| vec![item; num_input_partitions]) + .collect::>(); + let rxs = rxs.into_iter().map(|item| vec![item]).collect::>(); + (txs, rxs) }; for (partition, (tx, rx)) in txs.into_iter().zip(rxs).enumerate() { let reservation = Arc::new(Mutex::new( @@ -526,7 +580,8 @@ impl ExecutionPlan for RepartitionByRangeAndHashExec { self.input.clone(), i, txs.clone(), - self.hash_partitioning().clone(), + self.range_partitioning(), + self.hash_partitioning(), r_metrics, context.clone(), )); @@ -553,45 +608,128 @@ impl ExecutionPlan for RepartitionByRangeAndHashExec { // now return stream for the specified *output* partition which will // read from the channel - let (_tx, rx, reservation) = state + let (_tx, mut rx, reservation) = state .channels .remove(&partition) .ok_or(DataFusionError::Internal("partition not used yet".to_string()))?; - // Store streams from all the input partitions: - let input_streams = rx - .into_iter() - .map(|receiver| { - Box::pin(PerPartitionStream { - schema: self.schema(), - receiver, - drop_helper: Arc::clone(&state.abort_helper), - reservation: reservation.clone(), - }) as SendableRecordBatchStream - }) - .collect::>(); - // Note that receiver size (`rx.len()`) and `num_input_partitions` are same. - - // Get existing ordering to use for merging - let sort_exprs = self.sort_exprs().unwrap_or(&[]); - - // Merge streams (while preserving ordering) coming from - // input partitions to this partition: - let fetch = None; - let merge_reservation = - MemoryConsumer::new(format!("{}[Merge {partition}]", self.name())).register(context.memory_pool()); - streaming_merge( - input_streams, - self.schema(), - sort_exprs, - BaselineMetrics::new(&self.metrics, partition), - context.session_config().batch_size(), - fetch, - merge_reservation, - ) + // if self.preserve_order { + + // // Store streams from all the input partitions: + // let input_streams = rx + // .into_iter() + // .map(|receiver| { + // Box::pin(PerPartitionStream { + // schema: self.schema(), + // receiver, + // drop_helper: Arc::clone(&state.abort_helper), + // reservation: reservation.clone(), + // }) as SendableRecordBatchStream + // }) + // .collect::>(); + // // Note that receiver size (`rx.len()`) and `num_input_partitions` are same. + + // // Get existing ordering to use for merging + // let sort_exprs = self.sort_exprs().unwrap_or(&[]); + + // // Merge streams (while preserving ordering) coming from + // // input partitions to this partition: + // let fetch = None; + // let merge_reservation = + // MemoryConsumer::new(format!("{}[Merge {partition}]", self.name())).register(context.memory_pool()); + // streaming_merge( + // input_streams, + // self.schema(), + // sort_exprs, + // BaselineMetrics::new(&self.metrics, partition), + // context.session_config().batch_size(), + // fetch, + // merge_reservation, + // ) + // } else { + Ok(Box::pin(RepartitionStream { + num_input_partitions, + num_input_partitions_processed: 0, + schema: self.input.schema(), + input: rx.swap_remove(0), + drop_helper: Arc::clone(&state.abort_helper), + reservation, + })) + // } + + } } +struct RepartitionStream { + /// Number of input partitions that will be sending batches to this output channel + num_input_partitions: usize, + + /// Number of input partitions that have finished sending batches to this output channel + num_input_partitions_processed: usize, + + /// Schema wrapped by Arc + schema: SchemaRef, + + /// channel containing the repartitioned batches + input: DistributionReceiver, + + /// Handle to ensure background tasks are killed when no longer needed. + #[allow(dead_code)] + drop_helper: Arc>, + + /// Memory reservation. + reservation: SharedMemoryReservation, +} + +impl Stream for RepartitionStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + loop { + match self.input.recv().poll_unpin(cx) { + Poll::Ready(Some(Some(v))) => { + if let Ok(batch) = &v { + self.reservation + .lock() + .shrink(batch.get_array_memory_size()); + } + + return Poll::Ready(Some(v)); + } + Poll::Ready(Some(None)) => { + self.num_input_partitions_processed += 1; + + if self.num_input_partitions == self.num_input_partitions_processed { + // all input partitions have finished sending batches + return Poll::Ready(None); + } else { + // other partitions still have data to send + continue; + } + } + Poll::Ready(None) => { + return Poll::Ready(None); + } + Poll::Pending => { + return Poll::Pending; + } + } + } + } +} + +impl RecordBatchStream for RepartitionStream { + /// Get the schema + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + + /// This struct converts a receiver to a stream. /// Receiver receives data on an SPSC channel. struct PerPartitionStream { diff --git a/rust/lakesoul-metadata/src/metadata_client.rs b/rust/lakesoul-metadata/src/metadata_client.rs index 51cbbfeac..abda5f8b3 100644 --- a/rust/lakesoul-metadata/src/metadata_client.rs +++ b/rust/lakesoul-metadata/src/metadata_client.rs @@ -507,13 +507,8 @@ impl MetaDataClient { pub async fn get_data_files_by_table_name( &self, table_name: &str, - partitions: Vec<(&str, &str)>, namespace: &str, ) -> Result> { - let partition_filter = partitions - .iter() - .map(|(k, v)| format!("{}={}", k, v)) - .collect::>(); let table_info = self.get_table_info_by_table_name(table_name, namespace).await?; debug!("table_info: {:?}", table_info); let partition_list = self.get_all_partition_info(table_info.table_id.as_str()).await?; @@ -522,30 +517,44 @@ impl MetaDataClient { table_info.table_id.as_str(), partition_list ); - let mut data_commit_info_list = Vec::::new(); + self.get_data_files_of_partitions(partition_list).await + } + + pub async fn get_data_files_of_partitions( + &self, + partition_list: Vec, + ) -> Result> { + let mut data_files = Vec::::new(); for partition_info in &partition_list { - let partition_desc = partition_info.partition_desc.clone(); - if partition_filter.contains(&partition_desc) { - continue; - } else { - let _data_commit_info_list = self.get_data_commit_info_of_single_partition(partition_info).await?; - // let data_commit_info_list = Vec::::new(); - let _data_file_list = _data_commit_info_list - .iter() - .flat_map(|data_commit_info| { - data_commit_info - .file_ops - .iter() - .map(|file_op| file_op.path.clone()) - .collect::>() - }) - .collect::>(); - data_commit_info_list.extend_from_slice(&_data_file_list); - } + let _data_file_list = self.get_data_files_of_single_partition(partition_info).await?; + data_files.extend_from_slice(&_data_file_list); + } - Ok(data_commit_info_list) + Ok(data_files) + } + pub async fn get_data_files_of_single_partition( + &self, + partition_info: &PartitionInfo, + ) -> Result> { + let data_commit_info_list = self.get_data_commit_info_of_single_partition(partition_info).await?; + // let data_commit_info_list = Vec::::new(); + let data_file_list = data_commit_info_list + .iter() + .flat_map(|data_commit_info| { + data_commit_info + .file_ops + .iter() + .map(|file_op| file_op.path.clone()) + .collect::>() + }) + .collect::>(); + Ok(data_file_list) + + } + + async fn get_data_commit_info_of_single_partition( &self, partition_info: &PartitionInfo,