From 6031df3f0d8086c7484d1d39a12ef3bfea3c487e Mon Sep 17 00:00:00 2001 From: zenghua Date: Mon, 4 Mar 2024 16:30:49 +0800 Subject: [PATCH 1/5] fix escape path error Signed-off-by: zenghua --- rust/Cargo.lock | 1 + rust/lakesoul-datafusion/Cargo.toml | 2 + .../datasource/file_format/metadata_format.rs | 7 +- .../src/datasource/table_provider.rs | 1 + .../src/lakesoul_table/helpers.rs | 5 +- .../src/{ => test}/benchmarks/mod.rs | 0 .../src/{ => test}/benchmarks/tpch/mod.rs | 85 ++++++++++++++++--- .../src/{ => test}/benchmarks/tpch/run.rs | 0 .../src/test/integration_tests.rs | 52 ++++++------ rust/lakesoul-datafusion/src/test/mod.rs | 3 + rust/lakesoul-io/src/helpers.rs | 5 +- 11 files changed, 119 insertions(+), 42 deletions(-) rename rust/lakesoul-datafusion/src/{ => test}/benchmarks/mod.rs (100%) rename rust/lakesoul-datafusion/src/{ => test}/benchmarks/tpch/mod.rs (75%) rename rust/lakesoul-datafusion/src/{ => test}/benchmarks/tpch/run.rs (100%) diff --git a/rust/Cargo.lock b/rust/Cargo.lock index bd68e7de7..0f499ae26 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1716,6 +1716,7 @@ dependencies = [ "thiserror", "tokio", "tracing", + "url", "uuid", ] diff --git a/rust/lakesoul-datafusion/Cargo.toml b/rust/lakesoul-datafusion/Cargo.toml index aedaf6c19..883f799c8 100644 --- a/rust/lakesoul-datafusion/Cargo.toml +++ b/rust/lakesoul-datafusion/Cargo.toml @@ -33,6 +33,8 @@ bytes = { workspace = true } tracing = "0.1.40" thiserror = { workspace = true } anyhow = { workspace = true } +url = { workspace = true } + [dev-dependencies] ctor = "0.2" diff --git a/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs b/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs index 1b9b23431..e24177e4b 100644 --- a/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs +++ b/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs @@ -46,6 +46,7 @@ use rand::distributions::DistString; use tokio::sync::Mutex; use tokio::task::JoinHandle; use tracing::debug; +use url::Url; use crate::catalog::{commit_data, parse_table_info_partitions}; use crate::lakesoul_table::helpers::{columnar_values_to_partition_desc, columnar_values_to_sub_path, create_io_config_builder_from_table_info, get_columnar_values}; @@ -336,10 +337,13 @@ impl LakeSoulHashSinkExec { let batch_excluding_range = batch.project(&schema_projection_excluding_range)?; let file_absolute_path = format!("{}{}part-{}_{:0>4}.parquet", table_info.table_path, columnar_values_to_sub_path(&columnar_values), write_id, partition); + let file_absolute_path = Url::parse(file_absolute_path.as_str()).map_err(|e| DataFusionError::External(Box::new(e)))?; + + if !partitioned_writer.contains_key(&partition_desc) { let mut config = create_io_config_builder_from_table_info(table_info.clone()) .map_err(|e| DataFusionError::External(Box::new(e)))? - .with_files(vec![file_absolute_path.clone()]) + .with_files(vec![file_absolute_path.to_string()]) .with_schema(batch_excluding_range.schema()) .build(); @@ -392,7 +396,6 @@ impl LakeSoulHashSinkExec { let partitioned_file_path_and_row_count = partitioned_file_path_and_row_count.lock().await; for (partition_desc, (files, _)) in partitioned_file_path_and_row_count.iter() { - // let partition_desc = columnar_values_to_partition_desc(columnar_values); commit_data(client.clone(), &table_name, partition_desc.clone(), files) .await .map_err(|e| DataFusionError::External(Box::new(e)))?; diff --git a/rust/lakesoul-datafusion/src/datasource/table_provider.rs b/rust/lakesoul-datafusion/src/datasource/table_provider.rs index 5456f37ce..f7487f270 100644 --- a/rust/lakesoul-datafusion/src/datasource/table_provider.rs +++ b/rust/lakesoul-datafusion/src/datasource/table_provider.rs @@ -229,6 +229,7 @@ impl LakeSoulTableProvider { let mut file_groups = Vec::new(); while let Some((partition, object_metas)) = futures.next().await.transpose()? { + dbg!(&object_metas); let cols = self.table_partition_cols().iter().map(|x| x.0.as_str()); let parsed = parse_partitions_for_partition_desc(&partition.partition_desc, cols); diff --git a/rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs b/rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs index c76a78773..5e86072f8 100644 --- a/rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs +++ b/rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs @@ -10,8 +10,9 @@ use arrow_arith::boolean::and; use datafusion::{common::{DFField, DFSchema}, datasource::listing::ListingTableUrl, error::DataFusionError, execution::context::ExecutionProps, logical_expr::Expr, physical_expr::create_physical_expr, scalar::ScalarValue}; use lakesoul_metadata::MetaDataClientRef; -use object_store::{ObjectMeta, ObjectStore}; +use object_store::{path::Path, ObjectMeta, ObjectStore}; use tracing::{debug, trace}; +use url::Url; use crate::error::Result; use lakesoul_io::lakesoul_io_config::LakeSoulIOConfigBuilder; @@ -200,7 +201,7 @@ pub async fn listing_partition_info(partition_info: PartitionInfo, store: &dyn O .get_data_files_of_single_partition(&partition_info).await.map_err(|_| DataFusionError::External("listing partition info failed".into()))?; let mut files = Vec::new(); for path in paths { - let result = store.head(ListingTableUrl::parse(path.clone())?.prefix()).await?; + let result = store.head(&Path::from(Url::parse(path.as_str()).map_err(|e| DataFusionError::External(Box::new(e)))?.path())).await?; files.push(result); } Ok((partition_info, files)) diff --git a/rust/lakesoul-datafusion/src/benchmarks/mod.rs b/rust/lakesoul-datafusion/src/test/benchmarks/mod.rs similarity index 100% rename from rust/lakesoul-datafusion/src/benchmarks/mod.rs rename to rust/lakesoul-datafusion/src/test/benchmarks/mod.rs diff --git a/rust/lakesoul-datafusion/src/benchmarks/tpch/mod.rs b/rust/lakesoul-datafusion/src/test/benchmarks/tpch/mod.rs similarity index 75% rename from rust/lakesoul-datafusion/src/benchmarks/tpch/mod.rs rename to rust/lakesoul-datafusion/src/test/benchmarks/tpch/mod.rs index 20e3e4144..6c26ced7d 100644 --- a/rust/lakesoul-datafusion/src/benchmarks/tpch/mod.rs +++ b/rust/lakesoul-datafusion/src/test/benchmarks/tpch/mod.rs @@ -3,35 +3,96 @@ // SPDX-License-Identifier: Apache-2.0 mod run; -use arrow::datatypes::{DataType, Field, Schema, SchemaBuilder}; +use arrow::datatypes::{Schema, SchemaBuilder, Field, DataType}; pub const TPCH_TABLES: &[&str] = &[ - "part", "supplier", "partsupp", "customer", "orders", "lineitem", "nation", "region", + "part", "supplier", "partsupp", "customer", + "orders", + "lineitem", + "nation", "region", ]; -/// The `.tbl` file contains a trailing column pub fn get_tbl_tpch_table_primary_keys(table: &str) -> Vec { match table { - "part" => vec![String::from("p_partkey"), String::from("p_name")], + "part" => vec![ + String::from("p_partkey"), + String::from("p_name"), + ], + + "supplier" => vec![ + String::from("s_suppkey"), + String::from("s_name"), + ], + + "partsupp" => vec![ + String::from("ps_partkey"), + String::from("ps_suppkey"), + ], + + "customer" => vec![ + String::from("c_custkey"), + String::from("c_name"), + ], + + "orders" => vec![ + String::from("o_orderkey"), + String::from("o_custkey"), + ], + + "lineitem" => vec![ + String::from("l_orderkey"), + String::from("l_partkey"), + ], + + "nation" => vec![ + String::from("n_nationkey"), + String::from("n_name"), + ], + + "region" => vec![ + String::from("r_regionkey"), + String::from("r_name"), + ], + + _ => unimplemented!(), + } +} - "supplier" => vec![String::from("s_suppkey"), String::from("s_name")], +pub fn get_tbl_tpch_table_range_partitions(table: &str) -> Vec { + match table { + "part" => vec![], - "partsupp" => vec![String::from("ps_partkey"), String::from("ps_suppkey")], + "supplier" => vec![ + String::from("s_nationkey"), + ], - "customer" => vec![String::from("c_custkey"), String::from("c_name")], + "partsupp" => vec![], - "orders" => vec![String::from("o_orderkey"), String::from("o_custkey")], + "customer" => vec![ + String::from("c_nationkey"), + ], - "lineitem" => vec![String::from("l_orderkey"), String::from("l_partkey")], + "orders" => vec![ + // String::from("o_orderdate"), + String::from("o_orderpriority"), + ], - "nation" => vec![String::from("n_nationkey"), String::from("n_name")], + "lineitem" => vec![ + ], - "region" => vec![String::from("r_regionkey"), String::from("r_name")], + "nation" => vec![ + String::from("n_regionkey"), + ], + + "region" => vec![ + ], _ => unimplemented!(), } } + + /// The `.tbl` file contains a trailing column pub fn get_tbl_tpch_table_schema(table: &str) -> Schema { let mut schema = SchemaBuilder::from(get_tpch_table_schema(table).fields); @@ -134,3 +195,5 @@ pub fn get_tpch_table_schema(table: &str) -> Schema { _ => unimplemented!(), } } + + diff --git a/rust/lakesoul-datafusion/src/benchmarks/tpch/run.rs b/rust/lakesoul-datafusion/src/test/benchmarks/tpch/run.rs similarity index 100% rename from rust/lakesoul-datafusion/src/benchmarks/tpch/run.rs rename to rust/lakesoul-datafusion/src/test/benchmarks/tpch/run.rs diff --git a/rust/lakesoul-datafusion/src/test/integration_tests.rs b/rust/lakesoul-datafusion/src/test/integration_tests.rs index fcd7a7969..f4badab52 100644 --- a/rust/lakesoul-datafusion/src/test/integration_tests.rs +++ b/rust/lakesoul-datafusion/src/test/integration_tests.rs @@ -5,35 +5,27 @@ mod integration_tests { use std::{path::Path, sync::Arc}; - use datafusion::{ - datasource::{ - file_format::{csv::CsvFormat, FileFormat}, - listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}, - TableProvider, - }, - execution::context::SessionContext, - }; + use arrow_cast::pretty::print_batches; + use datafusion::{execution::context::SessionContext, datasource::{TableProvider, file_format::{FileFormat, csv::CsvFormat}, listing::{ListingOptions, ListingTableUrl, ListingTableConfig, ListingTable}}}; use lakesoul_io::lakesoul_io_config::{create_session_context_with_planner, LakeSoulIOConfigBuilder}; use lakesoul_metadata::MetaDataClient; - use crate::{ - benchmarks::tpch::{ - get_tbl_tpch_table_primary_keys, get_tbl_tpch_table_schema, get_tpch_table_schema, TPCH_TABLES, - }, - catalog::{create_io_config_builder, create_table}, - error::{LakeSoulError, Result}, - lakesoul_table::LakeSoulTable, - planner::query_planner::LakeSoulQueryPlanner, - }; - - async fn get_table(ctx: &SessionContext, table: &str) -> Result> { + use crate::{catalog::{create_io_config_builder, create_table}, error::{LakeSoulError, Result}, lakesoul_table::LakeSoulTable, planner::query_planner::LakeSoulQueryPlanner, test::benchmarks::tpch::get_tbl_tpch_table_range_partitions}; + use crate::test::benchmarks::tpch::{TPCH_TABLES, get_tbl_tpch_table_schema, get_tpch_table_schema, get_tbl_tpch_table_primary_keys}; + + async fn get_table( + ctx: &SessionContext, + table: &str, + ) -> Result> { let path = get_tpch_data_path()?; // Obtain a snapshot of the SessionState let state = ctx.state(); let (format, path, extension): (Arc, String, &'static str) = { let path = format!("{path}/{table}.tbl"); - let format = CsvFormat::default().with_delimiter(b'|').with_has_header(false); + let format = CsvFormat::default() + .with_delimiter(b'|') + .with_has_header(false); (Arc::new(format), path, ".tbl") }; @@ -45,12 +37,16 @@ mod integration_tests { let table_path = ListingTableUrl::parse(path)?; let config = ListingTableConfig::new(table_path).with_listing_options(options); - let config = { config.with_schema(Arc::new(get_tbl_tpch_table_schema(table))) }; + let config = { + config.with_schema(Arc::new(get_tbl_tpch_table_schema(table))) + }; Ok(Arc::new(ListingTable::try_new(config)?)) - } + } + fn get_tpch_data_path() -> Result { - let path = std::env::var("TPCH_DATA").unwrap_or_else(|_| "benchmarks/data".to_string()); + let path = + std::env::var("TPCH_DATA").unwrap_or_else(|_| "benchmarks/data".to_string()); if !Path::new(&path).exists() { return Err(LakeSoulError::Internal(format!( "Benchmark data not found (set TPCH_DATA env var to override): {}", @@ -60,30 +56,34 @@ mod integration_tests { Ok(path) } + #[tokio::test] async fn load_tpch_data() -> Result<()> { let client = Arc::new(MetaDataClient::from_env().await?); let builder = create_io_config_builder(client.clone(), None, false, "default").await?; let ctx = create_session_context_with_planner(&mut builder.clone().build(), Some(LakeSoulQueryPlanner::new_ref()))?; - + for table in TPCH_TABLES { let table_provider = get_table(&ctx, table).await?; ctx.register_table(*table, table_provider)?; - let dataframe = ctx.sql(format!("select * from {}", table).as_str()).await?; + let dataframe = ctx.sql(format!("select * from {}", table).as_str()) + .await?; let schema = get_tpch_table_schema(table); let builder = LakeSoulIOConfigBuilder::new() .with_schema(Arc::new(schema)) .with_primary_keys(get_tbl_tpch_table_primary_keys(table)); + // .with_range_partitions(get_tbl_tpch_table_range_partitions(table)); create_table(client.clone(), &table, builder.build()).await?; let lakesoul_table = LakeSoulTable::for_name(table).await?; lakesoul_table.upsert_dataframe(dataframe).await?; + print_batches(&lakesoul_table.to_dataframe(&ctx).await?.collect().await?); dbg!(table); } Ok(()) } -} +} \ No newline at end of file diff --git a/rust/lakesoul-datafusion/src/test/mod.rs b/rust/lakesoul-datafusion/src/test/mod.rs index 1c4571d87..d7d9b6b2c 100644 --- a/rust/lakesoul-datafusion/src/test/mod.rs +++ b/rust/lakesoul-datafusion/src/test/mod.rs @@ -16,6 +16,9 @@ mod upsert_tests; #[cfg(feature = "ci")] mod integration_tests; +#[cfg(feature = "ci")] +mod benchmarks; + mod catalog_tests; // in cargo test, this executed only once diff --git a/rust/lakesoul-io/src/helpers.rs b/rust/lakesoul-io/src/helpers.rs index 8e93bf216..acdc6c226 100644 --- a/rust/lakesoul-io/src/helpers.rs +++ b/rust/lakesoul-io/src/helpers.rs @@ -9,6 +9,8 @@ use datafusion::{ datasource::{file_format::FileFormat, listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}, physical_plan::FileScanConfig}, execution::context::SessionState, logical_expr::col, physical_expr::{create_physical_expr, PhysicalSortExpr}, physical_plan::PhysicalExpr, physical_planner::create_physical_sort_expr }; use datafusion_common::{DFSchema, DataFusionError, Result}; +use object_store::path::Path; +use url::Url; use crate::{lakesoul_io_config::LakeSoulIOConfig, transform::uniform_schema}; @@ -108,6 +110,7 @@ pub async fn listing_table_from_lakesoul_io_config( .iter() .map(ListingTableUrl::parse) .collect::>>()?; + dbg!(&table_paths); // Create default parquet options let object_store_url = table_paths .first() @@ -124,7 +127,7 @@ pub async fn listing_table_from_lakesoul_io_config( let mut objects = vec![]; for url in &table_paths { - objects.push(store.head(url.prefix()).await?); + objects.push(store.head(&Path::from(>::as_ref(url).path())).await?); } // Resolve the schema let resolved_schema = file_format.infer_schema(session_state, &store, &objects).await?; From 4b156df22d11314afae5e11226562b8fa391b672 Mon Sep 17 00:00:00 2001 From: zenghua Date: Mon, 4 Mar 2024 17:25:32 +0800 Subject: [PATCH 2/5] makeQualified for DataFileInfo Signed-off-by: zenghua --- .../datasources/v2/merge/MergePartitionedFileUtil.scala | 7 +++++-- rust/lakesoul-datafusion/src/datasource/table_provider.rs | 1 - rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs | 2 +- rust/lakesoul-datafusion/src/test/integration_tests.rs | 2 +- rust/lakesoul-io/src/helpers.rs | 1 - 5 files changed, 7 insertions(+), 6 deletions(-) diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/merge/MergePartitionedFileUtil.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/merge/MergePartitionedFileUtil.scala index 40e9c76d4..545dee04a 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/merge/MergePartitionedFileUtil.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/merge/MergePartitionedFileUtil.scala @@ -12,6 +12,8 @@ import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors import org.apache.spark.sql.lakesoul.utils.TableInfo import org.apache.spark.sql.types.StructType +import java.net.URI + object MergePartitionedFileUtil { def notSplitFiles(sparkSession: SparkSession, file: FileStatus, @@ -45,10 +47,11 @@ object MergePartitionedFileUtil { requestPartitionFields: Array[String]): MergePartitionedFile = { val hosts = getBlockHosts(getBlockLocations(file), 0, file.getLen) - val filePathStr = filePath + val fs = filePath .getFileSystem(sparkSession.sessionState.newHadoopConf()) + val filePathStr = fs .makeQualified(filePath).toString - val touchedFileInfo = fileInfo.find(f => filePathStr.equals(f.path)) + val touchedFileInfo = fileInfo.find(f => filePathStr.equals(fs.makeQualified(new Path(new URI(f.path))).toString)) .getOrElse(throw LakeSoulErrors.filePathNotFoundException(filePathStr, fileInfo.mkString(","))) val touchedFileSchema = requestFilesSchemaMap(touchedFileInfo.range_version).fieldNames diff --git a/rust/lakesoul-datafusion/src/datasource/table_provider.rs b/rust/lakesoul-datafusion/src/datasource/table_provider.rs index f7487f270..5456f37ce 100644 --- a/rust/lakesoul-datafusion/src/datasource/table_provider.rs +++ b/rust/lakesoul-datafusion/src/datasource/table_provider.rs @@ -229,7 +229,6 @@ impl LakeSoulTableProvider { let mut file_groups = Vec::new(); while let Some((partition, object_metas)) = futures.next().await.transpose()? { - dbg!(&object_metas); let cols = self.table_partition_cols().iter().map(|x| x.0.as_str()); let parsed = parse_partitions_for_partition_desc(&partition.partition_desc, cols); diff --git a/rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs b/rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs index 5e86072f8..a0e8793e7 100644 --- a/rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs +++ b/rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs @@ -8,7 +8,7 @@ use arrow::{array::{Array, ArrayRef, AsArray, StringBuilder}, compute::prep_null use arrow_cast::cast; use arrow_arith::boolean::and; -use datafusion::{common::{DFField, DFSchema}, datasource::listing::ListingTableUrl, error::DataFusionError, execution::context::ExecutionProps, logical_expr::Expr, physical_expr::create_physical_expr, scalar::ScalarValue}; +use datafusion::{common::{DFField, DFSchema}, error::DataFusionError, execution::context::ExecutionProps, logical_expr::Expr, physical_expr::create_physical_expr, scalar::ScalarValue}; use lakesoul_metadata::MetaDataClientRef; use object_store::{path::Path, ObjectMeta, ObjectStore}; use tracing::{debug, trace}; diff --git a/rust/lakesoul-datafusion/src/test/integration_tests.rs b/rust/lakesoul-datafusion/src/test/integration_tests.rs index f4badab52..548d020f5 100644 --- a/rust/lakesoul-datafusion/src/test/integration_tests.rs +++ b/rust/lakesoul-datafusion/src/test/integration_tests.rs @@ -80,7 +80,7 @@ mod integration_tests { create_table(client.clone(), &table, builder.build()).await?; let lakesoul_table = LakeSoulTable::for_name(table).await?; lakesoul_table.upsert_dataframe(dataframe).await?; - print_batches(&lakesoul_table.to_dataframe(&ctx).await?.collect().await?); + // print_batches(&lakesoul_table.to_dataframe(&ctx).await?.collect().await?); dbg!(table); } diff --git a/rust/lakesoul-io/src/helpers.rs b/rust/lakesoul-io/src/helpers.rs index acdc6c226..f311ea804 100644 --- a/rust/lakesoul-io/src/helpers.rs +++ b/rust/lakesoul-io/src/helpers.rs @@ -110,7 +110,6 @@ pub async fn listing_table_from_lakesoul_io_config( .iter() .map(ListingTableUrl::parse) .collect::>>()?; - dbg!(&table_paths); // Create default parquet options let object_store_url = table_paths .first() From ba759142420aab0978ac1a3407d9874b8466ee1b Mon Sep 17 00:00:00 2001 From: zenghua Date: Tue, 5 Mar 2024 11:30:04 +0800 Subject: [PATCH 3/5] well define DataFileOp Signed-off-by: zenghua --- .../v2/merge/MergePartitionedFileUtil.scala | 4 +- .../lakesoul/benchmark/ConsistencyCI.scala | 42 ++++++++++++------- .../datasource/file_format/metadata_format.rs | 8 +--- .../src/lakesoul_table/helpers.rs | 2 +- .../src/test/integration_tests.rs | 4 +- rust/lakesoul-io/src/helpers.rs | 2 +- rust/lakesoul-io/src/lakesoul_writer.rs | 2 +- rust/proto/src/entity.proto | 2 +- 8 files changed, 36 insertions(+), 30 deletions(-) diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/merge/MergePartitionedFileUtil.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/merge/MergePartitionedFileUtil.scala index 545dee04a..c8758b504 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/merge/MergePartitionedFileUtil.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/merge/MergePartitionedFileUtil.scala @@ -12,8 +12,6 @@ import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors import org.apache.spark.sql.lakesoul.utils.TableInfo import org.apache.spark.sql.types.StructType -import java.net.URI - object MergePartitionedFileUtil { def notSplitFiles(sparkSession: SparkSession, file: FileStatus, @@ -51,7 +49,7 @@ object MergePartitionedFileUtil { .getFileSystem(sparkSession.sessionState.newHadoopConf()) val filePathStr = fs .makeQualified(filePath).toString - val touchedFileInfo = fileInfo.find(f => filePathStr.equals(fs.makeQualified(new Path(new URI(f.path))).toString)) + val touchedFileInfo = fileInfo.find(f => filePathStr.equals(fs.makeQualified(new Path(f.path)).toString)) .getOrElse(throw LakeSoulErrors.filePathNotFoundException(filePathStr, fileInfo.mkString(","))) val touchedFileSchema = requestFilesSchemaMap(touchedFileInfo.range_version).fieldNames diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/benchmark/ConsistencyCI.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/benchmark/ConsistencyCI.scala index bfe598155..f31b865a4 100644 --- a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/benchmark/ConsistencyCI.scala +++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/benchmark/ConsistencyCI.scala @@ -20,7 +20,7 @@ object ConsistencyCI { StructField("c_mktsegment", StringType, nullable = false), StructField("c_comment", StringType, nullable = false), )), - "c_custkey, c_name"), + "c_custkey, c_name", Some("c_nationkey")), ("part", StructType(Array( StructField("p_partkey", LongType, nullable = false), @@ -33,7 +33,7 @@ object ConsistencyCI { StructField("p_retailprice", DecimalType(15, 2), nullable = false), StructField("p_comment", StringType, nullable = false), )), - "p_partkey, p_name"), + "p_partkey, p_name", Option.empty), ("supplier", StructType(Array( StructField("s_suppkey", LongType, nullable = false), @@ -44,7 +44,7 @@ object ConsistencyCI { StructField("s_acctbal", DecimalType(15, 2), nullable = false), StructField("s_comment", StringType, nullable = false), )), - "s_suppkey, s_name"), + "s_suppkey, s_name", Some("s_nationkey")), ("partsupp", StructType(Array( StructField("ps_partkey", LongType, nullable = false), @@ -53,7 +53,7 @@ object ConsistencyCI { StructField("ps_supplycost", DecimalType(15, 2), nullable = false), StructField("ps_comment", StringType, nullable = false), )), - "ps_partkey, ps_suppkey"), + "ps_partkey, ps_suppkey", Option.empty), ("orders", StructType(Array( StructField("o_orderkey", LongType, nullable = false), @@ -66,7 +66,7 @@ object ConsistencyCI { StructField("o_shippriority", IntegerType, nullable = false), StructField("o_comment", StringType, nullable = false), )), - "o_orderkey, o_custkey"), + "o_orderkey, o_custkey", Some("o_orderpriority")), ("nation", StructType(Array( @@ -75,14 +75,14 @@ object ConsistencyCI { StructField("n_regionkey", LongType, nullable = false), StructField("n_comment", StringType, nullable = false), )), - "n_nationkey, n_name"), + "n_nationkey, n_name", Some("n_regionkey")), ("region", StructType(Array( StructField("r_regionkey", LongType, nullable = false), StructField("r_name", StringType, nullable = false), StructField("r_comment", StringType, nullable = false), )), - "r_regionkey, r_name"), + "r_regionkey, r_name", Option.empty), ("lineitem", StructType(Array( StructField("l_orderkey", LongType, nullable = false), @@ -102,7 +102,7 @@ object ConsistencyCI { StructField("l_shipmode", StringType, nullable = false), StructField("l_comment", StringType, nullable = false), )), - "l_orderkey, l_partkey"), + "l_orderkey, l_partkey", Option.empty), ) def load_data(spark: SparkSession): Unit = { @@ -110,17 +110,29 @@ object ConsistencyCI { val tpchPath = System.getenv("TPCH_DATA") val lakeSoulPath = "/tmp/lakesoul/tpch" tpchTable.foreach(tup => { - val (name, schema, hashPartitions) = tup + val (name, schema, hashPartitions, rangePartitions) = tup val df = spark.read.option("delimiter", "|") .schema(schema) .csv(s"$tpchPath/$name.tbl") // df.show - df.write.format("lakesoul") - .option("shortTableName", name) - .option("hashPartitions", hashPartitions) - .option("hashBucketNum", 5) - .mode("Overwrite") - .save(s"$lakeSoulPath/$name") + rangePartitions match { + case Some(value) => + df.write.format("lakesoul") + .option("shortTableName", name) + .option("hashPartitions", hashPartitions) + .option("rangePartitions", value) + .option("hashBucketNum", 5) + .mode("Overwrite") + .save(s"$lakeSoulPath/$name") + case None => + df.write.format("lakesoul") + .option("shortTableName", name) + .option("hashPartitions", hashPartitions) + .option("hashBucketNum", 5) + .mode("Overwrite") + .save(s"$lakeSoulPath/$name") + } + }) } diff --git a/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs b/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs index e24177e4b..109050467 100644 --- a/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs +++ b/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs @@ -46,7 +46,6 @@ use rand::distributions::DistString; use tokio::sync::Mutex; use tokio::task::JoinHandle; use tracing::debug; -use url::Url; use crate::catalog::{commit_data, parse_table_info_partitions}; use crate::lakesoul_table::helpers::{columnar_values_to_partition_desc, columnar_values_to_sub_path, create_io_config_builder_from_table_info, get_columnar_values}; @@ -335,15 +334,12 @@ impl LakeSoulHashSinkExec { let columnar_values = get_columnar_values(&batch, range_partitions.clone())?; let partition_desc = columnar_values_to_partition_desc(&columnar_values); let batch_excluding_range = batch.project(&schema_projection_excluding_range)?; - let file_absolute_path = format!("{}{}part-{}_{:0>4}.parquet", table_info.table_path, columnar_values_to_sub_path(&columnar_values), write_id, partition); - - let file_absolute_path = Url::parse(file_absolute_path.as_str()).map_err(|e| DataFusionError::External(Box::new(e)))?; - + let file_absolute_path = format!("{}{}part-{}_{:0>4}.parquet", table_info.table_path, columnar_values_to_sub_path(&columnar_values), write_id, partition); if !partitioned_writer.contains_key(&partition_desc) { let mut config = create_io_config_builder_from_table_info(table_info.clone()) .map_err(|e| DataFusionError::External(Box::new(e)))? - .with_files(vec![file_absolute_path.to_string()]) + .with_files(vec![file_absolute_path]) .with_schema(batch_excluding_range.schema()) .build(); diff --git a/rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs b/rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs index a0e8793e7..2e049b189 100644 --- a/rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs +++ b/rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs @@ -201,7 +201,7 @@ pub async fn listing_partition_info(partition_info: PartitionInfo, store: &dyn O .get_data_files_of_single_partition(&partition_info).await.map_err(|_| DataFusionError::External("listing partition info failed".into()))?; let mut files = Vec::new(); for path in paths { - let result = store.head(&Path::from(Url::parse(path.as_str()).map_err(|e| DataFusionError::External(Box::new(e)))?.path())).await?; + let result = store.head(&Path::from_url_path(Url::parse(path.as_str()).map_err(|e| DataFusionError::External(Box::new(e)))?.path())?).await?; files.push(result); } Ok((partition_info, files)) diff --git a/rust/lakesoul-datafusion/src/test/integration_tests.rs b/rust/lakesoul-datafusion/src/test/integration_tests.rs index 548d020f5..bb6bdf82f 100644 --- a/rust/lakesoul-datafusion/src/test/integration_tests.rs +++ b/rust/lakesoul-datafusion/src/test/integration_tests.rs @@ -74,8 +74,8 @@ mod integration_tests { let builder = LakeSoulIOConfigBuilder::new() .with_schema(Arc::new(schema)) - .with_primary_keys(get_tbl_tpch_table_primary_keys(table)); - // .with_range_partitions(get_tbl_tpch_table_range_partitions(table)); + .with_primary_keys(get_tbl_tpch_table_primary_keys(table)) + .with_range_partitions(get_tbl_tpch_table_range_partitions(table)); create_table(client.clone(), &table, builder.build()).await?; let lakesoul_table = LakeSoulTable::for_name(table).await?; diff --git a/rust/lakesoul-io/src/helpers.rs b/rust/lakesoul-io/src/helpers.rs index f311ea804..77ef12794 100644 --- a/rust/lakesoul-io/src/helpers.rs +++ b/rust/lakesoul-io/src/helpers.rs @@ -126,7 +126,7 @@ pub async fn listing_table_from_lakesoul_io_config( let mut objects = vec![]; for url in &table_paths { - objects.push(store.head(&Path::from(>::as_ref(url).path())).await?); + objects.push(store.head(&Path::from_url_path(>::as_ref(url).path())?).await?); } // Resolve the schema let resolved_schema = file_format.infer_schema(session_state, &store, &objects).await?; diff --git a/rust/lakesoul-io/src/lakesoul_writer.rs b/rust/lakesoul-io/src/lakesoul_writer.rs index 1c2db6fd2..c71569b32 100644 --- a/rust/lakesoul-io/src/lakesoul_writer.rs +++ b/rust/lakesoul-io/src/lakesoul_writer.rs @@ -184,7 +184,7 @@ impl MultiPartAsyncWriter { task_context .runtime_env() .object_store(ObjectStoreUrl::parse(&url[..url::Position::BeforePath])?)?, - Path::from(url.path()), + Path::from_url_path(url.path())?, )), Err(e) => Err(DataFusionError::External(Box::new(e))), }?; diff --git a/rust/proto/src/entity.proto b/rust/proto/src/entity.proto index c190ec53c..c79a6c35e 100644 --- a/rust/proto/src/entity.proto +++ b/rust/proto/src/entity.proto @@ -96,7 +96,7 @@ enum FileOp { // Singleton Data File information message DataFileOp { - // Physical qualified path of a parquet file + // Physical qualified non-percent-encoded path of a parquet file, string path = 1; // Specific operation of this file FileOp file_op = 2; From 90ae4d3abe68485fcf8549b1ed844d9fb6ed3548 Mon Sep 17 00:00:00 2001 From: zenghua Date: Tue, 5 Mar 2024 11:56:51 +0800 Subject: [PATCH 4/5] fix error from rebase Signed-off-by: zenghua --- rust/lakesoul-datafusion/src/lib.rs | 1 - rust/lakesoul-datafusion/src/test/integration_tests.rs | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/rust/lakesoul-datafusion/src/lib.rs b/rust/lakesoul-datafusion/src/lib.rs index 5389b08d4..654a205ac 100644 --- a/rust/lakesoul-datafusion/src/lib.rs +++ b/rust/lakesoul-datafusion/src/lib.rs @@ -6,7 +6,6 @@ // after finished. remove above attr extern crate core; -mod benchmarks; mod catalog; mod datasource; mod error; diff --git a/rust/lakesoul-datafusion/src/test/integration_tests.rs b/rust/lakesoul-datafusion/src/test/integration_tests.rs index bb6bdf82f..31d4138ed 100644 --- a/rust/lakesoul-datafusion/src/test/integration_tests.rs +++ b/rust/lakesoul-datafusion/src/test/integration_tests.rs @@ -5,7 +5,6 @@ mod integration_tests { use std::{path::Path, sync::Arc}; - use arrow_cast::pretty::print_batches; use datafusion::{execution::context::SessionContext, datasource::{TableProvider, file_format::{FileFormat, csv::CsvFormat}, listing::{ListingOptions, ListingTableUrl, ListingTableConfig, ListingTable}}}; use lakesoul_io::lakesoul_io_config::{create_session_context_with_planner, LakeSoulIOConfigBuilder}; use lakesoul_metadata::MetaDataClient; @@ -80,7 +79,7 @@ mod integration_tests { create_table(client.clone(), &table, builder.build()).await?; let lakesoul_table = LakeSoulTable::for_name(table).await?; lakesoul_table.upsert_dataframe(dataframe).await?; - // print_batches(&lakesoul_table.to_dataframe(&ctx).await?.collect().await?); + // arrow_cast::pretty::print_batches(&lakesoul_table.to_dataframe(&ctx).await?.collect().await?); dbg!(table); } From 391d8801d5e7989fabc494617862519cb535cd49 Mon Sep 17 00:00:00 2001 From: zenghua Date: Tue, 5 Mar 2024 13:22:41 +0800 Subject: [PATCH 5/5] fix error from rebase Signed-off-by: zenghua --- .../scala/org/apache/spark/sql/lakesoul/LakeSoulFileIndex.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/LakeSoulFileIndex.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/LakeSoulFileIndex.scala index fddf0611d..2a206fb2f 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/LakeSoulFileIndex.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/LakeSoulFileIndex.scala @@ -150,7 +150,7 @@ case class BatchDataSoulFileIndexV2(override val spark: SparkSession, object LakeSoulFileIndexUtils { def absolutePath(child: String, tableName: String): Path = { - val p = new Path(new URI(child)) + val p = new Path(child) if (p.isAbsolute) { p } else {