diff --git a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBManager.java b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBManager.java index c2e9385e7..81f2fcc1c 100644 --- a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBManager.java +++ b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBManager.java @@ -156,7 +156,7 @@ public List listTablePathsByNamespace(String tableNamespace) { return tablePathIdDao.listAllPathByNamespace(tableNamespace); } - public List getTableInfosByNamespace(String tableNamespace){ + public List getTableInfosByNamespace(String tableNamespace) { return tableInfoDao.selectByNamespace(tableNamespace); } @@ -233,7 +233,7 @@ public void deleteSinglePartitionMetaInfo(String tableId, String partitionDesc, } private void getSnapshotAndFilePathInfo(String tableId, String partitionDesc, List fileOps, List deleteFilePathList, - List filterPartitionInfo, Set snapshotList) { + List filterPartitionInfo, Set snapshotList) { filterPartitionInfo.forEach(p -> snapshotList.addAll(p.getSnapshotList())); List filterDataCommitInfo = dataCommitInfoDao.selectByTableIdPartitionDescCommitList(tableId, partitionDesc, snapshotList.stream().collect(Collectors.toList())); diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/benchmark/ConsistencyCI.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/benchmark/ConsistencyCI.scala index 95bc37460..aee243de6 100644 --- a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/benchmark/ConsistencyCI.scala +++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/benchmark/ConsistencyCI.scala @@ -142,12 +142,15 @@ object ConsistencyCI { spark.sql("CREATE NAMESPACE IF NOT EXISTS tpch") spark.sql("USE tpch") - // load_data(spark) + load_data(spark) tpchTable.foreach(tup => { - // val sparkDF = spark.sql(s"select * from ${tup._1}") + val sparkDF = spark.sql(s"select * from ${tup._1}") val rustDF = spark.sql(s"select * from default.${tup._1}") - rustDF.show() + println(s"${tup._1} sparkDF: ") + sparkDF.show + println(s"${tup._1} rustDF: ") + rustDF.show // val diff1 = sparkDF.rdd.subtract(rustDF.rdd) // val diff2 = rustDF.rdd.subtract(sparkDF.rdd) // val result = diff1.count() == 0 && diff2.count() == 0 diff --git a/rust/lakesoul-datafusion/src/catalog/mod.rs b/rust/lakesoul-datafusion/src/catalog/mod.rs index 993d9d6e3..83c8bb7db 100644 --- a/rust/lakesoul-datafusion/src/catalog/mod.rs +++ b/rust/lakesoul-datafusion/src/catalog/mod.rs @@ -39,7 +39,7 @@ pub(crate) async fn create_table(client: MetaDataClientRef, table_name: &str, co table_id: format!("table_{}", uuid::Uuid::new_v4()), table_name: table_name.to_string(), table_path: format!( - "file://{}default/{}", + "file:{}default/{}", env::temp_dir() .to_str() .ok_or(LakeSoulError::Internal("can not get $TMPDIR".to_string()))?, diff --git a/rust/lakesoul-datafusion/src/test/integration_tests.rs b/rust/lakesoul-datafusion/src/test/integration_tests.rs index 94b06c835..46eb7c66b 100644 --- a/rust/lakesoul-datafusion/src/test/integration_tests.rs +++ b/rust/lakesoul-datafusion/src/test/integration_tests.rs @@ -5,7 +5,6 @@ mod integration_tests { use std::{path::Path, sync::Arc}; - use arrow::util::pretty::print_batches; use datafusion::{execution::context::SessionContext, datasource::{TableProvider, file_format::{FileFormat, csv::CsvFormat}, listing::{ListingOptions, ListingTableUrl, ListingTableConfig, ListingTable}}}; use lakesoul_io::lakesoul_io_config::{create_session_context_with_planner, LakeSoulIOConfigBuilder}; use lakesoul_metadata::MetaDataClient; diff --git a/rust/lakesoul-metadata/src/metadata_client.rs b/rust/lakesoul-metadata/src/metadata_client.rs index 971e471b2..51cbbfeac 100644 --- a/rust/lakesoul-metadata/src/metadata_client.rs +++ b/rust/lakesoul-metadata/src/metadata_client.rs @@ -280,7 +280,15 @@ impl MetaDataClient { } pub async fn meta_cleanup(&self) -> Result { - clean_meta_for_test(self.client.lock().await.deref_mut()).await + clean_meta_for_test(self.client.lock().await.deref_mut()).await?; + self.insert_namespace( + &Namespace { + namespace: "default".to_string(), + properties: "{}".to_string(), + comment: "".to_string(), + domain: "public".to_string() + } + ).await } pub async fn commit_data(&self, meta_info: MetaInfo, commit_op: CommitOp) -> Result<()> {