From e336b3a85d3bf0496568802a9ff8f65422618eda Mon Sep 17 00:00:00 2001 From: Matthias Loibl Date: Tue, 26 Nov 2024 15:26:59 +0100 Subject: [PATCH] pqarrow/arrowutils: Add sorting support for Struct, RunEndEncoded, FixedSizeBinary (#936) * pqarrow/arrowutils: Add FixedSizeBinaryDictionary support * pqarrow/arrowutils: Add support for Struct and RunEndEncoded * pqarrow/arrowutils: Handle empty structs correctly * pqarrow/arrowutils: Retain record if unmodified * query/physicalplan: Sampler requires LessOrEqual 1024 bytes allocations * remove trailing newline * pqarrow/arrowutils: Limit by using the arrowutils Take helper This will also allow us to limit all the newly supported column types, timestamp, struct and runendencoded * query/physicalplan: Guard against s.size==0 panic Please take a look at this fix, @thorfour. I'm not sure why this started panicking now. * pqarrow/arrowutils: Release List ValueBuilder --- pqarrow/arrowutils/sort.go | 174 ++++++++++++++++-- pqarrow/arrowutils/sort_test.go | 279 +++++++++++++++++++++++++++-- query/physicalplan/limit.go | 54 +----- query/physicalplan/sampler.go | 4 + query/physicalplan/sampler_test.go | 2 +- 5 files changed, 428 insertions(+), 85 deletions(-) diff --git a/pqarrow/arrowutils/sort.go b/pqarrow/arrowutils/sort.go index 9a69c2254..a6b470bf4 100644 --- a/pqarrow/arrowutils/sort.go +++ b/pqarrow/arrowutils/sort.go @@ -72,7 +72,10 @@ func Take(ctx context.Context, r arrow.Record, indices *array.Int32) (arrow.Reco // does not have these columns. var customTake bool for i := 0; i < int(r.NumCols()); i++ { - if r.Column(i).DataType().ID() == arrow.DICTIONARY || r.Column(i).DataType().ID() == arrow.LIST { + if r.Column(i).DataType().ID() == arrow.DICTIONARY || + r.Column(i).DataType().ID() == arrow.RUN_END_ENCODED || + r.Column(i).DataType().ID() == arrow.LIST || + r.Column(i).DataType().ID() == arrow.STRUCT { customTake = true break } @@ -108,8 +111,12 @@ func Take(ctx context.Context, r arrow.Record, indices *array.Int32) (arrow.Reco switch arr := r.Column(i).(type) { case *array.Dictionary: g.Go(func() error { return TakeDictColumn(ctx, arr, i, resArr, indices) }) + case *array.RunEndEncoded: + g.Go(func() error { return TakeRunEndEncodedColumn(ctx, arr, i, resArr, indices) }) case *array.List: g.Go(func() error { return TakeListColumn(ctx, arr, i, resArr, indices) }) + case *array.Struct: + g.Go(func() error { return TakeStructColumn(ctx, arr, i, resArr, indices) }) default: g.Go(func() error { return TakeColumn(ctx, col, i, resArr, indices) }) } @@ -140,22 +147,91 @@ func TakeColumn(ctx context.Context, a arrow.Array, idx int, arr []arrow.Array, } func TakeDictColumn(ctx context.Context, a *array.Dictionary, idx int, arr []arrow.Array, indices *array.Int32) error { - r := array.NewDictionaryBuilderWithDict( - compute.GetAllocator(ctx), a.DataType().(*arrow.DictionaryType), a.Dictionary(), - ).(*array.BinaryDictionaryBuilder) - defer r.Release() + switch a.Dictionary().(type) { + case *array.String, *array.Binary: + r := array.NewDictionaryBuilderWithDict( + compute.GetAllocator(ctx), a.DataType().(*arrow.DictionaryType), a.Dictionary(), + ).(*array.BinaryDictionaryBuilder) + defer r.Release() + + r.Reserve(indices.Len()) + idxBuilder := r.IndexBuilder() + for _, i := range indices.Int32Values() { + if a.IsNull(int(i)) { + r.AppendNull() + continue + } + idxBuilder.Append(a.GetValueIndex(int(i))) + } - r.Reserve(indices.Len()) - idxBuilder := r.IndexBuilder() - for _, i := range indices.Int32Values() { - if a.IsNull(int(i)) { - r.AppendNull() + arr[idx] = r.NewArray() + return nil + case *array.FixedSizeBinary: + r := array.NewDictionaryBuilderWithDict( + compute.GetAllocator(ctx), a.DataType().(*arrow.DictionaryType), a.Dictionary(), + ).(*array.FixedSizeBinaryDictionaryBuilder) + defer r.Release() + + r.Reserve(indices.Len()) + idxBuilder := r.IndexBuilder() + for _, i := range indices.Int32Values() { + if a.IsNull(int(i)) { + r.AppendNull() + continue + } + // TODO: Improve this by not copying actual values. + idxBuilder.Append(a.GetValueIndex(int(i))) + } + + arr[idx] = r.NewArray() + return nil + } + + return nil +} + +func TakeRunEndEncodedColumn(ctx context.Context, a *array.RunEndEncoded, idx int, arr []arrow.Array, indices *array.Int32) error { + expandedIndexBuilder := array.NewInt32Builder(compute.GetAllocator(ctx)) + defer expandedIndexBuilder.Release() + + dict := a.Values().(*array.Dictionary) + for i := 0; i < a.Len(); i++ { + if dict.IsNull(a.GetPhysicalIndex(i)) { + expandedIndexBuilder.AppendNull() + } else { + expandedIndexBuilder.Append(int32(dict.GetValueIndex(a.GetPhysicalIndex(i)))) + } + } + expandedIndex := expandedIndexBuilder.NewInt32Array() + defer expandedIndex.Release() + + expandedReorderedArr := make([]arrow.Array, 1) + if err := TakeColumn(ctx, expandedIndex, 0, expandedReorderedArr, indices); err != nil { + return err + } + expandedReordered := expandedReorderedArr[0].(*array.Int32) + defer expandedReordered.Release() + + b := array.NewRunEndEncodedBuilder( + compute.GetAllocator(ctx), a.RunEndsArr().DataType(), a.Values().DataType(), + ) + defer b.Release() + b.Reserve(indices.Len()) + + dictValues := dict.Dictionary().(*array.String) + for i := 0; i < expandedReordered.Len(); i++ { + if expandedReordered.IsNull(i) { + b.AppendNull() continue } - idxBuilder.Append(a.GetValueIndex(int(i))) + reorderedIndex := expandedReordered.Value(i) + v := dictValues.Value(int(reorderedIndex)) + if err := b.AppendValueFromString(v); err != nil { + return err + } } - arr[idx] = r.NewArray() + arr[idx] = b.NewRunEndEncodedArray() return nil } @@ -165,6 +241,7 @@ func TakeListColumn(ctx context.Context, a *array.List, idx int, arr []arrow.Arr if !ok { return fmt.Errorf("unexpected value builder type %T for list column", r.ValueBuilder()) } + defer valueBuilder.Release() listValues := a.ListValues().(*array.Dictionary) switch dictV := listValues.Dictionary().(type) { @@ -200,6 +277,54 @@ func TakeListColumn(ctx context.Context, a *array.List, idx int, arr []arrow.Arr return nil } +func TakeStructColumn(ctx context.Context, a *array.Struct, idx int, arr []arrow.Array, indices *array.Int32) error { + aType := a.Data().DataType().(*arrow.StructType) + + // Immediately, return this struct if it has no fields/columns + if a.NumField() == 0 { + // If the original record is released and this is released once more, + // as usually done, we want to retain it once more. + a.Retain() + arr[idx] = a + return nil + } + + cols := make([]arrow.Array, a.NumField()) + names := make([]string, a.NumField()) + defer func() { + for _, col := range cols { + if col != nil { + col.Release() + } + } + }() + + for i := 0; i < a.NumField(); i++ { + names[i] = aType.Field(i).Name + + switch f := a.Field(i).(type) { + case *array.RunEndEncoded: + err := TakeRunEndEncodedColumn(ctx, f, i, cols, indices) + if err != nil { + return err + } + default: + err := TakeColumn(ctx, f, i, cols, indices) + if err != nil { + return err + } + } + } + + takeStruct, err := array.NewStructArray(cols, names) + if err != nil { + return err + } + + arr[idx] = takeStruct + return nil +} + type multiColSorter struct { indices *builder.OptInt32Builder comparisons []comparator @@ -263,13 +388,21 @@ func newMultiColSorter( }, bytes.Compare, ) + case *array.FixedSizeBinary: + ms.comparisons[i] = newOrderedSorter[[]byte]( + &fixedSizeBinaryDictionary{ + dict: e, + elem: elem, + }, + bytes.Compare, + ) default: ms.Release() - return nil, fmt.Errorf("unsupported dictionary column type for sorting %T", e) + return nil, fmt.Errorf("unsupported dictionary column type for sorting %T for column %s", e, r.Schema().Field(col.Index).Name) } default: ms.Release() - return nil, fmt.Errorf("unsupported column type for sorting %T", e) + return nil, fmt.Errorf("unsupported column type for sorting %T for column %s", e, r.Schema().Field(col.Index).Name) } } return ms, nil @@ -417,3 +550,16 @@ func (s *binaryDictionary) IsNull(i int) bool { func (s *binaryDictionary) Value(i int) []byte { return s.elem.Value(s.dict.GetValueIndex(i)) } + +type fixedSizeBinaryDictionary struct { + dict *array.Dictionary + elem *array.FixedSizeBinary +} + +func (s *fixedSizeBinaryDictionary) IsNull(i int) bool { + return s.dict.IsNull(i) +} + +func (s *fixedSizeBinaryDictionary) Value(i int) []byte { + return s.elem.Value(s.dict.GetValueIndex(i)) +} diff --git a/pqarrow/arrowutils/sort_test.go b/pqarrow/arrowutils/sort_test.go index 05a4eb14c..c061e9b40 100644 --- a/pqarrow/arrowutils/sort_test.go +++ b/pqarrow/arrowutils/sort_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sort" + "strings" "testing" "github.com/apache/arrow/go/v16/arrow" @@ -116,7 +117,7 @@ func TestSortRecord(t *testing.T) { {Timestamp: 2}, {Timestamp: 1}, }, - Columns: []SortingColumn{{Index: 5}}, + Columns: []SortingColumn{{Index: 6}}, Indices: []int32{2, 1, 0}, }, { @@ -126,7 +127,7 @@ func TestSortRecord(t *testing.T) { {Timestamp: 2}, {Timestamp: 3}, }, - Columns: []SortingColumn{{Index: 5, Direction: Descending}}, + Columns: []SortingColumn{{Index: 6, Direction: Descending}}, Indices: []int32{2, 1, 0}, }, { @@ -149,6 +150,26 @@ func TestSortRecord(t *testing.T) { Columns: []SortingColumn{{Index: 3, Direction: Descending}}, Indices: []int32{2, 1, 0}, }, + { + Name: "By DictFixed column ascending", + Samples: Samples{ + {DictFixed: [2]byte{0, 3}}, + {DictFixed: [2]byte{0, 2}}, + {DictFixed: [2]byte{0, 1}}, + }, + Columns: []SortingColumn{{Index: 4}}, + Indices: []int32{2, 1, 0}, + }, + { + Name: "By DictFixed column descending", + Samples: Samples{ + {DictFixed: [2]byte{0, 1}}, + {DictFixed: [2]byte{0, 2}}, + {DictFixed: [2]byte{0, 3}}, + }, + Columns: []SortingColumn{{Index: 4, Direction: Descending}}, + Indices: []int32{2, 1, 0}, + }, { Name: "By Null column ascending", Samples: Samples{ @@ -156,7 +177,7 @@ func TestSortRecord(t *testing.T) { {}, {Nullable: null(1)}, }, - Columns: []SortingColumn{{Index: 4}}, + Columns: []SortingColumn{{Index: 5}}, Indices: []int32{2, 0, 1}, }, { @@ -166,7 +187,7 @@ func TestSortRecord(t *testing.T) { {}, {Nullable: null(1)}, }, - Columns: []SortingColumn{{Index: 4, NullsFirst: true}}, + Columns: []SortingColumn{{Index: 5, NullsFirst: true}}, Indices: []int32{0, 1, 2}, }, { @@ -176,7 +197,7 @@ func TestSortRecord(t *testing.T) { {}, {Nullable: null(1)}, }, - Columns: []SortingColumn{{Index: 4, Direction: Descending}}, + Columns: []SortingColumn{{Index: 5, Direction: Descending}}, Indices: []int32{2, 0, 1}, }, { @@ -186,7 +207,7 @@ func TestSortRecord(t *testing.T) { {}, {Nullable: null(1)}, }, - Columns: []SortingColumn{{Index: 4, Direction: Descending, NullsFirst: true}}, + Columns: []SortingColumn{{Index: 5, Direction: Descending, NullsFirst: true}}, Indices: []int32{0, 1, 2}, }, { @@ -261,8 +282,26 @@ func TestSortRecordBuilderReuse(t *testing.T) { } func TestReorderRecord(t *testing.T) { + readRunEndEncodedDictionary := func(arr *array.RunEndEncoded) string { + arrDict := arr.Values().(*array.Dictionary) + arrDictValues := arrDict.Dictionary().(*array.String) + + values := make([]string, arr.Len()) + for i := 0; i < arr.Len(); i++ { + physicalIndex := arr.GetPhysicalIndex(i) + if arrDict.IsNull(physicalIndex) { + values[i] = array.NullValueStr + continue + } + valueIndex := arrDict.GetValueIndex(physicalIndex) + values[i] = arrDictValues.Value(valueIndex) + } + return "[" + strings.Join(values, " ") + "]" + } + t.Run("Simple", func(t *testing.T) { - mem := memory.NewGoAllocator() + mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer mem.AssertSize(t, 0) b := array.NewRecordBuilder(mem, arrow.NewSchema( []arrow.Field{ { @@ -279,16 +318,17 @@ func TestReorderRecord(t *testing.T) { indices := array.NewInt32Builder(mem) indices.AppendValues([]int32{2, 1, 0}, nil) by := indices.NewInt32Array() - result, err := Take( - compute.WithAllocator(context.Background(), mem), r, by) + defer by.Release() + result, err := Take(compute.WithAllocator(context.Background(), mem), r, by) require.Nil(t, err) defer result.Release() want := []int64{1, 2, 3} require.Equal(t, want, result.Column(0).(*array.Int64).Int64Values()) }) - t.Run("WithDict", func(t *testing.T) { - mem := memory.NewGoAllocator() + t.Run("WithStringDict", func(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer mem.AssertSize(t, 0) b := array.NewRecordBuilder(mem, arrow.NewSchema( []arrow.Field{ { @@ -312,7 +352,9 @@ func TestReorderRecord(t *testing.T) { indices := array.NewInt32Builder(mem) indices.AppendValues([]int32{2, 1, 4, 0, 3}, nil) - result, err := Take(compute.WithAllocator(context.Background(), mem), r, indices.NewInt32Array()) + by := indices.NewInt32Array() + defer by.Release() + result, err := Take(compute.WithAllocator(context.Background(), mem), r, by) require.NoError(t, err) defer result.Release() @@ -327,8 +369,99 @@ func TestReorderRecord(t *testing.T) { require.Equal(t, want[i], got.ValueStr(i)) } }) + t.Run("RunEndEncoded", func(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer mem.AssertSize(t, 0) + + b := array.NewRecordBuilder(mem, arrow.NewSchema( + []arrow.Field{ + { + Name: "ree", + Type: arrow.RunEndEncodedOf( + arrow.PrimitiveTypes.Int32, + &arrow.DictionaryType{ + IndexType: arrow.PrimitiveTypes.Uint32, + ValueType: arrow.BinaryTypes.String, + }), + }, + }, nil, + )) + defer b.Release() + + ree := b.Field(0).(*array.RunEndEncodedBuilder) + require.NoError(t, ree.AppendValueFromString("3")) + require.NoError(t, ree.AppendValueFromString("2")) + require.NoError(t, ree.AppendValueFromString("1")) + ree.AppendNull() + require.NoError(t, ree.AppendValueFromString("3")) + r := b.NewRecord() + defer r.Release() + + indices := array.NewInt32Builder(mem) + indices.AppendValues([]int32{2, 1, 4, 0, 3}, nil) + by := indices.NewInt32Array() + defer by.Release() + + // Reordering + + result, err := Take(compute.WithAllocator(context.Background(), mem), r, by) + require.NoError(t, err) + defer result.Release() + + // Testing + + sorted := result.Column(0).(*array.RunEndEncoded) + sortedEnds := sorted.RunEndsArr().(*array.Int32) + // notice how the index to 3 is runEndEncoded + require.Equal(t, "[1 2 4 5]", sortedEnds.String()) + require.Equal(t, "[1 2 3 3 (null)]", readRunEndEncodedDictionary(sorted)) + }) + t.Run("WithFixedSizeBinaryDict", func(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer mem.AssertSize(t, 0) + b := array.NewRecordBuilder(mem, arrow.NewSchema( + []arrow.Field{ + { + Name: "dict", + Type: &arrow.DictionaryType{ + IndexType: arrow.PrimitiveTypes.Int32, + ValueType: &arrow.FixedSizeBinaryType{ByteWidth: 2}, + }, + }, + }, nil, + )) + defer b.Release() + d := b.Field(0).(*array.FixedSizeBinaryDictionaryBuilder) + require.NoError(t, d.Append([]byte{0, 3})) + require.NoError(t, d.Append([]byte{0, 2})) + require.NoError(t, d.Append([]byte{0, 1})) + d.AppendNull() + require.NoError(t, d.Append([]byte{0, 3})) + r := b.NewRecord() + defer r.Release() + + indices := array.NewInt32Builder(mem) + indices.AppendValues([]int32{2, 1, 4, 0, 3}, nil) + by := indices.NewInt32Array() + defer by.Release() + result, err := Take(compute.WithAllocator(context.Background(), mem), r, by) + require.NoError(t, err) + defer result.Release() + + want := [][]byte{{0, 1}, {0, 2}, {0, 3}, {0, 3}, {}} + got := result.Column(0).(*array.Dictionary) + require.Equal(t, len(want), got.Len()) + for i, v := range want { + if len(v) == 0 { + require.True(t, got.IsNull(i)) + continue + } + require.Equal(t, want[i], got.Dictionary().(*array.FixedSizeBinary).Value(got.GetValueIndex(i))) + } + }) t.Run("List", func(t *testing.T) { - mem := memory.NewGoAllocator() + mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer mem.AssertSize(t, 0) b := array.NewRecordBuilder(mem, arrow.NewSchema( []arrow.Field{ { @@ -360,8 +493,10 @@ func TestReorderRecord(t *testing.T) { indices := array.NewInt32Builder(mem) indices.AppendValues([]int32{2, 1, 0, 3}, nil) + by := indices.NewInt32Array() + defer by.Release() result, err := Take( - compute.WithAllocator(context.Background(), mem), r, indices.NewInt32Array()) + compute.WithAllocator(context.Background(), mem), r, by) require.Nil(t, err) defer result.Release() @@ -381,6 +516,104 @@ func TestReorderRecord(t *testing.T) { require.Equal(t, expected[i], got.ValueStr(i), "unexpected value at %d", i) } }) + t.Run("Struct", func(t *testing.T) { + LabelArrowType := arrow.RunEndEncodedOf( + arrow.PrimitiveTypes.Int32, + &arrow.DictionaryType{ + IndexType: arrow.PrimitiveTypes.Uint32, + ValueType: arrow.BinaryTypes.String, + }, + ) + + mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer mem.AssertSize(t, 0) + + b := array.NewRecordBuilder(mem, arrow.NewSchema( + []arrow.Field{ + { + Name: "struct", + Type: arrow.StructOf( + arrow.Field{Name: "first", Type: LabelArrowType, Nullable: true}, + arrow.Field{Name: "second", Type: LabelArrowType, Nullable: true}, + arrow.Field{Name: "third", Type: arrow.PrimitiveTypes.Int64, Nullable: true}, + ), + }, + }, &arrow.Metadata{}, + )) + defer b.Release() + + sb := b.Field(0).(*array.StructBuilder) + firstFieldBuilder := sb.FieldBuilder(0).(*array.RunEndEncodedBuilder) + secondFieldBuilder := sb.FieldBuilder(1).(*array.RunEndEncodedBuilder) + thirdFieldBuilder := sb.FieldBuilder(2).(*array.Int64Builder) + + sb.Append(true) + require.NoError(t, firstFieldBuilder.AppendValueFromString("3")) + require.NoError(t, secondFieldBuilder.AppendValueFromString("1")) + thirdFieldBuilder.Append(1) + sb.Append(true) + require.NoError(t, firstFieldBuilder.AppendValueFromString("2")) + require.NoError(t, secondFieldBuilder.AppendValueFromString("2")) + thirdFieldBuilder.Append(2) + sb.Append(true) + require.NoError(t, firstFieldBuilder.AppendValueFromString("1")) + require.NoError(t, secondFieldBuilder.AppendValueFromString("3")) + thirdFieldBuilder.Append(3) + sb.Append(true) + firstFieldBuilder.AppendNull() + require.NoError(t, secondFieldBuilder.AppendValueFromString("4")) + thirdFieldBuilder.Append(4) + sb.Append(true) + require.NoError(t, firstFieldBuilder.AppendValueFromString("3")) + require.NoError(t, secondFieldBuilder.AppendValueFromString("5")) + thirdFieldBuilder.Append(5) + + r := b.NewRecord() + defer r.Release() + + indices := array.NewInt32Builder(mem) + indices.AppendValues([]int32{2, 1, 4, 0, 3}, nil) + by := indices.NewInt32Array() + defer by.Release() + result, err := Take(compute.WithAllocator(context.Background(), mem), r, by) + require.Nil(t, err) + defer result.Release() + resultStruct := result.Column(0).(*array.Struct) + + require.Equal(t, "[1 2 3 3 (null)]", readRunEndEncodedDictionary(resultStruct.Field(0).(*array.RunEndEncoded))) + require.Equal(t, "[3 2 5 1 4]", readRunEndEncodedDictionary(resultStruct.Field(1).(*array.RunEndEncoded))) + require.Equal(t, "[3 2 5 1 4]", resultStruct.Field(2).(*array.Int64).String()) + }) + + t.Run("StructEmpty", func(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer mem.AssertSize(t, 0) + + b := array.NewRecordBuilder(mem, arrow.NewSchema( + []arrow.Field{ + { + Name: "struct", + Type: arrow.StructOf(), + }, + }, &arrow.Metadata{}, + )) + defer b.Release() + b.Field(0).AppendNulls(5) + + r := b.NewRecord() + defer r.Release() + + indices := array.NewInt32Builder(mem) + indices.AppendValues([]int32{2, 1, 4, 0, 3}, nil) + by := indices.NewInt32Array() + defer by.Release() + + result, err := Take(compute.WithAllocator(context.Background(), mem), r, by) + require.Nil(t, err) + defer result.Release() + resultStruct := result.Column(0).(*array.Struct) + resultStruct.Len() + }) } // Use all supported sort field. @@ -389,6 +622,7 @@ type Sample struct { Double float64 String string Dict string + DictFixed [2]byte Nullable *int64 Timestamp arrow.Timestamp } @@ -417,6 +651,13 @@ func (s Samples) Record() arrow.Record { ValueType: arrow.BinaryTypes.String, }, }, + { + Name: "dictFixed", + Type: &arrow.DictionaryType{ + IndexType: arrow.PrimitiveTypes.Int32, + ValueType: &arrow.FixedSizeBinaryType{ByteWidth: 2}, + }, + }, { Name: "nullable", Type: arrow.PrimitiveTypes.Int64, @@ -433,9 +674,10 @@ func (s Samples) Record() arrow.Record { fInt := b.Field(0).(*array.Int64Builder) fDouble := b.Field(1).(*array.Float64Builder) fString := b.Field(2).(*array.StringBuilder) - fDict := b.Field(3).(*array.BinaryDictionaryBuilder) - fNullable := b.Field(4).(*array.Int64Builder) - fTimestamp := b.Field(5).(*array.TimestampBuilder) + fBinaryDict := b.Field(3).(*array.BinaryDictionaryBuilder) + fFixedDict := b.Field(4).(*array.FixedSizeBinaryDictionaryBuilder) + fNullable := b.Field(5).(*array.Int64Builder) + fTimestamp := b.Field(6).(*array.TimestampBuilder) for _, v := range s { fInt.Append(v.Int) @@ -446,7 +688,8 @@ func (s Samples) Record() arrow.Record { } else { fTimestamp.Append(v.Timestamp) } - _ = fDict.AppendString(v.Dict) + _ = fBinaryDict.AppendString(v.Dict) + _ = fFixedDict.Append(v.DictFixed[:]) if v.Nullable != nil { fNullable.Append(*v.Nullable) } else { diff --git a/query/physicalplan/limit.go b/query/physicalplan/limit.go index 96cbdc19a..3489ccf5e 100644 --- a/query/physicalplan/limit.go +++ b/query/physicalplan/limit.go @@ -6,11 +6,9 @@ import ( "github.com/apache/arrow/go/v16/arrow" "github.com/apache/arrow/go/v16/arrow/array" - "github.com/apache/arrow/go/v16/arrow/compute" "github.com/apache/arrow/go/v16/arrow/memory" "github.com/apache/arrow/go/v16/arrow/scalar" "go.opentelemetry.io/otel/trace" - "golang.org/x/sync/errgroup" "github.com/polarsignals/frostdb/pqarrow/arrowutils" "github.com/polarsignals/frostdb/query/logicalplan" @@ -87,60 +85,12 @@ func (l *Limiter) Callback(ctx context.Context, r arrow.Record) error { indices := indicesBuilder.NewInt32Array() defer indices.Release() - // compute.Take doesn't support dictionaries. Use take on r when r does not have - // dictionary column. - var hasDictionary bool - for i := 0; i < int(r.NumCols()); i++ { - if r.Column(i).DataType().ID() == arrow.DICTIONARY { - hasDictionary = true - break - } - } - if !hasDictionary { - res, err := compute.Take( - ctx, - compute.TakeOptions{BoundsCheck: true}, - compute.NewDatumWithoutOwning(r), - compute.NewDatumWithoutOwning(indices), - ) - if err != nil { - return err - } - r.Release() - return l.next.Callback(ctx, res.(*compute.RecordDatum).Value) - } - resArr := make([]arrow.Array, r.NumCols()) - - defer func() { - for _, a := range resArr { - if a != nil { - a.Release() - } - } - }() - var g errgroup.Group - for i := 0; i < int(r.NumCols()); i++ { - i := i - col := r.Column(i) - if d, ok := col.(*array.Dictionary); ok { - g.Go(func() error { - return arrowutils.TakeDictColumn(ctx, d, i, resArr, indices) - }) - } else { - g.Go(func() error { - return arrowutils.TakeColumn(ctx, col, i, resArr, indices) - }) - } - } - err := g.Wait() + limitedRecord, err := arrowutils.Take(ctx, r, indices) if err != nil { return err } - if err := l.next.Callback( - ctx, - array.NewRecord(r.Schema(), resArr, int64(indices.Len())), - ); err != nil { + if err := l.next.Callback(ctx, limitedRecord); err != nil { return err } diff --git a/query/physicalplan/sampler.go b/query/physicalplan/sampler.go index 5f29f4846..f0211ae03 100644 --- a/query/physicalplan/sampler.go +++ b/query/physicalplan/sampler.go @@ -174,6 +174,10 @@ func (s *ReservoirSampler) sliceReservoir() { // sample implements the reservoir sampling algorithm found https://en.wikipedia.org/wiki/Reservoir_sampling. func (s *ReservoirSampler) sample(r arrow.Record, ref *referencedRecord) { + // The size can be 0 and in that case we don't want to sample. + if s.size == 0 { + return + } n := s.n + r.NumRows() if s.i == 0 { s.i = float64(s.n) - 1 diff --git a/query/physicalplan/sampler_test.go b/query/physicalplan/sampler_test.go index 660d64aad..b3fadd359 100644 --- a/query/physicalplan/sampler_test.go +++ b/query/physicalplan/sampler_test.go @@ -291,7 +291,7 @@ func Test_Sampler_MaxSizeAllocation(t *testing.T) { allocator: memory.NewCheckedAllocator(memory.NewGoAllocator()), } t.Cleanup(func() { - require.Equal(t, allocator.maxUsed, 1024) // Expect the most we allocated was 1024 bytes during materialization + require.LessOrEqual(t, allocator.maxUsed, 1024) // Expect the most we allocated was 1024 bytes during materialization allocator.allocator.AssertSize(t, 0) }) s := NewReservoirSampler(10, 200, allocator)