Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support csv lz4 compression #408

Merged
merged 2 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/content/connector-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ In your Maven project's `pom.xml` file, add the Flink connector as a dependency
| sink.properties.row_delimiter | No | \n | The row delimiter for CSV-formatted data. |
| sink.properties.max_filter_ratio | No | 0 | The maximum error tolerance of the Stream Load. It's the maximum percentage of data records that can be filtered out due to inadequate data quality. Valid values: `0` to `1`. Default value: `0`. See [Stream Load](https://docs.starrocks.io/en-us/latest/sql-reference/sql-statements/data-manipulation/STREAM%20LOAD) for details. |
| sink.properties.strict_mode | No | false | Specifies whether to enable the strict mode for Stream Load. It affects the loading behavior when there are unqualified rows, such as inconsistent column values. Valid values: `true` and `false`. Default value: `false`. See [Stream Load](https://docs.starrocks.io/en-us/latest/sql-reference/sql-statements/data-manipulation/STREAM%20LOAD) for details. |
| sink.properties.compression | No | NONE | Supported since 1.2.10. The compression algorithm used for Stream Load. Currently, compression is only supported for the JSON format. Valid values: `lz4_frame`. Compression for json format is supported only in StarRocks v3.2.7 and later. |
| sink.properties.compression | No | NONE | The compression algorithm used for Stream Load. Valid values: `lz4_frame`. Compression for json format needs connector 1.2.10 and StarRocks v3.2.7 or later. Compression for csv format needs connector 1.2.11 and there is no requirements for StarRocks version. |

## Data type mapping between Flink and StarRocks

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,20 @@ public void testJsonLz4Compression() throws Exception {
testConfigurationBase(map, env -> null);
}

@Test
public void testCsvLz4Compression() throws Exception {
assumeTrue(isSinkV2);
Map<String, String> map = new HashMap<>();
map.put("sink.properties.compression", "lz4_frame");
testConfigurationBase(map, env -> null);

map.put("sink.properties.format", "csv");
testConfigurationBase(map, env -> null);

map.put("sink.at-least-once.use-transaction-stream-load", "false");
testConfigurationBase(map, env -> null);
}

@Test
public void testTimestampType() throws Exception {
String tableName = createDatetimeTable("testTimestampType");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.starrocks.data.load.stream.exception.StreamLoadFailException;
import com.starrocks.data.load.stream.properties.StreamLoadProperties;
import com.starrocks.data.load.stream.properties.StreamLoadTableProperties;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
Expand Down Expand Up @@ -297,8 +296,7 @@ protected StreamLoadResponse sendToSR(TableRegion region) {
httpPut.setEntity(region.getHttpEntity());

httpPut.setHeaders(defaultHeaders);
StreamLoadTableProperties tableProperties = region.getProperties();
for (Map.Entry<String, String> entry : tableProperties.getProperties().entrySet()) {
for (Map.Entry<String, String> entry : region.getHeaders().entrySet()) {
httpPut.removeHeaders(entry.getKey());
httpPut.addHeader(entry.getKey(), entry.getValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@
import com.starrocks.data.load.stream.properties.StreamLoadTableProperties;
import org.apache.http.HttpEntity;

import java.util.Map;
import java.util.concurrent.Future;

public interface TableRegion {

StreamLoadTableProperties getProperties();
default Map<String, String> getHeaders() {
throw new UnsupportedOperationException();
}
String getUniqueKey();
String getDatabase();
String getTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@ static Optional<CompressionCodec> createCompressionCodec(StreamLoadDataFormat da
}

if (LZ4FrameCompressionCodec.NAME.equalsIgnoreCase(compressionType.get())) {
if (dataFormat instanceof StreamLoadDataFormat.JSONFormat) {
return Optional.of(LZ4FrameCompressionCodec.create(properties));
}
return Optional.of(LZ4FrameCompressionCodec.create(properties));
}

throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,24 @@

import com.starrocks.data.load.stream.Chunk;
import com.starrocks.data.load.stream.LabelGenerator;
import com.starrocks.data.load.stream.StreamLoadDataFormat;
import com.starrocks.data.load.stream.StreamLoadManager;
import com.starrocks.data.load.stream.StreamLoadResponse;
import com.starrocks.data.load.stream.StreamLoadSnapshot;
import com.starrocks.data.load.stream.StreamLoader;
import com.starrocks.data.load.stream.TableRegion;
import com.starrocks.data.load.stream.compress.CompressionCodec;
import com.starrocks.data.load.stream.compress.CompressionHttpEntity;
import com.starrocks.data.load.stream.compress.LZ4FrameCompressionCodec;
import com.starrocks.data.load.stream.exception.StreamLoadFailException;
import com.starrocks.data.load.stream.http.StreamLoadEntityMeta;
import com.starrocks.data.load.stream.properties.StreamLoadTableProperties;
import org.apache.http.HttpEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
Expand All @@ -60,6 +64,7 @@ enum State {
private final String database;
private final String table;
private final StreamLoadTableProperties properties;
private final Map<String, String> headers = new HashMap<>();
private final Optional<CompressionCodec> compressionCodec;
private final AtomicLong age = new AtomicLong(0L);
private final AtomicLong cacheBytes = new AtomicLong();
Expand Down Expand Up @@ -95,6 +100,7 @@ public TransactionTableRegion(String uniqueKey,
this.properties = properties;
this.streamLoader = streamLoader;
this.labelGenerator = labelGenerator;
initHeaders(properties);
this.compressionCodec = CompressionCodec.createCompressionCodec(
properties.getDataFormat(),
properties.getProperty("compression"),
Expand All @@ -104,13 +110,40 @@ public TransactionTableRegion(String uniqueKey,
this.activeChunk = new Chunk(properties.getDataFormat());
this.maxRetries = maxRetries;
this.retryIntervalInMs = retryIntervalInMs;
initHeaders(properties);
}

private void initHeaders(StreamLoadTableProperties properties) {
headers.putAll(properties.getProperties());
Optional<String> compressionType = properties.getProperty("compression");
// To enable csv compression, at the connector side, the user need to set two properties:
// "format = csv" and "compression = <compression type>". It needs to be converted to one
// header "format = <compression type>" which matches the server usage. In the future, the
// server will be refactored to configure the compression type in the same way as the connector,
// and this conversion will be removed.
if (properties.getDataFormat() instanceof StreamLoadDataFormat.CSVFormat && compressionType.isPresent()) {
// You can see the format name for different compression types here
// https://github.com/StarRocks/starrocks/blob/main/be/src/http/action/stream_load.cpp#L96
if (LZ4FrameCompressionCodec.NAME.equalsIgnoreCase(compressionType.get())) {
headers.put("format", "lz4");
} else {
throw new UnsupportedOperationException(
"CSV format does not support compression type: " + compressionType.get());
}

}
}

@Override
public StreamLoadTableProperties getProperties() {
return properties;
}

@Override
public Map<String, String> getHeaders() {
return headers;
}

@Override
public String getUniqueKey() {
return uniqueKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.starrocks.data.load.stream.v2.ChunkHttpEntity;

import java.io.ByteArrayOutputStream;
import java.util.Arrays;

import static com.starrocks.data.load.stream.ChunkInputStreamTest.genChunk;
import static org.junit.Assert.assertArrayEquals;
Expand All @@ -37,10 +38,16 @@ public void testCompressBase(CompressionCodec compressionCodec) throws Exception
ChunkInputStreamTest.ChunkMeta chunkMeta = genChunk();
ChunkHttpEntity entity = new ChunkHttpEntity("test", chunkMeta.chunk);
CompressionHttpEntity compressionEntity = new CompressionHttpEntity(entity, compressionCodec);
ChunkInputStreamTest.ChunkMeta chunkMeta1 = genChunk();
ChunkHttpEntity entity1 = new ChunkHttpEntity("test", chunkMeta1.chunk);
CompressionHttpEntity compressionEntity1 = new CompressionHttpEntity(entity1, compressionCodec);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
compressionEntity.writeTo(outputStream);
compressionEntity1.writeTo(outputStream);
byte[] result = outputStream.toByteArray();
byte[] descompressData = decompress(result, (int) entity.getContentLength());
assertArrayEquals(chunkMeta.expectedData, descompressData);
byte[] descompressData = decompress(result, (int) (entity.getContentLength() + entity1.getContentLength()));
assertArrayEquals(chunkMeta.expectedData, Arrays.copyOfRange(descompressData, 0, (int) entity.getContentLength()));
assertArrayEquals(chunkMeta1.expectedData,
Arrays.copyOfRange(descompressData, (int) entity.getContentLength(), descompressData.length));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ public class LZ4FrameCompressionCodecTest extends CompressionCodecTestBase {
protected byte[] decompress(byte[] compressedData, int rawSize) throws Exception {
byte[] result = new byte[rawSize];
LZ4FrameInputStream inputStream = new LZ4FrameInputStream(new ByteArrayInputStream(compressedData));
inputStream.read(result);
int totalRead = 0;
int n = inputStream.read(result, totalRead, rawSize - totalRead);
while (n > 0) {
totalRead += n;
n = inputStream.read(result, totalRead, rawSize - totalRead);
}
inputStream.close();
return result;
}
Expand Down
Loading