Skip to content

Commit

Permalink
[Enhancement] Refactor the input record serializer for new sink api (#…
Browse files Browse the repository at this point in the history
…307)

Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy authored Nov 27, 2023
1 parent bfa8c97 commit 70f6685
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,17 @@

package com.starrocks.connector.flink.table.sink;

import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;

import com.starrocks.connector.flink.manager.StarRocksSinkTable;
import com.starrocks.connector.flink.row.sink.StarRocksIRowTransformer;
import com.starrocks.connector.flink.row.sink.StarRocksISerializer;
import com.starrocks.connector.flink.row.sink.StarRocksSerializerFactory;
import com.starrocks.connector.flink.table.sink.v2.RecordSerializer;
import com.starrocks.connector.flink.table.sink.v2.RowDataSerializer;
import com.starrocks.connector.flink.table.sink.v2.StarRocksSink;
import org.apache.flink.table.api.TableSchema;
import com.starrocks.data.load.stream.properties.StreamLoadProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -116,21 +124,40 @@ public static <T> StarRocksDynamicSinkFunctionBase<T> createSinkFunction(StarRoc
}
}

public static <T> StarRocksSink<T> createSink(
StarRocksSinkOptions sinkOptions, TableSchema schema, StarRocksIRowTransformer<T> rowTransformer) {
public static StarRocksSink<RowData> createSink(
StarRocksSinkOptions sinkOptions, TableSchema schema, StarRocksIRowTransformer<RowData> rowTransformer) {
detectStarRocksFeature(sinkOptions);
SinkVersion sinkVersion = getSinkVersion(sinkOptions);
if (sinkVersion == SinkVersion.V2) {
return new StarRocksSink<>(sinkOptions, schema, rowTransformer);
StarRocksSinkTable sinkTable = StarRocksSinkTable.builder()
.sinkOptions(sinkOptions)
.build();
sinkTable.validateTableStructure(sinkOptions, schema);
// StarRocksJsonSerializer depends on SinkOptions#supportUpsertDelete which is decided in
// StarRocksSinkTable#validateTableStructure, so create serializer after validating table structure
StarRocksISerializer serializer = StarRocksSerializerFactory.createSerializer(sinkOptions, schema.getFieldNames());
rowTransformer.setStarRocksColumns(sinkTable.getFieldMapping());
rowTransformer.setTableSchema(schema);
RowDataSerializer recordSerializer = new RowDataSerializer(
sinkOptions.getDatabaseName(),
sinkOptions.getTableName(),
sinkOptions.supportUpsertDelete(),
sinkOptions.getIgnoreUpdateBefore(),
serializer,
rowTransformer);
StreamLoadProperties streamLoadProperties = sinkOptions.getProperties(sinkTable);
return new StarRocksSink<>(sinkOptions, recordSerializer, streamLoadProperties);
}
throw new UnsupportedOperationException("New sink api don't support sink type " + sinkVersion.name());
}

public static <T> StarRocksSink<T> createSink(StarRocksSinkOptions sinkOptions) {
public static <T> StarRocksSink<T> createSink(
StarRocksSinkOptions sinkOptions, RecordSerializer<T> recordSerializer) {
detectStarRocksFeature(sinkOptions);
SinkVersion sinkVersion = getSinkVersion(sinkOptions);
if (sinkVersion == SinkVersion.V2) {
return new StarRocksSink<>(sinkOptions);
StreamLoadProperties streamLoadProperties = sinkOptions.getProperties(null);
return new StarRocksSink<>(sinkOptions, recordSerializer, streamLoadProperties);
}
throw new UnsupportedOperationException("New sink api don't support sink type " + sinkVersion.name());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2021-present StarRocks, Inc. All rights reserved.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.starrocks.connector.flink.table.sink.v2;

import com.starrocks.connector.flink.table.data.StarRocksRowData;

import java.io.Serializable;

/**
* Interface for the input record serialization.
*
* @param <T> the type of input record being serialized
*/
public interface RecordSerializer<T> extends Serializable {

/** Open the serializer. */
void open();

/** Serialize the input record. */
StarRocksRowData serialize(T record);

/** Close the serializer */
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright 2021-present StarRocks, Inc. All rights reserved.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.starrocks.connector.flink.table.sink.v2;

import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;

import com.starrocks.connector.flink.row.sink.StarRocksIRowTransformer;
import com.starrocks.connector.flink.row.sink.StarRocksISerializer;
import com.starrocks.connector.flink.table.data.DefaultStarRocksRowData;
import com.starrocks.connector.flink.table.data.StarRocksRowData;
import com.starrocks.connector.flink.tools.JsonWrapper;

/** Serializer for the {@link RowData} record. */
public class RowDataSerializer implements RecordSerializer<RowData> {

private static final long serialVersionUID = 1L;

private final String databaseName;
private final String tableName;
boolean supportUpsertDelete;
boolean ignoreUpdateBefore;
private final StarRocksISerializer serializer;
private final StarRocksIRowTransformer<RowData> rowTransformer;
private transient DefaultStarRocksRowData reusableRowData;

public RowDataSerializer(
String databaseName,
String tableName,
boolean supportUpsertDelete,
boolean ignoreUpdateBefore,
StarRocksISerializer serializer,
StarRocksIRowTransformer<RowData> rowTransformer) {
this.databaseName = databaseName;
this.tableName = tableName;
this.supportUpsertDelete = supportUpsertDelete;
this.ignoreUpdateBefore = ignoreUpdateBefore;
this.serializer = serializer;
this.rowTransformer = rowTransformer;
}

@Override
public void open() {
JsonWrapper jsonWrapper = new JsonWrapper();
this.serializer.open(new StarRocksISerializer.SerializerContext(jsonWrapper));
this.rowTransformer.setRuntimeContext(null);
this.rowTransformer.setFastJsonWrapper(jsonWrapper);
this.reusableRowData = new DefaultStarRocksRowData();
reusableRowData.setDatabase(databaseName);
reusableRowData.setTable(tableName);
}

@Override
public StarRocksRowData serialize(RowData record) {
if (RowKind.UPDATE_BEFORE.equals(record.getRowKind()) &&
(!supportUpsertDelete || ignoreUpdateBefore)) {
return null;
}
if (!supportUpsertDelete && RowKind.DELETE.equals(record.getRowKind())) {
// let go the UPDATE_AFTER and INSERT rows for tables who have a group of `unique` or `duplicate` keys.
return null;
}
String serializedRow = serializer.serialize(rowTransformer.transform(record, supportUpsertDelete));
reusableRowData.setRow(serializedRow);
return reusableRowData;
}

@Override
public void close() {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,13 @@

package com.starrocks.connector.flink.table.sink.v2;

import com.starrocks.connector.flink.manager.StarRocksSinkTable;
import com.starrocks.connector.flink.row.sink.StarRocksIRowTransformer;
import com.starrocks.connector.flink.row.sink.StarRocksISerializer;
import com.starrocks.connector.flink.row.sink.StarRocksSerializerFactory;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
import com.starrocks.data.load.stream.properties.StreamLoadProperties;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.table.api.TableSchema;

import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
import com.starrocks.data.load.stream.properties.StreamLoadProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -41,34 +37,21 @@
public class StarRocksSink<InputT>
implements StatefulSink<InputT, StarRocksWriterState>, TwoPhaseCommittingSink<InputT, StarRocksCommittable> {

private static final long serialVersionUID = 1L;

private static final Logger LOG = LoggerFactory.getLogger(StarRocksSink.class);

private final StarRocksSinkOptions sinkOptions;
private final StarRocksISerializer serializer;
private final StarRocksIRowTransformer<InputT> rowTransformer;
private final RecordSerializer<InputT> recordSerializer;
private final StreamLoadProperties streamLoadProperties;


public StarRocksSink(StarRocksSinkOptions sinkOptions, TableSchema schema, StarRocksIRowTransformer<InputT> rowTransformer) {
this.sinkOptions = sinkOptions;
this.rowTransformer = rowTransformer;
StarRocksSinkTable sinkTable = StarRocksSinkTable.builder()
.sinkOptions(sinkOptions)
.build();
sinkTable.validateTableStructure(sinkOptions, schema);
// StarRocksJsonSerializer depends on SinkOptions#supportUpsertDelete which is decided in
// StarRocksSinkTable#validateTableStructure, so create serializer after validating table structure
this.serializer = StarRocksSerializerFactory.createSerializer(sinkOptions, schema.getFieldNames());
rowTransformer.setStarRocksColumns(sinkTable.getFieldMapping());
rowTransformer.setTableSchema(schema);
this.streamLoadProperties = sinkOptions.getProperties(sinkTable);
}

public StarRocksSink(StarRocksSinkOptions sinkOptions) {
public StarRocksSink(
StarRocksSinkOptions sinkOptions,
RecordSerializer<InputT> recordSerializer,
StreamLoadProperties streamLoadProperties) {
this.sinkOptions = sinkOptions;
this.serializer = null;
this.rowTransformer = null;
this.streamLoadProperties = sinkOptions.getProperties(null);
this.recordSerializer = recordSerializer;
this.streamLoadProperties = streamLoadProperties;
}

@Override
Expand All @@ -82,8 +65,7 @@ public StarRocksWriter<InputT> restoreWriter(InitContext context, Collection<Sta
try {
return new StarRocksWriter<>(
sinkOptions,
serializer,
rowTransformer,
recordSerializer,
streamLoadProperties,
context,
Collections.emptyList());
Expand Down
Loading

0 comments on commit 70f6685

Please sign in to comment.