From 39ffa682748bec23e85cf769b01985b280776f04 Mon Sep 17 00:00:00 2001 From: PengFei Li Date: Tue, 31 Oct 2023 08:21:53 +0800 Subject: [PATCH] [Feature] Adapt to sink v2 api FLIP-191 Signed-off-by: PengFei Li --- .../manager/StarRocksStreamLoadListener.java | 32 +-- .../sink/StarRocksTableRowTransformer.java | 6 +- .../sink/StarRocksDynamicSinkFunctionV2.java | 2 +- .../table/sink/StarRocksDynamicTableSink.java | 21 +- .../table/sink/StarRocksSinkOptions.java | 11 + .../table/sink/v2/StarRocksCommittable.java | 36 +++ .../v2/StarRocksCommittableSerializer.java | 50 ++++ .../table/sink/v2/StarRocksCommitter.java | 77 ++++++ .../flink/table/sink/v2/StarRocksSink.java | 109 ++++++++ .../flink/table/sink/v2/StarRocksWriter.java | 245 ++++++++++++++++++ .../table/sink/v2/StarRocksWriterState.java | 38 +++ .../v2/StarRocksWriterStateSerializer.java | 51 ++++ .../flink/it/sink/StarRocksSinkITTest.java | 19 +- 13 files changed, 668 insertions(+), 29 deletions(-) create mode 100644 src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksCommittable.java create mode 100644 src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksCommittableSerializer.java create mode 100644 src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksCommitter.java create mode 100644 src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksSink.java create mode 100644 src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriter.java create mode 100644 src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriterState.java create mode 100644 src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriterStateSerializer.java diff --git a/src/main/java/com/starrocks/connector/flink/manager/StarRocksStreamLoadListener.java b/src/main/java/com/starrocks/connector/flink/manager/StarRocksStreamLoadListener.java index c88233f3..dad64948 100644 --- a/src/main/java/com/starrocks/connector/flink/manager/StarRocksStreamLoadListener.java +++ b/src/main/java/com/starrocks/connector/flink/manager/StarRocksStreamLoadListener.java @@ -23,9 +23,9 @@ import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; import com.starrocks.data.load.stream.StreamLoadResponse; import com.starrocks.data.load.stream.v2.StreamLoadListener; -import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram; public class StarRocksStreamLoadListener implements StreamLoadListener { @@ -46,22 +46,22 @@ public class StarRocksStreamLoadListener implements StreamLoadListener { private transient Histogram writeDataTimeMs; private transient Histogram loadTimeMs; - public StarRocksStreamLoadListener(RuntimeContext context, StarRocksSinkOptions sinkOptions) { - totalFlushBytes = context.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_BYTES); - totalFlushRows = context.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_ROWS); - totalFlushTime = context.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_COST_TIME); - totalFlushTimeWithoutRetries = context.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_COST_TIME_WITHOUT_RETRIES); - totalFlushSucceededTimes = context.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_SUCCEEDED_TIMES); - totalFlushFailedTimes = context.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_FAILED_TIMES); - flushTimeNs = context.getMetricGroup().histogram(HISTOGRAM_FLUSH_TIME, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); - offerTimeNs = context.getMetricGroup().histogram(HISTOGRAM_OFFER_TIME_NS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); + public StarRocksStreamLoadListener(MetricGroup metricGroup, StarRocksSinkOptions sinkOptions) { + totalFlushBytes = metricGroup.counter(COUNTER_TOTAL_FLUSH_BYTES); + totalFlushRows = metricGroup.counter(COUNTER_TOTAL_FLUSH_ROWS); + totalFlushTime = metricGroup.counter(COUNTER_TOTAL_FLUSH_COST_TIME); + totalFlushTimeWithoutRetries = metricGroup.counter(COUNTER_TOTAL_FLUSH_COST_TIME_WITHOUT_RETRIES); + totalFlushSucceededTimes = metricGroup.counter(COUNTER_TOTAL_FLUSH_SUCCEEDED_TIMES); + totalFlushFailedTimes = metricGroup.counter(COUNTER_TOTAL_FLUSH_FAILED_TIMES); + flushTimeNs = metricGroup.histogram(HISTOGRAM_FLUSH_TIME, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); + offerTimeNs = metricGroup.histogram(HISTOGRAM_OFFER_TIME_NS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); - totalFilteredRows = context.getMetricGroup().counter(COUNTER_NUMBER_FILTERED_ROWS); - commitAndPublishTimeMs = context.getMetricGroup().histogram(HISTOGRAM_COMMIT_AND_PUBLISH_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); - streamLoadPlanTimeMs = context.getMetricGroup().histogram(HISTOGRAM_STREAM_LOAD_PLAN_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); - readDataTimeMs = context.getMetricGroup().histogram(HISTOGRAM_READ_DATA_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); - writeDataTimeMs = context.getMetricGroup().histogram(HISTOGRAM_WRITE_DATA_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); - loadTimeMs = context.getMetricGroup().histogram(HISTOGRAM_LOAD_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); + totalFilteredRows = metricGroup.counter(COUNTER_NUMBER_FILTERED_ROWS); + commitAndPublishTimeMs = metricGroup.histogram(HISTOGRAM_COMMIT_AND_PUBLISH_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); + streamLoadPlanTimeMs = metricGroup.histogram(HISTOGRAM_STREAM_LOAD_PLAN_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); + readDataTimeMs = metricGroup.histogram(HISTOGRAM_READ_DATA_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); + writeDataTimeMs = metricGroup.histogram(HISTOGRAM_WRITE_DATA_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); + loadTimeMs = metricGroup.histogram(HISTOGRAM_LOAD_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); } @Override diff --git a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformer.java b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformer.java index 32d36b83..aef0c02b 100644 --- a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformer.java +++ b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformer.java @@ -20,7 +20,6 @@ import com.starrocks.connector.flink.tools.JsonWrapper; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.ArrayData; import org.apache.flink.table.data.GenericArrayData; @@ -78,8 +77,9 @@ public void setTableSchema(TableSchema ts) { @Override public void setRuntimeContext(RuntimeContext runtimeCtx) { - final TypeSerializer typeSerializer = rowDataTypeInfo.createSerializer(runtimeCtx.getExecutionConfig()); - valueTransform = runtimeCtx.getExecutionConfig().isObjectReuseEnabled() ? typeSerializer::copy : Function.identity(); + // No need to copy the value even if object reuse is enabled, + // because the raw RowData value will not be buffered + this.valueTransform = Function.identity(); } @Override diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java index ecf75628..f208b4b1 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java @@ -215,7 +215,7 @@ public void open(Configuration parameters) throws Exception { if (serializer != null) { this.serializer.open(new StarRocksISerializer.SerializerContext(getOrCreateJsonWrapper())); } - this.streamLoadListener = new StarRocksStreamLoadListener(getRuntimeContext(), sinkOptions); + this.streamLoadListener = new StarRocksStreamLoadListener(getRuntimeContext().getMetricGroup(), sinkOptions); sinkManager.setStreamLoadListener(streamLoadListener); LabelGeneratorFactory labelGeneratorFactory; diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSink.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSink.java index c5e38eff..f67fd140 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSink.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSink.java @@ -15,11 +15,13 @@ package com.starrocks.connector.flink.table.sink; import com.starrocks.connector.flink.row.sink.StarRocksTableRowTransformer; +import com.starrocks.connector.flink.table.sink.v2.StarRocksSink; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.connector.sink.SinkV2Provider; import org.apache.flink.table.data.RowData; public class StarRocksDynamicTableSink implements DynamicTableSink { @@ -41,12 +43,19 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { @SuppressWarnings("unchecked") public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { final TypeInformation rowDataTypeInfo = context.createTypeInformation(flinkSchema.toRowDataType()); - StarRocksDynamicSinkFunctionBase starrocksSinkFunction = SinkFunctionFactory.createSinkFunction( - sinkOptions, - flinkSchema, - new StarRocksTableRowTransformer(rowDataTypeInfo) - ); - return SinkFunctionProvider.of(starrocksSinkFunction, sinkOptions.getSinkParallelism()); + if (sinkOptions.isUseUnifiedSinkApi()) { + StarRocksSink starRocksSink = new StarRocksSink<>( + sinkOptions, flinkSchema, new StarRocksTableRowTransformer(rowDataTypeInfo)); + return SinkV2Provider.of(starRocksSink, sinkOptions.getSinkParallelism()); + } else { + StarRocksDynamicSinkFunctionBase starrocksSinkFunction = + SinkFunctionFactory.createSinkFunction( + sinkOptions, + flinkSchema, + new StarRocksTableRowTransformer(rowDataTypeInfo) + ); + return SinkFunctionProvider.of(starrocksSinkFunction, sinkOptions.getSinkParallelism()); + } } @Override diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java index e03daf85..3664fc1b 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java @@ -139,6 +139,13 @@ public enum StreamLoadFormat { "The number of transactions to check if they are lingering. -1 indicates that check until finding the first " + "transaction that is not lingering."); + public static final ConfigOption SINK_USE_UNIFIED_SINK_API = ConfigOptions.key("sink.use.unified-sink-api") + .booleanType().defaultValue(false).withDescription("Whether to use the implementation with the unified sink api " + + "described in Flink FLIP-191. There is no difference for users whether to enable this flag. This is just " + + "for adapting some frameworks which only support new sink api, and Flink will also remove the old sink api " + + "in the coming 2.0. Note that it's not compatible after changing the flag, that's, you can't recover from " + + "the previous job after changing the flag."); + public static final ConfigOption SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM; // Sink semantic @@ -351,6 +358,10 @@ public int getAbortCheckNumTxns() { return tableOptions.get(SINK_ABORT_CHECK_NUM_TXNS); } + public boolean isUseUnifiedSinkApi() { + return tableOptions.get(SINK_USE_UNIFIED_SINK_API); + } + private void validateStreamLoadUrl() { tableOptions.getOptional(LOAD_URL).ifPresent(urlList -> { for (String host : urlList) { diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksCommittable.java b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksCommittable.java new file mode 100644 index 00000000..25294257 --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksCommittable.java @@ -0,0 +1,36 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.table.sink.v2; + +import com.starrocks.data.load.stream.StreamLoadSnapshot; + +public class StarRocksCommittable { + + private final StreamLoadSnapshot labelSnapshot; + + public StarRocksCommittable(StreamLoadSnapshot labelSnapshot) { + this.labelSnapshot = labelSnapshot; + } + + public StreamLoadSnapshot getLabelSnapshot() { + return labelSnapshot; + } +} diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksCommittableSerializer.java b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksCommittableSerializer.java new file mode 100644 index 00000000..bfb69090 --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksCommittableSerializer.java @@ -0,0 +1,50 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.table.sink.v2; + +import com.starrocks.connector.flink.tools.JsonWrapper; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.IOException; + +public class StarRocksCommittableSerializer implements SimpleVersionedSerializer { + + private final JsonWrapper jsonWrapper; + + public StarRocksCommittableSerializer() { + this.jsonWrapper = new JsonWrapper(); + } + + @Override + public int getVersion() { + return 1; + } + + @Override + public byte[] serialize(StarRocksCommittable committable) throws IOException { + return jsonWrapper.toJSONBytes(committable); + } + + @Override + public StarRocksCommittable deserialize(int version, byte[] serialized) throws IOException { + return jsonWrapper.parseObject(serialized, StarRocksCommittable.class); + } +} diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksCommitter.java b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksCommitter.java new file mode 100644 index 00000000..e8a61bfb --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksCommitter.java @@ -0,0 +1,77 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.table.sink.v2; + +import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; +import com.starrocks.connector.flink.table.sink.StarRocksSinkSemantic; +import com.starrocks.data.load.stream.properties.StreamLoadProperties; +import com.starrocks.data.load.stream.v2.StreamLoadManagerV2; +import org.apache.flink.api.connector.sink2.Committer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; + +public class StarRocksCommitter implements Committer { + + private static final Logger LOG = LoggerFactory.getLogger(StarRocksCommitter.class); + + private final StreamLoadManagerV2 sinkManager; + + public StarRocksCommitter( + StarRocksSinkOptions sinkOptions, + StreamLoadProperties streamLoadProperties) { + this.sinkManager = new StreamLoadManagerV2(streamLoadProperties, + sinkOptions.getSemantic() == StarRocksSinkSemantic.AT_LEAST_ONCE); + try { + // TODO no need to start flush thread in sinkManager + sinkManager.init(); + } catch (Exception e) { + LOG.error("Failed to init sink manager.", e); + try { + sinkManager.close(); + } catch (Exception ie) { + LOG.error("Failed to close sink manager after init failure.", ie); + } + throw new RuntimeException("Failed to init sink manager", e); + } + LOG.info("Create StarRocksCommitter."); + } + + @Override + public void commit(Collection> committables) + throws IOException, InterruptedException { + // TODO deal with retyr and fatal errors + for (CommitRequest commitRequest : committables) { + StarRocksCommittable committable = commitRequest.getCommittable(); + sinkManager.commit(committable.getLabelSnapshot()); + } + } + + @Override + public void close() throws Exception { + LOG.info("Close StarRocksCommitter."); + if(sinkManager != null) { + sinkManager.close(); + } + } +} diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksSink.java b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksSink.java new file mode 100644 index 00000000..56177c32 --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksSink.java @@ -0,0 +1,109 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.table.sink.v2; + +import com.starrocks.connector.flink.manager.StarRocksSinkTable; +import com.starrocks.connector.flink.row.sink.StarRocksIRowTransformer; +import com.starrocks.connector.flink.row.sink.StarRocksISerializer; +import com.starrocks.connector.flink.row.sink.StarRocksSerializerFactory; +import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; +import com.starrocks.data.load.stream.properties.StreamLoadProperties; +import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.api.connector.sink2.StatefulSink; +import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.table.api.TableSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; + +public class StarRocksSink + implements StatefulSink, TwoPhaseCommittingSink { + + private static final Logger LOG = LoggerFactory.getLogger(StarRocksSink.class); + + private final StarRocksSinkOptions sinkOptions; + private final StarRocksISerializer serializer; + private final StarRocksIRowTransformer rowTransformer; + private final StreamLoadProperties streamLoadProperties; + + + public StarRocksSink(StarRocksSinkOptions sinkOptions, TableSchema schema, StarRocksIRowTransformer rowTransformer) { + this.sinkOptions = sinkOptions; + this.rowTransformer = rowTransformer; + StarRocksSinkTable sinkTable = StarRocksSinkTable.builder() + .sinkOptions(sinkOptions) + .build(); + sinkTable.validateTableStructure(sinkOptions, schema); + // StarRocksJsonSerializer depends on SinkOptions#supportUpsertDelete which is decided in + // StarRocksSinkTable#validateTableStructure, so create serializer after validating table structure + this.serializer = StarRocksSerializerFactory.createSerializer(sinkOptions, schema.getFieldNames()); + rowTransformer.setStarRocksColumns(sinkTable.getFieldMapping()); + rowTransformer.setTableSchema(schema); + this.streamLoadProperties = sinkOptions.getProperties(sinkTable); + } + + public StarRocksSink(StarRocksSinkOptions sinkOptions) { + this.sinkOptions = sinkOptions; + this.serializer = null; + this.rowTransformer = null; + this.streamLoadProperties = sinkOptions.getProperties(null); + } + + @Override + public StarRocksWriter createWriter(InitContext context) throws IOException { + return restoreWriter(context, Collections.emptyList()); + } + + @Override + public StarRocksWriter restoreWriter(InitContext context, Collection recoveredState) + throws IOException { + try { + return new StarRocksWriter<>( + sinkOptions, + serializer, + rowTransformer, + streamLoadProperties, + context, + Collections.emptyList()); + } catch (Exception e) { + throw new RuntimeException("Failed to create writer.", e); + } + } + + @Override + public SimpleVersionedSerializer getWriterStateSerializer() { + return new StarRocksWriterStateSerializer(); + } + + @Override + public Committer createCommitter() throws IOException { + return new StarRocksCommitter(sinkOptions, streamLoadProperties); + } + + @Override + public SimpleVersionedSerializer getCommittableSerializer() { + return new StarRocksCommittableSerializer(); + } +} diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriter.java b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriter.java new file mode 100644 index 00000000..3ca6771e --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriter.java @@ -0,0 +1,245 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.table.sink.v2; + +import com.google.common.base.Strings; +import com.starrocks.connector.flink.manager.StarRocksStreamLoadListener; +import com.starrocks.connector.flink.row.sink.StarRocksIRowTransformer; +import com.starrocks.connector.flink.row.sink.StarRocksISerializer; +import com.starrocks.connector.flink.table.data.StarRocksRowData; +import com.starrocks.connector.flink.table.sink.ExactlyOnceLabelGeneratorFactory; +import com.starrocks.connector.flink.table.sink.ExactlyOnceLabelGeneratorSnapshot; +import com.starrocks.connector.flink.table.sink.LingeringTransactionAborter; +import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; +import com.starrocks.connector.flink.table.sink.StarRocksSinkRowDataWithMeta; +import com.starrocks.connector.flink.table.sink.StarRocksSinkSemantic; +import com.starrocks.connector.flink.tools.EnvUtils; +import com.starrocks.connector.flink.tools.JsonWrapper; +import com.starrocks.data.load.stream.LabelGeneratorFactory; +import com.starrocks.data.load.stream.StreamLoadSnapshot; +import com.starrocks.data.load.stream.properties.StreamLoadProperties; +import com.starrocks.data.load.stream.v2.StreamLoadManagerV2; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.StatefulSink; +import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; +import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; +import org.apache.flink.table.data.RowData; +import org.apache.flink.types.RowKind; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +public class StarRocksWriter + implements StatefulSink.StatefulSinkWriter, + TwoPhaseCommittingSink.PrecommittingSinkWriter { + + private static final Logger LOG = LoggerFactory.getLogger(StarRocksWriter.class); + + private final StarRocksSinkOptions sinkOptions; + private final StarRocksISerializer serializer; + private final StarRocksIRowTransformer rowTransformer; + private final JsonWrapper jsonWrapper; + private final StarRocksStreamLoadListener streamLoadListener; + private final LabelGeneratorFactory labelGeneratorFactory; + private final StreamLoadManagerV2 sinkManager; + private long totalReceivedRows = 0; + + public StarRocksWriter( + StarRocksSinkOptions sinkOptions, + StarRocksISerializer serializer, + StarRocksIRowTransformer rowTransformer, + StreamLoadProperties streamLoadProperties, + Sink.InitContext initContext, + Collection recoveredState) throws Exception { + this.sinkOptions = sinkOptions; + this.serializer = serializer; + this.rowTransformer = rowTransformer; + + this.jsonWrapper = new JsonWrapper(); + if (this.serializer != null) { + this.serializer.open(new StarRocksISerializer.SerializerContext(jsonWrapper)); + } + if (this.rowTransformer != null) { + this.rowTransformer.setRuntimeContext(null); + this.rowTransformer.setFastJsonWrapper(jsonWrapper); + } + this.streamLoadListener = new StarRocksStreamLoadListener(initContext.metricGroup(), sinkOptions); + + long restoredCheckpointId = initContext.getRestoredCheckpointId() + .orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1); + List restoredGeneratorSnapshots = new ArrayList<>(); + for (StarRocksWriterState writerState : recoveredState) { + restoredGeneratorSnapshots.addAll(writerState.getLabelSnapshots()); + } + String labelPrefix = sinkOptions.getLabelPrefix(); + if (labelPrefix == null || + sinkOptions.getSemantic() == StarRocksSinkSemantic.AT_LEAST_ONCE || + !sinkOptions.isEnableExactlyOnceLabelGen()) { + this.labelGeneratorFactory = new LabelGeneratorFactory.DefaultLabelGeneratorFactory( + labelPrefix == null ? "flink" : labelPrefix); + } else { + this.labelGeneratorFactory = new ExactlyOnceLabelGeneratorFactory( + labelPrefix, + initContext.getNumberOfParallelSubtasks(), + initContext.getSubtaskId(), + restoredCheckpointId); + } + + this.sinkManager = new StreamLoadManagerV2(streamLoadProperties, + sinkOptions.getSemantic() == StarRocksSinkSemantic.AT_LEAST_ONCE); + sinkManager.setStreamLoadListener(streamLoadListener); + sinkManager.setLabelGeneratorFactory(labelGeneratorFactory); + try { + sinkManager.init(); + // TODO move out of the try block + if (sinkOptions.getSemantic() == StarRocksSinkSemantic.EXACTLY_ONCE) { + if (sinkOptions.isAbortLingeringTxns()) { + LingeringTransactionAborter aborter = new LingeringTransactionAborter( + sinkOptions.getLabelPrefix(), + restoredCheckpointId, + initContext.getSubtaskId(), + sinkOptions.getAbortCheckNumTxns(), + sinkOptions.getDbTables(), + restoredGeneratorSnapshots, + sinkManager.getStreamLoader()); + aborter.execute(); + } + } + + } catch (Exception e) { + LOG.error("Failed to init sink manager.", e); + try { + sinkManager.close(); + } catch (Exception ie) { + LOG.error("Failed to close sink manager after init failure.", ie); + } + throw new RuntimeException("Failed to init sink manager", e); + } + + LOG.info("Create StarRocksWriter. {}", EnvUtils.getGitInformation()); + } + + @Override + public void write(InputT element, Context context) throws IOException, InterruptedException { + if (serializer == null) { + if (element instanceof StarRocksSinkRowDataWithMeta) { + StarRocksSinkRowDataWithMeta data = (StarRocksSinkRowDataWithMeta) element; + if (Strings.isNullOrEmpty(data.getDatabase()) + || Strings.isNullOrEmpty(data.getTable()) + || data.getDataRows() == null) { + LOG.warn(String.format("json row data not fulfilled. {database: %s, table: %s, dataRows: %s}", + data.getDatabase(), data.getTable(), Arrays.toString(data.getDataRows()))); + return; + } + sinkManager.write(null, data.getDatabase(), data.getTable(), data.getDataRows()); + return; + } else if (element instanceof StarRocksRowData) { + StarRocksRowData data = (StarRocksRowData) element; + if (Strings.isNullOrEmpty(data.getDatabase()) + || Strings.isNullOrEmpty(data.getTable()) + || data.getRow() == null) { + LOG.warn(String.format("json row data not fulfilled. {database: %s, table: %s, dataRows: %s}", + data.getDatabase(), data.getTable(), data.getRow())); + return; + } + sinkManager.write(data.getUniqueKey(), data.getDatabase(), data.getTable(), data.getRow()); + return; + } + // raw data sink + sinkManager.write(null, sinkOptions.getDatabaseName(), sinkOptions.getTableName(), element.toString()); + return; + } + + if (element instanceof RowData) { + if (RowKind.UPDATE_BEFORE.equals(((RowData) element).getRowKind()) && + (!sinkOptions.supportUpsertDelete() || sinkOptions.getIgnoreUpdateBefore())) { + return; + } + if (!sinkOptions.supportUpsertDelete() && RowKind.DELETE.equals(((RowData) element).getRowKind())) { + // let go the UPDATE_AFTER and INSERT rows for tables who have a group of `unique` or `duplicate` keys. + return; + } + } + String serializedValue = serializer.serialize(rowTransformer.transform(element, sinkOptions.supportUpsertDelete())); + sinkManager.write( + null, + sinkOptions.getDatabaseName(), + sinkOptions.getTableName(), + serializedValue); + + totalReceivedRows += 1; + if (totalReceivedRows % 100 == 1) { + LOG.debug("Received raw record: {}", element); + LOG.debug("Received serialized record: {}", serializedValue); + } + } + + @Override + public void flush(boolean endOfInput) throws IOException, InterruptedException { + sinkManager.flush(); + } + + @Override + public Collection prepareCommit() throws IOException, InterruptedException { + if (sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) { + return Collections.emptyList(); + } + + StreamLoadSnapshot snapshot = sinkManager.snapshot(); + if (sinkManager.prepare(snapshot)) { + return Collections.singleton(new StarRocksCommittable(snapshot)); + } else { + sinkManager.abort(snapshot); + throw new RuntimeException("Snapshot state failed by prepare"); + } + } + + @Override + public List snapshotState(long checkpointId) throws IOException { + if (sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE || + !(labelGeneratorFactory instanceof ExactlyOnceLabelGeneratorFactory)) { + return Collections.emptyList(); + } + + List labelSnapshots = + ((ExactlyOnceLabelGeneratorFactory) labelGeneratorFactory).snapshot(checkpointId); + return Collections.singletonList(new StarRocksWriterState(labelSnapshots)); + } + + @Override + public void close() throws Exception { + LOG.info("Close StarRocksWriter"); + if (sinkManager != null) { + try { + StreamLoadSnapshot snapshot = sinkManager.snapshot(); + sinkManager.abort(snapshot); + } finally { + sinkManager.close(); + } + } + } +} diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriterState.java b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriterState.java new file mode 100644 index 00000000..e07680bd --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriterState.java @@ -0,0 +1,38 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.table.sink.v2; + +import com.starrocks.connector.flink.table.sink.ExactlyOnceLabelGeneratorSnapshot; + +import java.util.List; + +public class StarRocksWriterState { + + private List labelSnapshots; + + public StarRocksWriterState(List labelSnapshots) { + this.labelSnapshots = labelSnapshots; + } + + public List getLabelSnapshots() { + return labelSnapshots; + } +} diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriterStateSerializer.java b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriterStateSerializer.java new file mode 100644 index 00000000..c7200463 --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriterStateSerializer.java @@ -0,0 +1,51 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.table.sink.v2; + +import com.starrocks.connector.flink.table.sink.StarrocksSnapshotState; +import com.starrocks.connector.flink.tools.JsonWrapper; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.IOException; + +public class StarRocksWriterStateSerializer implements SimpleVersionedSerializer { + + private final JsonWrapper jsonWrapper; + + public StarRocksWriterStateSerializer() { + this.jsonWrapper = new JsonWrapper(); + } + + @Override + public int getVersion() { + return 1; + } + + @Override + public byte[] serialize(StarRocksWriterState state) throws IOException { + return jsonWrapper.toJSONBytes(state); + } + + @Override + public StarRocksWriterState deserialize(int version, byte[] serialized) throws IOException { + return jsonWrapper.parseObject(serialized, StarrocksSnapshotState.class); + } +} diff --git a/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java b/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java index 661c53e6..4438b6ec 100644 --- a/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java +++ b/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java @@ -53,14 +53,21 @@ @RunWith(Parameterized.class) public class StarRocksSinkITTest extends StarRocksITTestBase { - @Parameterized.Parameters(name = "sinkV2={0}") - public static List parameters() { - return Arrays.asList(false, true); + @Parameterized.Parameters(name = "sinkV2={0}, newSinkApi={1}") + public static List> parameters() { + return Arrays.asList( + Arrays.asList(false, false), + Arrays.asList(true, false), + Arrays.asList(true, true) + ); } @Parameterized.Parameter public boolean isSinkV2; + @Parameterized.Parameter(1) + public boolean newSinkApi; + @Test public void testDupKeyWriteFullColumnsInOrder() throws Exception { String ddl = "c0 INT, c1 FLOAT, c2 STRING"; @@ -149,6 +156,7 @@ private void testDupKeyWriteBase(String flinkDDL, RowTypeInfo rowTypeInfo, "'jdbc-url'='" + sinkOptions.getJdbcUrl() + "'," + "'load-url'='" + String.join(";", sinkOptions.getLoadUrlList()) + "'," + "'sink.version' = '" + (isSinkV2 ? "V2" : "V1") + "'," + + "'sink.use.unified-sink-api' = '" + (newSinkApi ? "true" : "false") + "'," + "'database-name' = '" + DB_NAME + "'," + "'table-name' = '" + sinkOptions.getTableName() + "'," + "'username' = '" + sinkOptions.getUsername() + "'," + @@ -271,6 +279,7 @@ private void testPkWriteForBase(String flinkDDL, RowTypeInfo rowTypeInfo, "'jdbc-url'='" + sinkOptions.getJdbcUrl() + "'," + "'load-url'='" + String.join(";", sinkOptions.getLoadUrlList()) + "'," + "'sink.version' = '" + (isSinkV2 ? "V2" : "V1") + "'," + + "'sink.use.unified-sink-api' = '" + (newSinkApi ? "true" : "false") + "'," + "'database-name' = '" + DB_NAME + "'," + "'table-name' = '" + sinkOptions.getTableName() + "'," + "'username' = '" + sinkOptions.getUsername() + "'," + @@ -321,6 +330,7 @@ public void testPKUpsertAndDelete() throws Exception { "'jdbc-url'='" + sinkOptions.getJdbcUrl() + "'," + "'load-url'='" + String.join(";", sinkOptions.getLoadUrlList()) + "'," + "'sink.version' = '" + (isSinkV2 ? "V2" : "V1") + "'," + + "'sink.use.unified-sink-api' = '" + (newSinkApi ? "true" : "false") + "'," + "'database-name' = '" + DB_NAME + "'," + "'table-name' = '" + sinkOptions.getTableName() + "'," + "'username' = '" + sinkOptions.getUsername() + "'," + @@ -374,6 +384,7 @@ public void testPKPartialUpdateDelete() throws Exception { "'jdbc-url'='" + sinkOptions.getJdbcUrl() + "'," + "'load-url'='" + String.join(";", sinkOptions.getLoadUrlList()) + "'," + "'sink.version' = '" + (isSinkV2 ? "V2" : "V1") + "'," + + "'sink.use.unified-sink-api' = '" + (newSinkApi ? "true" : "false") + "'," + "'database-name' = '" + DB_NAME + "'," + "'table-name' = '" + sinkOptions.getTableName() + "'," + "'username' = '" + sinkOptions.getUsername() + "'," + @@ -524,6 +535,7 @@ private void testConfigurationBase(Map options, Function