diff --git a/build.sh b/build.sh index 3d503c58..21bcf4dc 100755 --- a/build.sh +++ b/build.sh @@ -18,40 +18,25 @@ # specific language governing permissions and limitations # under the License. -set -eo pipefail +source "$(dirname "$0")"/common.sh -# check maven -MVN_CMD=mvn -if [[ ! -z ${CUSTOM_MVN} ]]; then - MVN_CMD=${CUSTOM_MVN} -fi -if ! ${MVN_CMD} --version; then - echo "Error: mvn is not found" - exit 1 -fi -export MVN_CMD - -supported_minor_version=("1.15" "1.16" "1.17") -version_msg=$(IFS=, ; echo "${supported_minor_version[*]}") if [ ! $1 ] then echo "Usage:" echo " sh build.sh " - echo " supported flink version: ${version_msg}" + echo " supported flink version: ${VERSION_MESSAGE}" exit 1 fi flink_minor_version=$1 -if [[ " ${supported_minor_version[*]} " == *" $flink_minor_version "* ]]; -then - echo "Compiling connector for flink version $flink_minor_version" -else - echo "Error: only support flink version: ${version_msg}" - exit 1 -fi +check_flink_version_supported $flink_minor_version +flink_version="$(get_flink_version $flink_minor_version)" +kafka_connector_version="$(get_kafka_connector_version $flink_minor_version)" -flink_version=${flink_minor_version}.0 -${MVN_CMD} clean package -DskipTests -Dflink.minor.version=${flink_minor_version} -Dflink.version=${flink_version} +${MVN_CMD} clean package -DskipTests \ + -Dflink.minor.version=${flink_minor_version} \ + -Dflink.version=${flink_version} \ + -Dkafka.connector.version=${kafka_connector_version} echo "*********************************************************************" echo "Successfully build Flink StarRocks Connector for Flink $flink_minor_version" diff --git a/common.sh b/common.sh new file mode 100644 index 00000000..89e5584d --- /dev/null +++ b/common.sh @@ -0,0 +1,72 @@ +#!/usr/bin/env bash +# +# Copyright 2021-present StarRocks, Inc. All rights reserved. +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +set -eo pipefail + +# check maven +MVN_CMD=mvn +if [[ ! -z ${CUSTOM_MVN} ]]; then + MVN_CMD=${CUSTOM_MVN} +fi +if ! ${MVN_CMD} --version; then + echo "Error: mvn is not found" + exit 1 +fi +export MVN_CMD + +SUPPORTED_MINOR_VERSION=("1.15" "1.16" "1.17" "1.18") +# version formats are different among flink versions +SUPPORTED_KAFKA_CONNECTOR_VERSION=("1.15.0" "1.16.0" "1.17.0" "3.0.1-1.18") +VERSION_MESSAGE=$(IFS=, ; echo "${SUPPORTED_MINOR_VERSION[*]}") + +function check_flink_version_supported() { + local FLINK_MINOR_VERSION=$1 + if [[ " ${SUPPORTED_MINOR_VERSION[*]} " != *" $FLINK_MINOR_VERSION "* ]]; + then + echo "Error: only support flink version: ${VERSION_MESSAGE}" + exit 1 + fi +} + +function get_flink_version() { + local FLINK_MINOR_VERSION=$1 + echo "${FLINK_MINOR_VERSION}.0" +} + +function get_kafka_connector_version() { + local FLINK_MINOR_VERSION=$1 + local index=-1 + for ((i=0; i<${#SUPPORTED_MINOR_VERSION[@]}; i++)); do + if [ "${SUPPORTED_MINOR_VERSION[i]}" = "$FLINK_MINOR_VERSION" ]; then + index=$i + break + fi + done + + if [ "$index" != -1 ]; + then + local KAFKA_CONNECTOR_VERSION="${SUPPORTED_KAFKA_CONNECTOR_VERSION[index]}" + echo $KAFKA_CONNECTOR_VERSION + else + echo "Can't find kafka connector version for flink-${FLINK_MINOR_VERSION}" + exit 1 + fi +} \ No newline at end of file diff --git a/deploy.sh b/deploy.sh index 678316de..e44b110f 100644 --- a/deploy.sh +++ b/deploy.sh @@ -18,40 +18,25 @@ # specific language governing permissions and limitations # under the License. -set -eo pipefail +source "$(dirname "$0")"/common.sh -# check maven -MVN_CMD=mvn -if [[ ! -z ${CUSTOM_MVN} ]]; then - MVN_CMD=${CUSTOM_MVN} -fi -if ! ${MVN_CMD} --version; then - echo "Error: mvn is not found" - exit 1 -fi -export MVN_CMD - -supported_minor_version=("1.15" "1.16" "1.17") -version_msg=$(IFS=, ; echo "${supported_minor_version[*]}") if [ ! $1 ] then echo "Usage:" - echo " sh build.sh " - echo " supported flink version: ${version_msg}" + echo " sh deploy.sh " + echo " supported flink version: ${VERSION_MESSAGE}" exit 1 fi flink_minor_version=$1 -if [[ " ${supported_minor_version[*]} " == *" $flink_minor_version "* ]]; -then - echo "Compiling connector for flink version $flink_minor_version" -else - echo "Error: only support flink version: ${version_msg}" - exit 1 -fi +check_flink_version_supported $flink_minor_version +flink_version="$(get_flink_version $flink_minor_version)" +kafka_connector_version="$(get_kafka_connector_version $flink_minor_version)" -flink_version=${flink_minor_version}.0 -${MVN_CMD} clean deploy -Prelease -DskipTests -Dflink.minor.version=${flink_minor_version} -Dflink.version=${flink_version} +${MVN_CMD} clean deploy -Prelease -DskipTests \ + -Dflink.minor.version=${flink_minor_version} \ + -Dflink.version=${flink_version} \ + -Dkafka.connector.version=${kafka_connector_version} echo "*********************************************************************" echo "Successfully deploy Flink StarRocks Connector for Flink $flink_minor_version" diff --git a/examples/pom.xml b/examples/pom.xml index b8b3fd5c..ba789af6 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -25,10 +25,10 @@ limitations under the License. 1.8 1.8 UTF-8 - 1.15 - 1.15.0 + 1.17 + 1.17.0 2.12 - 1.2.7 + 1.2.8 2.17.1 diff --git a/examples/src/main/java/com/starrocks/connector/flink/examples/datastream/WriteMultipleTables.java b/examples/src/main/java/com/starrocks/connector/flink/examples/datastream/WriteMultipleTables.java new file mode 100644 index 00000000..798bffe2 --- /dev/null +++ b/examples/src/main/java/com/starrocks/connector/flink/examples/datastream/WriteMultipleTables.java @@ -0,0 +1,139 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.examples.datastream; + +import com.starrocks.connector.flink.table.data.DefaultStarRocksRowData; +import com.starrocks.connector.flink.table.data.StarRocksRowData; +import com.starrocks.connector.flink.table.sink.SinkFunctionFactory; +import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; +import com.starrocks.data.load.stream.properties.StreamLoadTableProperties; +import org.apache.flink.api.java.utils.MultipleParameterTool; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; + +/** + * This example will show how to use one sink to write data to multiple StarRocks tables . + * Note this example requires connector version >= 1.2.9. + */ +public class WriteMultipleTables { + + public static void main(String[] args) throws Exception { + // To run the example, you should prepare in the following steps + // + // 1. create two primary key tables in your StarRocks cluster. The DDL is + // CREATE DATABASE `test`; + // CREATE TABLE `test`.`tbl1` + // ( + // `id` int(11) NOT NULL COMMENT "", + // `name` varchar(65533) NULL DEFAULT "" COMMENT "", + // `score` int(11) DEFAULT "0" COMMENT "" + // ) + // ENGINE=OLAP + // PRIMARY KEY(`id`) + // COMMENT "OLAP" + // DISTRIBUTED BY HASH(`id`) + // PROPERTIES( + // "replication_num" = "1" + // ); + // + // CREATE TABLE `test`.`tbl2` + // ( + // `order_id` BIGINT NOT NULL COMMENT "", + // `order_state` INT DEFAULT "0" COMMENT "", + // `total_price` BIGINT DEFAULT "0" COMMENT "" + // ) + // ENGINE=OLAP + // PRIMARY KEY(`order_id`) + // COMMENT "OLAP" + // DISTRIBUTED BY HASH(`order_id`) + // PROPERTIES( + // "replication_num" = "1" + // ); + // + // 2. replace the connector options with your cluster configurations + MultipleParameterTool params = MultipleParameterTool.fromArgs(args); + String jdbcUrl = params.get("jdbcUrl", "jdbc:mysql://127.0.0.1:11903"); + String loadUrl = params.get("loadUrl", "127.0.0.1:11901"); + String userName = params.get("userName", "root"); + String password = params.get("password", ""); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // Generate records for tables `test`.`tbl1` and `test`.`tbl2`. Each record is represented with + // the structure StarRocksRowData. A StarRocksRowData includes the meta (which db and table) + // and data. The data is a json-format string whose fields correspond to partial or all columns + // of the table. + StarRocksRowData[] records = + new StarRocksRowData[]{ + // write full columns of `test`.`tbl1`: `id`, `name` and `score` + buildRow("test", "tbl1", "{\"id\":1, \"name\":\"starrocks-json\", \"score\":100}"), + // write partial columns of `test`.`tbl1`: `id`, `name` + buildRow("test", "tbl1", "{\"id\":2, \"name\":\"flink-json\"}"), + // write full columns of `test`.`tbl2`: `order_id`, `order_state` and `total_price` + buildRow("test", "tbl2", "{\"order_id\":1, \"order_state\":1, \"total_price\":100}"), + // write partial columns of `test`.`tbl2`: `order_id`, `order_state` + buildRow("test", "tbl2", "{\"order_id\":2, \"order_state\":2}"), + }; + DataStream source = env.fromElements(records); + + // Configure the connector with the required properties, and you also need to add properties + // "sink.properties.format" and "sink.properties.strip_outer_array" to tell the connector the + // input records are json-format. + StarRocksSinkOptions options = StarRocksSinkOptions.builder() + .withProperty("jdbc-url", jdbcUrl) + .withProperty("load-url", loadUrl) + .withProperty("database-name", "*") + .withProperty("table-name", "*") + .withProperty("username", userName) + .withProperty("password", password) + .withProperty("sink.properties.format", "json") + .withProperty("sink.properties.strip_outer_array", "true") + .withProperty("sink.properties.ignore_json_size", "true") + .build(); + + // By default, all tables will use the stream load properties defined when building + // StarRocksSinkOptions. You can also customize the stream load properties for each + // table. Here we create custom properties for the table `test`.`tbl2`, and the main + // difference is that enable partial_update. The table `test`.`tbl2` still use the + // default properties. + StreamLoadTableProperties tbl2Properties = StreamLoadTableProperties.builder() + .database("test") + .table("tbl2") + .addProperty("format", "json") + .addProperty("strip_outer_array", "true") + .addProperty("ignore_json_size", "true") + .addProperty("partial_update", "true") + .addProperty("columns", "`order_id`,`order_state`") + .build(); + options.addTableProperties(tbl2Properties); + + // Create the sink with the options + SinkFunction starRockSink = SinkFunctionFactory.createSinkFunction(options); + source.addSink(starRockSink); + + env.execute("WriteMultipleTables"); + } + + private static StarRocksRowData buildRow(String db, String table, String data) { + return new DefaultStarRocksRowData(null, db, table, data); + } +} diff --git a/pom.xml b/pom.xml index 5a4765b6..6c0120df 100644 --- a/pom.xml +++ b/pom.xml @@ -53,11 +53,14 @@ limitations under the License. UTF-8 3.0.0-M3 3.0.0-M4 - 1.17 - 1.17.0 + 1.18 + 1.18.0 + 3.0.1-1.18 5.0.0 2.8.1 2.12 + 31.1-jre + com.starrocks.shade @@ -97,37 +100,14 @@ limitations under the License. com.google.guava guava - 31.1-jre - - - commons-codec - commons-codec - 1.15 - - - com.github.jsqlparser - jsqlparser - 4.0 - - - commons-logging - commons-logging - 1.1.1 - - - commons-io - commons-io - 2.11.0 - - - org.apache.httpcomponents - httpcore - 4.4.6 - - - org.apache.httpcomponents - httpclient - 4.5.13 + ${guava.version} + + + + com.google.guava + listenablefuture + + com.alibaba @@ -159,14 +139,14 @@ limitations under the License. org.apache.flink flink-connector-kafka - ${flink.version} + ${kafka.connector.version} test org.apache.flink flink-connector-kafka - ${flink.version} + ${kafka.connector.version} test-jar test @@ -450,59 +430,39 @@ limitations under the License. com.alibaba - com.starrocks.shade.com.alibaba - - - org.apache.http - com.starrocks.shade.org.apache.http - - - org.apache.commons - com.starrocks.shade.org.apache.commons + ${shading.prefix}.com.alibaba org.apache.arrow - com.starrocks.shade.org.apache.arrow + ${shading.prefix}.org.apache.arrow io.netty - com.starrocks.shade.io.netty - - - com.google.flatbuffers - com.starrocks.shade.com.google.flatbuffers + ${shading.prefix}.io.netty - com.google.common - com.starrocks.shade.com.google.common + com.google + ${shading.prefix}.com.google + com.fasterxml.jackson - com.starrocks.shade.com.fasterxml.jackson + com.starrocks.streamload.shade.com.fasterxml.jackson - commons-codec:commons-codec - commons-io:commons-io - commons-logging:* - org.apache.httpcomponents:httpclient - org.apache.httpcomponents:httpcore com.alibaba:fastjson mysql:mysql-connector-java com.starrocks:starrocks-stream-load-sdk com.starrocks:starrocks-thrift-sdk - org.apache.arrow:arrow-vector - org.apache.arrow:arrow-memory-netty - org.apache.arrow:arrow-memory-core - org.apache.arrow:arrow-format - io.netty:netty-buffer - io.netty:netty-common + org.apache.arrow:* + io.netty:* com.google.flatbuffers:flatbuffers-java - com.google.guava:guava - com.fasterxml.jackson.core:jackson-annotations - com.fasterxml.jackson.core:jackson-core - com.fasterxml.jackson.core:jackson-databind + com.google.guava:* diff --git a/src/main/java/com/starrocks/connector/flink/catalog/StarRocksCatalog.java b/src/main/java/com/starrocks/connector/flink/catalog/StarRocksCatalog.java index ccb5db3b..63eaae1b 100644 --- a/src/main/java/com/starrocks/connector/flink/catalog/StarRocksCatalog.java +++ b/src/main/java/com/starrocks/connector/flink/catalog/StarRocksCatalog.java @@ -22,8 +22,8 @@ import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; import com.starrocks.connector.flink.table.source.StarRocksSourceOptions; -import org.apache.arrow.util.VisibleForTesting; import org.apache.commons.compress.utils.Lists; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.Schema; diff --git a/src/main/java/com/starrocks/connector/flink/manager/StarRocksQueryPlanVisitor.java b/src/main/java/com/starrocks/connector/flink/manager/StarRocksQueryPlanVisitor.java index 6006e012..576f5cc0 100644 --- a/src/main/java/com/starrocks/connector/flink/manager/StarRocksQueryPlanVisitor.java +++ b/src/main/java/com/starrocks/connector/flink/manager/StarRocksQueryPlanVisitor.java @@ -20,14 +20,14 @@ import com.starrocks.connector.flink.table.source.struct.QueryInfo; import com.starrocks.connector.flink.table.source.struct.QueryPlan; import com.starrocks.connector.flink.tools.JsonWrapper; +import com.starrocks.streamload.shade.org.apache.http.HttpEntity; +import com.starrocks.streamload.shade.org.apache.http.client.methods.CloseableHttpResponse; +import com.starrocks.streamload.shade.org.apache.http.client.methods.HttpPost; +import com.starrocks.streamload.shade.org.apache.http.entity.ByteArrayEntity; +import com.starrocks.streamload.shade.org.apache.http.impl.client.CloseableHttpClient; +import com.starrocks.streamload.shade.org.apache.http.impl.client.HttpClients; +import com.starrocks.streamload.shade.org.apache.http.util.EntityUtils; import org.apache.commons.codec.binary.Base64; -import org.apache.http.HttpEntity; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.entity.ByteArrayEntity; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkBufferEntity.java b/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkBufferEntity.java index e9570720..9ea62853 100644 --- a/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkBufferEntity.java +++ b/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkBufferEntity.java @@ -14,12 +14,12 @@ package com.starrocks.connector.flink.manager; +import com.google.common.base.Strings; + import java.io.Serializable; import java.util.ArrayList; import java.util.UUID; -import org.apache.flink.shaded.guava30.com.google.common.base.Strings; - public class StarRocksSinkBufferEntity implements Serializable { private static final long serialVersionUID = 1L; diff --git a/src/main/java/com/starrocks/connector/flink/manager/StarRocksStreamLoadListener.java b/src/main/java/com/starrocks/connector/flink/manager/StarRocksStreamLoadListener.java index c88233f3..dad64948 100644 --- a/src/main/java/com/starrocks/connector/flink/manager/StarRocksStreamLoadListener.java +++ b/src/main/java/com/starrocks/connector/flink/manager/StarRocksStreamLoadListener.java @@ -23,9 +23,9 @@ import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; import com.starrocks.data.load.stream.StreamLoadResponse; import com.starrocks.data.load.stream.v2.StreamLoadListener; -import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram; public class StarRocksStreamLoadListener implements StreamLoadListener { @@ -46,22 +46,22 @@ public class StarRocksStreamLoadListener implements StreamLoadListener { private transient Histogram writeDataTimeMs; private transient Histogram loadTimeMs; - public StarRocksStreamLoadListener(RuntimeContext context, StarRocksSinkOptions sinkOptions) { - totalFlushBytes = context.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_BYTES); - totalFlushRows = context.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_ROWS); - totalFlushTime = context.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_COST_TIME); - totalFlushTimeWithoutRetries = context.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_COST_TIME_WITHOUT_RETRIES); - totalFlushSucceededTimes = context.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_SUCCEEDED_TIMES); - totalFlushFailedTimes = context.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_FAILED_TIMES); - flushTimeNs = context.getMetricGroup().histogram(HISTOGRAM_FLUSH_TIME, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); - offerTimeNs = context.getMetricGroup().histogram(HISTOGRAM_OFFER_TIME_NS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); + public StarRocksStreamLoadListener(MetricGroup metricGroup, StarRocksSinkOptions sinkOptions) { + totalFlushBytes = metricGroup.counter(COUNTER_TOTAL_FLUSH_BYTES); + totalFlushRows = metricGroup.counter(COUNTER_TOTAL_FLUSH_ROWS); + totalFlushTime = metricGroup.counter(COUNTER_TOTAL_FLUSH_COST_TIME); + totalFlushTimeWithoutRetries = metricGroup.counter(COUNTER_TOTAL_FLUSH_COST_TIME_WITHOUT_RETRIES); + totalFlushSucceededTimes = metricGroup.counter(COUNTER_TOTAL_FLUSH_SUCCEEDED_TIMES); + totalFlushFailedTimes = metricGroup.counter(COUNTER_TOTAL_FLUSH_FAILED_TIMES); + flushTimeNs = metricGroup.histogram(HISTOGRAM_FLUSH_TIME, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); + offerTimeNs = metricGroup.histogram(HISTOGRAM_OFFER_TIME_NS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); - totalFilteredRows = context.getMetricGroup().counter(COUNTER_NUMBER_FILTERED_ROWS); - commitAndPublishTimeMs = context.getMetricGroup().histogram(HISTOGRAM_COMMIT_AND_PUBLISH_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); - streamLoadPlanTimeMs = context.getMetricGroup().histogram(HISTOGRAM_STREAM_LOAD_PLAN_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); - readDataTimeMs = context.getMetricGroup().histogram(HISTOGRAM_READ_DATA_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); - writeDataTimeMs = context.getMetricGroup().histogram(HISTOGRAM_WRITE_DATA_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); - loadTimeMs = context.getMetricGroup().histogram(HISTOGRAM_LOAD_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); + totalFilteredRows = metricGroup.counter(COUNTER_NUMBER_FILTERED_ROWS); + commitAndPublishTimeMs = metricGroup.histogram(HISTOGRAM_COMMIT_AND_PUBLISH_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); + streamLoadPlanTimeMs = metricGroup.histogram(HISTOGRAM_STREAM_LOAD_PLAN_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); + readDataTimeMs = metricGroup.histogram(HISTOGRAM_READ_DATA_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); + writeDataTimeMs = metricGroup.histogram(HISTOGRAM_WRITE_DATA_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); + loadTimeMs = metricGroup.histogram(HISTOGRAM_LOAD_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); } @Override diff --git a/src/main/java/com/starrocks/connector/flink/manager/StarRocksStreamLoadVisitor.java b/src/main/java/com/starrocks/connector/flink/manager/StarRocksStreamLoadVisitor.java index 6777667f..ef89fce3 100644 --- a/src/main/java/com/starrocks/connector/flink/manager/StarRocksStreamLoadVisitor.java +++ b/src/main/java/com/starrocks/connector/flink/manager/StarRocksStreamLoadVisitor.java @@ -19,18 +19,18 @@ import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; import com.starrocks.connector.flink.tools.JsonWrapper; import com.starrocks.data.load.stream.exception.StreamLoadFailException; +import com.starrocks.streamload.shade.org.apache.http.HttpEntity; +import com.starrocks.streamload.shade.org.apache.http.client.config.RequestConfig; +import com.starrocks.streamload.shade.org.apache.http.client.methods.CloseableHttpResponse; +import com.starrocks.streamload.shade.org.apache.http.client.methods.HttpGet; +import com.starrocks.streamload.shade.org.apache.http.client.methods.HttpPut; +import com.starrocks.streamload.shade.org.apache.http.entity.ByteArrayEntity; +import com.starrocks.streamload.shade.org.apache.http.impl.client.CloseableHttpClient; +import com.starrocks.streamload.shade.org.apache.http.impl.client.DefaultRedirectStrategy; +import com.starrocks.streamload.shade.org.apache.http.impl.client.HttpClientBuilder; +import com.starrocks.streamload.shade.org.apache.http.impl.client.HttpClients; +import com.starrocks.streamload.shade.org.apache.http.util.EntityUtils; import org.apache.commons.codec.binary.Base64; -import org.apache.http.HttpEntity; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.methods.HttpPut; -import org.apache.http.entity.ByteArrayEntity; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.DefaultRedirectStrategy; -import org.apache.http.impl.client.HttpClientBuilder; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformer.java b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformer.java index 32d36b83..aef0c02b 100644 --- a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformer.java +++ b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformer.java @@ -20,7 +20,6 @@ import com.starrocks.connector.flink.tools.JsonWrapper; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.ArrayData; import org.apache.flink.table.data.GenericArrayData; @@ -78,8 +77,9 @@ public void setTableSchema(TableSchema ts) { @Override public void setRuntimeContext(RuntimeContext runtimeCtx) { - final TypeSerializer typeSerializer = rowDataTypeInfo.createSerializer(runtimeCtx.getExecutionConfig()); - valueTransform = runtimeCtx.getExecutionConfig().isObjectReuseEnabled() ? typeSerializer::copy : Function.identity(); + // No need to copy the value even if object reuse is enabled, + // because the raw RowData value will not be buffered + this.valueTransform = Function.identity(); } @Override diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/ExactlyOnceLabelGeneratorFactory.java b/src/main/java/com/starrocks/connector/flink/table/sink/ExactlyOnceLabelGeneratorFactory.java index 6c6ae15f..4d2d575e 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/ExactlyOnceLabelGeneratorFactory.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/ExactlyOnceLabelGeneratorFactory.java @@ -21,7 +21,7 @@ package com.starrocks.connector.flink.table.sink; import com.starrocks.data.load.stream.LabelGeneratorFactory; -import org.apache.arrow.util.VisibleForTesting; +import org.apache.flink.annotation.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/SinkFunctionFactory.java b/src/main/java/com/starrocks/connector/flink/table/sink/SinkFunctionFactory.java index 4409f977..bee4b2ad 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/SinkFunctionFactory.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/SinkFunctionFactory.java @@ -19,6 +19,7 @@ package com.starrocks.connector.flink.table.sink; import com.starrocks.connector.flink.row.sink.StarRocksIRowTransformer; +import com.starrocks.connector.flink.table.sink.v2.StarRocksSink; import org.apache.flink.table.api.TableSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -114,4 +115,23 @@ public static StarRocksDynamicSinkFunctionBase createSinkFunction(StarRoc throw new UnsupportedOperationException("Unsupported sink type " + sinkVersion.name()); } } + + public static StarRocksSink createSink( + StarRocksSinkOptions sinkOptions, TableSchema schema, StarRocksIRowTransformer rowTransformer) { + detectStarRocksFeature(sinkOptions); + SinkVersion sinkVersion = getSinkVersion(sinkOptions); + if (sinkVersion == SinkVersion.V2) { + return new StarRocksSink<>(sinkOptions, schema, rowTransformer); + } + throw new UnsupportedOperationException("New sink api don't support sink type " + sinkVersion.name()); + } + + public static StarRocksSink createSink(StarRocksSinkOptions sinkOptions) { + detectStarRocksFeature(sinkOptions); + SinkVersion sinkVersion = getSinkVersion(sinkOptions); + if (sinkVersion == SinkVersion.V2) { + return new StarRocksSink<>(sinkOptions); + } + throw new UnsupportedOperationException("New sink api don't support sink type " + sinkVersion.name()); + } } diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java index bfd6a950..201502f5 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java @@ -25,10 +25,6 @@ import com.starrocks.connector.flink.row.sink.StarRocksSerializerFactory; import com.starrocks.connector.flink.tools.EnvUtils; import com.starrocks.connector.flink.tools.JsonWrapper; -import net.sf.jsqlparser.parser.CCJSqlParserUtil; -import net.sf.jsqlparser.statement.Statement; -import net.sf.jsqlparser.statement.alter.Alter; -import net.sf.jsqlparser.statement.truncate.Truncate; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; @@ -39,13 +35,10 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.binary.NestedRowData; import org.apache.flink.types.RowKind; -import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; import java.util.Map; public class StarRocksDynamicSinkFunction extends StarRocksDynamicSinkFunctionBase { @@ -126,33 +119,6 @@ public synchronized void invoke(T value, Context context) throws Exception { totalInvokeRowsTime.inc(System.nanoTime() - start); return; } - if (value instanceof NestedRowData) { - final int headerSize = 256; - NestedRowData ddlData = (NestedRowData) value; - if (ddlData.getSegments().length != 1 || ddlData.getSegments()[0].size() < headerSize) { - return; - } - int totalSize = ddlData.getSegments()[0].size(); - byte[] data = new byte[totalSize - headerSize]; - ddlData.getSegments()[0].get(headerSize, data); - Map ddlMap = InstantiationUtil.deserializeObject(data, HashMap.class.getClassLoader()); - if (null == ddlMap - || "true".equals(ddlMap.get("snapshot")) - || Strings.isNullOrEmpty(ddlMap.get("ddl")) - || Strings.isNullOrEmpty(ddlMap.get("databaseName"))) { - return; - } - Statement stmt = CCJSqlParserUtil.parse(ddlMap.get("ddl")); - if (stmt instanceof Truncate) { - Truncate truncate = (Truncate) stmt; - if (!sinkOptions.getTableName().equalsIgnoreCase(truncate.getTable().getName())) { - return; - } - // TODO: add ddl to queue - } else if (stmt instanceof Alter) { - Alter alter = (Alter) stmt; - } - } if (value instanceof RowData) { if (RowKind.UPDATE_BEFORE.equals(((RowData)value).getRowKind()) && (!sinkOptions.supportUpsertDelete() || sinkOptions.getIgnoreUpdateBefore())) { diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java index ecf75628..63536236 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java @@ -33,10 +33,6 @@ import com.starrocks.data.load.stream.LabelGeneratorFactory; import com.starrocks.data.load.stream.StreamLoadSnapshot; import com.starrocks.data.load.stream.v2.StreamLoadManagerV2; -import net.sf.jsqlparser.parser.CCJSqlParserUtil; -import net.sf.jsqlparser.statement.Statement; -import net.sf.jsqlparser.statement.alter.Alter; -import net.sf.jsqlparser.statement.truncate.Truncate; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; @@ -48,9 +44,7 @@ import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.binary.NestedRowData; import org.apache.flink.types.RowKind; -import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +52,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -157,33 +150,6 @@ public void invoke(T value, Context context) throws Exception { return; } - if (value instanceof NestedRowData) { - NestedRowData ddlData = (NestedRowData) value; - if (ddlData.getSegments().length != 1 || ddlData.getSegments()[0].size() < NESTED_ROW_DATA_HEADER_SIZE) { - return; - } - - int totalSize = ddlData.getSegments()[0].size(); - byte[] data = new byte[totalSize - NESTED_ROW_DATA_HEADER_SIZE]; - ddlData.getSegments()[0].get(NESTED_ROW_DATA_HEADER_SIZE, data); - Map ddlMap = InstantiationUtil.deserializeObject(data, HashMap.class.getClassLoader()); - if (ddlMap == null - || "true".equals(ddlMap.get("snapshot")) - || Strings.isNullOrEmpty(ddlMap.get("ddl")) - || Strings.isNullOrEmpty(ddlMap.get("databaseName"))) { - return; - } - Statement statement = CCJSqlParserUtil.parse(ddlMap.get("ddl")); - if (statement instanceof Truncate) { - Truncate truncate = (Truncate) statement; - if (!sinkOptions.getTableName().equalsIgnoreCase(truncate.getTable().getName())) { - return; - } - // TODO: add ddl to queue - } else if (statement instanceof Alter) { - - } - } if (value instanceof RowData) { if (RowKind.UPDATE_BEFORE.equals(((RowData)value).getRowKind()) && (!sinkOptions.supportUpsertDelete() || sinkOptions.getIgnoreUpdateBefore())) { @@ -215,7 +181,7 @@ public void open(Configuration parameters) throws Exception { if (serializer != null) { this.serializer.open(new StarRocksISerializer.SerializerContext(getOrCreateJsonWrapper())); } - this.streamLoadListener = new StarRocksStreamLoadListener(getRuntimeContext(), sinkOptions); + this.streamLoadListener = new StarRocksStreamLoadListener(getRuntimeContext().getMetricGroup(), sinkOptions); sinkManager.setStreamLoadListener(streamLoadListener); LabelGeneratorFactory labelGeneratorFactory; diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSink.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSink.java index c5e38eff..41c80227 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSink.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSink.java @@ -15,11 +15,13 @@ package com.starrocks.connector.flink.table.sink; import com.starrocks.connector.flink.row.sink.StarRocksTableRowTransformer; +import com.starrocks.connector.flink.table.sink.v2.StarRocksSink; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.connector.sink.SinkV2Provider; import org.apache.flink.table.data.RowData; public class StarRocksDynamicTableSink implements DynamicTableSink { @@ -41,12 +43,23 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { @SuppressWarnings("unchecked") public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { final TypeInformation rowDataTypeInfo = context.createTypeInformation(flinkSchema.toRowDataType()); - StarRocksDynamicSinkFunctionBase starrocksSinkFunction = SinkFunctionFactory.createSinkFunction( - sinkOptions, - flinkSchema, - new StarRocksTableRowTransformer(rowDataTypeInfo) - ); - return SinkFunctionProvider.of(starrocksSinkFunction, sinkOptions.getSinkParallelism()); + if (sinkOptions.isUseUnifiedSinkApi()) { + StarRocksSink starRocksSink = + SinkFunctionFactory.createSink( + sinkOptions, + flinkSchema, + new StarRocksTableRowTransformer(rowDataTypeInfo) + ); + return SinkV2Provider.of(starRocksSink, sinkOptions.getSinkParallelism()); + } else { + StarRocksDynamicSinkFunctionBase starrocksSinkFunction = + SinkFunctionFactory.createSinkFunction( + sinkOptions, + flinkSchema, + new StarRocksTableRowTransformer(rowDataTypeInfo) + ); + return SinkFunctionProvider.of(starrocksSinkFunction, sinkOptions.getSinkParallelism()); + } } @Override diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSinkFactory.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSinkFactory.java index 39f858d1..21a2b7c6 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSinkFactory.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSinkFactory.java @@ -85,6 +85,7 @@ public Set> optionalOptions() { optionalOptions.add(StarRocksSinkOptions.SINK_ENABLE_EXACTLY_ONCE_LABEL_GEN); optionalOptions.add(StarRocksSinkOptions.SINK_ABORT_LINGERING_TXNS); optionalOptions.add(StarRocksSinkOptions.SINK_ABORT_CHECK_NUM_TXNS); + optionalOptions.add(StarRocksSinkOptions.SINK_USE_NEW_SINK_API); return optionalOptions; } } diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java index e03daf85..e0fac69c 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java @@ -81,7 +81,7 @@ public enum StreamLoadFormat { public static final ConfigOption SINK_LABEL_PREFIX = ConfigOptions.key("sink.label-prefix") .stringType().noDefaultValue().withDescription("The prefix of the stream load label. Available values are within [-_A-Za-z0-9]"); public static final ConfigOption SINK_CONNECT_TIMEOUT = ConfigOptions.key("sink.connect.timeout-ms") - .intType().defaultValue(1000).withDescription("Timeout in millisecond for connecting to the `load-url`."); + .intType().defaultValue(30000).withDescription("Timeout in millisecond for connecting to the `load-url`."); public static final ConfigOption SINK_WAIT_FOR_CONTINUE_TIMEOUT = ConfigOptions.key("sink.wait-for-continue.timeout-ms") .intType().defaultValue(30000).withDescription("Timeout in millisecond to wait for 100-continue response for http client."); public static final ConfigOption SINK_IO_THREAD_COUNT = ConfigOptions.key("sink.io.thread-count") @@ -139,6 +139,13 @@ public enum StreamLoadFormat { "The number of transactions to check if they are lingering. -1 indicates that check until finding the first " + "transaction that is not lingering."); + public static final ConfigOption SINK_USE_NEW_SINK_API = ConfigOptions.key("sink.use.new-sink-api") + .booleanType().defaultValue(false).withDescription("Whether to use the implementation with the unified sink api " + + "described in Flink FLIP-191. There is no difference for users whether to enable this flag. This is just " + + "for adapting some frameworks which only support new sink api, and Flink will also remove the old sink api " + + "in the coming 2.0. Note that it's not compatible after changing the flag, that's, you can't recover from " + + "the previous job after changing the flag."); + public static final ConfigOption SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM; // Sink semantic @@ -351,6 +358,10 @@ public int getAbortCheckNumTxns() { return tableOptions.get(SINK_ABORT_CHECK_NUM_TXNS); } + public boolean isUseUnifiedSinkApi() { + return tableOptions.get(SINK_USE_NEW_SINK_API); + } + private void validateStreamLoadUrl() { tableOptions.getOptional(LOAD_URL).ifPresent(urlList -> { for (String host : urlList) { diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksCommittable.java b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksCommittable.java new file mode 100644 index 00000000..25294257 --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksCommittable.java @@ -0,0 +1,36 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.table.sink.v2; + +import com.starrocks.data.load.stream.StreamLoadSnapshot; + +public class StarRocksCommittable { + + private final StreamLoadSnapshot labelSnapshot; + + public StarRocksCommittable(StreamLoadSnapshot labelSnapshot) { + this.labelSnapshot = labelSnapshot; + } + + public StreamLoadSnapshot getLabelSnapshot() { + return labelSnapshot; + } +} diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksCommittableSerializer.java b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksCommittableSerializer.java new file mode 100644 index 00000000..bfb69090 --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksCommittableSerializer.java @@ -0,0 +1,50 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.table.sink.v2; + +import com.starrocks.connector.flink.tools.JsonWrapper; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.IOException; + +public class StarRocksCommittableSerializer implements SimpleVersionedSerializer { + + private final JsonWrapper jsonWrapper; + + public StarRocksCommittableSerializer() { + this.jsonWrapper = new JsonWrapper(); + } + + @Override + public int getVersion() { + return 1; + } + + @Override + public byte[] serialize(StarRocksCommittable committable) throws IOException { + return jsonWrapper.toJSONBytes(committable); + } + + @Override + public StarRocksCommittable deserialize(int version, byte[] serialized) throws IOException { + return jsonWrapper.parseObject(serialized, StarRocksCommittable.class); + } +} diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksCommitter.java b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksCommitter.java new file mode 100644 index 00000000..4a004268 --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksCommitter.java @@ -0,0 +1,96 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.table.sink.v2; + +import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; +import com.starrocks.connector.flink.table.sink.StarRocksSinkSemantic; +import com.starrocks.connector.flink.tools.EnvUtils; +import com.starrocks.data.load.stream.properties.StreamLoadProperties; +import com.starrocks.data.load.stream.v2.StreamLoadManagerV2; +import org.apache.flink.api.connector.sink2.Committer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; + +public class StarRocksCommitter implements Committer { + + private static final Logger LOG = LoggerFactory.getLogger(StarRocksCommitter.class); + + private final int maxRetries; + private final StreamLoadManagerV2 sinkManager; + + public StarRocksCommitter( + StarRocksSinkOptions sinkOptions, + StreamLoadProperties streamLoadProperties) { + this.maxRetries = sinkOptions.getSinkMaxRetries(); + this.sinkManager = new StreamLoadManagerV2(streamLoadProperties, + sinkOptions.getSemantic() == StarRocksSinkSemantic.AT_LEAST_ONCE); + try { + // TODO no need to start flush thread in sinkManager + sinkManager.init(); + } catch (Exception e) { + LOG.error("Failed to init sink manager.", e); + try { + sinkManager.close(); + } catch (Exception ie) { + LOG.error("Failed to close sink manager after init failure.", ie); + } + throw new RuntimeException("Failed to init sink manager", e); + } + LOG.info("Create StarRocksCommitter, maxRetries: {}. {}", maxRetries, EnvUtils.getGitInformation()); + } + + @Override + public void commit(Collection> committables) + throws IOException, InterruptedException { + for (CommitRequest commitRequest : committables) { + StarRocksCommittable committable = commitRequest.getCommittable(); + RuntimeException firstException = null; + for (int i = 0; i <= maxRetries; i++) { + try { + boolean success = sinkManager.commit(committable.getLabelSnapshot()); + if (success) { + break; + } + throw new RuntimeException("Please see the taskmanager log for the failure reason"); + } catch (Exception e) { + LOG.error("Fail to commit after {} retries, max retries: {}", i, maxRetries, e); + if (firstException != null) { + firstException = new RuntimeException("Failed to commit", e); + } + } + } + if (firstException != null) { + throw firstException; + } + } + } + + @Override + public void close() throws Exception { + LOG.info("Close StarRocksCommitter."); + if(sinkManager != null) { + sinkManager.close(); + } + } +} diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksSink.java b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksSink.java new file mode 100644 index 00000000..56177c32 --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksSink.java @@ -0,0 +1,109 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.table.sink.v2; + +import com.starrocks.connector.flink.manager.StarRocksSinkTable; +import com.starrocks.connector.flink.row.sink.StarRocksIRowTransformer; +import com.starrocks.connector.flink.row.sink.StarRocksISerializer; +import com.starrocks.connector.flink.row.sink.StarRocksSerializerFactory; +import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; +import com.starrocks.data.load.stream.properties.StreamLoadProperties; +import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.api.connector.sink2.StatefulSink; +import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.table.api.TableSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; + +public class StarRocksSink + implements StatefulSink, TwoPhaseCommittingSink { + + private static final Logger LOG = LoggerFactory.getLogger(StarRocksSink.class); + + private final StarRocksSinkOptions sinkOptions; + private final StarRocksISerializer serializer; + private final StarRocksIRowTransformer rowTransformer; + private final StreamLoadProperties streamLoadProperties; + + + public StarRocksSink(StarRocksSinkOptions sinkOptions, TableSchema schema, StarRocksIRowTransformer rowTransformer) { + this.sinkOptions = sinkOptions; + this.rowTransformer = rowTransformer; + StarRocksSinkTable sinkTable = StarRocksSinkTable.builder() + .sinkOptions(sinkOptions) + .build(); + sinkTable.validateTableStructure(sinkOptions, schema); + // StarRocksJsonSerializer depends on SinkOptions#supportUpsertDelete which is decided in + // StarRocksSinkTable#validateTableStructure, so create serializer after validating table structure + this.serializer = StarRocksSerializerFactory.createSerializer(sinkOptions, schema.getFieldNames()); + rowTransformer.setStarRocksColumns(sinkTable.getFieldMapping()); + rowTransformer.setTableSchema(schema); + this.streamLoadProperties = sinkOptions.getProperties(sinkTable); + } + + public StarRocksSink(StarRocksSinkOptions sinkOptions) { + this.sinkOptions = sinkOptions; + this.serializer = null; + this.rowTransformer = null; + this.streamLoadProperties = sinkOptions.getProperties(null); + } + + @Override + public StarRocksWriter createWriter(InitContext context) throws IOException { + return restoreWriter(context, Collections.emptyList()); + } + + @Override + public StarRocksWriter restoreWriter(InitContext context, Collection recoveredState) + throws IOException { + try { + return new StarRocksWriter<>( + sinkOptions, + serializer, + rowTransformer, + streamLoadProperties, + context, + Collections.emptyList()); + } catch (Exception e) { + throw new RuntimeException("Failed to create writer.", e); + } + } + + @Override + public SimpleVersionedSerializer getWriterStateSerializer() { + return new StarRocksWriterStateSerializer(); + } + + @Override + public Committer createCommitter() throws IOException { + return new StarRocksCommitter(sinkOptions, streamLoadProperties); + } + + @Override + public SimpleVersionedSerializer getCommittableSerializer() { + return new StarRocksCommittableSerializer(); + } +} diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriter.java b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriter.java new file mode 100644 index 00000000..cfbe1f7b --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriter.java @@ -0,0 +1,254 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.table.sink.v2; + +import com.google.common.base.Strings; +import com.starrocks.connector.flink.manager.StarRocksStreamLoadListener; +import com.starrocks.connector.flink.row.sink.StarRocksIRowTransformer; +import com.starrocks.connector.flink.row.sink.StarRocksISerializer; +import com.starrocks.connector.flink.table.data.StarRocksRowData; +import com.starrocks.connector.flink.table.sink.ExactlyOnceLabelGeneratorFactory; +import com.starrocks.connector.flink.table.sink.ExactlyOnceLabelGeneratorSnapshot; +import com.starrocks.connector.flink.table.sink.LingeringTransactionAborter; +import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; +import com.starrocks.connector.flink.table.sink.StarRocksSinkRowDataWithMeta; +import com.starrocks.connector.flink.table.sink.StarRocksSinkSemantic; +import com.starrocks.connector.flink.tools.EnvUtils; +import com.starrocks.connector.flink.tools.JsonWrapper; +import com.starrocks.data.load.stream.LabelGeneratorFactory; +import com.starrocks.data.load.stream.StreamLoadSnapshot; +import com.starrocks.data.load.stream.properties.StreamLoadProperties; +import com.starrocks.data.load.stream.v2.StreamLoadManagerV2; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.StatefulSink; +import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; +import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; +import org.apache.flink.table.data.RowData; +import org.apache.flink.types.RowKind; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +public class StarRocksWriter + implements StatefulSink.StatefulSinkWriter, + TwoPhaseCommittingSink.PrecommittingSinkWriter { + + private static final Logger LOG = LoggerFactory.getLogger(StarRocksWriter.class); + + private final StarRocksSinkOptions sinkOptions; + private final StarRocksISerializer serializer; + private final StarRocksIRowTransformer rowTransformer; + private final JsonWrapper jsonWrapper; + private final StarRocksStreamLoadListener streamLoadListener; + private final LabelGeneratorFactory labelGeneratorFactory; + private final StreamLoadManagerV2 sinkManager; + private long totalReceivedRows = 0; + + public StarRocksWriter( + StarRocksSinkOptions sinkOptions, + StarRocksISerializer serializer, + StarRocksIRowTransformer rowTransformer, + StreamLoadProperties streamLoadProperties, + Sink.InitContext initContext, + Collection recoveredState) throws Exception { + this.sinkOptions = sinkOptions; + this.serializer = serializer; + this.rowTransformer = rowTransformer; + + this.jsonWrapper = new JsonWrapper(); + if (this.serializer != null) { + this.serializer.open(new StarRocksISerializer.SerializerContext(jsonWrapper)); + } + if (this.rowTransformer != null) { + this.rowTransformer.setRuntimeContext(null); + this.rowTransformer.setFastJsonWrapper(jsonWrapper); + } + this.streamLoadListener = new StarRocksStreamLoadListener(initContext.metricGroup(), sinkOptions); + + long restoredCheckpointId = initContext.getRestoredCheckpointId() + .orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1); + List restoredGeneratorSnapshots = new ArrayList<>(); + for (StarRocksWriterState writerState : recoveredState) { + restoredGeneratorSnapshots.addAll(writerState.getLabelSnapshots()); + } + String labelPrefix = sinkOptions.getLabelPrefix(); + if (labelPrefix == null || + sinkOptions.getSemantic() == StarRocksSinkSemantic.AT_LEAST_ONCE || + !sinkOptions.isEnableExactlyOnceLabelGen()) { + this.labelGeneratorFactory = new LabelGeneratorFactory.DefaultLabelGeneratorFactory( + labelPrefix == null ? "flink" : labelPrefix); + } else { + this.labelGeneratorFactory = new ExactlyOnceLabelGeneratorFactory( + labelPrefix, + initContext.getNumberOfParallelSubtasks(), + initContext.getSubtaskId(), + restoredCheckpointId); + } + + this.sinkManager = new StreamLoadManagerV2(streamLoadProperties, + sinkOptions.getSemantic() == StarRocksSinkSemantic.AT_LEAST_ONCE); + sinkManager.setStreamLoadListener(streamLoadListener); + sinkManager.setLabelGeneratorFactory(labelGeneratorFactory); + try { + sinkManager.init(); + } catch (Exception e) { + LOG.error("Failed to init sink manager.", e); + try { + sinkManager.close(); + } catch (Exception ie) { + LOG.error("Failed to close sink manager after init failure.", ie); + } + throw new RuntimeException("Failed to init sink manager", e); + } + + try { + if (sinkOptions.getSemantic() == StarRocksSinkSemantic.EXACTLY_ONCE + && sinkOptions.isAbortLingeringTxns()) { + LingeringTransactionAborter aborter = new LingeringTransactionAborter( + sinkOptions.getLabelPrefix(), + restoredCheckpointId, + initContext.getSubtaskId(), + sinkOptions.getAbortCheckNumTxns(), + sinkOptions.getDbTables(), + restoredGeneratorSnapshots, + sinkManager.getStreamLoader()); + + aborter.execute(); + } + } catch (Exception e) { + LOG.error("Failed to abort lingering transactions.", e); + try { + sinkManager.close(); + } catch (Exception ie) { + LOG.error("Failed to close sink manager after aborting lingering transaction failure.", ie); + } + throw new RuntimeException("Failed to abort lingering transactions", e); + } + + LOG.info("Create StarRocksWriter. {}", EnvUtils.getGitInformation()); + } + + @Override + public void write(InputT element, Context context) throws IOException, InterruptedException { + if (serializer == null) { + if (element instanceof StarRocksSinkRowDataWithMeta) { + StarRocksSinkRowDataWithMeta data = (StarRocksSinkRowDataWithMeta) element; + if (Strings.isNullOrEmpty(data.getDatabase()) + || Strings.isNullOrEmpty(data.getTable()) + || data.getDataRows() == null) { + LOG.warn(String.format("json row data not fulfilled. {database: %s, table: %s, dataRows: %s}", + data.getDatabase(), data.getTable(), Arrays.toString(data.getDataRows()))); + return; + } + sinkManager.write(null, data.getDatabase(), data.getTable(), data.getDataRows()); + return; + } else if (element instanceof StarRocksRowData) { + StarRocksRowData data = (StarRocksRowData) element; + if (Strings.isNullOrEmpty(data.getDatabase()) + || Strings.isNullOrEmpty(data.getTable()) + || data.getRow() == null) { + LOG.warn(String.format("json row data not fulfilled. {database: %s, table: %s, dataRows: %s}", + data.getDatabase(), data.getTable(), data.getRow())); + return; + } + sinkManager.write(data.getUniqueKey(), data.getDatabase(), data.getTable(), data.getRow()); + return; + } + // raw data sink + sinkManager.write(null, sinkOptions.getDatabaseName(), sinkOptions.getTableName(), element.toString()); + return; + } + + if (element instanceof RowData) { + if (RowKind.UPDATE_BEFORE.equals(((RowData) element).getRowKind()) && + (!sinkOptions.supportUpsertDelete() || sinkOptions.getIgnoreUpdateBefore())) { + return; + } + if (!sinkOptions.supportUpsertDelete() && RowKind.DELETE.equals(((RowData) element).getRowKind())) { + // let go the UPDATE_AFTER and INSERT rows for tables who have a group of `unique` or `duplicate` keys. + return; + } + } + String serializedValue = serializer.serialize(rowTransformer.transform(element, sinkOptions.supportUpsertDelete())); + sinkManager.write( + null, + sinkOptions.getDatabaseName(), + sinkOptions.getTableName(), + serializedValue); + + totalReceivedRows += 1; + if (totalReceivedRows % 100 == 1) { + LOG.debug("Received raw record: {}", element); + LOG.debug("Received serialized record: {}", serializedValue); + } + } + + @Override + public void flush(boolean endOfInput) throws IOException, InterruptedException { + sinkManager.flush(); + } + + @Override + public Collection prepareCommit() throws IOException, InterruptedException { + if (sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) { + return Collections.emptyList(); + } + + StreamLoadSnapshot snapshot = sinkManager.snapshot(); + if (sinkManager.prepare(snapshot)) { + return Collections.singleton(new StarRocksCommittable(snapshot)); + } else { + sinkManager.abort(snapshot); + throw new RuntimeException("Snapshot state failed by prepare"); + } + } + + @Override + public List snapshotState(long checkpointId) throws IOException { + if (sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE || + !(labelGeneratorFactory instanceof ExactlyOnceLabelGeneratorFactory)) { + return Collections.emptyList(); + } + + List labelSnapshots = + ((ExactlyOnceLabelGeneratorFactory) labelGeneratorFactory).snapshot(checkpointId); + return Collections.singletonList(new StarRocksWriterState(labelSnapshots)); + } + + @Override + public void close() throws Exception { + LOG.info("Close StarRocksWriter"); + if (sinkManager != null) { + try { + StreamLoadSnapshot snapshot = sinkManager.snapshot(); + sinkManager.abort(snapshot); + } finally { + sinkManager.close(); + } + } + } +} diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriterState.java b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriterState.java new file mode 100644 index 00000000..e07680bd --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriterState.java @@ -0,0 +1,38 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.table.sink.v2; + +import com.starrocks.connector.flink.table.sink.ExactlyOnceLabelGeneratorSnapshot; + +import java.util.List; + +public class StarRocksWriterState { + + private List labelSnapshots; + + public StarRocksWriterState(List labelSnapshots) { + this.labelSnapshots = labelSnapshots; + } + + public List getLabelSnapshots() { + return labelSnapshots; + } +} diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriterStateSerializer.java b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriterStateSerializer.java new file mode 100644 index 00000000..c7200463 --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriterStateSerializer.java @@ -0,0 +1,51 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.table.sink.v2; + +import com.starrocks.connector.flink.table.sink.StarrocksSnapshotState; +import com.starrocks.connector.flink.tools.JsonWrapper; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.IOException; + +public class StarRocksWriterStateSerializer implements SimpleVersionedSerializer { + + private final JsonWrapper jsonWrapper; + + public StarRocksWriterStateSerializer() { + this.jsonWrapper = new JsonWrapper(); + } + + @Override + public int getVersion() { + return 1; + } + + @Override + public byte[] serialize(StarRocksWriterState state) throws IOException { + return jsonWrapper.toJSONBytes(state); + } + + @Override + public StarRocksWriterState deserialize(int version, byte[] serialized) throws IOException { + return jsonWrapper.parseObject(serialized, StarrocksSnapshotState.class); + } +} diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicLRUFunction.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicLRUFunction.java index 368bc6ee..06a7cd1b 100644 --- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicLRUFunction.java +++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicLRUFunction.java @@ -14,13 +14,12 @@ package com.starrocks.connector.flink.table.source; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo; import com.starrocks.connector.flink.table.source.struct.QueryBeXTablets; import com.starrocks.connector.flink.table.source.struct.QueryInfo; import com.starrocks.connector.flink.table.source.struct.SelectColumn; -import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo; - -import org.apache.flink.shaded.guava30.com.google.common.cache.Cache; -import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.TimestampData; diff --git a/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java b/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java index 661c53e6..dc486fd0 100644 --- a/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java +++ b/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java @@ -53,14 +53,21 @@ @RunWith(Parameterized.class) public class StarRocksSinkITTest extends StarRocksITTestBase { - @Parameterized.Parameters(name = "sinkV2={0}") - public static List parameters() { - return Arrays.asList(false, true); + @Parameterized.Parameters(name = "sinkV2={0}, newSinkApi={1}") + public static List parameters() { + return Arrays.asList( + new Object[] {false, false}, + new Object[] {true, false}, + new Object[] {true, true} + ); } @Parameterized.Parameter public boolean isSinkV2; + @Parameterized.Parameter(1) + public boolean newSinkApi; + @Test public void testDupKeyWriteFullColumnsInOrder() throws Exception { String ddl = "c0 INT, c1 FLOAT, c2 STRING"; @@ -149,6 +156,7 @@ private void testDupKeyWriteBase(String flinkDDL, RowTypeInfo rowTypeInfo, "'jdbc-url'='" + sinkOptions.getJdbcUrl() + "'," + "'load-url'='" + String.join(";", sinkOptions.getLoadUrlList()) + "'," + "'sink.version' = '" + (isSinkV2 ? "V2" : "V1") + "'," + + "'sink.use.new-sink-api' = '" + (newSinkApi ? "true" : "false") + "'," + "'database-name' = '" + DB_NAME + "'," + "'table-name' = '" + sinkOptions.getTableName() + "'," + "'username' = '" + sinkOptions.getUsername() + "'," + @@ -271,6 +279,7 @@ private void testPkWriteForBase(String flinkDDL, RowTypeInfo rowTypeInfo, "'jdbc-url'='" + sinkOptions.getJdbcUrl() + "'," + "'load-url'='" + String.join(";", sinkOptions.getLoadUrlList()) + "'," + "'sink.version' = '" + (isSinkV2 ? "V2" : "V1") + "'," + + "'sink.use.new-sink-api' = '" + (newSinkApi ? "true" : "false") + "'," + "'database-name' = '" + DB_NAME + "'," + "'table-name' = '" + sinkOptions.getTableName() + "'," + "'username' = '" + sinkOptions.getUsername() + "'," + @@ -321,6 +330,7 @@ public void testPKUpsertAndDelete() throws Exception { "'jdbc-url'='" + sinkOptions.getJdbcUrl() + "'," + "'load-url'='" + String.join(";", sinkOptions.getLoadUrlList()) + "'," + "'sink.version' = '" + (isSinkV2 ? "V2" : "V1") + "'," + + "'sink.use.new-sink-api' = '" + (newSinkApi ? "true" : "false") + "'," + "'database-name' = '" + DB_NAME + "'," + "'table-name' = '" + sinkOptions.getTableName() + "'," + "'username' = '" + sinkOptions.getUsername() + "'," + @@ -374,6 +384,7 @@ public void testPKPartialUpdateDelete() throws Exception { "'jdbc-url'='" + sinkOptions.getJdbcUrl() + "'," + "'load-url'='" + String.join(";", sinkOptions.getLoadUrlList()) + "'," + "'sink.version' = '" + (isSinkV2 ? "V2" : "V1") + "'," + + "'sink.use.new-sink-api' = '" + (newSinkApi ? "true" : "false") + "'," + "'database-name' = '" + DB_NAME + "'," + "'table-name' = '" + sinkOptions.getTableName() + "'," + "'username' = '" + sinkOptions.getUsername() + "'," + @@ -524,6 +535,7 @@ private void testConfigurationBase(Map options, Function