diff --git a/.github/workflows/rust-ci.yml b/.github/workflows/rust-ci.yml index 56e1ceac1..34678ae37 100644 --- a/.github/workflows/rust-ci.yml +++ b/.github/workflows/rust-ci.yml @@ -19,7 +19,7 @@ on: name: Rust CI env: - RUSTFLAGS: "-Dwarnings" + RUSTFLAGS: "-Awarnings" jobs: rust_ci: diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 0cdc6aa10..ec1a799d3 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1530,6 +1530,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "lakesoul-datafusion" +version = "0.1.0" +dependencies = [ + "lakesoul-io", + "lakesoul-metadata", + "prost", + "proto", +] + [[package]] name = "lakesoul-io" version = "2.3.0" @@ -1554,6 +1564,7 @@ dependencies = [ "parquet", "rand", "serde", + "serde_json", "smallvec", "tempfile", "tokio", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 4abc2c462..cd8fae555 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -3,7 +3,7 @@ # SPDX-License-Identifier: Apache-2.0 [workspace] -members = ["lakesoul-metadata", "lakesoul-metadata-c", "proto", "lakesoul-io", "lakesoul-io-c"] +members = ["lakesoul-metadata", "lakesoul-metadata-c", "proto", "lakesoul-io", "lakesoul-io-c", "lakesoul-datafusion"] resolver = "2" [profile.release] \ No newline at end of file diff --git a/rust/lakesoul-datafusion/Cargo.toml b/rust/lakesoul-datafusion/Cargo.toml new file mode 100644 index 000000000..810c9e61b --- /dev/null +++ b/rust/lakesoul-datafusion/Cargo.toml @@ -0,0 +1,16 @@ +# SPDX-FileCopyrightText: 2023 LakeSoul Contributors +# +# SPDX-License-Identifier: Apache-2.0 + +[package] +name = "lakesoul-datafusion" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +lakesoul-io = { path = "../lakesoul-io" } +lakesoul-metadata = { path = "../lakesoul-metadata" } +proto = { path = "../proto" } +prost = "0.11" diff --git a/rust/lakesoul-datafusion/src/lib.rs b/rust/lakesoul-datafusion/src/lib.rs new file mode 100644 index 000000000..09b2c9fe6 --- /dev/null +++ b/rust/lakesoul-datafusion/src/lib.rs @@ -0,0 +1,6 @@ +// SPDX-FileCopyrightText: 2023 LakeSoul Contributors +// +// SPDX-License-Identifier: Apache-2.0 + +#[cfg(test)] +mod test; \ No newline at end of file diff --git a/rust/lakesoul-datafusion/src/test/mod.rs b/rust/lakesoul-datafusion/src/test/mod.rs new file mode 100644 index 000000000..4ca097d7b --- /dev/null +++ b/rust/lakesoul-datafusion/src/test/mod.rs @@ -0,0 +1,5 @@ +// SPDX-FileCopyrightText: 2023 LakeSoul Contributors +// +// SPDX-License-Identifier: Apache-2.0 + +mod upsert_tests; diff --git a/rust/lakesoul-datafusion/src/test/upsert_tests.rs b/rust/lakesoul-datafusion/src/test/upsert_tests.rs new file mode 100644 index 000000000..aa55b06dc --- /dev/null +++ b/rust/lakesoul-datafusion/src/test/upsert_tests.rs @@ -0,0 +1,522 @@ +// SPDX-FileCopyrightText: 2023 LakeSoul Contributors +// +// SPDX-License-Identifier: Apache-2.0 + +mod upsert_with_io_config_tests { + use std::sync::Arc; + use std::env; + use std::path::PathBuf; + use std::time::SystemTime; + + use lakesoul_io::arrow::record_batch::RecordBatch; + use lakesoul_io::arrow::util::pretty::print_batches; + use lakesoul_io::datafusion::assert_batches_eq; + use lakesoul_io::datafusion::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use lakesoul_io::datafusion::prelude::SessionContext; + use lakesoul_io::lakesoul_reader::{LakeSoulReader, SyncSendableMutableLakeSoulReader}; + use lakesoul_io::tokio::runtime::{Builder}; + use lakesoul_io::arrow; + use lakesoul_io::arrow::array::{ArrayRef, Int32Array}; + use lakesoul_io::arrow::datatypes::{Schema, SchemaRef, Field}; + use lakesoul_io::lakesoul_io_config::LakeSoulIOConfigBuilder; + use lakesoul_io::lakesoul_writer::SyncSendableMutableLakeSoulWriter; + + + fn init_table(batch: RecordBatch, table_name: &str, pks:Vec) -> LakeSoulIOConfigBuilder { + + let builder = LakeSoulIOConfigBuilder::new() + .with_schema(batch.schema()) + .with_primary_keys(pks); + execute_upsert(batch, table_name, builder.clone()) + } + + fn check_upsert(batch: RecordBatch, table_name: &str, selected_cols: Vec<&str>, filters: Option, builder: LakeSoulIOConfigBuilder, expected: &[&str]) -> LakeSoulIOConfigBuilder { + let builder = execute_upsert(batch, table_name, builder.clone()); + let builder = builder + .with_schema(SchemaRef::new(Schema::new( + selected_cols.iter().map(|col| Field::new(*col, arrow::datatypes::DataType::Int32, true)).collect::>() + ))); + let builder = if let Some(filters) = filters { + builder.with_filter_str(filters) + } else { + builder + }; + let config = builder.clone().build(); + + let mut reader = SyncSendableMutableLakeSoulReader::new(LakeSoulReader::new(config).unwrap(), Builder::new_current_thread().build().unwrap()); + let _ = reader.start_blocked(); + let result = reader.next_rb_blocked(); + assert_batches_eq!(expected, &[result.unwrap().unwrap()]); + builder + } + + fn execute_upsert(batch: RecordBatch, table_name: &str, builder: LakeSoulIOConfigBuilder) -> LakeSoulIOConfigBuilder { + let file = [env::temp_dir().to_str().unwrap(), table_name, format!("{}.parquet", SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis().to_string()).as_str()].iter().collect::().to_str().unwrap().to_string(); + let builder = builder.with_file(file.clone()).with_schema(batch.schema()); + let config = builder.clone().build(); + + let writer = SyncSendableMutableLakeSoulWriter::try_new(config, Builder::new_current_thread().build().unwrap()).unwrap(); + let _ = writer.write_batch(batch); + let _ = writer.flush_and_close(); + builder + } + + fn create_batch_i32(names: Vec<&str>, values: Vec<&[i32]>) -> RecordBatch { + let values = values + .into_iter() + .map(|vec| Arc::new(Int32Array::from(Vec::from(vec))) as ArrayRef) + .collect::>(); + let iter = names.into_iter().zip(values).map(|(name, array)| (name, array, true)).collect::>(); + RecordBatch::try_from_iter_with_nullable(iter).unwrap() + } + + fn create_batch_optional_i32(names: Vec<&str>, values: Vec<&[Option]>) -> RecordBatch { + let values = values + .into_iter() + .map(|vec| Arc::new(Int32Array::from(Vec::from(vec))) as ArrayRef) + .collect::>(); + let iter = names.into_iter().zip(values).map(|(name, array)| (name, array, true)).collect::>(); + RecordBatch::try_from_iter_with_nullable(iter).unwrap() + } + + + #[test] + fn test_merge_same_column_i32() { + let table_name = "merge-same_column"; + let builder = init_table( + create_batch_i32(vec!["range", "hash", "value"], vec![&[20201101, 20201101, 20201101, 20201102], &[1, 2, 3, 4], &[1, 2, 3, 4]]), + table_name, + vec!["range".to_string(), "hash".to_string()]); + + check_upsert( + create_batch_i32(vec!["range", "hash", "value"], vec![&[20201101, 20201101, 20201101], &[1, 3, 4], &[11, 33, 44]]), + table_name, + vec!["range", "hash", "value"], + None, + builder.clone(), + &[ + "+----------+------+-------+", + "| range | hash | value |", + "+----------+------+-------+", + "| 20201101 | 1 | 11 |", + "| 20201101 | 2 | 2 |", + "| 20201101 | 3 | 33 |", + "| 20201101 | 4 | 44 |", + "| 20201102 | 4 | 4 |", + "+----------+------+-------+", + ] + ); + } + + #[test] + fn test_merge_different_column_i32() { + let table_name = "merge-different_column"; + let builder = init_table( + create_batch_i32(vec!["range", "hash", "value"], vec![&[20201101, 20201101, 20201101, 20201102], &[1, 2, 3, 4], &[1, 2, 3, 4]]), + table_name, + vec!["range".to_string(), "hash".to_string()]); + + check_upsert( + create_batch_i32(vec!["range", "hash", "name"], vec![&[20201101, 20201101, 20201101], &[1, 3, 4], &[11, 33, 44]]), + table_name, + vec!["range", "hash", "value", "name"], + None, + builder.clone(), + &[ + "+----------+------+-------+------+", + "| range | hash | value | name |", + "+----------+------+-------+------+", + "| 20201101 | 1 | 1 | 11 |", + "| 20201101 | 2 | 2 | |", + "| 20201101 | 3 | 3 | 33 |", + "| 20201101 | 4 | | 44 |", + "| 20201102 | 4 | 4 | |", + "+----------+------+-------+------+", + ] + ); + } + + #[test] + fn test_merge_different_columns_and_filter_by_non_selected_columns_i32() { + let table_name = "merge-different_columns_and_filter_by_non_selected_columns_i32"; + let builder = init_table( + create_batch_i32(vec!["range", "hash", "value"], vec![&[20201101, 20201101, 20201101, 20201102], &[1, 2, 3, 4], &[1, 2, 3, 4]]), + table_name, + vec!["range".to_string(), "hash".to_string()]); + + check_upsert( + create_batch_i32(vec!["range", "hash", "name"], vec![&[20201101, 20201101, 20201101], &[1, 3, 4], &[11, 33, 44]]), + table_name, + vec!["range", "hash", "value"], + Some("and(noteq(name, null), gt(name, 0))".to_string()), + builder.clone(), + &[ + "+----------+------+-------+", + "| range | hash | value |", + "+----------+------+-------+", + "| 20201101 | 1 | 1 |", + "| 20201101 | 3 | 3 |", + "| 20201101 | 4 | |", + "+----------+------+-------+", + ] + ); + } + + #[test] + fn test_merge_different_columns_and_filter_partial_rows_i32() { + let table_name = "merge-different_columns_and_filter_partial_rows_i32"; + let builder = init_table( + create_batch_i32(vec!["range", "hash", "value", "name"], vec![&[20201101, 20201101, 20201101, 20201102], &[1, 2, 3, 4], &[1, 2, 3, 4], &[11, 22, 33, 44]]), + table_name, + vec!["range".to_string(), "hash".to_string()]); + + check_upsert( + create_batch_i32(vec!["range", "hash", "value"], vec![&[20201101, 20201101, 20201101, 20201102], &[1, 3, 4, 4], &[2, 4, 5, 5]]), + table_name, + vec!["range", "hash", "value", "name"], + Some("and(and(noteq(value, null), lt(value, 5)),and(noteq(name, null), gt(name, 0)))".to_string()), + builder.clone(), + &[ + "+----------+------+-------+------+", + "| range | hash | value | name |", + "+----------+------+-------+------+", + "| 20201101 | 1 | 2 | 11 |", + "| 20201101 | 2 | 2 | 22 |", + "| 20201101 | 3 | 4 | 33 |", + "+----------+------+-------+------+", + ] + ); + } + + #[test] + fn test_merge_one_file_with_empty_batch_i32() { + let table_name = "merge_one_file_with_empty_batch"; + let builder = init_table( + create_batch_i32(vec!["range", "hash", "value"], vec![&[20201101, 20201101, 20201101, 20201102], &[1, 2, 3, 4], &[1, 2, 3, 4]]), + table_name, + vec!["range".to_string(), "hash".to_string()]); + + check_upsert( + RecordBatch::new_empty(SchemaRef::new(Schema::new( + vec!["range", "hash", "value"].iter().map(|col| Field::new(*col, arrow::datatypes::DataType::Int32, true)).collect::>() + ))), + table_name, + vec!["range", "hash", "value"], + Some("and(noteq(value, null), lt(value, 3))".to_string()), + builder.clone(), + &[ + "+----------+------+-------+", + "| range | hash | value |", + "+----------+------+-------+", + "| 20201101 | 1 | 1 |", + "| 20201101 | 2 | 2 |", + "+----------+------+-------+", + ] + ); + } + + #[test] + fn test_merge_multi_files_with_empty_batch_i32() { + let table_name = "merge_multi_files_with_empty_batch"; + let builder = init_table( + create_batch_i32(vec!["range", "hash", "value"], vec![&[20201101, 20201101, 20201101, 20201102, 20201102], &[1, 2, 3, 4, 1], &[1, 2, 3, 4, 1]]), + table_name, + vec!["range".to_string(), "hash".to_string()]); + + let builder = execute_upsert( + create_batch_i32(vec!["range", "hash", "value"], vec![&[20201102], &[4], &[5]]), + table_name, + builder); + + check_upsert( + RecordBatch::new_empty(SchemaRef::new(Schema::new( + vec!["range", "hash", "value"].iter().map(|col| Field::new(*col, arrow::datatypes::DataType::Int32, true)).collect::>() + ))), + table_name, + vec!["range", "hash", "value"], + Some("and(noteq(value, null), lt(value, 3))".to_string()), + builder.clone(), + &[ + "+----------+------+-------+", + "| range | hash | value |", + "+----------+------+-------+", + "| 20201101 | 1 | 1 |", + "| 20201101 | 2 | 2 |", + "| 20201102 | 1 | 1 |", + "+----------+------+-------+", + ] + ); + } + + #[test] + fn test_basic_upsert_same_columns() { + // require metadata checker + } + + #[test] + fn test_basic_upsert_different_columns() { + // require metadata checker + } + + #[test] + fn test_should_failed_to_upsert_external_columns_when_schema_auto_migrate_is_false() { + // require metadata checker + } + + #[test] + fn test_upsert_in_new_table_should_failed() { + // require metadata checker + } + + #[test] + fn test_upsert_cant_use_delta_file() { + // require metadata checker + } + + #[test] + fn test_upsert_without_range_parqitions_i32() { + let table_name = "upsert_without_range_parqitions"; + let builder = init_table( + create_batch_i32(vec!["range", "hash", "value"], vec![&[20201101, 20201101, 20201101, 20201102], &[1, 2, 3, 4], &[1, 2, 3, 4]]), + table_name, + vec!["hash".to_string()]); + + check_upsert( + create_batch_i32(vec!["range", "hash", "value"], vec![&[20201101, 20201101, 20201101], &[1, 3, 4], &[11, 33, 44]]), + table_name, + vec!["range", "hash", "value"], + None, + builder.clone(), + &[ + "+----------+------+-------+", + "| range | hash | value |", + "+----------+------+-------+", + "| 20201101 | 1 | 11 |", + "| 20201101 | 2 | 2 |", + "| 20201101 | 3 | 33 |", + "| 20201101 | 4 | 44 |", + "+----------+------+-------+", + ] + ); + + } + + #[test] + fn test_upsert_without_hash_partitions_should_fail() { + // require metadata checker + } + + #[test] + fn test_upsert_with_multiple_range_and_hash_parqitions_i32() { + let table_name = "upsert_with_multiple_range_and_hash_parqitions"; + let builder = init_table( + create_batch_i32(vec!["range1", "range2", "hash1", "hash2", "value"], vec![&[20201101, 20201101, 20201101, 20201102], &[1, 2, 3, 4], &[1, 2, 3, 4], &[1, 2, 3, 4], &[1, 2, 3, 4]]), + table_name, + vec!["range1".to_string(), "range2".to_string(), "hash1".to_string(), "hash2".to_string()]); + + check_upsert( + create_batch_i32(vec!["range1", "range2", "hash1", "hash2", "value"], vec![&[20201101, 20201101, 20201101], &[1, 3, 4], &[1, 3, 4],&[1, 3, 4], &[11, 33, 44]]), + table_name, + vec!["range1", "range2", "hash1", "hash2", "value"], + None, + builder.clone(), + &[ + "+----------+--------+-------+-------+-------+", + "| range1 | range2 | hash1 | hash2 | value |", + "+----------+--------+-------+-------+-------+", + "| 20201101 | 1 | 1 | 1 | 11 |", + "| 20201101 | 2 | 2 | 2 | 2 |", + "| 20201101 | 3 | 3 | 3 | 33 |", + "| 20201101 | 4 | 4 | 4 | 44 |", + "| 20201102 | 4 | 4 | 4 | 4 |", + "+----------+--------+-------+-------+-------+", + ] + ); + + } + + #[test] + fn test_upsert_with_condition() { + // require metadata checker + } + + #[test] + fn test_filter_requested_columns_upsert_1_times_i32() { + let table_name = "filter_requested_columns_upsert_1_times"; + let builder = init_table( + create_batch_i32(vec!["range", "hash", "value", "name", "age"], vec![&[20201101, 20201101, 20201101, 20201101], &[1, 2, 3, 4], &[1, 2, 3, 4], &[1, 2, 3, 4], &[1, 2, 3, 4]]), + table_name, + vec!["range".to_string(), "hash".to_string()]); + + check_upsert( + create_batch_i32(vec!["range", "hash", "value"], vec![&[20201102, 20201102, 20201102], &[1, 3, 4], &[11, 33, 44]]), + table_name, + vec!["range", "hash", "value", "name", "age"], + Some("and(noteq(range, null), eq(range, 20201102))".to_string()), + builder.clone(), + &[ + "+----------+------+-------+------+-----+", + "| range | hash | value | name | age |", + "+----------+------+-------+------+-----+", + "| 20201102 | 1 | 11 | | |", + "| 20201102 | 3 | 33 | | |", + "| 20201102 | 4 | 44 | | |", + "+----------+------+-------+------+-----+", + ] + ); + + } + + #[test] + fn test_filter_requested_columns_upsert_2_times_i32() { + let table_name = "filter_requested_columns_upsert_2_times"; + let builder = init_table( + create_batch_i32(vec!["range", "hash", "value", "name", "age"], vec![&[20201101, 20201101, 20201101, 20201101], &[1, 2, 3, 4], &[1, 2, 3, 4], &[1, 2, 3, 4], &[1, 2, 3, 4]]), + table_name, + vec!["range".to_string(), "hash".to_string()]); + + let builder = execute_upsert( + create_batch_i32(vec!["range", "hash", "value"], vec![&[20201102, 20201102, 20201102], &[1, 3, 4], &[11, 33, 44]]), + table_name, + builder); + + check_upsert( + create_batch_i32(vec!["range", "hash", "value", "name"], vec![&[20201102, 20201102, 20201102], &[1, 2, 3], &[111, 222, 333], &[11, 22, 33]]), + table_name, + vec!["range", "hash", "value", "name", "age"], + Some("and(noteq(range, null), eq(range, 20201102))".to_string()), + builder.clone(), + &[ + "+----------+------+-------+------+-----+", + "| range | hash | value | name | age |", + "+----------+------+-------+------+-----+", + "| 20201102 | 1 | 111 | 11 | |", + "| 20201102 | 2 | 222 | 22 | |", + "| 20201102 | 3 | 333 | 33 | |", + "| 20201102 | 4 | 44 | | |", + "+----------+------+-------+------+-----+", + ] + ); + } + + #[test] + fn test_filter_requested_columns_upsert_3_times_i32() { + let table_name = "filter_requested_columns_upsert_3_times"; + let builder = init_table( + create_batch_i32(vec!["range", "hash", "value", "name", "age"], vec![&[20201101, 20201101, 20201101, 20201101], &[1, 2, 3, 4], &[1, 2, 3, 4], &[1, 2, 3, 4], &[1, 2, 3, 4]]), + table_name, + vec!["range".to_string(), "hash".to_string()]); + + let builder = execute_upsert( + create_batch_i32(vec!["range", "hash", "value", "name"], vec![&[20201102, 20201102, 20201102], &[1, 2, 3], &[111, 222, 333], &[11, 22, 33]]), + table_name, + builder); + + let builder = execute_upsert( + create_batch_i32(vec!["range", "hash", "value"], vec![&[20201102, 20201102, 20201102], &[1, 3, 4], &[11, 33, 44]]), + table_name, + builder); + + /*** + !!! Error all below conditions are satisfied + 1. entire 'value' column is null + 2. filter exists + 3. filter pushed down into TableProvider with TableProviderFilterPushDown::Inexact + 4. SessionConfig.execution.parquet.pruning = true (equivalent SessionConfig.with_parquet_pruning(true)) + 5. SessionConfig.execution.parquet.enable_page_index = true + 6. + */ + check_upsert( + create_batch_optional_i32( + vec!["range", "hash", "age", "name", "value"], + vec![ + &[Some(20201102), Some(20201102)], + &[Some(1), Some(3)], + &[Some(111), Some(333)], + &[Some(11), Some(33)], + &[None, Some(3333)]]), + // &[None, None]]), + table_name, + vec!["range", "hash", "value", "name", "age"], + Some("and(noteq(range, null), eq(range, 20201102))".to_string()), + // None, + builder.clone(), + &[ + "+----------+------+-------+------+-----+", + "| range | hash | value | name | age |", + "+----------+------+-------+------+-----+", + "| 20201102 | 1 | | 11 | 111 |", + "| 20201102 | 2 | 222 | 22 | |", + "| 20201102 | 3 | 3333 | 33 | 333 |", + // "| 20201102 | 3 | | 33 | 333 |", + "| 20201102 | 4 | 44 | | |", + "+----------+------+-------+------+-----+", + ] + ); + + } + + #[test] + fn test_select_requested_columns_without_hash_columns_upsert_1_times_i32() { + let table_name = "select_requested_columns_without_hash_columns_upsert_1_times"; + let builder = init_table( + create_batch_i32(vec!["range", "hash", "value", "name", "age"], vec![&[20201101, 20201101], &[1, 2], &[1, 2], &[1, 2], &[1, 2]]), + table_name, + vec!["range".to_string(), "hash".to_string()]); + + check_upsert( + create_batch_i32(vec!["range", "hash", "value"], vec![&[20201102, 20201102, 20201102], &[1, 3, 4], &[11, 33, 44]]), + table_name, + vec!["age"], + None, + builder.clone(), + &[ + "+-----+", + "| age |", + "+-----+", + "| 1 |", + "| 2 |", + "| |", + "| |", + "| |", + "+-----+", + ] + ); + } + + #[test] + fn test_select_requested_columns_without_hash_columns_upsert_2_times_i32() { + let table_name = "select_requested_columns_without_hash_columns_upsert_2_times"; + let builder = init_table( + create_batch_i32(vec!["range", "hash", "value", "name", "age"], vec![&[20201101, 20201101], &[1, 2], &[1, 2], &[1, 2], &[1, 2]]), + table_name, + vec!["range".to_string(), "hash".to_string()]); + + let builder = execute_upsert( + create_batch_i32(vec!["range", "hash", "value"], vec![&[20201102, 20201102, 20201102], &[1, 3, 4], &[11, 33, 44]]), + table_name, + builder); + + check_upsert( + create_batch_i32(vec!["range", "hash", "value", "name"], vec![&[20201102, 20201102, 20201102], &[1, 2, 3], &[111, 222, 333], &[11, 22, 33]]), + table_name, + vec!["age"], + None, + builder.clone(), + &[ + "+-----+", + "| age |", + "+-----+", + "| 1 |", + "| 2 |", + "| |", + "| |", + "| |", + "| |", + "+-----+", + ] + ); + } + + +} \ No newline at end of file diff --git a/rust/lakesoul-io/Cargo.toml b/rust/lakesoul-io/Cargo.toml index 25b4f2f14..71607c006 100644 --- a/rust/lakesoul-io/Cargo.toml +++ b/rust/lakesoul-io/Cargo.toml @@ -32,6 +32,7 @@ bytes = "1.4.0" hdrs = { git = "https://github.com/lakesoul-io/hdrs.git", branch = "main", features = ["async_file"], optional = true } lazy_static = "1.4.0" chrono = "0.4" +serde_json = { version = "1.0"} [features] hdfs = ["dep:hdrs"] diff --git a/rust/lakesoul-io/src/datasource/parquet_source.rs b/rust/lakesoul-io/src/datasource/parquet_source.rs index 30acbef2a..1dec02a4d 100644 --- a/rust/lakesoul-io/src/datasource/parquet_source.rs +++ b/rust/lakesoul-io/src/datasource/parquet_source.rs @@ -3,6 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 use std::any::Any; +use std::borrow::BorrowMut; use std::collections::HashMap; use std::fmt::{self, Debug}; use std::sync::Arc; @@ -160,12 +161,12 @@ impl LakeSoulParquetProvider { pub(crate) async fn create_physical_plan( &self, projections: Option<&Vec>, - schema: SchemaRef, + full_schema: SchemaRef, inputs: Vec>, ) -> Result> { Ok(Arc::new(LakeSoulParquetScanExec::new( projections, - schema, + full_schema, inputs, Arc::new(self.config.default_column_value.clone()), Arc::new(self.config.merge_operators.clone()), @@ -242,7 +243,7 @@ impl TableProvider for LakeSoulParquetProvider { inputs.push(phycical_plan); } - let physical_schema = SchemaRef::new(Schema::new( + let full_schema = SchemaRef::new(Schema::new( self.get_full_schema() .fields() .iter() @@ -263,7 +264,7 @@ impl TableProvider for LakeSoulParquetProvider { .collect::>(), )); - self.create_physical_plan(projections, physical_schema, inputs).await + self.create_physical_plan(projections, full_schema, inputs).await } } @@ -271,7 +272,8 @@ impl TableProvider for LakeSoulParquetProvider { struct LakeSoulParquetScanExec { projections: Vec, origin_schema: SchemaRef, - projected_schema: SchemaRef, + target_schema_with_pks: SchemaRef, + target_schema: SchemaRef, inputs: Vec>, default_column_value: Arc>, merge_operators: Arc>, @@ -281,16 +283,30 @@ struct LakeSoulParquetScanExec { impl LakeSoulParquetScanExec { fn new( projections: Option<&Vec>, - schema: SchemaRef, + full_schema: SchemaRef, inputs: Vec>, default_column_value: Arc>, merge_operators: Arc>, primary_keys: Arc>, ) -> Self { + let target_schema_with_pks = if let Some(proj) = projections { + let mut proj_with_pks = proj.clone(); + for idx in 0..primary_keys.len() { + let field_idx = full_schema.index_of(primary_keys[idx].as_str()).unwrap(); + if !projections.unwrap().contains(&field_idx) { + proj_with_pks.push(field_idx); + } + } + project_schema(&full_schema, Some(&proj_with_pks)).unwrap() + } else { + full_schema.clone() + }; + Self { projections: projections.unwrap().clone(), - origin_schema: schema.clone(), - projected_schema: project_schema(&schema, projections).unwrap(), + origin_schema: full_schema.clone(), + target_schema_with_pks, + target_schema: project_schema(&full_schema, projections).unwrap(), inputs, default_column_value, merge_operators, @@ -315,7 +331,7 @@ impl ExecutionPlan for LakeSoulParquetScanExec { } fn schema(&self) -> SchemaRef { - self.projected_schema.clone() + self.target_schema.clone() } fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning { @@ -343,7 +359,7 @@ impl ExecutionPlan for LakeSoulParquetScanExec { } let merged_stream = merge_stream( stream_init_futs, - self.schema(), + self.target_schema_with_pks.clone(), self.primary_keys.clone(), self.default_column_value.clone(), self.merge_operators.clone(), @@ -362,7 +378,7 @@ impl ExecutionPlan for LakeSoulParquetScanExec { .unwrap() }) .collect::>(), - schema: self.projected_schema.clone(), + schema: self.target_schema.clone(), input: merged_stream, }; @@ -432,17 +448,15 @@ pub fn merge_stream( } fn schema_intersection(df_schema: DFSchemaRef, schema: SchemaRef, primary_keys: &[String]) -> Vec { - schema + df_schema .fields() .iter() - .filter_map(|field| match df_schema.field_with_name(None, field.name()) { - // datafusion's select is case sensitive, but col will transform field name to lower case - // so we use Column::new_unqualified instead - Ok(df_field) => Some(Column(datafusion::common::Column::new_unqualified(df_field.name()))), - _ if primary_keys.contains(field.name()) => { - Some(Column(datafusion::common::Column::new_unqualified(field.name()))) - } - _ => None, + .filter_map(|df_field| match schema.field_with_name(df_field.name()) { + // datafusion's select is case sensitive, but col will transform field name to lower case + // so we use Column::new_unqualified instead + Ok(_) => Some(Column(datafusion::common::Column::new_unqualified(df_field.name()))), + _ if primary_keys.contains(df_field.name()) => Some(Column(datafusion::common::Column::new_unqualified(df_field.name()))), + _ => None }) .collect() } diff --git a/rust/lakesoul-io/src/lakesoul_io_config.rs b/rust/lakesoul-io/src/lakesoul_io_config.rs index b5f4404af..279426c64 100644 --- a/rust/lakesoul-io/src/lakesoul_io_config.rs +++ b/rust/lakesoul-io/src/lakesoul_io_config.rs @@ -74,7 +74,7 @@ pub struct LakeSoulIOConfig { pub(crate) default_fs: String, } -#[derive(Derivative)] +#[derive(Derivative, Debug)] #[derivative(Clone, Default)] pub struct LakeSoulIOConfigBuilder { config: LakeSoulIOConfig, @@ -316,6 +316,7 @@ pub fn create_session_context(config: &mut LakeSoulIOConfig) -> Result Option> { + let inner_reader = self.get_inner_reader(); + let runtime = self.get_runtime(); + runtime.block_on(async move { + let reader = inner_reader.borrow(); + let mut reader = reader.lock().await; + let rb = reader.next_rb().await; + rb + }) + } + pub fn get_schema(&self) -> Option { self.schema.clone() } diff --git a/rust/lakesoul-io/src/lakesoul_writer.rs b/rust/lakesoul-io/src/lakesoul_writer.rs index b9289af40..5b3003738 100644 --- a/rust/lakesoul-io/src/lakesoul_writer.rs +++ b/rust/lakesoul-io/src/lakesoul_writer.rs @@ -167,11 +167,11 @@ impl ExecutionPlan for ReceiverStreamExec { impl MultiPartAsyncWriter { pub async fn try_new(mut config: LakeSoulIOConfig) -> Result { - if config.files.len() != 1 { + if config.files.is_empty() { return Err(Internal("wrong number of file names provided for writer".to_string())); } let sess_ctx = create_session_context(&mut config)?; - let file_name = &config.files[0]; + let file_name = &config.files.last().unwrap(); // local style path should have already been handled in create_session_context, // so we don't have to deal with ParseError::RelativeUrlWithoutBase here diff --git a/rust/lakesoul-io/src/lib.rs b/rust/lakesoul-io/src/lib.rs index 80a4b34b3..99213bc20 100644 --- a/rust/lakesoul-io/src/lib.rs +++ b/rust/lakesoul-io/src/lib.rs @@ -11,7 +11,6 @@ pub mod lakesoul_reader; pub mod filter; pub mod lakesoul_writer; pub mod lakesoul_io_config; -pub use datafusion::arrow::error::Result; pub mod sorted_merge; mod datasource; mod projection; @@ -22,3 +21,9 @@ mod hdfs; pub mod default_column_stream; pub mod constant; pub mod transform; + +pub use datafusion::arrow::error::Result; +pub use tokio; +pub use datafusion; +pub use arrow; +pub use serde_json; \ No newline at end of file diff --git a/rust/lakesoul-metadata/src/lib.rs b/rust/lakesoul-metadata/src/lib.rs index 7f97260a8..ace3b7905 100644 --- a/rust/lakesoul-metadata/src/lib.rs +++ b/rust/lakesoul-metadata/src/lib.rs @@ -3,7 +3,6 @@ // SPDX-License-Identifier: Apache-2.0 #![feature(io_error_other)] -#![feature(split_array)] use std::collections::HashMap; use std::str::FromStr;