Skip to content

Commit

Permalink
fix ConsistencyCI
Browse files Browse the repository at this point in the history
Signed-off-by: zenghua <[email protected]>
  • Loading branch information
zenghua committed Feb 6, 2024
1 parent a3d6fd0 commit b5f1f74
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public List<String> listTablePathsByNamespace(String tableNamespace) {
return tablePathIdDao.listAllPathByNamespace(tableNamespace);
}

public List<TableInfo> getTableInfosByNamespace(String tableNamespace){
public List<TableInfo> getTableInfosByNamespace(String tableNamespace) {
return tableInfoDao.selectByNamespace(tableNamespace);
}

Expand Down Expand Up @@ -233,7 +233,7 @@ public void deleteSinglePartitionMetaInfo(String tableId, String partitionDesc,
}

private void getSnapshotAndFilePathInfo(String tableId, String partitionDesc, List<DataFileOp> fileOps, List<String> deleteFilePathList,
List<PartitionInfo> filterPartitionInfo, Set<Uuid> snapshotList) {
List<PartitionInfo> filterPartitionInfo, Set<Uuid> snapshotList) {
filterPartitionInfo.forEach(p -> snapshotList.addAll(p.getSnapshotList()));
List<DataCommitInfo> filterDataCommitInfo =
dataCommitInfoDao.selectByTableIdPartitionDescCommitList(tableId, partitionDesc, snapshotList.stream().collect(Collectors.toList()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,15 @@ object ConsistencyCI {
spark.sql("CREATE NAMESPACE IF NOT EXISTS tpch")
spark.sql("USE tpch")

// load_data(spark)
load_data(spark)

tpchTable.foreach(tup => {
// val sparkDF = spark.sql(s"select * from ${tup._1}")
val sparkDF = spark.sql(s"select * from ${tup._1}")
val rustDF = spark.sql(s"select * from default.${tup._1}")
rustDF.show()
println(s"${tup._1} sparkDF: ")
sparkDF.show
println(s"${tup._1} rustDF: ")
rustDF.show
// val diff1 = sparkDF.rdd.subtract(rustDF.rdd)
// val diff2 = rustDF.rdd.subtract(sparkDF.rdd)
// val result = diff1.count() == 0 && diff2.count() == 0
Expand Down
2 changes: 1 addition & 1 deletion rust/lakesoul-datafusion/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub(crate) async fn create_table(client: MetaDataClientRef, table_name: &str, co
table_id: format!("table_{}", uuid::Uuid::new_v4()),
table_name: table_name.to_string(),
table_path: format!(
"file://{}default/{}",
"file:{}default/{}",
env::temp_dir()
.to_str()
.ok_or(LakeSoulError::Internal("can not get $TMPDIR".to_string()))?,
Expand Down
1 change: 0 additions & 1 deletion rust/lakesoul-datafusion/src/test/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
mod integration_tests {
use std::{path::Path, sync::Arc};

use arrow::util::pretty::print_batches;
use datafusion::{execution::context::SessionContext, datasource::{TableProvider, file_format::{FileFormat, csv::CsvFormat}, listing::{ListingOptions, ListingTableUrl, ListingTableConfig, ListingTable}}};
use lakesoul_io::lakesoul_io_config::{create_session_context_with_planner, LakeSoulIOConfigBuilder};
use lakesoul_metadata::MetaDataClient;
Expand Down
10 changes: 9 additions & 1 deletion rust/lakesoul-metadata/src/metadata_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,15 @@ impl MetaDataClient {
}

pub async fn meta_cleanup(&self) -> Result<i32> {
clean_meta_for_test(self.client.lock().await.deref_mut()).await
clean_meta_for_test(self.client.lock().await.deref_mut()).await?;
self.insert_namespace(
&Namespace {
namespace: "default".to_string(),
properties: "{}".to_string(),
comment: "".to_string(),
domain: "public".to_string()
}
).await
}

pub async fn commit_data(&self, meta_info: MetaInfo, commit_op: CommitOp) -> Result<()> {
Expand Down

0 comments on commit b5f1f74

Please sign in to comment.