From b4fd15287286ea1fd96edc1c7ffccc2267751020 Mon Sep 17 00:00:00 2001 From: Xu Chen Date: Mon, 25 Nov 2024 17:38:59 +0800 Subject: [PATCH] [Flink] upgrade to sink v2 api (#566) Signed-off-by: chenxu Co-authored-by: chenxu --- .../sink/LakeSoulMultiTablesSink.java | 133 +++++++++++------- .../lakesoul/sink/bucket/BucketsBuilder.java | 4 +- .../sink/bucket/BulkFormatBuilder.java | 4 +- .../DefaultMultiTablesArrowFormatBuilder.java | 2 +- .../DefaultMultiTablesBulkFormatBuilder.java | 2 +- .../DefaultOneTableBulkFormatBuilder.java | 2 +- .../sink/committer/LakeSoulSinkCommitter.java | 24 ++-- .../LakeSoulSinkGlobalCommitter.java | 2 +- .../AbstractLakeSoulMultiTableSinkWriter.java | 44 +++--- .../writer/LakeSoulMultiTableSinkWriter.java | 4 +- .../LakeSoulRowDataOneTableSinkWriter.java | 5 +- .../sink/writer/LakeSoulWriterBucket.java | 2 +- .../LakeSoulArrowMultiTableSinkWriter.java | 11 +- .../arrow/LakeSoulArrowWriterBucket.java | 5 +- 14 files changed, 138 insertions(+), 106 deletions(-) diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/LakeSoulMultiTablesSink.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/LakeSoulMultiTablesSink.java index 8a8984000..66d1daf6e 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/LakeSoulMultiTablesSink.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/LakeSoulMultiTablesSink.java @@ -4,10 +4,10 @@ package org.apache.flink.lakesoul.sink; -import org.apache.flink.api.connector.sink.Committer; import org.apache.flink.api.connector.sink.GlobalCommitter; -import org.apache.flink.api.connector.sink.Sink; -import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.api.connector.sink2.StatefulSink; +import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.core.io.SimpleVersionedSerializer; @@ -21,21 +21,23 @@ import org.apache.flink.lakesoul.sink.writer.AbstractLakeSoulMultiTableSinkWriter; import org.apache.flink.lakesoul.tool.LakeSoulSinkOptions; import org.apache.flink.lakesoul.types.TableSchemaIdentity; -import org.apache.flink.table.data.RowData; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies; +import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.util.FlinkRuntimeException; import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Optional; +import java.io.UncheckedIOException; +import java.util.*; +import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkNotNull; public class LakeSoulMultiTablesSink implements - Sink { + StatefulSink, + TwoPhaseCommittingSink, + WithPostCommitTopology { private final BucketsBuilder> bucketsBuilder; @@ -60,18 +62,24 @@ public static DefaultMultiTablesArrowFormatBuilder forMultiTablesArrowFormat(Con } @Override - public SinkWriter createWriter( - InitContext context, List states) throws IOException { + public AbstractLakeSoulMultiTableSinkWriter createWriter(InitContext context) throws IOException { int subTaskId = context.getSubtaskId(); AbstractLakeSoulMultiTableSinkWriter writer = bucketsBuilder.createWriter(context, subTaskId); - writer.initializeState(states); return writer; } @Override - public Optional> getWriterStateSerializer() { + public StatefulSinkWriter restoreWriter(InitContext context, Collection recoveredState) throws IOException { + int subTaskId = context.getSubtaskId(); + AbstractLakeSoulMultiTableSinkWriter writer = bucketsBuilder.createWriter(context, subTaskId); + writer.initializeState(new ArrayList<>(recoveredState)); + return writer; + } + + @Override + public SimpleVersionedSerializer getWriterStateSerializer() { try { - return Optional.of(bucketsBuilder.getWriterStateSerializer()); + return bucketsBuilder.getWriterStateSerializer(); } catch (IOException e) { // it's not optimal that we have to do this but creating the serializers for the // LakeSoulMultiTablesSink requires (among other things) a call to FileSystem.get() which declares @@ -83,25 +91,21 @@ public Optional> getWriterS // committer must not be null since flink requires it to enable // StatefulGlobalTwoPhaseCommittingSinkAdapter @Override - public Optional> createCommitter() throws IOException { - return Optional.of(new Committer() { + public Committer createCommitter() throws IOException { + return new Committer() { @Override - public List commit(List committables) - throws IOException, InterruptedException { - System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis()) + " org.apache.flink.api.connector.sink.Committer.commit: " + committables); - return Collections.emptyList(); + public void close() throws Exception { } - @Override - public void close() throws Exception { + public void commit(Collection> committables) throws IOException, InterruptedException { } - }); + }; } @Override - public Optional> getCommittableSerializer() { + public SimpleVersionedSerializer getCommittableSerializer() { try { - return Optional.of(bucketsBuilder.getCommittableSerializer()); + return bucketsBuilder.getCommittableSerializer(); } catch (IOException e) { // it's not optimal that we have to do this but creating the serializers for the // LakeSoulMultiTablesSink requires (among other things) a call to FileSystem.get() which declares @@ -110,30 +114,65 @@ public Optional> ge } } - @Override - public Optional> createGlobalCommitter() - throws IOException { - return Optional.ofNullable(bucketsBuilder.createGlobalCommitter()); + public BucketsBuilder getBucketsBuilder(){ + return this.bucketsBuilder; } @Override - public Optional> getGlobalCommittableSerializer() { - try { - return Optional.of(bucketsBuilder.getGlobalCommittableSerializer()); - } catch (IOException e) { - // it's not optimal that we have to do this but creating the serializers for the - // LakeSoulMultiTablesSink requires (among other things) a call to FileSystem.get() which declares - // IOException. - throw new FlinkRuntimeException("Could not create global committable serializer.", e); - } + public void addPostCommitTopology(DataStream> committables) { + StandardSinkTopologies.addGlobalCommitter( + committables, + GlobalCommitterAdapter::new, + this::getCommittableSerializer); } - @Override - public Collection getCompatibleStateNames() { - // StreamingFileSink - return Collections.singleton("lakesoul-cdc-multitable-bucket-states"); - } - public BucketsBuilder getBucketsBuilder(){ - return this.bucketsBuilder; + public class GlobalCommitterAdapter implements Committer { + final GlobalCommitter globalCommitter; + final SimpleVersionedSerializer globalCommittableSerializer; + + GlobalCommitterAdapter() { + try { + globalCommitter = LakeSoulMultiTablesSink.this.bucketsBuilder.createGlobalCommitter(); + globalCommittableSerializer = LakeSoulMultiTablesSink.this.bucketsBuilder.getGlobalCommittableSerializer(); + } catch (IOException e) { + throw new UncheckedIOException("Cannot create global committer", e); + } + } + + @Override + public void close() throws Exception { + globalCommitter.close(); + } + + @Override + public void commit(Collection> committables) + throws IOException, InterruptedException { + if (committables.isEmpty()) { + return; + } + + List rawCommittables = + committables.stream() + .map(CommitRequest::getCommittable) + .collect(Collectors.toList()); + List globalCommittables = + Collections.singletonList(globalCommitter.combine(rawCommittables)); + List failures = globalCommitter.commit(globalCommittables); + // Only committables are retriable so the complete batch of committables is retried + // because we cannot trace back the committable to which global committable it belongs. + // This might lead to committing the same global committable twice, but we assume that + // the GlobalCommitter commit call is idempotent. + if (!failures.isEmpty()) { + committables.forEach(CommitRequest::retryLater); + } + } + + public GlobalCommitter getGlobalCommitter() { + return globalCommitter; + } + + public SimpleVersionedSerializer getGlobalCommittableSerializer() { + return globalCommittableSerializer; + } } } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/BucketsBuilder.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/BucketsBuilder.java index 5db30c60c..70b33bb7d 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/BucketsBuilder.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/BucketsBuilder.java @@ -4,7 +4,7 @@ package org.apache.flink.lakesoul.sink.bucket; -import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.lakesoul.sink.committer.LakeSoulSinkCommitter; import org.apache.flink.lakesoul.sink.committer.LakeSoulSinkGlobalCommitter; @@ -41,7 +41,7 @@ public abstract SimpleVersionedSerializer getWriterSt public abstract SimpleVersionedSerializer getCommittableSerializer() throws IOException; - public abstract LakeSoulSinkGlobalCommitter createGlobalCommitter() throws IOException; + public abstract LakeSoulSinkGlobalCommitter createGlobalCommitter(); public abstract SimpleVersionedSerializer getGlobalCommittableSerializer() throws IOException; diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/BulkFormatBuilder.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/BulkFormatBuilder.java index 433094f79..e89f83501 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/BulkFormatBuilder.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/BulkFormatBuilder.java @@ -4,7 +4,7 @@ package org.apache.flink.lakesoul.sink.bucket; -import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.core.io.SimpleVersionedSerializer; @@ -110,7 +110,7 @@ public SimpleVersionedSerializer getCommittab } @Override - public LakeSoulSinkGlobalCommitter createGlobalCommitter() throws IOException { + public LakeSoulSinkGlobalCommitter createGlobalCommitter() { return new LakeSoulSinkGlobalCommitter(conf); } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/DefaultMultiTablesArrowFormatBuilder.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/DefaultMultiTablesArrowFormatBuilder.java index e8b93a9e0..beb313f79 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/DefaultMultiTablesArrowFormatBuilder.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/DefaultMultiTablesArrowFormatBuilder.java @@ -4,7 +4,7 @@ package org.apache.flink.lakesoul.sink.bucket; -import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.lakesoul.sink.writer.AbstractLakeSoulMultiTableSinkWriter; diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/DefaultMultiTablesBulkFormatBuilder.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/DefaultMultiTablesBulkFormatBuilder.java index 84955655a..d4ffd6f45 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/DefaultMultiTablesBulkFormatBuilder.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/DefaultMultiTablesBulkFormatBuilder.java @@ -4,7 +4,7 @@ package org.apache.flink.lakesoul.sink.bucket; -import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.lakesoul.sink.writer.AbstractLakeSoulMultiTableSinkWriter; diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/DefaultOneTableBulkFormatBuilder.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/DefaultOneTableBulkFormatBuilder.java index 489e65a77..047ec1bb2 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/DefaultOneTableBulkFormatBuilder.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/DefaultOneTableBulkFormatBuilder.java @@ -4,7 +4,7 @@ package org.apache.flink.lakesoul.sink.bucket; -import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.lakesoul.sink.LakeSoulMultiTablesSink; diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkCommitter.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkCommitter.java index 2c60e96a2..8a7236cb0 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkCommitter.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkCommitter.java @@ -7,7 +7,7 @@ import com.dmetasoul.lakesoul.meta.DBManager; import com.dmetasoul.lakesoul.meta.DBUtil; import com.dmetasoul.lakesoul.meta.entity.*; -import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; @@ -44,13 +44,11 @@ public class LakeSoulSinkCommitter implements Committer commit(List committables) - throws IOException { - LOG.info("Found {} committable for LakeSoul to commit", committables.size()); - // commit by file creation time in ascending order - committables.sort(LakeSoulMultiTableSinkCommittable::compareTo); - + public void commit(List committables, boolean sort) + throws IOException, InterruptedException { + if (sort) { + committables.sort(LakeSoulMultiTableSinkCommittable::compareTo); + } DBManager lakeSoulDBManager = new DBManager(); for (LakeSoulMultiTableSinkCommittable committable : committables) { LOG.info("Committing {}", committable); @@ -140,8 +138,16 @@ public List commit(List> commits) + throws IOException, InterruptedException { + LOG.info("Found {} committable for LakeSoul to commit", commits.size()); + // commit by file creation time in ascending order + List committables = + commits.stream().map(CommitRequest::getCommittable).sorted(LakeSoulMultiTableSinkCommittable::compareTo).collect(Collectors.toList()); + this.commit(committables, false); } @Override diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java index a8956b0f1..ab5ab0f84 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java @@ -257,7 +257,7 @@ public List commit( } } - committer.commit(lakeSoulMultiTableSinkCommittable); + committer.commit(lakeSoulMultiTableSinkCommittable, true); } return Collections.emptyList(); } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/AbstractLakeSoulMultiTableSinkWriter.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/AbstractLakeSoulMultiTableSinkWriter.java index c98fb2aa4..f4f2a2f93 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/AbstractLakeSoulMultiTableSinkWriter.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/AbstractLakeSoulMultiTableSinkWriter.java @@ -4,8 +4,9 @@ package org.apache.flink.lakesoul.sink.writer; -import org.apache.flink.api.connector.sink.Sink; -import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.api.common.operators.ProcessingTimeService; +import org.apache.flink.api.connector.sink2.StatefulSink; +import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; @@ -34,7 +35,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A {@link SinkWriter} implementation for {@link LakeSoulMultiTablesSink}. + * A sink writer implementation for {@link LakeSoulMultiTablesSink}. * *

It writes data to and manages the different active {@link LakeSoulWriterBucket buckes} in the * {@link LakeSoulMultiTablesSink}. @@ -42,8 +43,9 @@ * @param The type of input elements. */ public abstract class AbstractLakeSoulMultiTableSinkWriter - implements SinkWriter, - Sink.ProcessingTimeService.ProcessingTimeCallback { + implements + StatefulSink.StatefulSinkWriter, + TwoPhaseCommittingSink.PrecommittingSinkWriter { private static final Logger LOG = LoggerFactory.getLogger(AbstractLakeSoulMultiTableSinkWriter.class); @@ -53,8 +55,6 @@ public abstract class AbstractLakeSoulMultiTableSinkWriter private final RollingPolicy rollingPolicy; - protected final Sink.ProcessingTimeService processingTimeService; - private final long bucketCheckInterval; // --------------------------- runtime fields ----------------------------- @@ -69,13 +69,15 @@ public abstract class AbstractLakeSoulMultiTableSinkWriter protected final Configuration conf; + protected final ProcessingTimeService processingTimeService; + public AbstractLakeSoulMultiTableSinkWriter( int subTaskId, final SinkWriterMetricGroup metricGroup, final LakeSoulWriterBucketFactory bucketFactory, final RollingPolicy rollingPolicy, final OutputFileConfig outputFileConfig, - final Sink.ProcessingTimeService processingTimeService, + final ProcessingTimeService processingTimeService, final long bucketCheckInterval, final Configuration conf) { this.subTaskId = subTaskId; @@ -121,8 +123,6 @@ public void initializeState(List bucketStates) throws updateActiveBucketId(identity, bucketId, restoredBucket); } - - registerNextBucketInspectionTimer(); } private void updateActiveBucketId(TableSchemaIdentity tableId, String bucketId, LakeSoulWriterBucket restoredBucket) @@ -178,7 +178,7 @@ public void write(IN element, Context context) throws IOException { } @Override - public List prepareCommit(boolean flush) throws IOException { + public Collection prepareCommit() throws IOException { List committables = new ArrayList<>(); String dmlType = this.conf.getString(LakeSoulSinkOptions.DML_TYPE); String sourcePartitionInfo = this.conf.getString(LakeSoulSinkOptions.SOURCE_PARTITION_INFO); @@ -192,7 +192,7 @@ public List prepareCommit(boolean flush) thro if (!entry.getValue().isActive()) { activeBucketIt.remove(); } else { - committables.addAll(entry.getValue().prepareCommit(flush, dmlType, sourcePartitionInfo)); + committables.addAll(entry.getValue().prepareCommit(dmlType, sourcePartitionInfo)); } } LOG.info("PrepareCommit with conf={}, \n activeBuckets={}, \n committables={}", conf, activeBuckets, committables); @@ -237,6 +237,11 @@ protected LakeSoulWriterBucket getOrCreateBucketForBucketId( return bucket; } + @Override + public void flush(boolean endOfInput) throws IOException, InterruptedException { + + } + @Override public void close() { if (activeBuckets != null) { @@ -251,21 +256,6 @@ private Path assembleBucketPath(Path basePath, String bucketId) { return new Path(basePath, bucketId); } - @Override - public void onProcessingTime(long time) throws IOException { - for (LakeSoulWriterBucket bucket : activeBuckets.values()) { - bucket.onProcessingTime(time); - } - - registerNextBucketInspectionTimer(); - } - - protected void registerNextBucketInspectionTimer() { - final long nextInspectionTime = - processingTimeService.getCurrentProcessingTime() + bucketCheckInterval; - processingTimeService.registerProcessingTimer(nextInspectionTime, this); - } - protected int getSubTaskId() { return subTaskId; } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulMultiTableSinkWriter.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulMultiTableSinkWriter.java index 1d0a90218..369d0eac4 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulMultiTableSinkWriter.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulMultiTableSinkWriter.java @@ -4,7 +4,7 @@ package org.apache.flink.lakesoul.sink.writer; -import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.common.operators.ProcessingTimeService; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.lakesoul.types.BinarySourceRecord; @@ -32,7 +32,7 @@ public LakeSoulMultiTableSinkWriter(int subTaskId, LakeSoulWriterBucketFactory bucketFactory, RollingPolicy rollingPolicy, OutputFileConfig outputFileConfig, - Sink.ProcessingTimeService processingTimeService, + ProcessingTimeService processingTimeService, long bucketCheckInterval, Configuration conf) { super(subTaskId, metricGroup, bucketFactory, rollingPolicy, outputFileConfig, processingTimeService, diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulRowDataOneTableSinkWriter.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulRowDataOneTableSinkWriter.java index 999b2bbe3..ea0c1cc15 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulRowDataOneTableSinkWriter.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulRowDataOneTableSinkWriter.java @@ -4,7 +4,7 @@ package org.apache.flink.lakesoul.sink.writer; -import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.common.operators.ProcessingTimeService; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.lakesoul.types.LakeSoulRecordConvert; @@ -20,7 +20,6 @@ import java.util.stream.IntStream; import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.SERVER_TIME_ZONE; -import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.USE_CDC; public class LakeSoulRowDataOneTableSinkWriter extends AbstractLakeSoulMultiTableSinkWriter { @@ -39,7 +38,7 @@ public LakeSoulRowDataOneTableSinkWriter( LakeSoulWriterBucketFactory bucketFactory, RollingPolicy rollingPolicy, OutputFileConfig outputFileConfig, - Sink.ProcessingTimeService processingTimeService, + ProcessingTimeService processingTimeService, long bucketCheckInterval, Configuration conf) throws IOException { super(subTaskId, metricGroup, bucketFactory, rollingPolicy, outputFileConfig, diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulWriterBucket.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulWriterBucket.java index 778a2335a..62283586d 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulWriterBucket.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulWriterBucket.java @@ -173,7 +173,7 @@ void write(RowData element, long currentTime, long tsMs) throws IOException { inProgressPartWriter.write(element, currentTime); } - List prepareCommit(boolean flush, String dmlType, String sourcePartitionInfo) + List prepareCommit(String dmlType, String sourcePartitionInfo) throws IOException { // we always close part file and do not keep in-progress file // since the native parquet writer doesn't support resume diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/arrow/LakeSoulArrowMultiTableSinkWriter.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/arrow/LakeSoulArrowMultiTableSinkWriter.java index 5f2e0ba93..90402fb0a 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/arrow/LakeSoulArrowMultiTableSinkWriter.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/arrow/LakeSoulArrowMultiTableSinkWriter.java @@ -4,6 +4,7 @@ package org.apache.flink.lakesoul.sink.writer.arrow; +import org.apache.flink.api.common.operators.ProcessingTimeService; import org.apache.flink.api.connector.sink.Sink; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; @@ -43,7 +44,7 @@ public LakeSoulArrowMultiTableSinkWriter(int subTaskId, LakeSoulArrowWriterBucketFactory bucketFactory, RollingPolicy rollingPolicy, OutputFileConfig outputFileConfig, - Sink.ProcessingTimeService processingTimeService, + ProcessingTimeService processingTimeService, long bucketCheckInterval, Configuration conf) { super(subTaskId, metricGroup, new DefaultLakeSoulWriterBucketFactory(conf), rollingPolicy, outputFileConfig, processingTimeService, @@ -122,8 +123,6 @@ public void initializeState(List bucketStates) throws updateActiveBucketId(identity, restoredBucket); } - - registerNextBucketInspectionTimer(); } private void updateActiveBucketId(TableSchemaIdentity tableId, LakeSoulArrowWriterBucket restoredBucket) @@ -158,7 +157,7 @@ public void close() { } @Override - public List prepareCommit(boolean flush) throws IOException { + public List prepareCommit() throws IOException { long timer = System.currentTimeMillis(); List committables = new ArrayList<>(); String dmlType = this.conf.getString(LakeSoulSinkOptions.DML_TYPE); @@ -173,10 +172,10 @@ public List prepareCommit(boolean flush) thro if (!entry.getValue().isActive()) { activeBucketIt.remove(); } else { - committables.addAll(entry.getValue().prepareCommit(flush, dmlType, sourcePartitionInfo)); + committables.addAll(entry.getValue().prepareCommit(dmlType, sourcePartitionInfo)); } } - LOG.info("LakeSoulArrowMultiTableSinkWriter.prepareCommit done, costTime={}ms, subTaskId={}, flush={}, {}", String.format("%06d", System.currentTimeMillis() - timer), getSubTaskId(), flush, committables); + LOG.info("LakeSoulArrowMultiTableSinkWriter.prepareCommit done, costTime={}ms, subTaskId={}, {}", String.format("%06d", System.currentTimeMillis() - timer), getSubTaskId(), committables); return committables; } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/arrow/LakeSoulArrowWriterBucket.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/arrow/LakeSoulArrowWriterBucket.java index 5f90f195f..856811c0e 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/arrow/LakeSoulArrowWriterBucket.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/arrow/LakeSoulArrowWriterBucket.java @@ -155,15 +155,14 @@ void write(LakeSoulArrowWrapper element, long currentTime, long tsMs) throws IOE inProgressPartWriter.write(element, currentTime); } - List prepareCommit(boolean flush, String dmlType, String sourcePartitionInfo) + List prepareCommit(String dmlType, String sourcePartitionInfo) throws IOException { // we always close part file and do not keep in-progress file // since the native parquet writer doesn't support resume if (inProgressPartWriter != null) { closePartFile(); LOG.info( - "Closing in-progress part file for flush={} bucket id={} subTaskId={} tableId={} pendingFilesMap={} on checkpoint.", - flush, + "Closing in-progress part file for bucket id={} subTaskId={} tableId={} pendingFilesMap={} on checkpoint.", getBucketId(), subTaskId, tableId,