From 672bc29f4b8ce48cc2585f1f82537ee2c597ea76 Mon Sep 17 00:00:00 2001 From: Matthias Loibl Date: Thu, 14 Dec 2023 12:05:43 +0100 Subject: [PATCH 01/11] pqarrow/arrowutils: Add SortRecord and ReorderRecord This is extract from a previous PR #461. --- pqarrow/arrowutils/sort.go | 105 ++++++++++++++++++++++++++++++++ pqarrow/arrowutils/sort_test.go | 77 +++++++++++++++++++++++ 2 files changed, 182 insertions(+) create mode 100644 pqarrow/arrowutils/sort.go create mode 100644 pqarrow/arrowutils/sort_test.go diff --git a/pqarrow/arrowutils/sort.go b/pqarrow/arrowutils/sort.go new file mode 100644 index 000000000..8c1a14552 --- /dev/null +++ b/pqarrow/arrowutils/sort.go @@ -0,0 +1,105 @@ +package arrowutils + +import ( + "bytes" + "fmt" + "sort" + + "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v14/arrow/array" + "github.com/apache/arrow/go/v14/arrow/memory" + "golang.org/x/exp/constraints" + + "github.com/polarsignals/frostdb/pqarrow/builder" +) + +// SortRecord sorts the given record's rows by the given column. Currently only supports int64, string and binary columns. +func SortRecord(r arrow.Record, col int) ([]int, error) { + if r.NumRows() == 0 { + return nil, nil + } + if r.NumRows() == 1 { + return []int{0}, nil + } + + indices := make([]int, r.NumRows()) + // populate indices + for i := range indices { + indices[i] = i + } + + switch c := r.Column(col).(type) { + case *array.Int64: + sort.Sort(orderedSorter[int64]{array: c, indices: indices}) + case *array.String: + sort.Sort(orderedSorter[string]{array: c, indices: indices}) + case *array.Binary: + sort.Sort(binarySort{array: c, indices: indices}) + default: + return nil, fmt.Errorf("unsupported column type for sorting %T", c) + } + + return indices, nil +} + +// ReorderRecord reorders the given record's rows by the given indices. +func ReorderRecord(mem memory.Allocator, r arrow.Record, indices []int) (arrow.Record, error) { + // if the indices are already sorted, we can return the original record to save memory allocations + if sort.SliceIsSorted(indices, func(i, j int) bool { return indices[i] < indices[j] }) { + return r, nil + } + + recordBuilder := builder.NewRecordBuilder(mem, r.Schema()) + recordBuilder.Reserve(int(r.NumRows())) + for i := 0; i < int(r.NumRows()); i++ { + for colIdx, b := range recordBuilder.Fields() { + // here we read the value from the original record, + // but we the correct index and then write it to the new record + if err := builder.AppendValue(b, r.Column(colIdx), indices[i]); err != nil { + return nil, err + } + } + } + + return recordBuilder.NewRecord(), nil +} + +type orderedArray[T constraints.Ordered] interface { + Value(int) T + Len() int +} + +type orderedSorter[T constraints.Ordered] struct { + array orderedArray[T] + indices []int +} + +func (s orderedSorter[T]) Len() int { + return s.array.Len() +} + +func (s orderedSorter[T]) Less(i, j int) bool { + return s.array.Value(s.indices[i]) < s.array.Value(s.indices[j]) +} + +func (s orderedSorter[T]) Swap(i, j int) { + s.indices[i], s.indices[j] = s.indices[j], s.indices[i] +} + +type binarySort struct { + array *array.Binary + indices []int +} + +func (s binarySort) Len() int { + return s.array.Len() +} + +func (s binarySort) Less(i, j int) bool { + // we need to read the indices from the indices slice, as they might have already been swapped. + return bytes.Compare(s.array.Value(s.indices[i]), s.array.Value(s.indices[j])) == -1 +} + +func (s binarySort) Swap(i, j int) { + s.indices[i], s.indices[j] = s.indices[j], s.indices[i] +} diff --git a/pqarrow/arrowutils/sort_test.go b/pqarrow/arrowutils/sort_test.go new file mode 100644 index 000000000..8ead7897c --- /dev/null +++ b/pqarrow/arrowutils/sort_test.go @@ -0,0 +1,77 @@ +package arrowutils + +import ( + "testing" + + "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v14/arrow/array" + "github.com/apache/arrow/go/v14/arrow/memory" + "github.com/stretchr/testify/require" +) + +func TestSortRecord(t *testing.T) { + schema := arrow.NewSchema( + []arrow.Field{ + {Name: "int", Type: arrow.PrimitiveTypes.Int64}, + {Name: "string", Type: arrow.BinaryTypes.String}, + }, + nil, + ) + + mem := memory.DefaultAllocator + ib := array.NewInt64Builder(mem) + ib.Append(0) + ib.Append(3) + ib.Append(5) + ib.Append(1) + + sb := array.NewStringBuilder(mem) + sb.Append("d") + sb.Append("c") + sb.Append("b") + sb.Append("a") + + record := array.NewRecord(schema, []arrow.Array{ib.NewArray(), sb.NewArray()}, int64(4)) + + // Sort the record by the first column - int64 + { + sortedIndices, err := SortRecord(record, 0) + require.NoError(t, err) + require.Equal(t, []int{0, 3, 1, 2}, sortedIndices) + + sortedByInts, err := ReorderRecord(mem, record, sortedIndices) + require.NoError(t, err) + + // check that the column got sortedIndices + intCol := sortedByInts.Column(0).(*array.Int64) + require.Equal(t, []int64{0, 1, 3, 5}, intCol.Int64Values()) + // make sure the other column got updated too + strings := make([]string, 4) + stringCol := sortedByInts.Column(1).(*array.String) + for i := 0; i < 4; i++ { + strings[i] = stringCol.Value(i) + } + require.Equal(t, []string{"d", "a", "c", "b"}, strings) + } + + // Sort the record by the second column - string + { + sortedIndices, err := SortRecord(record, 1) + require.NoError(t, err) + require.Equal(t, []int{3, 2, 1, 0}, sortedIndices) + + sortedByStrings, err := ReorderRecord(mem, record, sortedIndices) + require.NoError(t, err) + + // check that the column got sortedByInts + intCol := sortedByStrings.Column(0).(*array.Int64) + require.Equal(t, []int64{1, 5, 3, 0}, intCol.Int64Values()) + // make sure the other column got updated too + strings := make([]string, 4) + stringCol := sortedByStrings.Column(1).(*array.String) + for i := 0; i < 4; i++ { + strings[i] = stringCol.Value(i) + } + require.Equal(t, []string{"a", "b", "c", "d"}, strings) + } +} From e61df89816c535d5e7ebb16e97a9f368917d4183 Mon Sep 17 00:00:00 2001 From: Matthias Loibl Date: Thu, 14 Dec 2023 12:23:15 +0100 Subject: [PATCH 02/11] pqarrow/arrowutils: Update SortRecord to allow for multiple sort columns This isn't implemented yet, just the function signature is future proof. --- pqarrow/arrowutils/sort.go | 7 +++++-- pqarrow/arrowutils/sort_test.go | 4 ++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/pqarrow/arrowutils/sort.go b/pqarrow/arrowutils/sort.go index 8c1a14552..8563a4d6a 100644 --- a/pqarrow/arrowutils/sort.go +++ b/pqarrow/arrowutils/sort.go @@ -14,7 +14,10 @@ import ( ) // SortRecord sorts the given record's rows by the given column. Currently only supports int64, string and binary columns. -func SortRecord(r arrow.Record, col int) ([]int, error) { +func SortRecord(r arrow.Record, cols []int) ([]int, error) { + if len(cols) > 1 { + return nil, fmt.Errorf("sorting by multiple columns isn't implemented yet") + } if r.NumRows() == 0 { return nil, nil } @@ -28,7 +31,7 @@ func SortRecord(r arrow.Record, col int) ([]int, error) { indices[i] = i } - switch c := r.Column(col).(type) { + switch c := r.Column(cols[0]).(type) { case *array.Int64: sort.Sort(orderedSorter[int64]{array: c, indices: indices}) case *array.String: diff --git a/pqarrow/arrowutils/sort_test.go b/pqarrow/arrowutils/sort_test.go index 8ead7897c..c7c7ea001 100644 --- a/pqarrow/arrowutils/sort_test.go +++ b/pqarrow/arrowutils/sort_test.go @@ -35,7 +35,7 @@ func TestSortRecord(t *testing.T) { // Sort the record by the first column - int64 { - sortedIndices, err := SortRecord(record, 0) + sortedIndices, err := SortRecord(record, []int{0}) require.NoError(t, err) require.Equal(t, []int{0, 3, 1, 2}, sortedIndices) @@ -56,7 +56,7 @@ func TestSortRecord(t *testing.T) { // Sort the record by the second column - string { - sortedIndices, err := SortRecord(record, 1) + sortedIndices, err := SortRecord(record, []int{1}) require.NoError(t, err) require.Equal(t, []int{3, 2, 1, 0}, sortedIndices) From 382716be338069b805f3918399f76a1504851e20 Mon Sep 17 00:00:00 2001 From: Matthias Loibl Date: Thu, 14 Dec 2023 16:48:55 +0100 Subject: [PATCH 03/11] pqarrow/arrowutils: Use compute.Take for ReorderRecord --- go.mod | 1 + go.sum | 4 +-- pqarrow/arrowutils/sort.go | 57 ++++++++++++++++----------------- pqarrow/arrowutils/sort_test.go | 12 +++---- 4 files changed, 36 insertions(+), 38 deletions(-) diff --git a/go.mod b/go.mod index 1f4c7b016..fdc58a5c0 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( ) require ( + github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect github.com/andybalholm/brotli v1.0.5 // indirect github.com/benbjohnson/clock v1.3.5 // indirect github.com/benbjohnson/immutable v0.4.0 // indirect diff --git a/go.sum b/go.sum index 2f84ddda6..1282c14b5 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU= +github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk= github.com/RoaringBitmap/roaring v0.9.4 h1:ckvZSX5gwCRaJYBNe7syNawCU5oruY9gQmjXlp4riwo= github.com/RoaringBitmap/roaring v0.9.4/go.mod h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= @@ -79,8 +81,6 @@ github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= -github.com/parquet-go/parquet-go v0.19.1-0.20231129084429-9010539a4f7a h1:NxS5GxNgZa5nJeLjJFidbzhwn+YuhdV5pXHtOw7VKB8= -github.com/parquet-go/parquet-go v0.19.1-0.20231129084429-9010539a4f7a/go.mod h1:4YfUo8TkoGoqwzhA/joZKZ8f77wSMShOLHESY4Ys0bY= github.com/parquet-go/parquet-go v0.20.0 h1:a6tV5XudF893P1FMuyp01zSReXbBelquKQgRxBgJ29w= github.com/parquet-go/parquet-go v0.20.0/go.mod h1:4YfUo8TkoGoqwzhA/joZKZ8f77wSMShOLHESY4Ys0bY= github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= diff --git a/pqarrow/arrowutils/sort.go b/pqarrow/arrowutils/sort.go index 8563a4d6a..c94db0e72 100644 --- a/pqarrow/arrowutils/sort.go +++ b/pqarrow/arrowutils/sort.go @@ -2,33 +2,36 @@ package arrowutils import ( "bytes" + "context" "fmt" "sort" "github.com/apache/arrow/go/v14/arrow" "github.com/apache/arrow/go/v14/arrow/array" + "github.com/apache/arrow/go/v14/arrow/compute" "github.com/apache/arrow/go/v14/arrow/memory" "golang.org/x/exp/constraints" - - "github.com/polarsignals/frostdb/pqarrow/builder" ) // SortRecord sorts the given record's rows by the given column. Currently only supports int64, string and binary columns. -func SortRecord(r arrow.Record, cols []int) ([]int, error) { +func SortRecord(mem memory.Allocator, r arrow.Record, cols []int) (*array.Int64, error) { if len(cols) > 1 { return nil, fmt.Errorf("sorting by multiple columns isn't implemented yet") } + indicesBuilder := array.NewInt64Builder(mem) + if r.NumRows() == 0 { - return nil, nil + return indicesBuilder.NewInt64Array(), nil } if r.NumRows() == 1 { - return []int{0}, nil + indicesBuilder.Append(0) + return indicesBuilder.NewInt64Array(), nil } - indices := make([]int, r.NumRows()) + indices := make([]int64, r.NumRows()) // populate indices for i := range indices { - indices[i] = i + indices[i] = int64(i) } switch c := r.Column(cols[0]).(type) { @@ -42,29 +45,23 @@ func SortRecord(r arrow.Record, cols []int) ([]int, error) { return nil, fmt.Errorf("unsupported column type for sorting %T", c) } - return indices, nil + indicesBuilder.AppendValues(indices, nil) + return indicesBuilder.NewInt64Array(), nil } // ReorderRecord reorders the given record's rows by the given indices. -func ReorderRecord(mem memory.Allocator, r arrow.Record, indices []int) (arrow.Record, error) { - // if the indices are already sorted, we can return the original record to save memory allocations - if sort.SliceIsSorted(indices, func(i, j int) bool { return indices[i] < indices[j] }) { - return r, nil +// This is a wrapper around compute.Take which handles the type castings. +func ReorderRecord(r arrow.Record, indices arrow.Array) (arrow.Record, error) { + res, err := compute.Take( + context.Background(), + *compute.DefaultTakeOptions(), + compute.NewDatum(r), + compute.NewDatum(indices), + ) + if err != nil { + return nil, err } - - recordBuilder := builder.NewRecordBuilder(mem, r.Schema()) - recordBuilder.Reserve(int(r.NumRows())) - for i := 0; i < int(r.NumRows()); i++ { - for colIdx, b := range recordBuilder.Fields() { - // here we read the value from the original record, - // but we the correct index and then write it to the new record - if err := builder.AppendValue(b, r.Column(colIdx), indices[i]); err != nil { - return nil, err - } - } - } - - return recordBuilder.NewRecord(), nil + return res.(*compute.RecordDatum).Value, nil } type orderedArray[T constraints.Ordered] interface { @@ -74,7 +71,7 @@ type orderedArray[T constraints.Ordered] interface { type orderedSorter[T constraints.Ordered] struct { array orderedArray[T] - indices []int + indices []int64 } func (s orderedSorter[T]) Len() int { @@ -82,7 +79,7 @@ func (s orderedSorter[T]) Len() int { } func (s orderedSorter[T]) Less(i, j int) bool { - return s.array.Value(s.indices[i]) < s.array.Value(s.indices[j]) + return s.array.Value(int(s.indices[i])) < s.array.Value(int(s.indices[j])) } func (s orderedSorter[T]) Swap(i, j int) { @@ -91,7 +88,7 @@ func (s orderedSorter[T]) Swap(i, j int) { type binarySort struct { array *array.Binary - indices []int + indices []int64 } func (s binarySort) Len() int { @@ -100,7 +97,7 @@ func (s binarySort) Len() int { func (s binarySort) Less(i, j int) bool { // we need to read the indices from the indices slice, as they might have already been swapped. - return bytes.Compare(s.array.Value(s.indices[i]), s.array.Value(s.indices[j])) == -1 + return bytes.Compare(s.array.Value(int(s.indices[i])), s.array.Value(int(s.indices[j]))) == -1 } func (s binarySort) Swap(i, j int) { diff --git a/pqarrow/arrowutils/sort_test.go b/pqarrow/arrowutils/sort_test.go index c7c7ea001..0516edc0a 100644 --- a/pqarrow/arrowutils/sort_test.go +++ b/pqarrow/arrowutils/sort_test.go @@ -35,11 +35,11 @@ func TestSortRecord(t *testing.T) { // Sort the record by the first column - int64 { - sortedIndices, err := SortRecord(record, []int{0}) + sortedIndices, err := SortRecord(mem, record, []int{0}) require.NoError(t, err) - require.Equal(t, []int{0, 3, 1, 2}, sortedIndices) + require.Equal(t, []int64{0, 3, 1, 2}, sortedIndices.Int64Values()) - sortedByInts, err := ReorderRecord(mem, record, sortedIndices) + sortedByInts, err := ReorderRecord(record, sortedIndices) require.NoError(t, err) // check that the column got sortedIndices @@ -56,11 +56,11 @@ func TestSortRecord(t *testing.T) { // Sort the record by the second column - string { - sortedIndices, err := SortRecord(record, []int{1}) + sortedIndices, err := SortRecord(mem, record, []int{1}) require.NoError(t, err) - require.Equal(t, []int{3, 2, 1, 0}, sortedIndices) + require.Equal(t, []int64{3, 2, 1, 0}, sortedIndices.Int64Values()) - sortedByStrings, err := ReorderRecord(mem, record, sortedIndices) + sortedByStrings, err := ReorderRecord(record, sortedIndices) require.NoError(t, err) // check that the column got sortedByInts From 1c6f0f7dd60d74fb3b0bdb5c9aa61785c6da7bd9 Mon Sep 17 00:00:00 2001 From: Matthias Loibl Date: Thu, 14 Dec 2023 18:56:19 +0100 Subject: [PATCH 04/11] pqarrow/arrowutils: Add support for sorting NULL NULL always gets sorted to the back. This seems to be the default for other language implementations. It can be made configurable in the future. --- pqarrow/arrowutils/sort.go | 19 +++++++++++++++++++ pqarrow/arrowutils/sort_test.go | 30 +++++++++++++++++------------- 2 files changed, 36 insertions(+), 13 deletions(-) diff --git a/pqarrow/arrowutils/sort.go b/pqarrow/arrowutils/sort.go index c94db0e72..cd1f52595 100644 --- a/pqarrow/arrowutils/sort.go +++ b/pqarrow/arrowutils/sort.go @@ -66,6 +66,7 @@ func ReorderRecord(r arrow.Record, indices arrow.Array) (arrow.Record, error) { type orderedArray[T constraints.Ordered] interface { Value(int) T + IsNull(int) bool Len() int } @@ -79,6 +80,15 @@ func (s orderedSorter[T]) Len() int { } func (s orderedSorter[T]) Less(i, j int) bool { + if s.array.IsNull(int(s.indices[i])) && !s.array.IsNull(int(s.indices[j])) { + return false + } + if !s.array.IsNull(int(s.indices[i])) && s.array.IsNull(int(s.indices[j])) { + return true + } + if s.array.IsNull(int(s.indices[i])) && s.array.IsNull(int(s.indices[j])) { + return false + } return s.array.Value(int(s.indices[i])) < s.array.Value(int(s.indices[j])) } @@ -96,6 +106,15 @@ func (s binarySort) Len() int { } func (s binarySort) Less(i, j int) bool { + if s.array.IsNull(int(s.indices[i])) && !s.array.IsNull(int(s.indices[j])) { + return false + } + if !s.array.IsNull(int(s.indices[i])) && s.array.IsNull(int(s.indices[j])) { + return true + } + if s.array.IsNull(int(s.indices[i])) && s.array.IsNull(int(s.indices[j])) { + return false + } // we need to read the indices from the indices slice, as they might have already been swapped. return bytes.Compare(s.array.Value(int(s.indices[i])), s.array.Value(int(s.indices[j]))) == -1 } diff --git a/pqarrow/arrowutils/sort_test.go b/pqarrow/arrowutils/sort_test.go index 0516edc0a..05dbb3ceb 100644 --- a/pqarrow/arrowutils/sort_test.go +++ b/pqarrow/arrowutils/sort_test.go @@ -21,6 +21,7 @@ func TestSortRecord(t *testing.T) { mem := memory.DefaultAllocator ib := array.NewInt64Builder(mem) ib.Append(0) + ib.AppendNull() ib.Append(3) ib.Append(5) ib.Append(1) @@ -29,49 +30,52 @@ func TestSortRecord(t *testing.T) { sb.Append("d") sb.Append("c") sb.Append("b") + sb.AppendNull() sb.Append("a") - record := array.NewRecord(schema, []arrow.Array{ib.NewArray(), sb.NewArray()}, int64(4)) + record := array.NewRecord(schema, []arrow.Array{ib.NewArray(), sb.NewArray()}, int64(5)) // Sort the record by the first column - int64 { - sortedIndices, err := SortRecord(mem, record, []int{0}) + sortedIndices, err := SortRecord(mem, record, []int{record.Schema().FieldIndices("int")[0]}) require.NoError(t, err) - require.Equal(t, []int64{0, 3, 1, 2}, sortedIndices.Int64Values()) + require.Equal(t, []int64{0, 4, 2, 3, 1}, sortedIndices.Int64Values()) sortedByInts, err := ReorderRecord(record, sortedIndices) require.NoError(t, err) // check that the column got sortedIndices intCol := sortedByInts.Column(0).(*array.Int64) - require.Equal(t, []int64{0, 1, 3, 5}, intCol.Int64Values()) + require.Equal(t, []int64{0, 1, 3, 5, 0}, intCol.Int64Values()) + require.True(t, intCol.IsNull(intCol.Len()-1)) // last is NULL // make sure the other column got updated too - strings := make([]string, 4) + strings := make([]string, sortedByInts.NumRows()) stringCol := sortedByInts.Column(1).(*array.String) - for i := 0; i < 4; i++ { + for i := 0; i < int(sortedByInts.NumRows()); i++ { strings[i] = stringCol.Value(i) } - require.Equal(t, []string{"d", "a", "c", "b"}, strings) + require.Equal(t, []string{"d", "a", "b", "", "c"}, strings) } // Sort the record by the second column - string { - sortedIndices, err := SortRecord(mem, record, []int{1}) + sortedIndices, err := SortRecord(mem, record, []int{record.Schema().FieldIndices("string")[0]}) require.NoError(t, err) - require.Equal(t, []int64{3, 2, 1, 0}, sortedIndices.Int64Values()) + require.Equal(t, []int64{4, 2, 1, 0, 3}, sortedIndices.Int64Values()) sortedByStrings, err := ReorderRecord(record, sortedIndices) require.NoError(t, err) // check that the column got sortedByInts intCol := sortedByStrings.Column(0).(*array.Int64) - require.Equal(t, []int64{1, 5, 3, 0}, intCol.Int64Values()) + require.Equal(t, []int64{1, 3, 0, 0, 5}, intCol.Int64Values()) // make sure the other column got updated too - strings := make([]string, 4) + strings := make([]string, sortedByStrings.NumRows()) stringCol := sortedByStrings.Column(1).(*array.String) - for i := 0; i < 4; i++ { + for i := 0; i < int(sortedByStrings.NumRows()); i++ { strings[i] = stringCol.Value(i) } - require.Equal(t, []string{"a", "b", "c", "d"}, strings) + require.Equal(t, []string{"a", "b", "c", "d", ""}, strings) + require.True(t, stringCol.IsNull(stringCol.Len()-1)) // last is NULL } } From 50fbae23692a5f35c65f6fc05ec53dc793174e3e Mon Sep 17 00:00:00 2001 From: Matthias Loibl Date: Fri, 15 Dec 2023 13:03:22 +0100 Subject: [PATCH 05/11] Update pqarrow/arrowutils/sort.go Co-authored-by: Geofrey Ernest --- pqarrow/arrowutils/sort.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/pqarrow/arrowutils/sort.go b/pqarrow/arrowutils/sort.go index cd1f52595..032ef53c0 100644 --- a/pqarrow/arrowutils/sort.go +++ b/pqarrow/arrowutils/sort.go @@ -80,15 +80,12 @@ func (s orderedSorter[T]) Len() int { } func (s orderedSorter[T]) Less(i, j int) bool { - if s.array.IsNull(int(s.indices[i])) && !s.array.IsNull(int(s.indices[j])) { + if s.array.IsNull(int(s.indices[i])) { return false } - if !s.array.IsNull(int(s.indices[i])) && s.array.IsNull(int(s.indices[j])) { + if s.array.IsNull(int(s.indices[j])) { return true } - if s.array.IsNull(int(s.indices[i])) && s.array.IsNull(int(s.indices[j])) { - return false - } return s.array.Value(int(s.indices[i])) < s.array.Value(int(s.indices[j])) } From 016693d3f11761500fc83cb0eefce9dae4041e1c Mon Sep 17 00:00:00 2001 From: Matthias Loibl Date: Fri, 15 Dec 2023 13:03:30 +0100 Subject: [PATCH 06/11] Update pqarrow/arrowutils/sort.go Co-authored-by: Geofrey Ernest --- pqarrow/arrowutils/sort.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/pqarrow/arrowutils/sort.go b/pqarrow/arrowutils/sort.go index 032ef53c0..8636f03f4 100644 --- a/pqarrow/arrowutils/sort.go +++ b/pqarrow/arrowutils/sort.go @@ -103,15 +103,12 @@ func (s binarySort) Len() int { } func (s binarySort) Less(i, j int) bool { - if s.array.IsNull(int(s.indices[i])) && !s.array.IsNull(int(s.indices[j])) { + if s.array.IsNull(int(s.indices[i])) { return false } - if !s.array.IsNull(int(s.indices[i])) && s.array.IsNull(int(s.indices[j])) { + if s.array.IsNull(int(s.indices[j])) { return true } - if s.array.IsNull(int(s.indices[i])) && s.array.IsNull(int(s.indices[j])) { - return false - } // we need to read the indices from the indices slice, as they might have already been swapped. return bytes.Compare(s.array.Value(int(s.indices[i])), s.array.Value(int(s.indices[j]))) == -1 } From 4388a7171737af374b08207f70a296f573be0571 Mon Sep 17 00:00:00 2001 From: Matthias Loibl Date: Fri, 15 Dec 2023 13:03:43 +0100 Subject: [PATCH 07/11] Update pqarrow/arrowutils/sort.go Co-authored-by: Geofrey Ernest --- pqarrow/arrowutils/sort.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pqarrow/arrowutils/sort.go b/pqarrow/arrowutils/sort.go index 8636f03f4..f2b7bf0b4 100644 --- a/pqarrow/arrowutils/sort.go +++ b/pqarrow/arrowutils/sort.go @@ -64,7 +64,7 @@ func ReorderRecord(r arrow.Record, indices arrow.Array) (arrow.Record, error) { return res.(*compute.RecordDatum).Value, nil } -type orderedArray[T constraints.Ordered] interface { +type orderedArray[T int64 | float64 | string] interface { Value(int) T IsNull(int) bool Len() int From 9330d14240b0801924dab9efde1bf48213202827 Mon Sep 17 00:00:00 2001 From: Matthias Loibl Date: Fri, 15 Dec 2023 13:03:49 +0100 Subject: [PATCH 08/11] Update pqarrow/arrowutils/sort.go Co-authored-by: Geofrey Ernest --- pqarrow/arrowutils/sort.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pqarrow/arrowutils/sort.go b/pqarrow/arrowutils/sort.go index f2b7bf0b4..6b7b4ce93 100644 --- a/pqarrow/arrowutils/sort.go +++ b/pqarrow/arrowutils/sort.go @@ -70,7 +70,7 @@ type orderedArray[T int64 | float64 | string] interface { Len() int } -type orderedSorter[T constraints.Ordered] struct { +type orderedSorter[T int64 | float64 | string] struct { array orderedArray[T] indices []int64 } From a8d72be24f0cfdb86aba9e1f2781311b8806dfa1 Mon Sep 17 00:00:00 2001 From: Matthias Loibl Date: Fri, 15 Dec 2023 13:04:56 +0100 Subject: [PATCH 09/11] Update pqarrow/arrowutils/sort.go Co-authored-by: Geofrey Ernest --- pqarrow/arrowutils/sort.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pqarrow/arrowutils/sort.go b/pqarrow/arrowutils/sort.go index 6b7b4ce93..9286191e0 100644 --- a/pqarrow/arrowutils/sort.go +++ b/pqarrow/arrowutils/sort.go @@ -10,7 +10,6 @@ import ( "github.com/apache/arrow/go/v14/arrow/array" "github.com/apache/arrow/go/v14/arrow/compute" "github.com/apache/arrow/go/v14/arrow/memory" - "golang.org/x/exp/constraints" ) // SortRecord sorts the given record's rows by the given column. Currently only supports int64, string and binary columns. From e715049e8a5cd6ab383486d0ac3235ea9982bf55 Mon Sep 17 00:00:00 2001 From: Matthias Loibl Date: Fri, 15 Dec 2023 13:12:37 +0100 Subject: [PATCH 10/11] pqarrow/arrowutils: Remove sorting *array.Binary This isn't properly unit tested and was more of an experiment. --- pqarrow/arrowutils/sort.go | 27 --------------------------- 1 file changed, 27 deletions(-) diff --git a/pqarrow/arrowutils/sort.go b/pqarrow/arrowutils/sort.go index 9286191e0..bee2bd4c2 100644 --- a/pqarrow/arrowutils/sort.go +++ b/pqarrow/arrowutils/sort.go @@ -1,7 +1,6 @@ package arrowutils import ( - "bytes" "context" "fmt" "sort" @@ -38,8 +37,6 @@ func SortRecord(mem memory.Allocator, r arrow.Record, cols []int) (*array.Int64, sort.Sort(orderedSorter[int64]{array: c, indices: indices}) case *array.String: sort.Sort(orderedSorter[string]{array: c, indices: indices}) - case *array.Binary: - sort.Sort(binarySort{array: c, indices: indices}) default: return nil, fmt.Errorf("unsupported column type for sorting %T", c) } @@ -91,27 +88,3 @@ func (s orderedSorter[T]) Less(i, j int) bool { func (s orderedSorter[T]) Swap(i, j int) { s.indices[i], s.indices[j] = s.indices[j], s.indices[i] } - -type binarySort struct { - array *array.Binary - indices []int64 -} - -func (s binarySort) Len() int { - return s.array.Len() -} - -func (s binarySort) Less(i, j int) bool { - if s.array.IsNull(int(s.indices[i])) { - return false - } - if s.array.IsNull(int(s.indices[j])) { - return true - } - // we need to read the indices from the indices slice, as they might have already been swapped. - return bytes.Compare(s.array.Value(int(s.indices[i])), s.array.Value(int(s.indices[j]))) == -1 -} - -func (s binarySort) Swap(i, j int) { - s.indices[i], s.indices[j] = s.indices[j], s.indices[i] -} From ae01333990ec3c61f2f8eb78f03ce730461b6d78 Mon Sep 17 00:00:00 2001 From: Matthias Loibl Date: Fri, 15 Dec 2023 16:43:46 +0100 Subject: [PATCH 11/11] pqarrow/arrowutils: Add context and reserve indices length --- pqarrow/arrowutils/sort.go | 10 +++++++--- pqarrow/arrowutils/sort_test.go | 6 ++++-- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/pqarrow/arrowutils/sort.go b/pqarrow/arrowutils/sort.go index bee2bd4c2..e4e3f7fab 100644 --- a/pqarrow/arrowutils/sort.go +++ b/pqarrow/arrowutils/sort.go @@ -41,15 +41,19 @@ func SortRecord(mem memory.Allocator, r arrow.Record, cols []int) (*array.Int64, return nil, fmt.Errorf("unsupported column type for sorting %T", c) } - indicesBuilder.AppendValues(indices, nil) + indicesBuilder.Reserve(len(indices)) + for _, i := range indices { + indicesBuilder.Append(i) + } + return indicesBuilder.NewInt64Array(), nil } // ReorderRecord reorders the given record's rows by the given indices. // This is a wrapper around compute.Take which handles the type castings. -func ReorderRecord(r arrow.Record, indices arrow.Array) (arrow.Record, error) { +func ReorderRecord(ctx context.Context, r arrow.Record, indices arrow.Array) (arrow.Record, error) { res, err := compute.Take( - context.Background(), + ctx, *compute.DefaultTakeOptions(), compute.NewDatum(r), compute.NewDatum(indices), diff --git a/pqarrow/arrowutils/sort_test.go b/pqarrow/arrowutils/sort_test.go index 05dbb3ceb..8f94586cb 100644 --- a/pqarrow/arrowutils/sort_test.go +++ b/pqarrow/arrowutils/sort_test.go @@ -1,6 +1,7 @@ package arrowutils import ( + "context" "testing" "github.com/apache/arrow/go/v14/arrow" @@ -10,6 +11,7 @@ import ( ) func TestSortRecord(t *testing.T) { + ctx := context.Background() schema := arrow.NewSchema( []arrow.Field{ {Name: "int", Type: arrow.PrimitiveTypes.Int64}, @@ -41,7 +43,7 @@ func TestSortRecord(t *testing.T) { require.NoError(t, err) require.Equal(t, []int64{0, 4, 2, 3, 1}, sortedIndices.Int64Values()) - sortedByInts, err := ReorderRecord(record, sortedIndices) + sortedByInts, err := ReorderRecord(ctx, record, sortedIndices) require.NoError(t, err) // check that the column got sortedIndices @@ -63,7 +65,7 @@ func TestSortRecord(t *testing.T) { require.NoError(t, err) require.Equal(t, []int64{4, 2, 1, 0, 3}, sortedIndices.Int64Values()) - sortedByStrings, err := ReorderRecord(record, sortedIndices) + sortedByStrings, err := ReorderRecord(ctx, record, sortedIndices) require.NoError(t, err) // check that the column got sortedByInts