From d12e163205137d4fd418c98128263c5a0069c3c6 Mon Sep 17 00:00:00 2001 From: PengFei Li Date: Fri, 2 Aug 2024 14:12:54 +0800 Subject: [PATCH] [Feature] Support flink 1.19 (#379) Signed-off-by: PengFei Li --- common.sh | 6 +-- pom.xml | 13 +++-- .../it/sink/kafka/KafkaTableTestUtils.java | 53 ------------------- 3 files changed, 13 insertions(+), 59 deletions(-) diff --git a/common.sh b/common.sh index 89e5584d..5b7d95cc 100644 --- a/common.sh +++ b/common.sh @@ -32,9 +32,9 @@ if ! ${MVN_CMD} --version; then fi export MVN_CMD -SUPPORTED_MINOR_VERSION=("1.15" "1.16" "1.17" "1.18") +SUPPORTED_MINOR_VERSION=("1.15" "1.16" "1.17" "1.18" "1.19") # version formats are different among flink versions -SUPPORTED_KAFKA_CONNECTOR_VERSION=("1.15.0" "1.16.0" "1.17.0" "3.0.1-1.18") +SUPPORTED_KAFKA_CONNECTOR_VERSION=("1.15.0" "1.16.0" "1.17.0" "3.0.1-1.18" "3.2.0-1.19") VERSION_MESSAGE=$(IFS=, ; echo "${SUPPORTED_MINOR_VERSION[*]}") function check_flink_version_supported() { @@ -69,4 +69,4 @@ function get_kafka_connector_version() { echo "Can't find kafka connector version for flink-${FLINK_MINOR_VERSION}" exit 1 fi -} \ No newline at end of file +} diff --git a/pom.xml b/pom.xml index 07d15745..35b7dd17 100644 --- a/pom.xml +++ b/pom.xml @@ -53,9 +53,9 @@ limitations under the License. UTF-8 3.0.0-M3 3.0.0-M4 - 1.18 - 1.18.0 - 3.0.1-1.18 + 1.19 + 1.19.0 + 3.2.0-1.19 5.0.0 2.8.1 2.12 @@ -163,6 +163,13 @@ limitations under the License. test + + org.apache.flink + flink-connector-base + ${flink.version} + test + + org.apache.flink flink-core diff --git a/src/test/java/com/starrocks/connector/flink/it/sink/kafka/KafkaTableTestUtils.java b/src/test/java/com/starrocks/connector/flink/it/sink/kafka/KafkaTableTestUtils.java index a9ac0d73..1fce0ccb 100644 --- a/src/test/java/com/starrocks/connector/flink/it/sink/kafka/KafkaTableTestUtils.java +++ b/src/test/java/com/starrocks/connector/flink/it/sink/kafka/KafkaTableTestUtils.java @@ -49,26 +49,6 @@ * Utils for kafka table tests. Refer to Flink's KafkaTableTestUtils. */ public class KafkaTableTestUtils { - public static List collectRows(Table table, int expectedSize) throws Exception { - final TableResult result = table.execute(); - final List collectedRows = new ArrayList<>(); - try (CloseableIterator iterator = result.collect()) { - while (collectedRows.size() < expectedSize && iterator.hasNext()) { - collectedRows.add(iterator.next()); - } - } - result.getJobClient() - .ifPresent( - jc -> { - try { - jc.cancel().get(5, TimeUnit.SECONDS); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - - return collectedRows; - } public static List readLines(String resource) throws IOException { final URL url = KafkaToStarRocksITTest.class.getClassLoader().getResource(resource); @@ -76,38 +56,5 @@ public static List readLines(String resource) throws IOException { Path path = new File(url.getFile()).toPath(); return Files.readAllLines(path); } - - public static void waitingExpectedResults( - String sinkName, List expected, Duration timeout) - throws InterruptedException, TimeoutException { - Collections.sort(expected); - CommonTestUtils.waitUtil( - () -> { - List actual = TestValuesTableFactory.getResults(sinkName); - Collections.sort(actual); - return expected.equals(actual); - }, - timeout, - "Can not get the expected result."); - } - - public static void comparedWithKeyAndOrder( - Map> expectedData, List actual, int[] keyLoc) { - Map> actualData = new HashMap<>(); - for (Row row : actual) { - Row key = Row.project(row, keyLoc); - // ignore row kind - key.setKind(RowKind.INSERT); - actualData.computeIfAbsent(key, k -> new LinkedList<>()).add(row); - } - // compare key first - assertEquals("Actual result: " + actual, expectedData.size(), actualData.size()); - // compare by value - for (Row key : expectedData.keySet()) { - assertThat( - actualData.get(key), - TableTestMatchers.deepEqualTo(expectedData.get(key), false)); - } - } }