Skip to content

Commit

Permalink
[Refactor] Rename RecordSerializer to StarRocksRecordSerializationSchema
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Nov 28, 2023
1 parent cfe73c0 commit c88c133
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import com.starrocks.connector.flink.row.sink.StarRocksIRowTransformer;
import com.starrocks.connector.flink.row.sink.StarRocksISerializer;
import com.starrocks.connector.flink.row.sink.StarRocksSerializerFactory;
import com.starrocks.connector.flink.table.sink.v2.RecordSerializer;
import com.starrocks.connector.flink.table.sink.v2.RowDataSerializer;
import com.starrocks.connector.flink.table.sink.v2.StarRocksRecordSerializationSchema;
import com.starrocks.connector.flink.table.sink.v2.RowDataSerializationSchema;
import com.starrocks.connector.flink.table.sink.v2.StarRocksSink;
import com.starrocks.data.load.stream.properties.StreamLoadProperties;
import org.slf4j.Logger;
Expand Down Expand Up @@ -138,21 +138,21 @@ public static StarRocksSink<RowData> createSink(
StarRocksISerializer serializer = StarRocksSerializerFactory.createSerializer(sinkOptions, schema.getFieldNames());
rowTransformer.setStarRocksColumns(sinkTable.getFieldMapping());
rowTransformer.setTableSchema(schema);
RowDataSerializer recordSerializer = new RowDataSerializer(
RowDataSerializationSchema serializationSchema = new RowDataSerializationSchema(
sinkOptions.getDatabaseName(),
sinkOptions.getTableName(),
sinkOptions.supportUpsertDelete(),
sinkOptions.getIgnoreUpdateBefore(),
serializer,
rowTransformer);
StreamLoadProperties streamLoadProperties = sinkOptions.getProperties(sinkTable);
return new StarRocksSink<>(sinkOptions, recordSerializer, streamLoadProperties);
return new StarRocksSink<>(sinkOptions, serializationSchema, streamLoadProperties);
}
throw new UnsupportedOperationException("New sink api don't support sink type " + sinkVersion.name());
}

public static <T> StarRocksSink<T> createSink(
StarRocksSinkOptions sinkOptions, RecordSerializer<T> recordSerializer) {
StarRocksSinkOptions sinkOptions, StarRocksRecordSerializationSchema<T> recordSerializer) {
detectStarRocksFeature(sinkOptions);
SinkVersion sinkVersion = getSinkVersion(sinkOptions);
if (sinkVersion == SinkVersion.V2) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import com.starrocks.connector.flink.tools.JsonWrapper;

/** Serializer for the {@link RowData} record. */
public class RowDataSerializer implements RecordSerializer<RowData> {
public class RowDataSerializationSchema implements StarRocksRecordSerializationSchema<RowData> {

private static final long serialVersionUID = 1L;

Expand All @@ -42,7 +42,7 @@ public class RowDataSerializer implements RecordSerializer<RowData> {
private final StarRocksIRowTransformer<RowData> rowTransformer;
private transient DefaultStarRocksRowData reusableRowData;

public RowDataSerializer(
public RowDataSerializationSchema(
String databaseName,
String tableName,
boolean supportUpsertDelete,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@
import java.io.Serializable;

/**
* Interface for the input record serialization.
* A serialization schema which defines how to convert a value
* of type {@code T} to {@link StarRocksRowData}.
*
* @param <T> the type of input record being serialized
*/
public interface RecordSerializer<T> extends Serializable {
public interface StarRocksRecordSerializationSchema<T> extends Serializable {

/** Open the serializer. */
void open();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ public class StarRocksSink<InputT>
private static final Logger LOG = LoggerFactory.getLogger(StarRocksSink.class);

private final StarRocksSinkOptions sinkOptions;
private final RecordSerializer<InputT> recordSerializer;
private final StarRocksRecordSerializationSchema<InputT> serializationSchema;
private final StreamLoadProperties streamLoadProperties;

public StarRocksSink(
StarRocksSinkOptions sinkOptions,
RecordSerializer<InputT> recordSerializer,
StarRocksRecordSerializationSchema<InputT> serializationSchema,
StreamLoadProperties streamLoadProperties) {
this.sinkOptions = sinkOptions;
this.recordSerializer = recordSerializer;
this.serializationSchema = serializationSchema;
this.streamLoadProperties = streamLoadProperties;
}

Expand All @@ -65,7 +65,7 @@ public StarRocksWriter<InputT> restoreWriter(InitContext context, Collection<Sta
try {
return new StarRocksWriter<>(
sinkOptions,
recordSerializer,
serializationSchema,
streamLoadProperties,
context,
Collections.emptyList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,21 @@ public class StarRocksWriter<InputT>
private static final Logger LOG = LoggerFactory.getLogger(StarRocksWriter.class);

private final StarRocksSinkOptions sinkOptions;
private final RecordSerializer<InputT> recordSerializer;
private final StarRocksRecordSerializationSchema<InputT> serializationSchema;
private final StarRocksStreamLoadListener streamLoadListener;
private final LabelGeneratorFactory labelGeneratorFactory;
private final StreamLoadManagerV2 sinkManager;
private long totalReceivedRows = 0;

public StarRocksWriter(
StarRocksSinkOptions sinkOptions,
RecordSerializer<InputT> recordSerializer,
StarRocksRecordSerializationSchema<InputT> serializationSchema,
StreamLoadProperties streamLoadProperties,
Sink.InitContext initContext,
Collection<StarRocksWriterState> recoveredState) throws Exception {
this.sinkOptions = sinkOptions;
this.recordSerializer = recordSerializer;
this.recordSerializer.open();
this.serializationSchema = serializationSchema;
this.serializationSchema.open();
this.streamLoadListener = new StarRocksStreamLoadListener(initContext.metricGroup(), sinkOptions);
long restoredCheckpointId = initContext.getRestoredCheckpointId()
.orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1);
Expand Down Expand Up @@ -134,7 +134,7 @@ public StarRocksWriter(

@Override
public void write(InputT element, Context context) throws IOException, InterruptedException {
StarRocksRowData rowData = recordSerializer.serialize(element);
StarRocksRowData rowData = serializationSchema.serialize(element);
sinkManager.write(rowData.getUniqueKey(), rowData.getDatabase(), rowData.getTable(), rowData.getRow());
totalReceivedRows += 1;
if (totalReceivedRows % 100 == 1) {
Expand Down Expand Up @@ -178,7 +178,7 @@ public List<StarRocksWriterState> snapshotState(long checkpointId) throws IOExce
@Override
public void close() throws Exception {
LOG.info("Close StarRocksWriter");
recordSerializer.close();
serializationSchema.close();
if (sinkManager != null) {
try {
StreamLoadSnapshot snapshot = sinkManager.snapshot();
Expand Down

0 comments on commit c88c133

Please sign in to comment.