From 8d5256c40cccc2f6801d10c42bea64c528409be6 Mon Sep 17 00:00:00 2001 From: chenxu Date: Thu, 5 Sep 2024 15:01:37 +0800 Subject: [PATCH] fix test case Signed-off-by: chenxu --- .../test/connector/sink/LakeSoulTableSinkCase.java | 10 ---------- .../lakesoul/test/fail/LakeSoulSinkFailTest.java | 12 ++++++++---- rust/lakesoul-io/src/lakesoul_io_config.rs | 1 - 3 files changed, 8 insertions(+), 15 deletions(-) diff --git a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/connector/sink/LakeSoulTableSinkCase.java b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/connector/sink/LakeSoulTableSinkCase.java index 0e8c6d0e8..dcd62bf96 100644 --- a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/connector/sink/LakeSoulTableSinkCase.java +++ b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/connector/sink/LakeSoulTableSinkCase.java @@ -8,8 +8,6 @@ import org.apache.flink.lakesoul.metadata.LakeSoulCatalog; import org.apache.flink.lakesoul.test.AbstractTestBase; import org.apache.flink.lakesoul.test.LakeSoulTestUtils; -import org.apache.flink.streaming.api.CheckpointingMode; -import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.ExplainDetail; @@ -29,11 +27,9 @@ import java.io.IOException; import java.net.URI; import java.util.*; -import java.util.concurrent.ExecutionException; import java.util.function.Consumer; import static org.apache.flink.lakesoul.LakeSoulOptions.LAKESOUL_TABLE_PATH; -import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.HASH_BUCKET_NUM; import static org.apache.flink.table.planner.utils.TableTestUtil.*; import static org.junit.Assert.assertEquals; @@ -297,9 +293,6 @@ private void testLakeSoulTableSinkWithParallelismBase( + " real_col int" + ") WITH (" + "'" - + HASH_BUCKET_NUM.key() - + "'= '3'," - + "'" + LAKESOUL_TABLE_PATH.key() + "'='" + getTempDirUri("/test_table") @@ -338,9 +331,6 @@ private void testLakeSoulTableSinkDeleteWithParallelismBase( + " PARTITIONED BY ( part )" + " WITH (" + "'" - + HASH_BUCKET_NUM.key() - + "'= '3'," - + "'" + LAKESOUL_TABLE_PATH.key() + "'='" + getTempDirUri("/test_table") diff --git a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/fail/LakeSoulSinkFailTest.java b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/fail/LakeSoulSinkFailTest.java index a32a8b25c..b53256382 100644 --- a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/fail/LakeSoulSinkFailTest.java +++ b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/fail/LakeSoulSinkFailTest.java @@ -53,10 +53,10 @@ public class LakeSoulSinkFailTest extends AbstractTestBase { public static Map> parameters; static String dropSourceSql = "drop table if exists test_source"; static String createSourceSqlFormat = "create table if not exists test_source %s " + - "with ('connector'='lakesoul', 'path'='/', 'hashBucketNum'='2', " + "'discoveryinterval'='1000'" + ")"; + "with ('connector'='lakesoul', 'path'='/', %s " + "'discoveryinterval'='1000'" + ")"; static String dropSinkSql = "drop table if exists test_sink"; static String createSinkSqlFormat = "create table if not exists test_sink %s %s" + - "with ('connector'='lakesoul', 'path'='%s', 'hashBucketNum'='%d')"; + "with ('connector'='lakesoul', 'path'='%s' %s)"; private static ArrayList indexArr; private static StreamExecutionEnvironment streamExecEnv; private static StreamTableEnvironment streamTableEnv; @@ -334,11 +334,15 @@ private void testLakeSoulSink(ResolvedSchema resolvedSchema, MockTableSource.Sto streamTableEnv.executeSql(dropSourceSql); - streamTableEnv.executeSql(String.format(createSourceSqlFormat, resolvedSchema)); + streamTableEnv.executeSql(String.format(createSourceSqlFormat, resolvedSchema, + resolvedSchema.getPrimaryKey().isPresent() ? + "'hashBucketNum'='2'," : "")); streamTableEnv.executeSql(dropSinkSql); - streamTableEnv.executeSql(String.format(createSinkSqlFormat, resolvedSchema, partitionBy, path, 2)); + streamTableEnv.executeSql(String.format(createSinkSqlFormat, resolvedSchema, partitionBy, path, + resolvedSchema.getPrimaryKey().isPresent() ? + ", 'hashBucketNum'='2'" : "")); streamTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); streamTableEnv.getConfig().setLocalTimeZone(TimeZone.getTimeZone("UTC").toZoneId()); diff --git a/rust/lakesoul-io/src/lakesoul_io_config.rs b/rust/lakesoul-io/src/lakesoul_io_config.rs index 87d9c3c37..64c65db52 100644 --- a/rust/lakesoul-io/src/lakesoul_io_config.rs +++ b/rust/lakesoul-io/src/lakesoul_io_config.rs @@ -393,7 +393,6 @@ fn register_hdfs_object_store( { let hdfs = Hdfs::try_new(_host, _config.clone())?; _runtime.register_object_store(_url, Arc::new(hdfs)); - println!("registered hdfs objec store {:?}, {:?}", _host, _url); Ok(()) } }