Skip to content

Commit

Permalink
[Feature] Support flink 1.19 (#379)
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy authored Aug 2, 2024
1 parent cc8689d commit d12e163
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 59 deletions.
6 changes: 3 additions & 3 deletions common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ if ! ${MVN_CMD} --version; then
fi
export MVN_CMD

SUPPORTED_MINOR_VERSION=("1.15" "1.16" "1.17" "1.18")
SUPPORTED_MINOR_VERSION=("1.15" "1.16" "1.17" "1.18" "1.19")
# version formats are different among flink versions
SUPPORTED_KAFKA_CONNECTOR_VERSION=("1.15.0" "1.16.0" "1.17.0" "3.0.1-1.18")
SUPPORTED_KAFKA_CONNECTOR_VERSION=("1.15.0" "1.16.0" "1.17.0" "3.0.1-1.18" "3.2.0-1.19")
VERSION_MESSAGE=$(IFS=, ; echo "${SUPPORTED_MINOR_VERSION[*]}")

function check_flink_version_supported() {
Expand Down Expand Up @@ -69,4 +69,4 @@ function get_kafka_connector_version() {
echo "Can't find kafka connector version for flink-${FLINK_MINOR_VERSION}"
exit 1
fi
}
}
13 changes: 10 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ limitations under the License.
<file_encoding>UTF-8</file_encoding>
<maven-enforcer-plugin.version>3.0.0-M3</maven-enforcer-plugin.version>
<maven-surefire-plugin.version>3.0.0-M4</maven-surefire-plugin.version>
<flink.minor.version>1.18</flink.minor.version>
<flink.version>1.18.0</flink.version>
<kafka.connector.version>3.0.1-1.18</kafka.connector.version>
<flink.minor.version>1.19</flink.minor.version>
<flink.version>1.19.0</flink.version>
<kafka.connector.version>3.2.0-1.19</kafka.connector.version>
<arrow.version>5.0.0</arrow.version>
<kafka.version>2.8.1</kafka.version>
<scala.binary.version>2.12</scala.binary.version>
Expand Down Expand Up @@ -163,6 +163,13 @@ limitations under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,65 +49,12 @@
* Utils for kafka table tests. Refer to Flink's KafkaTableTestUtils.
*/
public class KafkaTableTestUtils {
public static List<Row> collectRows(Table table, int expectedSize) throws Exception {
final TableResult result = table.execute();
final List<Row> collectedRows = new ArrayList<>();
try (CloseableIterator<Row> iterator = result.collect()) {
while (collectedRows.size() < expectedSize && iterator.hasNext()) {
collectedRows.add(iterator.next());
}
}
result.getJobClient()
.ifPresent(
jc -> {
try {
jc.cancel().get(5, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException(e);
}
});

return collectedRows;
}

public static List<String> readLines(String resource) throws IOException {
final URL url = KafkaToStarRocksITTest.class.getClassLoader().getResource(resource);
assert url != null;
Path path = new File(url.getFile()).toPath();
return Files.readAllLines(path);
}

public static void waitingExpectedResults(
String sinkName, List<String> expected, Duration timeout)
throws InterruptedException, TimeoutException {
Collections.sort(expected);
CommonTestUtils.waitUtil(
() -> {
List<String> actual = TestValuesTableFactory.getResults(sinkName);
Collections.sort(actual);
return expected.equals(actual);
},
timeout,
"Can not get the expected result.");
}

public static void comparedWithKeyAndOrder(
Map<Row, List<Row>> expectedData, List<Row> actual, int[] keyLoc) {
Map<Row, LinkedList<Row>> actualData = new HashMap<>();
for (Row row : actual) {
Row key = Row.project(row, keyLoc);
// ignore row kind
key.setKind(RowKind.INSERT);
actualData.computeIfAbsent(key, k -> new LinkedList<>()).add(row);
}
// compare key first
assertEquals("Actual result: " + actual, expectedData.size(), actualData.size());
// compare by value
for (Row key : expectedData.keySet()) {
assertThat(
actualData.get(key),
TableTestMatchers.deepEqualTo(expectedData.get(key), false));
}
}
}

0 comments on commit d12e163

Please sign in to comment.