Skip to content

Commit

Permalink
fix test case
Browse files Browse the repository at this point in the history
Signed-off-by: chenxu <[email protected]>
  • Loading branch information
dmetasoul01 committed Sep 5, 2024
1 parent 4dae785 commit 8d5256c
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
import org.apache.flink.lakesoul.metadata.LakeSoulCatalog;
import org.apache.flink.lakesoul.test.AbstractTestBase;
import org.apache.flink.lakesoul.test.LakeSoulTestUtils;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.ExplainDetail;
Expand All @@ -29,11 +27,9 @@
import java.io.IOException;
import java.net.URI;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;

import static org.apache.flink.lakesoul.LakeSoulOptions.LAKESOUL_TABLE_PATH;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.HASH_BUCKET_NUM;
import static org.apache.flink.table.planner.utils.TableTestUtil.*;
import static org.junit.Assert.assertEquals;

Expand Down Expand Up @@ -297,9 +293,6 @@ private void testLakeSoulTableSinkWithParallelismBase(
+ " real_col int"
+ ") WITH ("
+ "'"
+ HASH_BUCKET_NUM.key()
+ "'= '3',"
+ "'"
+ LAKESOUL_TABLE_PATH.key()
+ "'='" +
getTempDirUri("/test_table")
Expand Down Expand Up @@ -338,9 +331,6 @@ private void testLakeSoulTableSinkDeleteWithParallelismBase(
+ " PARTITIONED BY ( part )"
+ " WITH ("
+ "'"
+ HASH_BUCKET_NUM.key()
+ "'= '3',"
+ "'"
+ LAKESOUL_TABLE_PATH.key()
+ "'='" +
getTempDirUri("/test_table")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ public class LakeSoulSinkFailTest extends AbstractTestBase {
public static Map<String, Tuple3<ResolvedSchema, String, MockTableSource.StopBehavior>> parameters;
static String dropSourceSql = "drop table if exists test_source";
static String createSourceSqlFormat = "create table if not exists test_source %s " +
"with ('connector'='lakesoul', 'path'='/', 'hashBucketNum'='2', " + "'discoveryinterval'='1000'" + ")";
"with ('connector'='lakesoul', 'path'='/', %s " + "'discoveryinterval'='1000'" + ")";
static String dropSinkSql = "drop table if exists test_sink";
static String createSinkSqlFormat = "create table if not exists test_sink %s %s" +
"with ('connector'='lakesoul', 'path'='%s', 'hashBucketNum'='%d')";
"with ('connector'='lakesoul', 'path'='%s' %s)";
private static ArrayList<Integer> indexArr;
private static StreamExecutionEnvironment streamExecEnv;
private static StreamTableEnvironment streamTableEnv;
Expand Down Expand Up @@ -334,11 +334,15 @@ private void testLakeSoulSink(ResolvedSchema resolvedSchema, MockTableSource.Sto


streamTableEnv.executeSql(dropSourceSql);
streamTableEnv.executeSql(String.format(createSourceSqlFormat, resolvedSchema));
streamTableEnv.executeSql(String.format(createSourceSqlFormat, resolvedSchema,
resolvedSchema.getPrimaryKey().isPresent() ?
"'hashBucketNum'='2'," : ""));


streamTableEnv.executeSql(dropSinkSql);
streamTableEnv.executeSql(String.format(createSinkSqlFormat, resolvedSchema, partitionBy, path, 2));
streamTableEnv.executeSql(String.format(createSinkSqlFormat, resolvedSchema, partitionBy, path,
resolvedSchema.getPrimaryKey().isPresent() ?
", 'hashBucketNum'='2'" : ""));

streamTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
streamTableEnv.getConfig().setLocalTimeZone(TimeZone.getTimeZone("UTC").toZoneId());
Expand Down
1 change: 0 additions & 1 deletion rust/lakesoul-io/src/lakesoul_io_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,6 @@ fn register_hdfs_object_store(
{
let hdfs = Hdfs::try_new(_host, _config.clone())?;
_runtime.register_object_store(_url, Arc::new(hdfs));
println!("registered hdfs objec store {:?}, {:?}", _host, _url);
Ok(())
}
}
Expand Down

0 comments on commit 8d5256c

Please sign in to comment.