Skip to content

Commit

Permalink
[Feature] Support csv lz4 compression (#408)
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy authored Jan 2, 2025
1 parent 18f7f1e commit c79c53a
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 10 deletions.
2 changes: 1 addition & 1 deletion docs/content/connector-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ In your Maven project's `pom.xml` file, add the Flink connector as a dependency
| sink.properties.row_delimiter | No | \n | The row delimiter for CSV-formatted data. |
| sink.properties.max_filter_ratio | No | 0 | The maximum error tolerance of the Stream Load. It's the maximum percentage of data records that can be filtered out due to inadequate data quality. Valid values: `0` to `1`. Default value: `0`. See [Stream Load](https://docs.starrocks.io/en-us/latest/sql-reference/sql-statements/data-manipulation/STREAM%20LOAD) for details. |
| sink.properties.strict_mode | No | false | Specifies whether to enable the strict mode for Stream Load. It affects the loading behavior when there are unqualified rows, such as inconsistent column values. Valid values: `true` and `false`. Default value: `false`. See [Stream Load](https://docs.starrocks.io/en-us/latest/sql-reference/sql-statements/data-manipulation/STREAM%20LOAD) for details. |
| sink.properties.compression | No | NONE | Supported since 1.2.10. The compression algorithm used for Stream Load. Currently, compression is only supported for the JSON format. Valid values: `lz4_frame`. Compression for json format is supported only in StarRocks v3.2.7 and later. |
| sink.properties.compression | No | NONE | The compression algorithm used for Stream Load. Valid values: `lz4_frame`. Compression for json format needs connector 1.2.10 and StarRocks v3.2.7 or later. Compression for csv format needs connector 1.2.11 and there is no requirements for StarRocks version. |

## Data type mapping between Flink and StarRocks

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,20 @@ public void testJsonLz4Compression() throws Exception {
testConfigurationBase(map, env -> null);
}

@Test
public void testCsvLz4Compression() throws Exception {
assumeTrue(isSinkV2);
Map<String, String> map = new HashMap<>();
map.put("sink.properties.compression", "lz4_frame");
testConfigurationBase(map, env -> null);

map.put("sink.properties.format", "csv");
testConfigurationBase(map, env -> null);

map.put("sink.at-least-once.use-transaction-stream-load", "false");
testConfigurationBase(map, env -> null);
}

@Test
public void testTimestampType() throws Exception {
String tableName = createDatetimeTable("testTimestampType");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.starrocks.data.load.stream.exception.StreamLoadFailException;
import com.starrocks.data.load.stream.properties.StreamLoadProperties;
import com.starrocks.data.load.stream.properties.StreamLoadTableProperties;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
Expand Down Expand Up @@ -297,8 +296,7 @@ protected StreamLoadResponse sendToSR(TableRegion region) {
httpPut.setEntity(region.getHttpEntity());

httpPut.setHeaders(defaultHeaders);
StreamLoadTableProperties tableProperties = region.getProperties();
for (Map.Entry<String, String> entry : tableProperties.getProperties().entrySet()) {
for (Map.Entry<String, String> entry : region.getHeaders().entrySet()) {
httpPut.removeHeaders(entry.getKey());
httpPut.addHeader(entry.getKey(), entry.getValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@
import com.starrocks.data.load.stream.properties.StreamLoadTableProperties;
import org.apache.http.HttpEntity;

import java.util.Map;
import java.util.concurrent.Future;

public interface TableRegion {

StreamLoadTableProperties getProperties();
default Map<String, String> getHeaders() {
throw new UnsupportedOperationException();
}
String getUniqueKey();
String getDatabase();
String getTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@ static Optional<CompressionCodec> createCompressionCodec(StreamLoadDataFormat da
}

if (LZ4FrameCompressionCodec.NAME.equalsIgnoreCase(compressionType.get())) {
if (dataFormat instanceof StreamLoadDataFormat.JSONFormat) {
return Optional.of(LZ4FrameCompressionCodec.create(properties));
}
return Optional.of(LZ4FrameCompressionCodec.create(properties));
}

throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,24 @@

import com.starrocks.data.load.stream.Chunk;
import com.starrocks.data.load.stream.LabelGenerator;
import com.starrocks.data.load.stream.StreamLoadDataFormat;
import com.starrocks.data.load.stream.StreamLoadManager;
import com.starrocks.data.load.stream.StreamLoadResponse;
import com.starrocks.data.load.stream.StreamLoadSnapshot;
import com.starrocks.data.load.stream.StreamLoader;
import com.starrocks.data.load.stream.TableRegion;
import com.starrocks.data.load.stream.compress.CompressionCodec;
import com.starrocks.data.load.stream.compress.CompressionHttpEntity;
import com.starrocks.data.load.stream.compress.LZ4FrameCompressionCodec;
import com.starrocks.data.load.stream.exception.StreamLoadFailException;
import com.starrocks.data.load.stream.http.StreamLoadEntityMeta;
import com.starrocks.data.load.stream.properties.StreamLoadTableProperties;
import org.apache.http.HttpEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
Expand All @@ -60,6 +64,7 @@ enum State {
private final String database;
private final String table;
private final StreamLoadTableProperties properties;
private final Map<String, String> headers = new HashMap<>();
private final Optional<CompressionCodec> compressionCodec;
private final AtomicLong age = new AtomicLong(0L);
private final AtomicLong cacheBytes = new AtomicLong();
Expand Down Expand Up @@ -95,6 +100,7 @@ public TransactionTableRegion(String uniqueKey,
this.properties = properties;
this.streamLoader = streamLoader;
this.labelGenerator = labelGenerator;
initHeaders(properties);
this.compressionCodec = CompressionCodec.createCompressionCodec(
properties.getDataFormat(),
properties.getProperty("compression"),
Expand All @@ -104,13 +110,40 @@ public TransactionTableRegion(String uniqueKey,
this.activeChunk = new Chunk(properties.getDataFormat());
this.maxRetries = maxRetries;
this.retryIntervalInMs = retryIntervalInMs;
initHeaders(properties);
}

private void initHeaders(StreamLoadTableProperties properties) {
headers.putAll(properties.getProperties());
Optional<String> compressionType = properties.getProperty("compression");
// To enable csv compression, at the connector side, the user need to set two properties:
// "format = csv" and "compression = <compression type>". It needs to be converted to one
// header "format = <compression type>" which matches the server usage. In the future, the
// server will be refactored to configure the compression type in the same way as the connector,
// and this conversion will be removed.
if (properties.getDataFormat() instanceof StreamLoadDataFormat.CSVFormat && compressionType.isPresent()) {
// You can see the format name for different compression types here
// https://github.com/StarRocks/starrocks/blob/main/be/src/http/action/stream_load.cpp#L96
if (LZ4FrameCompressionCodec.NAME.equalsIgnoreCase(compressionType.get())) {
headers.put("format", "lz4");
} else {
throw new UnsupportedOperationException(
"CSV format does not support compression type: " + compressionType.get());
}

}
}

@Override
public StreamLoadTableProperties getProperties() {
return properties;
}

@Override
public Map<String, String> getHeaders() {
return headers;
}

@Override
public String getUniqueKey() {
return uniqueKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.starrocks.data.load.stream.v2.ChunkHttpEntity;

import java.io.ByteArrayOutputStream;
import java.util.Arrays;

import static com.starrocks.data.load.stream.ChunkInputStreamTest.genChunk;
import static org.junit.Assert.assertArrayEquals;
Expand All @@ -37,10 +38,16 @@ public void testCompressBase(CompressionCodec compressionCodec) throws Exception
ChunkInputStreamTest.ChunkMeta chunkMeta = genChunk();
ChunkHttpEntity entity = new ChunkHttpEntity("test", chunkMeta.chunk);
CompressionHttpEntity compressionEntity = new CompressionHttpEntity(entity, compressionCodec);
ChunkInputStreamTest.ChunkMeta chunkMeta1 = genChunk();
ChunkHttpEntity entity1 = new ChunkHttpEntity("test", chunkMeta1.chunk);
CompressionHttpEntity compressionEntity1 = new CompressionHttpEntity(entity1, compressionCodec);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
compressionEntity.writeTo(outputStream);
compressionEntity1.writeTo(outputStream);
byte[] result = outputStream.toByteArray();
byte[] descompressData = decompress(result, (int) entity.getContentLength());
assertArrayEquals(chunkMeta.expectedData, descompressData);
byte[] descompressData = decompress(result, (int) (entity.getContentLength() + entity1.getContentLength()));
assertArrayEquals(chunkMeta.expectedData, Arrays.copyOfRange(descompressData, 0, (int) entity.getContentLength()));
assertArrayEquals(chunkMeta1.expectedData,
Arrays.copyOfRange(descompressData, (int) entity.getContentLength(), descompressData.length));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ public class LZ4FrameCompressionCodecTest extends CompressionCodecTestBase {
protected byte[] decompress(byte[] compressedData, int rawSize) throws Exception {
byte[] result = new byte[rawSize];
LZ4FrameInputStream inputStream = new LZ4FrameInputStream(new ByteArrayInputStream(compressedData));
inputStream.read(result);
int totalRead = 0;
int n = inputStream.read(result, totalRead, rawSize - totalRead);
while (n > 0) {
totalRead += n;
n = inputStream.read(result, totalRead, rawSize - totalRead);
}
inputStream.close();
return result;
}
Expand Down

0 comments on commit c79c53a

Please sign in to comment.