Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Rust]Fix Consistency CI #443

Merged
merged 2 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/consistency-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ jobs:
git clone https://github.com/databricks/tpch-dbgen.git
cd tpch-dbgen
make
./dbgen -f -s 0.001
./dbgen -f -s 0.1
mv *.tbl ../lakesoul/test_files/tpch/data
- name: Verify that benchmark queries return expected results
run: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,16 +151,16 @@ object ConsistencyCI {
sparkDF.show
println(s"${tup._1} rustDF: ")
rustDF.show
// val diff1 = sparkDF.rdd.subtract(rustDF.rdd)
// val diff2 = rustDF.rdd.subtract(sparkDF.rdd)
// val result = diff1.count() == 0 && diff2.count() == 0
// if (!result) {
// println("sparkDF: ")
// println(sparkDF.collectAsList())
// println("rustDF: ")
// println(rustDF.collectAsList())
// System.exit(1)
// }
val diff1 = sparkDF.rdd.subtract(rustDF.rdd)
val diff2 = rustDF.rdd.subtract(sparkDF.rdd)
val result = diff1.count() == 0 && diff2.count() == 0
if (!result) {
println("sparkDF: ")
println(sparkDF.collectAsList())
println("rustDF: ")
println(rustDF.collectAsList())
System.exit(1)
}
})

}
Expand Down
5 changes: 3 additions & 2 deletions rust/lakesoul-datafusion/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ pub(crate) async fn create_table(client: MetaDataClientRef, table_name: &str, co
table_id: format!("table_{}", uuid::Uuid::new_v4()),
table_name: table_name.to_string(),
table_path: format!(
"file:{}default/{}",
env::temp_dir()
"file:{}/default/{}",
env::current_dir()
.unwrap()
.to_str()
.ok_or(LakeSoulError::Internal("can not get $TMPDIR".to_string()))?,
table_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::collections::HashMap;
use std::fmt::{self, Debug};
use std::sync::Arc;

use arrow::array::{ArrayRef, UInt64Array};
use arrow::array::{ArrayRef, StringArray, UInt64Array};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;

Expand All @@ -12,7 +12,6 @@ use datafusion::common::{FileType, Statistics};
use datafusion::error::DataFusionError;
use datafusion::execution::TaskContext;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::common::AbortOnDropSingle;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, Distribution, Partitioning, SendableRecordBatchStream};
use datafusion::scalar::ScalarValue;
Expand Down Expand Up @@ -140,7 +139,8 @@ pub struct LakeSoulHashSinkExec {
input: Arc<dyn ExecutionPlan>,

/// Schema describing the structure of the output data.
count_schema: SchemaRef,
sink_schema: SchemaRef,

/// Optional required sort order for output data.
sort_order: Option<Vec<PhysicalSortRequirement>>,

Expand All @@ -151,7 +151,7 @@ pub struct LakeSoulHashSinkExec {

impl Debug for LakeSoulHashSinkExec {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "LakeSoulHashSinkExec schema: {:?}", self.count_schema)
write!(f, "LakeSoulHashSinkExec schema: {:?}", self.sink_schema)
}
}

Expand All @@ -165,7 +165,7 @@ impl LakeSoulHashSinkExec {
) -> Result<Self> {
Ok(Self {
input,
count_schema: make_count_schema(),
sink_schema: make_sink_schema(),
sort_order,
table_info,
metadata_client,
Expand Down Expand Up @@ -205,7 +205,7 @@ impl LakeSoulHashSinkExec {
let mut partitioned_writer = HashMap::<Vec<(String, ScalarValue)>, Box<MultiPartAsyncWriter>>::new();
let mut partitioned_file_path_and_row_count_locked = partitioned_file_path_and_row_count.lock().await;
while let Some(batch) = data.next().await.transpose()? {
dbg!(&batch.num_rows());
debug!("write record_batch with {} rows", batch.num_rows());
let columnar_value = get_columnar_value(&batch);
let file_absolute_path = format!("{}/part-{}_{:0>4}.parquet", table_info.table_path, write_id, partition);
if !partitioned_writer.contains_key(&columnar_value) {
Expand Down Expand Up @@ -293,7 +293,7 @@ impl ExecutionPlan for LakeSoulHashSinkExec {

/// Get the schema for this execution plan
fn schema(&self) -> SchemaRef {
self.count_schema.clone()
self.sink_schema.clone()
}

fn output_partitioning(&self) -> Partitioning {
Expand Down Expand Up @@ -348,7 +348,7 @@ impl ExecutionPlan for LakeSoulHashSinkExec {
fn with_new_children(self: Arc<Self>, children: Vec<Arc<dyn ExecutionPlan>>) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(Self {
input: children[0].clone(),
count_schema: self.count_schema.clone(),
sink_schema: self.sink_schema.clone(),
sort_order: self.sort_order.clone(),
table_info: self.table_info.clone(),
metadata_client: self.metadata_client.clone(),
Expand Down Expand Up @@ -392,18 +392,18 @@ impl ExecutionPlan for LakeSoulHashSinkExec {
schema: self.table_info().table_namespace.clone().into(),
table: self.table_info().table_name.clone().into(),
};
let join_handle = AbortOnDropSingle::new(tokio::spawn(Self::wait_for_commit(
let join_handle = tokio::spawn(Self::wait_for_commit(
join_handles,
self.metadata_client(),
table_ref.to_string(),
partitioned_file_path_and_row_count,
)));
));

// });

// let abort_helper = Arc::new(AbortOnDropMany(join_handles));

let count_schema = self.count_schema.clone();
let sink_schema = self.sink_schema.clone();
// let count = futures::future::join_all(join_handles).await;
// for (columnar_values, result) in partitioned_file_path_and_row_count.lock().await.iter() {
// match commit_data(self.metadata_client(), self.table_info().table_name.as_str(), &result.0).await {
Expand All @@ -414,32 +414,39 @@ impl ExecutionPlan for LakeSoulHashSinkExec {

let stream = futures::stream::once(async move {
match join_handle.await {
Ok(Ok(count)) => Ok(make_count_batch(count)),
_other => Ok(make_count_batch(u64::MAX)),
Ok(Ok(count)) => Ok(make_sink_batch(count, String::from(""))),
Ok(Err(e)) => {
debug!("{}", e.to_string());
Ok(make_sink_batch(u64::MAX, e.to_string()))
}
Err(e) => {
debug!("{}", e.to_string());
Ok(make_sink_batch(u64::MAX, e.to_string()))
}
}
})
.boxed();

Ok(Box::pin(RecordBatchStreamAdapter::new(count_schema, stream)))
Ok(Box::pin(RecordBatchStreamAdapter::new(sink_schema, stream)))
}
}

/// Create an output record batch with a count
///
/// ```text
/// +-------+,
/// | count |,
/// +-------+,
/// | 6 |,
/// +-------+,
/// ```
fn make_count_batch(count: u64) -> RecordBatch {
let array = Arc::new(UInt64Array::from(vec![count])) as ArrayRef;

RecordBatch::try_from_iter_with_nullable(vec![("count", array, false)]).unwrap()

fn make_sink_batch(count: u64, msg: String) -> RecordBatch {
let count_array = Arc::new(UInt64Array::from(vec![count])) as ArrayRef;
let msg_array = Arc::new(StringArray::from(vec![msg])) as ArrayRef;
RecordBatch::try_from_iter_with_nullable(
vec![
("count", count_array, false),
("msg", msg_array, false)
]).unwrap()
}

fn make_count_schema() -> SchemaRef {
fn make_sink_schema() -> SchemaRef {
// define a schema.
Arc::new(Schema::new(vec![Field::new("count", DataType::UInt64, false)]))
Arc::new(Schema::new(vec![
Field::new("count", DataType::UInt64, false),
Field::new("msg", DataType::Utf8, false),
])
)
}
1 change: 0 additions & 1 deletion rust/lakesoul-datafusion/src/lakesoul_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ pub mod helpers;
use std::{ops::Deref, sync::Arc};

use arrow::datatypes::{SchemaRef, Schema};
use datafusion::dataframe;
use datafusion::sql::TableReference;
use datafusion::{
dataframe::DataFrame,
Expand Down
4 changes: 2 additions & 2 deletions rust/lakesoul-datafusion/src/test/catalog_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ mod catalog_tests {
}
v
};
let path = format!("{}{}/{}", env::temp_dir().to_str().unwrap(), &np.namespace, &table_name);
let path = format!("{}{}/{}", env::current_dir().unwrap_or(env::temp_dir()).to_str().unwrap(), &np.namespace, &table_name);
let table_id = format!(
"table_{}",
(&mut rng)
Expand All @@ -104,7 +104,7 @@ mod catalog_tests {
}

fn table_info(table_name: &str, namespace: &str, schema: SchemaRef) -> TableInfo {
let path = format!("{}{}/{}", env::temp_dir().to_str().unwrap(), namespace, table_name);
let path = format!("{}{}/{}", env::current_dir().unwrap_or(env::temp_dir()).to_str().unwrap(), namespace, table_name);
let schema = serde_json::to_string::<ArrowJavaSchema>(&schema.into()).unwrap();
TableInfo {
table_id: "table_000000001".into(),
Expand Down
2 changes: 1 addition & 1 deletion rust/lakesoul-datafusion/src/test/update_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ mod update_tests {
}

async fn execute_append(batch: RecordBatch, table_name: &str, client: MetaDataClientRef) -> Result<()> {
let file = [env::temp_dir().to_str().unwrap(), table_name, format!("{}.parquet", SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis().to_string()).as_str()].iter().collect::<PathBuf>().to_str().unwrap().to_string();
let file = [env::current_dir().unwrap_or(env::temp_dir()).to_str().unwrap(), table_name, format!("{}.parquet", SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis().to_string()).as_str()].iter().collect::<PathBuf>().to_str().unwrap().to_string();
let builder = create_io_config_builder(client.clone(), Some(table_name)).await?.with_file(file.clone()).with_schema(batch.schema());
let config = builder.clone().build();

Expand Down
2 changes: 1 addition & 1 deletion rust/lakesoul-datafusion/src/test/upsert_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ mod upsert_with_io_config_tests {
builder: LakeSoulIOConfigBuilder,
) -> LakeSoulIOConfigBuilder {
let file = [
env::temp_dir().to_str().unwrap(),
env::current_dir().unwrap_or(env::temp_dir()).to_str().unwrap(),
table_name,
format!(
"{}.parquet",
Expand Down
Loading