diff --git a/docs/content/connector-sink.md b/docs/content/connector-sink.md index 6ac099d3..ad78b168 100644 --- a/docs/content/connector-sink.md +++ b/docs/content/connector-sink.md @@ -113,7 +113,7 @@ In your Maven project's `pom.xml` file, add the Flink connector as a dependency | sink.properties.row_delimiter | No | \n | The row delimiter for CSV-formatted data. | | sink.properties.max_filter_ratio | No | 0 | The maximum error tolerance of the Stream Load. It's the maximum percentage of data records that can be filtered out due to inadequate data quality. Valid values: `0` to `1`. Default value: `0`. See [Stream Load](https://docs.starrocks.io/en-us/latest/sql-reference/sql-statements/data-manipulation/STREAM%20LOAD) for details. | | sink.properties.strict_mode | No | false | Specifies whether to enable the strict mode for Stream Load. It affects the loading behavior when there are unqualified rows, such as inconsistent column values. Valid values: `true` and `false`. Default value: `false`. See [Stream Load](https://docs.starrocks.io/en-us/latest/sql-reference/sql-statements/data-manipulation/STREAM%20LOAD) for details. | -| sink.properties.compression | No | NONE | Supported since 1.2.10. The compression algorithm used for Stream Load. Currently, compression is only supported for the JSON format. Valid values: `lz4_frame`. Compression for json format is supported only in StarRocks v3.2.7 and later. | +| sink.properties.compression | No | NONE | The compression algorithm used for Stream Load. Valid values: `lz4_frame`. Compression for json format needs connector 1.2.10 and StarRocks v3.2.7 or later. Compression for csv format needs connector 1.2.11 and there is no requirements for StarRocks version. | ## Data type mapping between Flink and StarRocks diff --git a/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java b/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java index 2db72436..fe1ee891 100644 --- a/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java +++ b/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java @@ -741,6 +741,20 @@ public void testJsonLz4Compression() throws Exception { testConfigurationBase(map, env -> null); } + @Test + public void testCsvLz4Compression() throws Exception { + assumeTrue(isSinkV2); + Map map = new HashMap<>(); + map.put("sink.properties.compression", "lz4_frame"); + testConfigurationBase(map, env -> null); + + map.put("sink.properties.format", "csv"); + testConfigurationBase(map, env -> null); + + map.put("sink.at-least-once.use-transaction-stream-load", "false"); + testConfigurationBase(map, env -> null); + } + @Test public void testTimestampType() throws Exception { String tableName = createDatetimeTable("testTimestampType"); diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/DefaultStreamLoader.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/DefaultStreamLoader.java index 151d98b5..7f83bbe8 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/DefaultStreamLoader.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/DefaultStreamLoader.java @@ -25,7 +25,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.starrocks.data.load.stream.exception.StreamLoadFailException; import com.starrocks.data.load.stream.properties.StreamLoadProperties; -import com.starrocks.data.load.stream.properties.StreamLoadTableProperties; import org.apache.http.Header; import org.apache.http.HttpEntity; import org.apache.http.HttpHeaders; @@ -297,8 +296,7 @@ protected StreamLoadResponse sendToSR(TableRegion region) { httpPut.setEntity(region.getHttpEntity()); httpPut.setHeaders(defaultHeaders); - StreamLoadTableProperties tableProperties = region.getProperties(); - for (Map.Entry entry : tableProperties.getProperties().entrySet()) { + for (Map.Entry entry : region.getHeaders().entrySet()) { httpPut.removeHeaders(entry.getKey()); httpPut.addHeader(entry.getKey(), entry.getValue()); } diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/TableRegion.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/TableRegion.java index a9912ae8..07e0c6de 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/TableRegion.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/TableRegion.java @@ -25,11 +25,15 @@ import com.starrocks.data.load.stream.properties.StreamLoadTableProperties; import org.apache.http.HttpEntity; +import java.util.Map; import java.util.concurrent.Future; public interface TableRegion { StreamLoadTableProperties getProperties(); + default Map getHeaders() { + throw new UnsupportedOperationException(); + } String getUniqueKey(); String getDatabase(); String getTable(); diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CompressionCodec.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CompressionCodec.java index a6050bd1..848907aa 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CompressionCodec.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CompressionCodec.java @@ -47,9 +47,7 @@ static Optional createCompressionCodec(StreamLoadDataFormat da } if (LZ4FrameCompressionCodec.NAME.equalsIgnoreCase(compressionType.get())) { - if (dataFormat instanceof StreamLoadDataFormat.JSONFormat) { - return Optional.of(LZ4FrameCompressionCodec.create(properties)); - } + return Optional.of(LZ4FrameCompressionCodec.create(properties)); } throw new UnsupportedOperationException( diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java index c1f410c4..1bedba9e 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java @@ -20,6 +20,7 @@ import com.starrocks.data.load.stream.Chunk; import com.starrocks.data.load.stream.LabelGenerator; +import com.starrocks.data.load.stream.StreamLoadDataFormat; import com.starrocks.data.load.stream.StreamLoadManager; import com.starrocks.data.load.stream.StreamLoadResponse; import com.starrocks.data.load.stream.StreamLoadSnapshot; @@ -27,6 +28,7 @@ import com.starrocks.data.load.stream.TableRegion; import com.starrocks.data.load.stream.compress.CompressionCodec; import com.starrocks.data.load.stream.compress.CompressionHttpEntity; +import com.starrocks.data.load.stream.compress.LZ4FrameCompressionCodec; import com.starrocks.data.load.stream.exception.StreamLoadFailException; import com.starrocks.data.load.stream.http.StreamLoadEntityMeta; import com.starrocks.data.load.stream.properties.StreamLoadTableProperties; @@ -34,6 +36,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Future; @@ -60,6 +64,7 @@ enum State { private final String database; private final String table; private final StreamLoadTableProperties properties; + private final Map headers = new HashMap<>(); private final Optional compressionCodec; private final AtomicLong age = new AtomicLong(0L); private final AtomicLong cacheBytes = new AtomicLong(); @@ -95,6 +100,7 @@ public TransactionTableRegion(String uniqueKey, this.properties = properties; this.streamLoader = streamLoader; this.labelGenerator = labelGenerator; + initHeaders(properties); this.compressionCodec = CompressionCodec.createCompressionCodec( properties.getDataFormat(), properties.getProperty("compression"), @@ -104,6 +110,28 @@ public TransactionTableRegion(String uniqueKey, this.activeChunk = new Chunk(properties.getDataFormat()); this.maxRetries = maxRetries; this.retryIntervalInMs = retryIntervalInMs; + initHeaders(properties); + } + + private void initHeaders(StreamLoadTableProperties properties) { + headers.putAll(properties.getProperties()); + Optional compressionType = properties.getProperty("compression"); + // To enable csv compression, at the connector side, the user need to set two properties: + // "format = csv" and "compression = ". It needs to be converted to one + // header "format = " which matches the server usage. In the future, the + // server will be refactored to configure the compression type in the same way as the connector, + // and this conversion will be removed. + if (properties.getDataFormat() instanceof StreamLoadDataFormat.CSVFormat && compressionType.isPresent()) { + // You can see the format name for different compression types here + // https://github.com/StarRocks/starrocks/blob/main/be/src/http/action/stream_load.cpp#L96 + if (LZ4FrameCompressionCodec.NAME.equalsIgnoreCase(compressionType.get())) { + headers.put("format", "lz4"); + } else { + throw new UnsupportedOperationException( + "CSV format does not support compression type: " + compressionType.get()); + } + + } } @Override @@ -111,6 +139,11 @@ public StreamLoadTableProperties getProperties() { return properties; } + @Override + public Map getHeaders() { + return headers; + } + @Override public String getUniqueKey() { return uniqueKey; diff --git a/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/CompressionCodecTestBase.java b/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/CompressionCodecTestBase.java index 4f63c806..1b7db9e2 100644 --- a/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/CompressionCodecTestBase.java +++ b/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/CompressionCodecTestBase.java @@ -24,6 +24,7 @@ import com.starrocks.data.load.stream.v2.ChunkHttpEntity; import java.io.ByteArrayOutputStream; +import java.util.Arrays; import static com.starrocks.data.load.stream.ChunkInputStreamTest.genChunk; import static org.junit.Assert.assertArrayEquals; @@ -37,10 +38,16 @@ public void testCompressBase(CompressionCodec compressionCodec) throws Exception ChunkInputStreamTest.ChunkMeta chunkMeta = genChunk(); ChunkHttpEntity entity = new ChunkHttpEntity("test", chunkMeta.chunk); CompressionHttpEntity compressionEntity = new CompressionHttpEntity(entity, compressionCodec); + ChunkInputStreamTest.ChunkMeta chunkMeta1 = genChunk(); + ChunkHttpEntity entity1 = new ChunkHttpEntity("test", chunkMeta1.chunk); + CompressionHttpEntity compressionEntity1 = new CompressionHttpEntity(entity1, compressionCodec); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); compressionEntity.writeTo(outputStream); + compressionEntity1.writeTo(outputStream); byte[] result = outputStream.toByteArray(); - byte[] descompressData = decompress(result, (int) entity.getContentLength()); - assertArrayEquals(chunkMeta.expectedData, descompressData); + byte[] descompressData = decompress(result, (int) (entity.getContentLength() + entity1.getContentLength())); + assertArrayEquals(chunkMeta.expectedData, Arrays.copyOfRange(descompressData, 0, (int) entity.getContentLength())); + assertArrayEquals(chunkMeta1.expectedData, + Arrays.copyOfRange(descompressData, (int) entity.getContentLength(), descompressData.length)); } } diff --git a/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/LZ4FrameCompressionCodecTest.java b/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/LZ4FrameCompressionCodecTest.java index ac930224..9a8f784e 100644 --- a/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/LZ4FrameCompressionCodecTest.java +++ b/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/LZ4FrameCompressionCodecTest.java @@ -36,7 +36,12 @@ public class LZ4FrameCompressionCodecTest extends CompressionCodecTestBase { protected byte[] decompress(byte[] compressedData, int rawSize) throws Exception { byte[] result = new byte[rawSize]; LZ4FrameInputStream inputStream = new LZ4FrameInputStream(new ByteArrayInputStream(compressedData)); - inputStream.read(result); + int totalRead = 0; + int n = inputStream.read(result, totalRead, rawSize - totalRead); + while (n > 0) { + totalRead += n; + n = inputStream.read(result, totalRead, rawSize - totalRead); + } inputStream.close(); return result; }