From 89fd5664b942f0cec1c51a4a17610aac3015d080 Mon Sep 17 00:00:00 2001 From: Joel Lubinitsky <33523178+joellubi@users.noreply.github.com> Date: Tue, 9 Jul 2024 13:28:16 -0400 Subject: [PATCH] GH-41640: [Go] Implement BYTE_STREAM_SPLIT Parquet Encoding (#43066) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### Rationale for this change This encoding is defined by the [Parquet spec](https://github.com/apache/parquet-format/blob/master/Encodings.md#byte-stream-split-byte_stream_split--9) but does not currently have a Go implementation. ### What changes are included in this PR? Implement BYTE_STREAM_SPLIT encoder/decoder for: - FIXED_LEN_BYTE_ARRAY - FLOAT - DOUBLE - INT32 - INT64 ### Are these changes tested? Yes. See unit tests, file read conformance tests, and benchmarks. **Benchmark results on my machine** ``` ➜ go git:(impl-pq-bytestreamsplit) go test ./parquet/internal/encoding -run=^$ -bench=BenchmarkByteStreamSplit -benchmem goos: darwin goarch: arm64 pkg: github.com/apache/arrow/go/v17/parquet/internal/encoding BenchmarkByteStreamSplitEncodingInt32/len_1024-14 502117 2005 ns/op 2043.37 MB/s 5267 B/op 3 allocs/op BenchmarkByteStreamSplitEncodingInt32/len_2048-14 328921 3718 ns/op 2203.54 MB/s 9879 B/op 3 allocs/op BenchmarkByteStreamSplitEncodingInt32/len_4096-14 169642 7083 ns/op 2313.14 MB/s 18852 B/op 3 allocs/op BenchmarkByteStreamSplitEncodingInt32/len_8192-14 82503 14094 ns/op 2324.99 MB/s 41425 B/op 3 allocs/op BenchmarkByteStreamSplitEncodingInt32/len_16384-14 45006 26841 ns/op 2441.68 MB/s 74286 B/op 3 allocs/op BenchmarkByteStreamSplitEncodingInt32/len_32768-14 23433 51233 ns/op 2558.33 MB/s 140093 B/op 3 allocs/op BenchmarkByteStreamSplitEncodingInt32/len_65536-14 12019 99001 ns/op 2647.90 MB/s 271417 B/op 3 allocs/op BenchmarkByteStreamSplitDecodingInt32/len_1024-14 996573 1199 ns/op 3417.00 MB/s 0 B/op 0 allocs/op BenchmarkByteStreamSplitDecodingInt32/len_2048-14 503200 2380 ns/op 3442.18 MB/s 0 B/op 0 allocs/op BenchmarkByteStreamSplitDecodingInt32/len_4096-14 252038 4748 ns/op 3450.90 MB/s 0 B/op 0 allocs/op BenchmarkByteStreamSplitDecodingInt32/len_8192-14 122419 9793 ns/op 3346.08 MB/s 0 B/op 0 allocs/op BenchmarkByteStreamSplitDecodingInt32/len_16384-14 63321 19040 ns/op 3442.00 MB/s 0 B/op 0 allocs/op BenchmarkByteStreamSplitDecodingInt32/len_32768-14 31051 38677 ns/op 3388.89 MB/s 0 B/op 0 allocs/op BenchmarkByteStreamSplitDecodingInt32/len_65536-14 15792 77931 ns/op 3363.80 MB/s 0 B/op 0 allocs/op BenchmarkByteStreamSplitDecodingInt32Batched/len_1024-14 981043 1221 ns/op 3354.53 MB/s 0 B/op 0 allocs/op BenchmarkByteStreamSplitDecodingInt32Batched/len_2048-14 492319 2424 ns/op 3379.34 MB/s 0 B/op 0 allocs/op BenchmarkByteStreamSplitDecodingInt32Batched/len_4096-14 248062 4850 ns/op 3378.20 MB/s 0 B/op 0 allocs/op BenchmarkByteStreamSplitDecodingInt32Batched/len_8192-14 123064 9903 ns/op 3308.87 MB/s 0 B/op 0 allocs/op BenchmarkByteStreamSplitDecodingInt32Batched/len_16384-14 61845 19567 ns/op 3349.29 MB/s 0 B/op 0 allocs/op BenchmarkByteStreamSplitDecodingInt32Batched/len_32768-14 30568 39456 ns/op 3321.96 MB/s 0 B/op 0 allocs/op BenchmarkByteStreamSplitDecodingInt32Batched/len_65536-14 15172 78762 ns/op 3328.30 MB/s 0 B/op 0 allocs/op BenchmarkByteStreamSplitEncodingInt64/len_1024-14 319006 3690 ns/op 2220.13 MB/s 9880 B/op 3 allocs/op BenchmarkByteStreamSplitEncodingInt64/len_2048-14 161006 7132 ns/op 2297.30 MB/s 18853 B/op 3 allocs/op BenchmarkByteStreamSplitEncodingInt64/len_4096-14 85783 13925 ns/op 2353.12 MB/s 41421 B/op 3 allocs/op BenchmarkByteStreamSplitEncodingInt64/len_8192-14 45015 26943 ns/op 2432.43 MB/s 74312 B/op 3 allocs/op BenchmarkByteStreamSplitEncodingInt64/len_16384-14 20352 59259 ns/op 2211.84 MB/s 139940 B/op 3 allocs/op BenchmarkByteStreamSplitEncodingInt64/len_32768-14 10000 111143 ns/op 2358.61 MB/s 271642 B/op 3 allocs/op BenchmarkByteStreamSplitEncodingInt64/len_65536-14 5529 212652 ns/op 2465.47 MB/s 534805 B/op 3 allocs/op BenchmarkByteStreamSplitDecodingInt64/len_1024-14 528987 2355 ns/op 3478.32 MB/s 0 B/op 0 allocs/op BenchmarkByteStreamSplitDecodingInt64/len_2048-14 262707 4701 ns/op 3485.08 MB/s 0 B/op 0 allocs/op BenchmarkByteStreamSplitDecodingInt64/len_4096-14 129212 9313 ns/op 3518.63 MB/s 0 B/op 0 allocs/op BenchmarkByteStreamSplitDecodingInt64/len_8192-14 53746 23315 ns/op 2810.90 MB/s 0 B/op 0 allocs/op BenchmarkByteStreamSplitDecodingInt64/len_16384-14 28782 41054 ns/op 3192.65 MB/s 0 B/op 0 allocs/op BenchmarkByteStreamSplitDecodingInt64/len_32768-14 14803 80157 ns/op 3270.39 MB/s 0 B/op 0 allocs/op BenchmarkByteStreamSplitDecodingInt64/len_65536-14 7484 164111 ns/op 3194.72 MB/s 0 B/op 0 allocs/op BenchmarkByteStreamSplitEncodingFixedLenByteArray/len_1024-14 291716 4107 ns/op 997.43 MB/s 5276 B/op 3 allocs/op BenchmarkByteStreamSplitEncodingFixedLenByteArray/len_2048-14 148888 7975 ns/op 1027.18 MB/s 9914 B/op 3 allocs/op BenchmarkByteStreamSplitEncodingFixedLenByteArray/len_4096-14 76587 15677 ns/op 1045.11 MB/s 18955 B/op 3 allocs/op BenchmarkByteStreamSplitEncodingFixedLenByteArray/len_8192-14 39758 30277 ns/op 1082.26 MB/s 41752 B/op 3 allocs/op BenchmarkByteStreamSplitEncodingFixedLenByteArray/len_16384-14 20306 59506 ns/op 1101.33 MB/s 74937 B/op 3 allocs/op BenchmarkByteStreamSplitEncodingFixedLenByteArray/len_32768-14 10000 116043 ns/op 1129.52 MB/s 141290 B/op 3 allocs/op BenchmarkByteStreamSplitEncodingFixedLenByteArray/len_65536-14 4770 236887 ns/op 1106.62 MB/s 277583 B/op 3 allocs/op BenchmarkByteStreamSplitDecodingFixedLenByteArray/len_1024-14 601875 1723 ns/op 2376.70 MB/s 0 B/op 0 allocs/op BenchmarkByteStreamSplitDecodingFixedLenByteArray/len_2048-14 363206 3422 ns/op 2394.18 MB/s 0 B/op 0 allocs/op BenchmarkByteStreamSplitDecodingFixedLenByteArray/len_4096-14 173041 6906 ns/op 2372.45 MB/s 0 B/op 0 allocs/op BenchmarkByteStreamSplitDecodingFixedLenByteArray/len_8192-14 81810 14307 ns/op 2290.40 MB/s 0 B/op 0 allocs/op BenchmarkByteStreamSplitDecodingFixedLenByteArray/len_16384-14 40518 29101 ns/op 2252.04 MB/s 1 B/op 0 allocs/op BenchmarkByteStreamSplitDecodingFixedLenByteArray/len_32768-14 21338 56678 ns/op 2312.58 MB/s 6 B/op 1 allocs/op BenchmarkByteStreamSplitDecodingFixedLenByteArray/len_65536-14 10000 111433 ns/op 2352.49 MB/s 26 B/op 6 allocs/op PASS ok github.com/apache/arrow/go/v17/parquet/internal/encoding 69.109s ``` ### Are there any user-facing changes? New ByteStreamSplit encoding option available. Godoc updated to reflect this. * GitHub Issue: #41640 Authored-by: Joel Lubinitsky Signed-off-by: Matt Topol --- go/parquet/doc.go | 14 +- go/parquet/file/column_reader.go | 5 +- go/parquet/file/column_writer_test.go | 9 + go/parquet/file/file_reader_test.go | 136 ++++++ go/parquet/file/file_writer_test.go | 76 ++++ .../internal/encoding/byte_stream_split.go | 389 ++++++++++++++++++ .../encoding/encoding_benchmarks_test.go | 170 ++++++++ go/parquet/internal/encoding/encoding_test.go | 11 + .../encoding/fixed_len_byte_array_decoder.go | 71 ++++ .../encoding/fixed_len_byte_array_encoder.go | 39 ++ .../internal/encoding/typed_encoder.gen.go | 20 + .../encoding/typed_encoder.gen.go.tmpl | 8 + go/parquet/types.go | 2 + 13 files changed, 945 insertions(+), 5 deletions(-) create mode 100644 go/parquet/internal/encoding/byte_stream_split.go diff --git a/go/parquet/doc.go b/go/parquet/doc.go index 6ab08f83f063f..c580b8e317a67 100644 --- a/go/parquet/doc.go +++ b/go/parquet/doc.go @@ -60,8 +60,18 @@ // # Encodings // // The encoding types supported in this package are: -// Plain, Plain/RLE Dictionary, Delta Binary Packed (only integer types), Delta Byte Array -// (only ByteArray), Delta Length Byte Array (only ByteArray) +// +// - Plain +// +// - Plain/RLE Dictionary +// +// - Delta Binary Packed (only integer types) +// +// - Delta Byte Array (only ByteArray) +// +// - Delta Length Byte Array (only ByteArray) +// +// - Byte Stream Split (Float, Double, Int32, Int64, FixedLenByteArray) // // Tip: Some platforms don't necessarily support all kinds of encodings. If you're not // sure what to use, just use Plain and Dictionary encoding. diff --git a/go/parquet/file/column_reader.go b/go/parquet/file/column_reader.go index e441cd3e9c2d2..74a1b4486a703 100644 --- a/go/parquet/file/column_reader.go +++ b/go/parquet/file/column_reader.go @@ -354,13 +354,12 @@ func (c *columnChunkReader) initDataDecoder(page Page, lvlByteLen int64) error { case format.Encoding_PLAIN, format.Encoding_DELTA_BYTE_ARRAY, format.Encoding_DELTA_LENGTH_BYTE_ARRAY, - format.Encoding_DELTA_BINARY_PACKED: + format.Encoding_DELTA_BINARY_PACKED, + format.Encoding_BYTE_STREAM_SPLIT: c.curDecoder = c.decoderTraits.Decoder(parquet.Encoding(encoding), c.descr, false, c.mem) c.decoders[encoding] = c.curDecoder case format.Encoding_RLE_DICTIONARY: return errors.New("parquet: dictionary page must be before data page") - case format.Encoding_BYTE_STREAM_SPLIT: - return fmt.Errorf("parquet: unsupported data encoding %s", encoding) default: return fmt.Errorf("parquet: unknown encoding type %s", encoding) } diff --git a/go/parquet/file/column_writer_test.go b/go/parquet/file/column_writer_test.go index c8d61952064fe..cd2408f4fba5d 100755 --- a/go/parquet/file/column_writer_test.go +++ b/go/parquet/file/column_writer_test.go @@ -459,6 +459,15 @@ func (p *PrimitiveWriterTestSuite) TestRequiredPlain() { p.testRequiredWithEncoding(parquet.Encodings.Plain) } +func (p *PrimitiveWriterTestSuite) TestRequiredByteStreamSplit() { + switch p.Typ { + case reflect.TypeOf(float32(0)), reflect.TypeOf(float64(0)), reflect.TypeOf(int32(0)), reflect.TypeOf(int64(0)), reflect.TypeOf(parquet.FixedLenByteArray{}): + p.testRequiredWithEncoding(parquet.Encodings.ByteStreamSplit) + default: + p.Panics(func() { p.testRequiredWithEncoding(parquet.Encodings.ByteStreamSplit) }) + } +} + func (p *PrimitiveWriterTestSuite) TestRequiredDictionary() { p.testRequiredWithEncoding(parquet.Encodings.PlainDict) } diff --git a/go/parquet/file/file_reader_test.go b/go/parquet/file/file_reader_test.go index 8056a837ea19e..d4faf26086f93 100644 --- a/go/parquet/file/file_reader_test.go +++ b/go/parquet/file/file_reader_test.go @@ -20,6 +20,7 @@ import ( "bytes" "crypto/rand" "encoding/binary" + "fmt" "io" "os" "path" @@ -446,3 +447,138 @@ func TestRleBooleanEncodingFileRead(t *testing.T) { assert.Equal(t, expected, values[:len(expected)]) } + +func TestByteStreamSplitEncodingFileRead(t *testing.T) { + dir := os.Getenv("PARQUET_TEST_DATA") + if dir == "" { + t.Skip("no path supplied with PARQUET_TEST_DATA") + } + require.DirExists(t, dir) + + props := parquet.NewReaderProperties(memory.DefaultAllocator) + fileReader, err := file.OpenParquetFile(path.Join(dir, "byte_stream_split_extended.gzip.parquet"), + false, file.WithReadProps(props)) + require.NoError(t, err) + defer fileReader.Close() + + nRows := 200 + nCols := 14 + require.Equal(t, 1, fileReader.NumRowGroups()) + rgr := fileReader.RowGroup(0) + require.EqualValues(t, nRows, rgr.NumRows()) + require.EqualValues(t, nCols, rgr.NumColumns()) + + // Helper to unpack values from column of a specific type + getValues := func(rdr file.ColumnChunkReader, typ parquet.Type) any { + var ( + vals any + total int64 + read int + err error + ) + + switch typ { + case parquet.Types.FixedLenByteArray: + r, ok := rdr.(*file.FixedLenByteArrayColumnChunkReader) + require.True(t, ok) + + values := make([]parquet.FixedLenByteArray, nRows) + total, read, err = r.ReadBatch(int64(nRows), values, nil, nil) + vals = values + case parquet.Types.Float: + r, ok := rdr.(*file.Float32ColumnChunkReader) + require.True(t, ok) + + values := make([]float32, nRows) + total, read, err = r.ReadBatch(int64(nRows), values, nil, nil) + vals = values + case parquet.Types.Double: + r, ok := rdr.(*file.Float64ColumnChunkReader) + require.True(t, ok) + + values := make([]float64, nRows) + total, read, err = r.ReadBatch(int64(nRows), values, nil, nil) + vals = values + case parquet.Types.Int32: + r, ok := rdr.(*file.Int32ColumnChunkReader) + require.True(t, ok) + + values := make([]int32, nRows) + total, read, err = r.ReadBatch(int64(nRows), values, nil, nil) + vals = values + case parquet.Types.Int64: + r, ok := rdr.(*file.Int64ColumnChunkReader) + require.True(t, ok) + + values := make([]int64, nRows) + total, read, err = r.ReadBatch(int64(nRows), values, nil, nil) + vals = values + default: + t.Fatalf("unrecognized parquet type: %s", typ) + } + + require.NoError(t, err) + require.EqualValues(t, nRows, total) + require.EqualValues(t, nRows, read) + + return vals + } + + // Test conformance against Parquet reference + // Expected structure: https://github.com/apache/parquet-testing/blob/1bf4bd39df2135d132451c281754268f03dc1c0e/data/README.md?plain=1#L358 + for i, tc := range []struct { + PhysicalType parquet.Type + LogicalType schema.LogicalType + }{ + { + PhysicalType: parquet.Types.FixedLenByteArray, + LogicalType: schema.Float16LogicalType{}, + }, + { + PhysicalType: parquet.Types.Float, + LogicalType: schema.NoLogicalType{}, + }, + { + PhysicalType: parquet.Types.Double, + LogicalType: schema.NoLogicalType{}, + }, + { + PhysicalType: parquet.Types.Int32, + LogicalType: schema.NoLogicalType{}, + }, + { + PhysicalType: parquet.Types.Int64, + LogicalType: schema.NoLogicalType{}, + }, + { + PhysicalType: parquet.Types.FixedLenByteArray, + LogicalType: schema.NoLogicalType{}, + }, + { + PhysicalType: parquet.Types.FixedLenByteArray, + LogicalType: schema.NewDecimalLogicalType(7, 3), + }, + } { + t.Run(fmt.Sprintf("(Physical:%s/Logical:%s)", tc.PhysicalType, tc.LogicalType), func(t *testing.T) { + // Iterate through pairs of adjacent columns + colIdx := 2 * i + + // Read Plain-encoded column + rdrPlain, err := rgr.Column(colIdx) + require.NoError(t, err) + + // Read ByteStreamSplit-encoded column + rdrByteStreamSplit, err := rgr.Column(colIdx + 1) + require.NoError(t, err) + + // Logical types match + require.True(t, rdrPlain.Descriptor().LogicalType().Equals(tc.LogicalType)) + require.True(t, rdrByteStreamSplit.Descriptor().LogicalType().Equals(tc.LogicalType)) + + // Decoded values match + valuesPlain := getValues(rdrPlain, tc.PhysicalType) + valuesByteStreamSplit := getValues(rdrByteStreamSplit, tc.PhysicalType) + require.Equal(t, valuesPlain, valuesByteStreamSplit) + }) + } +} diff --git a/go/parquet/file/file_writer_test.go b/go/parquet/file/file_writer_test.go index a183022357d62..e5ad1b07e25de 100644 --- a/go/parquet/file/file_writer_test.go +++ b/go/parquet/file/file_writer_test.go @@ -464,3 +464,79 @@ func TestCloseError(t *testing.T) { writer := file.NewParquetWriter(sink, sc) assert.Error(t, writer.Close()) } + +func TestBatchedByteStreamSplitFileRoundtrip(t *testing.T) { + input := []parquet.FixedLenByteArray{ + {1, 2}, + {3, 4}, + {5, 6}, + {7, 8}, + } + + size := len(input) + chunk := size / 2 + + props := parquet.NewWriterProperties( + parquet.WithEncoding(parquet.Encodings.ByteStreamSplit), + parquet.WithDictionaryDefault(false), + parquet.WithBatchSize(int64(chunk)), + parquet.WithDataPageSize(int64(size)*2), + ) + + field, err := schema.NewPrimitiveNodeLogical("f16", parquet.Repetitions.Required, schema.Float16LogicalType{}, parquet.Types.FixedLenByteArray, 2, 1) + require.NoError(t, err) + + schema, err := schema.NewGroupNode("test", parquet.Repetitions.Required, schema.FieldList{field}, 0) + require.NoError(t, err) + + sink := encoding.NewBufferWriter(0, memory.DefaultAllocator) + writer := file.NewParquetWriter(sink, schema, file.WithWriterProps(props)) + + rgw := writer.AppendRowGroup() + cw, err := rgw.NextColumn() + require.NoError(t, err) + + f16ColumnWriter, ok := cw.(*file.FixedLenByteArrayColumnChunkWriter) + require.True(t, ok) + + nVals, err := f16ColumnWriter.WriteBatch(input[:chunk], nil, nil) + require.NoError(t, err) + require.EqualValues(t, chunk, nVals) + + nVals, err = f16ColumnWriter.WriteBatch(input[chunk:], nil, nil) + require.NoError(t, err) + require.EqualValues(t, chunk, nVals) + + require.NoError(t, cw.Close()) + require.NoError(t, rgw.Close()) + require.NoError(t, writer.Close()) + + rdr, err := file.NewParquetReader(bytes.NewReader(sink.Bytes())) + require.NoError(t, err) + + require.Equal(t, 1, rdr.NumRowGroups()) + require.EqualValues(t, size, rdr.NumRows()) + + rgr := rdr.RowGroup(0) + cr, err := rgr.Column(0) + require.NoError(t, err) + + f16ColumnReader, ok := cr.(*file.FixedLenByteArrayColumnChunkReader) + require.True(t, ok) + + output := make([]parquet.FixedLenByteArray, size) + + total, valuesRead, err := f16ColumnReader.ReadBatch(int64(chunk), output[:chunk], nil, nil) + require.NoError(t, err) + require.EqualValues(t, chunk, total) + require.EqualValues(t, chunk, valuesRead) + + total, valuesRead, err = f16ColumnReader.ReadBatch(int64(chunk), output[chunk:], nil, nil) + require.NoError(t, err) + require.EqualValues(t, chunk, total) + require.EqualValues(t, chunk, valuesRead) + + require.Equal(t, input, output) + + require.NoError(t, rdr.Close()) +} diff --git a/go/parquet/internal/encoding/byte_stream_split.go b/go/parquet/internal/encoding/byte_stream_split.go new file mode 100644 index 0000000000000..3772aa876173f --- /dev/null +++ b/go/parquet/internal/encoding/byte_stream_split.go @@ -0,0 +1,389 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package encoding + +import ( + "fmt" + "math" + + "github.com/apache/arrow/go/v17/arrow" + "github.com/apache/arrow/go/v17/arrow/memory" + "github.com/apache/arrow/go/v17/parquet" + "github.com/apache/arrow/go/v17/parquet/internal/debug" + "golang.org/x/xerrors" +) + +// encodeByteStreamSplit encodes the raw bytes provided by 'in' into the output buffer 'data' using BYTE_STREAM_SPLIT encoding. +// 'data' must have space for at least len(in) bytes. +func encodeByteStreamSplit(data []byte, in []byte, width int) { + debug.Assert(len(data) >= len(in), fmt.Sprintf("not enough space in destination buffer for encoding, dest: %d bytes, src: %d bytes", len(data), len(in))) + numElements := len(in) / width + for stream := 0; stream < width; stream++ { + for element := 0; element < numElements; element++ { + encLoc := numElements*stream + element + decLoc := width*element + stream + data[encLoc] = in[decLoc] + } + } +} + +// encodeByteStreamSplitWidth2 implements encodeByteStreamSplit optimized for types stored using 2 bytes. +// 'data' must have space for at least len(in) bytes. +func encodeByteStreamSplitWidth2(data []byte, in []byte) { + debug.Assert(len(data) >= len(in), fmt.Sprintf("not enough space in destination buffer for encoding, dest: %d bytes, src: %d bytes", len(data), len(in))) + const width = 2 + numElements := len(in) / width + for element := 0; element < numElements; element++ { + decLoc := width * element + data[element] = in[decLoc] + data[numElements+element] = in[decLoc+1] + } +} + +// encodeByteStreamSplitWidth4 implements encodeByteStreamSplit optimized for types stored using 4 bytes. +// 'data' must have space for at least len(in) bytes. +func encodeByteStreamSplitWidth4(data []byte, in []byte) { + debug.Assert(len(data) >= len(in), fmt.Sprintf("not enough space in destination buffer for encoding, dest: %d bytes, src: %d bytes", len(data), len(in))) + const width = 4 + numElements := len(in) / width + for element := 0; element < numElements; element++ { + decLoc := width * element + data[element] = in[decLoc] + data[numElements+element] = in[decLoc+1] + data[numElements*2+element] = in[decLoc+2] + data[numElements*3+element] = in[decLoc+3] + } +} + +// encodeByteStreamSplitWidth8 implements encodeByteStreamSplit optimized for types stored using 8 bytes. +// 'data' must have space for at least len(in) bytes. +func encodeByteStreamSplitWidth8(data []byte, in []byte) { + debug.Assert(len(data) >= len(in), fmt.Sprintf("not enough space in destination buffer for encoding, dest: %d bytes, src: %d bytes", len(data), len(in))) + const width = 8 + numElements := len(in) / width + for element := 0; element < numElements; element++ { + decLoc := width * element + data[element] = in[decLoc] + data[numElements+element] = in[decLoc+1] + data[numElements*2+element] = in[decLoc+2] + data[numElements*3+element] = in[decLoc+3] + data[numElements*4+element] = in[decLoc+4] + data[numElements*5+element] = in[decLoc+5] + data[numElements*6+element] = in[decLoc+6] + data[numElements*7+element] = in[decLoc+7] + } +} + +// decodeByteStreamSplitBatchWidth4 decodes the batch of nValues raw bytes representing a 4-byte datatype provided by 'data', +// into the output buffer 'out' using BYTE_STREAM_SPLIT encoding. +// 'out' must have space for at least len(data) bytes. +func decodeByteStreamSplitBatchWidth4(data []byte, nValues, stride int, out []byte) { + debug.Assert(len(out) >= len(data), fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data))) + const width = 4 + for element := 0; element < nValues; element++ { + out[width*element] = data[element] + out[width*element+1] = data[stride+element] + out[width*element+2] = data[2*stride+element] + out[width*element+3] = data[3*stride+element] + } +} + +// decodeByteStreamSplitBatchWidth8 decodes the batch of nValues raw bytes representing a 8-byte datatype provided by 'data', +// into the output buffer 'out' using BYTE_STREAM_SPLIT encoding. +// 'out' must have space for at least len(data) bytes. +func decodeByteStreamSplitBatchWidth8(data []byte, nValues, stride int, out []byte) { + debug.Assert(len(out) >= len(data), fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data))) + const width = 8 + for element := 0; element < nValues; element++ { + out[width*element] = data[element] + out[width*element+1] = data[stride+element] + out[width*element+2] = data[2*stride+element] + out[width*element+3] = data[3*stride+element] + out[width*element+4] = data[4*stride+element] + out[width*element+5] = data[5*stride+element] + out[width*element+6] = data[6*stride+element] + out[width*element+7] = data[7*stride+element] + } +} + +// decodeByteStreamSplitBatchFLBA decodes the batch of nValues FixedLenByteArrays provided by 'data', +// into the output slice 'out' using BYTE_STREAM_SPLIT encoding. +// 'out' must have space for at least nValues slices. +func decodeByteStreamSplitBatchFLBA(data []byte, nValues, stride, width int, out []parquet.FixedLenByteArray) { + debug.Assert(len(out) >= nValues, fmt.Sprintf("not enough space in output slice for decoding, out: %d values, data: %d values", len(out), nValues)) + for stream := 0; stream < width; stream++ { + for element := 0; element < nValues; element++ { + encLoc := stride*stream + element + out[element][stream] = data[encLoc] + } + } +} + +// decodeByteStreamSplitBatchFLBAWidth2 decodes the batch of nValues FixedLenByteArrays of length 2 provided by 'data', +// into the output slice 'out' using BYTE_STREAM_SPLIT encoding. +// 'out' must have space for at least nValues slices. +func decodeByteStreamSplitBatchFLBAWidth2(data []byte, nValues, stride int, out []parquet.FixedLenByteArray) { + debug.Assert(len(out) >= nValues, fmt.Sprintf("not enough space in output slice for decoding, out: %d values, data: %d values", len(out), nValues)) + for element := 0; element < nValues; element++ { + out[element][0] = data[element] + out[element][1] = data[stride+element] + } +} + +// decodeByteStreamSplitBatchFLBAWidth4 decodes the batch of nValues FixedLenByteArrays of length 4 provided by 'data', +// into the output slice 'out' using BYTE_STREAM_SPLIT encoding. +// 'out' must have space for at least nValues slices. +func decodeByteStreamSplitBatchFLBAWidth4(data []byte, nValues, stride int, out []parquet.FixedLenByteArray) { + debug.Assert(len(out) >= nValues, fmt.Sprintf("not enough space in output slice for decoding, out: %d values, data: %d values", len(out), nValues)) + for element := 0; element < nValues; element++ { + out[element][0] = data[element] + out[element][1] = data[stride+element] + out[element][2] = data[stride*2+element] + out[element][3] = data[stride*3+element] + } +} + +// decodeByteStreamSplitBatchFLBAWidth8 decodes the batch of nValues FixedLenByteArrays of length 8 provided by 'data', +// into the output slice 'out' using BYTE_STREAM_SPLIT encoding. +// 'out' must have space for at least nValues slices. +func decodeByteStreamSplitBatchFLBAWidth8(data []byte, nValues, stride int, out []parquet.FixedLenByteArray) { + debug.Assert(len(out) >= nValues, fmt.Sprintf("not enough space in output slice for decoding, out: %d values, data: %d values", len(out), nValues)) + for element := 0; element < nValues; element++ { + out[element][0] = data[element] + out[element][1] = data[stride+element] + out[element][2] = data[stride*2+element] + out[element][3] = data[stride*3+element] + out[element][4] = data[stride*4+element] + out[element][5] = data[stride*5+element] + out[element][6] = data[stride*6+element] + out[element][7] = data[stride*7+element] + } +} + +func releaseBufferToPool(pooled *PooledBufferWriter) { + buf := pooled.buf + memory.Set(buf.Buf(), 0) + buf.ResizeNoShrink(0) + bufferPool.Put(buf) +} + +func validateByteStreamSplitPageData(typeLen, nvals int, data []byte) (int, error) { + if nvals*typeLen < len(data) { + return 0, fmt.Errorf("data size (%d) is too small for the number of values in in BYTE_STREAM_SPLIT (%d)", len(data), nvals) + } + + if len(data)%typeLen != 0 { + return 0, fmt.Errorf("ByteStreamSplit data size %d not aligned with byte_width: %d", len(data), typeLen) + } + + return len(data) / typeLen, nil +} + +// ByteStreamSplitFloat32Encoder writes the underlying bytes of the Float32 +// into interlaced streams as defined by the BYTE_STREAM_SPLIT encoding +type ByteStreamSplitFloat32Encoder struct { + PlainFloat32Encoder + flushBuffer *PooledBufferWriter +} + +func (enc *ByteStreamSplitFloat32Encoder) FlushValues() (Buffer, error) { + in, err := enc.PlainFloat32Encoder.FlushValues() + if err != nil { + return nil, err + } + + if enc.flushBuffer == nil { + enc.flushBuffer = NewPooledBufferWriter(in.Len()) + } + + enc.flushBuffer.buf.Resize(in.Len()) + encodeByteStreamSplitWidth4(enc.flushBuffer.Bytes(), in.Bytes()) + return enc.flushBuffer.Finish(), nil +} + +func (enc *ByteStreamSplitFloat32Encoder) Release() { + enc.PlainFloat32Encoder.Release() + releaseBufferToPool(enc.flushBuffer) + enc.flushBuffer = nil +} + +// ByteStreamSplitFloat64Encoder writes the underlying bytes of the Float64 +// into interlaced streams as defined by the BYTE_STREAM_SPLIT encoding +type ByteStreamSplitFloat64Encoder struct { + PlainFloat64Encoder + flushBuffer *PooledBufferWriter +} + +func (enc *ByteStreamSplitFloat64Encoder) FlushValues() (Buffer, error) { + in, err := enc.PlainFloat64Encoder.FlushValues() + if err != nil { + return nil, err + } + + if enc.flushBuffer == nil { + enc.flushBuffer = NewPooledBufferWriter(in.Len()) + } + + enc.flushBuffer.buf.Resize(in.Len()) + encodeByteStreamSplitWidth8(enc.flushBuffer.Bytes(), in.Bytes()) + return enc.flushBuffer.Finish(), nil +} + +func (enc *ByteStreamSplitFloat64Encoder) Release() { + enc.PlainFloat64Encoder.Release() + releaseBufferToPool(enc.flushBuffer) + enc.flushBuffer = nil +} + +// ByteStreamSplitInt32Encoder writes the underlying bytes of the Int32 +// into interlaced streams as defined by the BYTE_STREAM_SPLIT encoding +type ByteStreamSplitInt32Encoder struct { + PlainInt32Encoder + flushBuffer *PooledBufferWriter +} + +func (enc *ByteStreamSplitInt32Encoder) FlushValues() (Buffer, error) { + in, err := enc.PlainInt32Encoder.FlushValues() + if err != nil { + return nil, err + } + + if enc.flushBuffer == nil { + enc.flushBuffer = NewPooledBufferWriter(in.Len()) + } + + enc.flushBuffer.buf.Resize(in.Len()) + encodeByteStreamSplitWidth4(enc.flushBuffer.Bytes(), in.Bytes()) + return enc.flushBuffer.Finish(), nil +} + +func (enc *ByteStreamSplitInt32Encoder) Release() { + enc.PlainInt32Encoder.Release() + releaseBufferToPool(enc.flushBuffer) + enc.flushBuffer = nil +} + +// ByteStreamSplitInt64Encoder writes the underlying bytes of the Int64 +// into interlaced streams as defined by the BYTE_STREAM_SPLIT encoding +type ByteStreamSplitInt64Encoder struct { + PlainInt64Encoder + flushBuffer *PooledBufferWriter +} + +func (enc *ByteStreamSplitInt64Encoder) FlushValues() (Buffer, error) { + in, err := enc.PlainInt64Encoder.FlushValues() + if err != nil { + return nil, err + } + + if enc.flushBuffer == nil { + enc.flushBuffer = NewPooledBufferWriter(in.Len()) + } + + enc.flushBuffer.buf.Resize(in.Len()) + encodeByteStreamSplitWidth8(enc.flushBuffer.Bytes(), in.Bytes()) + return enc.flushBuffer.Finish(), nil +} + +func (enc *ByteStreamSplitInt64Encoder) Release() { + enc.PlainInt64Encoder.Release() + releaseBufferToPool(enc.flushBuffer) + enc.flushBuffer = nil +} + +// ByteStreamSplitFloat32Decoder is a decoder for BYTE_STREAM_SPLIT-encoded +// bytes representing Float32 values +type ByteStreamSplitFloat32Decoder = ByteStreamSplitDecoder[float32] + +// ByteStreamSplitFloat64Decoder is a decoder for BYTE_STREAM_SPLIT-encoded +// bytes representing Float64 values +type ByteStreamSplitFloat64Decoder = ByteStreamSplitDecoder[float64] + +// ByteStreamSplitInt32Decoder is a decoder for BYTE_STREAM_SPLIT-encoded +// bytes representing Int32 values +type ByteStreamSplitInt32Decoder = ByteStreamSplitDecoder[int32] + +// ByteStreamSplitInt64Decoder is a decoder for BYTE_STREAM_SPLIT-encoded +// bytes representing Int64 values +type ByteStreamSplitInt64Decoder = ByteStreamSplitDecoder[int64] + +type ByteStreamSplitDecoder[T float32 | float64 | int32 | int64] struct { + decoder + stride int +} + +func (dec *ByteStreamSplitDecoder[T]) Type() parquet.Type { + switch v := any(dec).(type) { + case *ByteStreamSplitDecoder[float32]: + return parquet.Types.Float + case *ByteStreamSplitDecoder[float64]: + return parquet.Types.Double + case *ByteStreamSplitDecoder[int32]: + return parquet.Types.Int32 + case *ByteStreamSplitDecoder[int64]: + return parquet.Types.Int64 + default: + panic(fmt.Sprintf("ByteStreamSplitDecoder is not supported for type: %T", v)) + } +} + +func (dec *ByteStreamSplitDecoder[T]) SetData(nvals int, data []byte) error { + nvals, err := validateByteStreamSplitPageData(dec.Type().ByteSize(), nvals, data) + if err != nil { + return err + } + + dec.stride = nvals + return dec.decoder.SetData(nvals, data) +} + +func (dec *ByteStreamSplitDecoder[T]) Decode(out []T) (int, error) { + typeLen := dec.Type().ByteSize() + toRead := len(out) + numBytesNeeded := toRead * typeLen + if numBytesNeeded > len(dec.data) || numBytesNeeded > math.MaxInt32 { + return 0, xerrors.New("parquet: eof exception") + } + + outBytes := arrow.GetBytes(out) + switch typeLen { + case 4: + decodeByteStreamSplitBatchWidth4(dec.data, toRead, dec.stride, outBytes) + case 8: + decodeByteStreamSplitBatchWidth8(dec.data, toRead, dec.stride, outBytes) + default: + return 0, fmt.Errorf("encoding ByteStreamSplit is only defined for numeric type of width 4 or 8, found: %d", typeLen) + } + + dec.nvals -= toRead + dec.data = dec.data[toRead:] + + return toRead, nil +} + +func (dec *ByteStreamSplitDecoder[T]) DecodeSpaced(out []T, nullCount int, validBits []byte, validBitsOffset int64) (int, error) { + toRead := len(out) - nullCount + valuesRead, err := dec.Decode(out[:toRead]) + if err != nil { + return valuesRead, err + } + if valuesRead != toRead { + return valuesRead, xerrors.New("parquet: number of values / definitions levels read did not match") + } + + return spacedExpand(out, nullCount, validBits, validBitsOffset), nil +} diff --git a/go/parquet/internal/encoding/encoding_benchmarks_test.go b/go/parquet/internal/encoding/encoding_benchmarks_test.go index 0252aa4801430..6bf0aec0d2035 100644 --- a/go/parquet/internal/encoding/encoding_benchmarks_test.go +++ b/go/parquet/internal/encoding/encoding_benchmarks_test.go @@ -464,3 +464,173 @@ func BenchmarkDecodeDictByteArray(b *testing.B) { dictDec.Decode(out) } } + +func BenchmarkByteStreamSplitEncodingInt32(b *testing.B) { + for sz := MINSIZE; sz < MAXSIZE+1; sz *= 2 { + b.Run(fmt.Sprintf("len %d", sz), func(b *testing.B) { + values := make([]int32, sz) + for idx := range values { + values[idx] = 64 + } + encoder := encoding.NewEncoder(parquet.Types.Int32, parquet.Encodings.ByteStreamSplit, + false, nil, memory.DefaultAllocator).(encoding.Int32Encoder) + b.ResetTimer() + b.SetBytes(int64(len(values) * arrow.Int32SizeBytes)) + for n := 0; n < b.N; n++ { + encoder.Put(values) + buf, _ := encoder.FlushValues() + buf.Release() + } + }) + } +} + +func BenchmarkByteStreamSplitDecodingInt32(b *testing.B) { + for sz := MINSIZE; sz < MAXSIZE+1; sz *= 2 { + b.Run(fmt.Sprintf("len %d", sz), func(b *testing.B) { + output := make([]int32, sz) + values := make([]int32, sz) + for idx := range values { + values[idx] = 64 + } + encoder := encoding.NewEncoder(parquet.Types.Int32, parquet.Encodings.ByteStreamSplit, + false, nil, memory.DefaultAllocator).(encoding.Int32Encoder) + encoder.Put(values) + buf, _ := encoder.FlushValues() + defer buf.Release() + + decoder := encoding.NewDecoder(parquet.Types.Int32, parquet.Encodings.ByteStreamSplit, nil, memory.DefaultAllocator) + b.ResetTimer() + b.SetBytes(int64(len(values) * arrow.Int32SizeBytes)) + for n := 0; n < b.N; n++ { + decoder.SetData(sz, buf.Bytes()) + decoder.(encoding.Int32Decoder).Decode(output) + } + }) + } +} + +func BenchmarkByteStreamSplitDecodingInt32Batched(b *testing.B) { + const batchSize = 512 + for sz := MINSIZE; sz < MAXSIZE+1; sz *= 2 { + b.Run(fmt.Sprintf("len %d", sz), func(b *testing.B) { + output := make([]int32, sz) + values := make([]int32, sz) + for idx := range values { + values[idx] = 64 + } + encoder := encoding.NewEncoder(parquet.Types.Int32, parquet.Encodings.ByteStreamSplit, + false, nil, memory.DefaultAllocator).(encoding.Int32Encoder) + encoder.Put(values) + buf, _ := encoder.FlushValues() + defer buf.Release() + + decoder := encoding.NewDecoder(parquet.Types.Int32, parquet.Encodings.ByteStreamSplit, nil, memory.DefaultAllocator) + b.ResetTimer() + b.SetBytes(int64(len(values) * arrow.Int32SizeBytes)) + for n := 0; n < b.N; n++ { + decoder.SetData(sz, buf.Bytes()) + for batch := 0; batch*batchSize < sz; batch++ { + offset := batch * batchSize + decoder.(encoding.Int32Decoder).Decode(output[offset : offset+batchSize]) + } + } + }) + } +} + +func BenchmarkByteStreamSplitEncodingInt64(b *testing.B) { + for sz := MINSIZE; sz < MAXSIZE+1; sz *= 2 { + b.Run(fmt.Sprintf("len %d", sz), func(b *testing.B) { + values := make([]int64, sz) + for idx := range values { + values[idx] = 64 + } + encoder := encoding.NewEncoder(parquet.Types.Int64, parquet.Encodings.ByteStreamSplit, + false, nil, memory.DefaultAllocator).(encoding.Int64Encoder) + b.ResetTimer() + b.SetBytes(int64(len(values) * arrow.Int64SizeBytes)) + for n := 0; n < b.N; n++ { + encoder.Put(values) + buf, _ := encoder.FlushValues() + buf.Release() + } + }) + } +} + +func BenchmarkByteStreamSplitDecodingInt64(b *testing.B) { + for sz := MINSIZE; sz < MAXSIZE+1; sz *= 2 { + b.Run(fmt.Sprintf("len %d", sz), func(b *testing.B) { + output := make([]int64, sz) + values := make([]int64, sz) + for idx := range values { + values[idx] = 64 + } + encoder := encoding.NewEncoder(parquet.Types.Int64, parquet.Encodings.ByteStreamSplit, + false, nil, memory.DefaultAllocator).(encoding.Int64Encoder) + encoder.Put(values) + buf, _ := encoder.FlushValues() + defer buf.Release() + + decoder := encoding.NewDecoder(parquet.Types.Int64, parquet.Encodings.ByteStreamSplit, nil, memory.DefaultAllocator) + b.ResetTimer() + b.SetBytes(int64(len(values) * arrow.Int64SizeBytes)) + for n := 0; n < b.N; n++ { + decoder.SetData(sz, buf.Bytes()) + decoder.(encoding.Int64Decoder).Decode(output) + } + }) + } +} + +func BenchmarkByteStreamSplitEncodingFixedLenByteArray(b *testing.B) { + for sz := MINSIZE; sz < MAXSIZE+1; sz *= 2 { + b.Run(fmt.Sprintf("len %d", sz), func(b *testing.B) { + values := make([]parquet.FixedLenByteArray, sz) + for idx := range values { + values[idx] = []byte{0x12, 0x34, 0x56, 0x78} + } + + arraySize := len(values[0]) + col := schema.NewColumn(schema.NewFixedLenByteArrayNode("fixedlenbytearray", parquet.Repetitions.Required, int32(arraySize), -1), 0, 0) + encoder := encoding.NewEncoder(parquet.Types.FixedLenByteArray, parquet.Encodings.ByteStreamSplit, + false, col, memory.DefaultAllocator).(encoding.FixedLenByteArrayEncoder) + b.ResetTimer() + b.SetBytes(int64(len(values) * arraySize)) + for n := 0; n < b.N; n++ { + encoder.Put(values) + buf, _ := encoder.FlushValues() + buf.Release() + } + }) + } +} + +func BenchmarkByteStreamSplitDecodingFixedLenByteArray(b *testing.B) { + for sz := MINSIZE; sz < MAXSIZE+1; sz *= 2 { + b.Run(fmt.Sprintf("len %d", sz), func(b *testing.B) { + output := make([]parquet.FixedLenByteArray, sz) + values := make([]parquet.FixedLenByteArray, sz) + for idx := range values { + values[idx] = []byte{0x12, 0x34, 0x56, 0x78} + } + + arraySize := len(values[0]) + col := schema.NewColumn(schema.NewFixedLenByteArrayNode("fixedlenbytearray", parquet.Repetitions.Required, int32(arraySize), -1), 0, 0) + encoder := encoding.NewEncoder(parquet.Types.FixedLenByteArray, parquet.Encodings.ByteStreamSplit, + false, col, memory.DefaultAllocator).(encoding.FixedLenByteArrayEncoder) + encoder.Put(values) + buf, _ := encoder.FlushValues() + defer buf.Release() + + decoder := encoding.NewDecoder(parquet.Types.FixedLenByteArray, parquet.Encodings.ByteStreamSplit, col, memory.DefaultAllocator) + b.ResetTimer() + b.SetBytes(int64(len(values) * arraySize)) + for n := 0; n < b.N; n++ { + decoder.SetData(sz, buf.Bytes()) + decoder.(encoding.FixedLenByteArrayDecoder).Decode(output) + } + }) + } +} diff --git a/go/parquet/internal/encoding/encoding_test.go b/go/parquet/internal/encoding/encoding_test.go index f2d1e31236adf..5e95ab16e2676 100644 --- a/go/parquet/internal/encoding/encoding_test.go +++ b/go/parquet/internal/encoding/encoding_test.go @@ -406,6 +406,17 @@ func (b *BaseEncodingTestSuite) TestDeltaByteArrayRoundTrip() { } } +func (b *BaseEncodingTestSuite) TestByteStreamSplitRoundTrip() { + b.initData(10000, 1) + + switch b.typ { + case reflect.TypeOf(float32(0)), reflect.TypeOf(float64(0)), reflect.TypeOf(int32(0)), reflect.TypeOf(int64(0)), reflect.TypeOf(parquet.FixedLenByteArray{}): + b.checkRoundTrip(parquet.Encodings.ByteStreamSplit) + default: + b.Panics(func() { b.checkRoundTrip(parquet.Encodings.ByteStreamSplit) }) + } +} + func (b *BaseEncodingTestSuite) TestSpacedRoundTrip() { exec := func(vals, repeats int, validBitsOffset int64, nullProb float64) { b.Run(fmt.Sprintf("%d vals %d repeats %d offset %0.3f null", vals, repeats, validBitsOffset, 1-nullProb), func() { diff --git a/go/parquet/internal/encoding/fixed_len_byte_array_decoder.go b/go/parquet/internal/encoding/fixed_len_byte_array_decoder.go index ceb9f5a2e4a22..e264697a8c547 100644 --- a/go/parquet/internal/encoding/fixed_len_byte_array_decoder.go +++ b/go/parquet/internal/encoding/fixed_len_byte_array_decoder.go @@ -17,6 +17,7 @@ package encoding import ( + "fmt" "math" "github.com/apache/arrow/go/v17/internal/utils" @@ -64,3 +65,73 @@ func (pflba *PlainFixedLenByteArrayDecoder) DecodeSpaced(out []parquet.FixedLenB return spacedExpand(out, nullCount, validBits, validBitsOffset), nil } + +// ByteStreamSplitFixedLenByteArrayDecoder is a decoder for BYTE_STREAM_SPLIT-encoded +// bytes representing FixedLenByteArray values +type ByteStreamSplitFixedLenByteArrayDecoder struct { + decoder + stride int +} + +func (dec *ByteStreamSplitFixedLenByteArrayDecoder) Type() parquet.Type { + return parquet.Types.FixedLenByteArray +} + +func (dec *ByteStreamSplitFixedLenByteArrayDecoder) SetData(nvals int, data []byte) error { + if nvals*dec.typeLen < len(data) { + return fmt.Errorf("data size (%d) is too small for the number of values in in BYTE_STREAM_SPLIT (%d)", len(data), nvals) + } + + if len(data)%dec.typeLen != 0 { + return fmt.Errorf("ByteStreamSplit data size %d not aligned with type %s and byte_width: %d", len(data), dec.Type(), dec.typeLen) + } + + nvals = len(data) / dec.typeLen + dec.stride = nvals + + return dec.decoder.SetData(nvals, data) +} + +func (dec *ByteStreamSplitFixedLenByteArrayDecoder) Decode(out []parquet.FixedLenByteArray) (int, error) { + toRead := len(out) + numBytesNeeded := toRead * dec.typeLen + if numBytesNeeded > len(dec.data) || numBytesNeeded > math.MaxInt32 { + return 0, xerrors.New("parquet: eof exception") + } + + for i := range out { + if cap(out[i]) < dec.typeLen { + out[i] = make(parquet.FixedLenByteArray, dec.typeLen) + } else { + out[i] = out[i][:dec.typeLen] + } + } + + switch dec.typeLen { + case 2: + decodeByteStreamSplitBatchFLBAWidth2(dec.data, toRead, dec.stride, out) + case 4: + decodeByteStreamSplitBatchFLBAWidth4(dec.data, toRead, dec.stride, out) + case 8: + decodeByteStreamSplitBatchFLBAWidth8(dec.data, toRead, dec.stride, out) + default: + decodeByteStreamSplitBatchFLBA(dec.data, toRead, dec.stride, dec.typeLen, out) + } + + dec.nvals -= toRead + dec.data = dec.data[toRead:] + return toRead, nil +} + +func (dec *ByteStreamSplitFixedLenByteArrayDecoder) DecodeSpaced(out []parquet.FixedLenByteArray, nullCount int, validBits []byte, validBitsOffset int64) (int, error) { + toRead := len(out) - nullCount + valuesRead, err := dec.Decode(out[:toRead]) + if err != nil { + return valuesRead, err + } + if valuesRead != toRead { + return valuesRead, xerrors.New("parquet: number of values / definitions levels read did not match") + } + + return spacedExpand(out, nullCount, validBits, validBitsOffset), nil +} diff --git a/go/parquet/internal/encoding/fixed_len_byte_array_encoder.go b/go/parquet/internal/encoding/fixed_len_byte_array_encoder.go index 1cdb3c84d9212..a93164e305fdf 100644 --- a/go/parquet/internal/encoding/fixed_len_byte_array_encoder.go +++ b/go/parquet/internal/encoding/fixed_len_byte_array_encoder.go @@ -75,6 +75,45 @@ func (PlainFixedLenByteArrayEncoder) Type() parquet.Type { return parquet.Types.FixedLenByteArray } +// ByteStreamSplitFixedLenByteArrayEncoder writes the underlying bytes of the FixedLenByteArray +// into interlaced streams as defined by the BYTE_STREAM_SPLIT encoding +type ByteStreamSplitFixedLenByteArrayEncoder struct { + PlainFixedLenByteArrayEncoder + flushBuffer *PooledBufferWriter +} + +func (enc *ByteStreamSplitFixedLenByteArrayEncoder) FlushValues() (Buffer, error) { + in, err := enc.PlainFixedLenByteArrayEncoder.FlushValues() + if err != nil { + return nil, err + } + + if enc.flushBuffer == nil { + enc.flushBuffer = NewPooledBufferWriter(in.Len()) + } + + enc.flushBuffer.buf.ResizeNoShrink(in.Len()) + + switch enc.typeLen { + case 2: + encodeByteStreamSplitWidth2(enc.flushBuffer.Bytes(), in.Bytes()) + case 4: + encodeByteStreamSplitWidth4(enc.flushBuffer.Bytes(), in.Bytes()) + case 8: + encodeByteStreamSplitWidth8(enc.flushBuffer.Bytes(), in.Bytes()) + default: + encodeByteStreamSplit(enc.flushBuffer.Bytes(), in.Bytes(), enc.typeLen) + } + + return enc.flushBuffer.Finish(), nil +} + +func (enc *ByteStreamSplitFixedLenByteArrayEncoder) Release() { + enc.PlainFixedLenByteArrayEncoder.Release() + releaseBufferToPool(enc.flushBuffer) + enc.flushBuffer = nil +} + // WriteDict overrides the embedded WriteDict function to call a specialized function // for copying out the Fixed length values from the dictionary more efficiently. func (enc *DictFixedLenByteArrayEncoder) WriteDict(out []byte) { diff --git a/go/parquet/internal/encoding/typed_encoder.gen.go b/go/parquet/internal/encoding/typed_encoder.gen.go index 0c473a989ef71..663c1164c565e 100644 --- a/go/parquet/internal/encoding/typed_encoder.gen.go +++ b/go/parquet/internal/encoding/typed_encoder.gen.go @@ -88,6 +88,8 @@ func (int32EncoderTraits) Encoder(e format.Encoding, useDict bool, descr *schema case format.Encoding_DELTA_BINARY_PACKED: return DeltaBitPackInt32Encoder{&deltaBitPackEncoder{ encoder: newEncoderBase(e, descr, mem)}} + case format.Encoding_BYTE_STREAM_SPLIT: + return &ByteStreamSplitInt32Encoder{PlainInt32Encoder: PlainInt32Encoder{encoder: newEncoderBase(e, descr, mem)}} default: panic("unimplemented encoding type") } @@ -120,6 +122,8 @@ func (int32DecoderTraits) Decoder(e parquet.Encoding, descr *schema.Column, useD decoder: newDecoderBase(format.Encoding(e), descr), mem: mem, }} + case parquet.Encodings.ByteStreamSplit: + return &ByteStreamSplitInt32Decoder{decoder: newDecoderBase(format.Encoding(e), descr)} default: panic("unimplemented encoding type") } @@ -325,6 +329,8 @@ func (int64EncoderTraits) Encoder(e format.Encoding, useDict bool, descr *schema case format.Encoding_DELTA_BINARY_PACKED: return DeltaBitPackInt64Encoder{&deltaBitPackEncoder{ encoder: newEncoderBase(e, descr, mem)}} + case format.Encoding_BYTE_STREAM_SPLIT: + return &ByteStreamSplitInt64Encoder{PlainInt64Encoder: PlainInt64Encoder{encoder: newEncoderBase(e, descr, mem)}} default: panic("unimplemented encoding type") } @@ -357,6 +363,8 @@ func (int64DecoderTraits) Decoder(e parquet.Encoding, descr *schema.Column, useD decoder: newDecoderBase(format.Encoding(e), descr), mem: mem, }} + case parquet.Encodings.ByteStreamSplit: + return &ByteStreamSplitInt64Decoder{decoder: newDecoderBase(format.Encoding(e), descr)} default: panic("unimplemented encoding type") } @@ -774,6 +782,8 @@ func (float32EncoderTraits) Encoder(e format.Encoding, useDict bool, descr *sche switch e { case format.Encoding_PLAIN: return &PlainFloat32Encoder{encoder: newEncoderBase(e, descr, mem)} + case format.Encoding_BYTE_STREAM_SPLIT: + return &ByteStreamSplitFloat32Encoder{PlainFloat32Encoder: PlainFloat32Encoder{encoder: newEncoderBase(e, descr, mem)}} default: panic("unimplemented encoding type") } @@ -797,6 +807,8 @@ func (float32DecoderTraits) Decoder(e parquet.Encoding, descr *schema.Column, us switch e { case parquet.Encodings.Plain: return &PlainFloat32Decoder{decoder: newDecoderBase(format.Encoding(e), descr)} + case parquet.Encodings.ByteStreamSplit: + return &ByteStreamSplitFloat32Decoder{decoder: newDecoderBase(format.Encoding(e), descr)} default: panic("unimplemented encoding type") } @@ -999,6 +1011,8 @@ func (float64EncoderTraits) Encoder(e format.Encoding, useDict bool, descr *sche switch e { case format.Encoding_PLAIN: return &PlainFloat64Encoder{encoder: newEncoderBase(e, descr, mem)} + case format.Encoding_BYTE_STREAM_SPLIT: + return &ByteStreamSplitFloat64Encoder{PlainFloat64Encoder: PlainFloat64Encoder{encoder: newEncoderBase(e, descr, mem)}} default: panic("unimplemented encoding type") } @@ -1022,6 +1036,8 @@ func (float64DecoderTraits) Decoder(e parquet.Encoding, descr *schema.Column, us switch e { case parquet.Encodings.Plain: return &PlainFloat64Decoder{decoder: newDecoderBase(format.Encoding(e), descr)} + case parquet.Encodings.ByteStreamSplit: + return &ByteStreamSplitFloat64Decoder{decoder: newDecoderBase(format.Encoding(e), descr)} default: panic("unimplemented encoding type") } @@ -1492,6 +1508,8 @@ func (fixedLenByteArrayEncoderTraits) Encoder(e format.Encoding, useDict bool, d switch e { case format.Encoding_PLAIN: return &PlainFixedLenByteArrayEncoder{encoder: newEncoderBase(e, descr, mem)} + case format.Encoding_BYTE_STREAM_SPLIT: + return &ByteStreamSplitFixedLenByteArrayEncoder{PlainFixedLenByteArrayEncoder: PlainFixedLenByteArrayEncoder{encoder: newEncoderBase(e, descr, mem)}} default: panic("unimplemented encoding type") } @@ -1515,6 +1533,8 @@ func (fixedLenByteArrayDecoderTraits) Decoder(e parquet.Encoding, descr *schema. switch e { case parquet.Encodings.Plain: return &PlainFixedLenByteArrayDecoder{decoder: newDecoderBase(format.Encoding(e), descr)} + case parquet.Encodings.ByteStreamSplit: + return &ByteStreamSplitFixedLenByteArrayDecoder{decoder: newDecoderBase(format.Encoding(e), descr)} default: panic("unimplemented encoding type") } diff --git a/go/parquet/internal/encoding/typed_encoder.gen.go.tmpl b/go/parquet/internal/encoding/typed_encoder.gen.go.tmpl index 57d7e641fb5df..ebd7733135a52 100644 --- a/go/parquet/internal/encoding/typed_encoder.gen.go.tmpl +++ b/go/parquet/internal/encoding/typed_encoder.gen.go.tmpl @@ -93,6 +93,10 @@ func ({{.lower}}EncoderTraits) Encoder(e format.Encoding, useDict bool, descr *s return &DeltaByteArrayEncoder{ encoder: newEncoderBase(e, descr, mem), } +{{- end}} +{{- if or (eq .Name "FixedLenByteArray") (eq .Name "Float32") (eq .Name "Float64") (eq .Name "Int32") (eq .Name "Int64")}} + case format.Encoding_BYTE_STREAM_SPLIT: + return &ByteStreamSplit{{.Name}}Encoder{Plain{{.Name}}Encoder: Plain{{.Name}}Encoder{encoder: newEncoderBase(e,descr,mem)}} {{- end}} default: panic("unimplemented encoding type") @@ -154,6 +158,10 @@ func ({{.lower}}DecoderTraits) Decoder(e parquet.Encoding, descr *schema.Column, decoder: newDecoderBase(format.Encoding(e), descr), mem: mem, }} +{{- end}} +{{- if or (eq .Name "FixedLenByteArray") (eq .Name "Float32") (eq .Name "Float64") (eq .Name "Int32") (eq .Name "Int64")}} + case parquet.Encodings.ByteStreamSplit: + return &ByteStreamSplit{{.Name}}Decoder{decoder: newDecoderBase(format.Encoding(e), descr)} {{- end}} default: panic("unimplemented encoding type") diff --git a/go/parquet/types.go b/go/parquet/types.go index c1ab3788ca577..71336a7987cd8 100644 --- a/go/parquet/types.go +++ b/go/parquet/types.go @@ -296,6 +296,7 @@ var ( DeltaByteArray Encoding DeltaBinaryPacked Encoding DeltaLengthByteArray Encoding + ByteStreamSplit Encoding }{ Plain: Encoding(format.Encoding_PLAIN), PlainDict: Encoding(format.Encoding_PLAIN_DICTIONARY), @@ -305,6 +306,7 @@ var ( DeltaByteArray: Encoding(format.Encoding_DELTA_BYTE_ARRAY), DeltaBinaryPacked: Encoding(format.Encoding_DELTA_BINARY_PACKED), DeltaLengthByteArray: Encoding(format.Encoding_DELTA_LENGTH_BYTE_ARRAY), + ByteStreamSplit: Encoding(format.Encoding_BYTE_STREAM_SPLIT), } // ColumnOrders contains constants for the Column Ordering fields