Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NativeIO/Fix]add error info of native writer && fix case of aux_sort_cols #547

Merged
merged 3 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion rust/lakesoul-io/src/async_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use datafusion::{
use datafusion_common::{DataFusionError, Result};
use parquet::format::FileMetaData;


// The result of a flush operation with format (partition_desc, file_path, file_meta)
pub type WriterFlushResult = Result<Vec<(String, String, FileMetaData)>>;

Expand Down
9 changes: 5 additions & 4 deletions rust/lakesoul-io/src/async_writer/multipart_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@ use tokio::io::{AsyncWrite, AsyncWriteExt};
use url::Url;

use crate::{
constant::TBD_PARTITION_DESC, helpers::get_batch_memory_size, lakesoul_io_config::{create_session_context, LakeSoulIOConfig}, transform::{uniform_record_batch, uniform_schema}
constant::TBD_PARTITION_DESC,
helpers::get_batch_memory_size,
lakesoul_io_config::{create_session_context, LakeSoulIOConfig},
transform::{uniform_record_batch, uniform_schema},
};

use super::{AsyncBatchWriter, WriterFlushResult, InMemBuf};
use super::{AsyncBatchWriter, InMemBuf, WriterFlushResult};

/// An async writer using object_store's multi-part upload feature for cloud storage.
/// This writer uses a `VecDeque<u8>` as `std::io::Write` for arrow-rs's ArrowWriter.
Expand Down Expand Up @@ -169,7 +172,6 @@ impl MultiPartAsyncWriter {

#[async_trait::async_trait]
impl AsyncBatchWriter for MultiPartAsyncWriter {

async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<()> {
let batch = uniform_record_batch(batch)?;
self.num_rows += batch.num_rows() as u64;
Expand Down Expand Up @@ -213,5 +215,4 @@ impl AsyncBatchWriter for MultiPartAsyncWriter {
fn buffered_size(&self) -> u64 {
self.buffered_size
}

}
37 changes: 13 additions & 24 deletions rust/lakesoul-io/src/async_writer/partitioning_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,26 @@ use datafusion::{
PhysicalSortExpr,
},
physical_plan::{
projection::ProjectionExec,
sorts::sort::SortExec,
stream::RecordBatchReceiverStream,
ExecutionPlan, Partitioning, PhysicalExpr,
projection::ProjectionExec, sorts::sort::SortExec, stream::RecordBatchReceiverStream, ExecutionPlan,
Partitioning, PhysicalExpr,
},
};
use datafusion_common::{DataFusionError, Result};

use rand::distributions::DistString;
use tokio::{
sync::mpsc::Sender,
task::JoinHandle,
};
use tokio::{sync::mpsc::Sender, task::JoinHandle};
use tokio_stream::StreamExt;
use tracing::debug;

use crate::{
helpers::{columnar_values_to_partition_desc, columnar_values_to_sub_path, get_batch_memory_size, get_columnar_values},
helpers::{
columnar_values_to_partition_desc, columnar_values_to_sub_path, get_batch_memory_size, get_columnar_values,
},
lakesoul_io_config::{create_session_context, LakeSoulIOConfig, LakeSoulIOConfigBuilder},
repartition::RepartitionByRangeAndHashExec,
};

use super::{AsyncBatchWriter, WriterFlushResult, MultiPartAsyncWriter, ReceiverStreamExec};
use super::{AsyncBatchWriter, MultiPartAsyncWriter, ReceiverStreamExec, WriterFlushResult};

// type PartitionedWriterInfo = Arc<Mutex<HashMap<String, Vec<WriterFlushResult>>>>;

Expand Down Expand Up @@ -75,7 +72,7 @@ impl PartitioningAsyncWriter {
task_context.clone(),
config.clone().into(),
Arc::new(config.range_partitions.clone()),
write_id.clone()
write_id.clone(),
));
// // In a separate task, wait for each input to be done
// // (and pass along any errors, including panic!s)
Expand Down Expand Up @@ -198,7 +195,6 @@ impl PartitioningAsyncWriter {

let mut err = None;


let mut partitioned_writer = HashMap::<String, Box<MultiPartAsyncWriter>>::new();
let mut flush_join_handle_list = Vec::new();
// let mut partitioned_flush_result_locked = partitioned_flush_result.lock().await;
Expand Down Expand Up @@ -230,7 +226,6 @@ impl PartitioningAsyncWriter {
// row_count += batch_excluding_range.num_rows();
async_writer.write_record_batch(batch_excluding_range).await?;
}

}
// received abort signal
Err(e) => {
Expand All @@ -256,19 +251,13 @@ impl PartitioningAsyncWriter {
}
Ok(flush_join_handle_list)
} else {

for (partition_desc, writer) in partitioned_writer.into_iter() {

let flush_result = tokio::spawn(async move {
let writer_flush_results =writer.flush_and_close().await?;
Ok(
writer_flush_results.into_iter().map(
|(_, path, file_metadata)|
{
(partition_desc.clone(), path, file_metadata)
}
).collect::<Vec<_>>()
)
let writer_flush_results = writer.flush_and_close().await?;
Ok(writer_flush_results
.into_iter()
.map(|(_, path, file_metadata)| (partition_desc.clone(), path, file_metadata))
.collect::<Vec<_>>())
});
flush_join_handle_list.push(flush_result);
}
Expand Down
8 changes: 3 additions & 5 deletions rust/lakesoul-io/src/async_writer/sort_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@ use datafusion::{
PhysicalSortExpr,
},
physical_plan::{
projection::ProjectionExec,
sorts::sort::SortExec,
stream::RecordBatchReceiverStream,
ExecutionPlan, PhysicalExpr,
projection::ProjectionExec, sorts::sort::SortExec, stream::RecordBatchReceiverStream, ExecutionPlan,
PhysicalExpr,
},
};
use datafusion_common::{DataFusionError, Result};
Expand All @@ -24,7 +22,7 @@ use tokio_stream::StreamExt;

use crate::{helpers::get_batch_memory_size, lakesoul_io_config::LakeSoulIOConfig};

use super::{AsyncBatchWriter, WriterFlushResult, MultiPartAsyncWriter, ReceiverStreamExec};
use super::{AsyncBatchWriter, MultiPartAsyncWriter, ReceiverStreamExec, WriterFlushResult};

/// Wrap the above async writer with a SortExec to
/// sort the batches before write to async writer
Expand Down
7 changes: 4 additions & 3 deletions rust/lakesoul-io/src/filter/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,6 @@ impl Parser {
}

pub(crate) fn parse_proto(plan: &Plan, df_schema: &DFSchema) -> Result<Expr> {

let function_extension = plan
.extensions
.iter()
Expand Down Expand Up @@ -733,7 +732,10 @@ fn _from_nullability(nullability: Nullability) -> bool {
mod tests {
use std::result::Result;

use datafusion::{logical_expr::{LogicalPlan, TableScan}, prelude::{ParquetReadOptions, SessionContext}};
use datafusion::{
logical_expr::{LogicalPlan, TableScan},
prelude::{ParquetReadOptions, SessionContext},
};
use prost::Message;

use super::*;
Expand All @@ -750,7 +752,6 @@ mod tests {

#[tokio::test]
async fn tt() {

let ctx = SessionContext::new();
let options = ParquetReadOptions::default();
let table_path = "/var/folders/_b/qyl87wbn1119cvw8kts6fqtw0000gn/T/lakeSource/type/part-00000-97db3149-f99e-404a-aa9a-2af4ab3f7a44_00000.c000.parquet";
Expand Down
37 changes: 20 additions & 17 deletions rust/lakesoul-io/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ use url::Url;

use crate::{
constant::{
DATE32_FORMAT, FLINK_TIMESTAMP_FORMAT, LAKESOUL_EMPTY_STRING, LAKESOUL_NULL_STRING,
TIMESTAMP_MICROSECOND_FORMAT, TIMESTAMP_MILLSECOND_FORMAT, TIMESTAMP_NANOSECOND_FORMAT,
TIMESTAMP_SECOND_FORMAT, LAKESOUL_COMMA, LAKESOUL_EQ
DATE32_FORMAT, FLINK_TIMESTAMP_FORMAT, LAKESOUL_COMMA, LAKESOUL_EMPTY_STRING, LAKESOUL_EQ,
LAKESOUL_NULL_STRING, TIMESTAMP_MICROSECOND_FORMAT, TIMESTAMP_MILLSECOND_FORMAT, TIMESTAMP_NANOSECOND_FORMAT,
TIMESTAMP_SECOND_FORMAT,
},
filter::parser::Parser,
lakesoul_io_config::LakeSoulIOConfig,
Expand Down Expand Up @@ -169,12 +169,10 @@ pub fn format_scalar_value(v: &ScalarValue) -> String {
}
ScalarValue::Decimal128(Some(s), _, _) => format!("{}", s),
ScalarValue::Decimal256(Some(s), _, _) => format!("{}", s),
ScalarValue::Binary(e)
| ScalarValue::FixedSizeBinary(_, e)
| ScalarValue::LargeBinary(e) => match e {
Some(bytes) => hex::encode(bytes),
None => LAKESOUL_NULL_STRING.to_string(),
}
ScalarValue::Binary(e) | ScalarValue::FixedSizeBinary(_, e) | ScalarValue::LargeBinary(e) => match e {
Some(bytes) => hex::encode(bytes),
None => LAKESOUL_NULL_STRING.to_string(),
},
other => other.to_string(),
}
}
Expand All @@ -192,7 +190,7 @@ pub fn into_scalar_value(val: &str, data_type: &DataType) -> Result<ScalarValue>
},
DataType::Decimal128(p, s) => Ok(ScalarValue::Decimal128(None, *p, *s)),
DataType::Decimal256(p, s) => Ok(ScalarValue::Decimal256(None, *p, *s)),
DataType::Binary=> Ok(ScalarValue::Binary(None)),
DataType::Binary => Ok(ScalarValue::Binary(None)),
DataType::FixedSizeBinary(size) => Ok(ScalarValue::FixedSizeBinary(*size, None)),
DataType::LargeBinary => Ok(ScalarValue::LargeBinary(None)),
_ => Ok(ScalarValue::Null),
Expand All @@ -204,7 +202,9 @@ pub fn into_scalar_value(val: &str, data_type: &DataType) -> Result<ScalarValue>
if val.eq(LAKESOUL_EMPTY_STRING) {
Ok(ScalarValue::Utf8(Some("".to_string())))
} else {
Ok(ScalarValue::Utf8(Some(val.replace(LAKESOUL_EQ, "=").replace(LAKESOUL_COMMA, ","))))
Ok(ScalarValue::Utf8(Some(
val.replace(LAKESOUL_EQ, "=").replace(LAKESOUL_COMMA, ","),
)))
}
}
DataType::Timestamp(unit, timezone) => match unit {
Expand Down Expand Up @@ -264,7 +264,7 @@ pub fn into_scalar_value(val: &str, data_type: &DataType) -> Result<ScalarValue>
},
DataType::Decimal128(p, s) => Ok(ScalarValue::Decimal128(Some(val.parse::<i128>().unwrap()), *p, *s)),
DataType::Decimal256(p, s) => Ok(ScalarValue::Decimal256(Some(i256::from_string(val).unwrap()), *p, *s)),
DataType::Binary=> Ok(ScalarValue::Binary(Some(hex::decode(val).unwrap()))),
DataType::Binary => Ok(ScalarValue::Binary(Some(hex::decode(val).unwrap()))),
DataType::FixedSizeBinary(size) => Ok(ScalarValue::FixedSizeBinary(*size, Some(hex::decode(val).unwrap()))),
DataType::LargeBinary => Ok(ScalarValue::LargeBinary(Some(hex::decode(val).unwrap()))),
_ => ScalarValue::try_from_string(val.to_string(), data_type),
Expand Down Expand Up @@ -526,7 +526,11 @@ pub fn timestamp_str_to_unix_time(value: &str, fmt: &str) -> Result<Duration> {
Ok(datetime.signed_duration_since(epoch_time.naive_utc()))
}

pub fn column_with_name_and_name2index<'a>(schema: &'a SchemaRef, name: &str, name_to_index: &Option<HashMap<String, usize>>) -> Option<(usize, &'a Field)> {
pub fn column_with_name_and_name2index<'a>(
schema: &'a SchemaRef,
name: &str,
name_to_index: &Option<HashMap<String, usize>>,
) -> Option<(usize, &'a Field)> {
if let Some(name_to_index) = name_to_index {
name_to_index.get(name).map(|index| (*index, schema.field(*index)))
} else {
Expand All @@ -535,12 +539,11 @@ pub fn column_with_name_and_name2index<'a>(schema: &'a SchemaRef, name: &str, na
}

pub fn get_batch_memory_size(batch: &RecordBatch) -> Result<usize> {
Ok(
batch.columns()
Ok(batch
.columns()
.iter()
.map(|array| array.to_data().get_slice_memory_size())
.collect::<std::result::Result<Vec<usize>, ArrowError>>()?
.into_iter()
.sum()
)
.sum())
}
4 changes: 1 addition & 3 deletions rust/lakesoul-io/src/lakesoul_io_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,9 @@ pub static OPTION_DEFAULT_VALUE_KEEP_ORDERS: &str = "false";

pub static OPTION_KEY_MEM_LIMIT: &str = "mem_limit";
pub static OPTION_KEY_POOL_SIZE: &str = "pool_size";
pub static OPTION_KEY_HASH_BUCKET_ID : &str = "hash_bucket_id";
pub static OPTION_KEY_HASH_BUCKET_ID: &str = "hash_bucket_id";
pub static OPTION_KEY_MAX_FILE_SIZE: &str = "max_file_size";



#[derive(Debug, Derivative)]
#[derivative(Default, Clone)]
pub struct LakeSoulIOConfig {
Expand Down
Loading
Loading