Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pqarrow/arrowutils: Add SortRecord and ReorderRecord #628

Merged
merged 12 commits into from
Dec 15, 2023
Merged
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
)

require (
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/benbjohnson/immutable v0.4.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU=
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk=
github.com/RoaringBitmap/roaring v0.9.4 h1:ckvZSX5gwCRaJYBNe7syNawCU5oruY9gQmjXlp4riwo=
github.com/RoaringBitmap/roaring v0.9.4/go.mod h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
Expand Down Expand Up @@ -79,8 +81,6 @@ github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
github.com/parquet-go/parquet-go v0.19.1-0.20231129084429-9010539a4f7a h1:NxS5GxNgZa5nJeLjJFidbzhwn+YuhdV5pXHtOw7VKB8=
github.com/parquet-go/parquet-go v0.19.1-0.20231129084429-9010539a4f7a/go.mod h1:4YfUo8TkoGoqwzhA/joZKZ8f77wSMShOLHESY4Ys0bY=
github.com/parquet-go/parquet-go v0.20.0 h1:a6tV5XudF893P1FMuyp01zSReXbBelquKQgRxBgJ29w=
github.com/parquet-go/parquet-go v0.20.0/go.mod h1:4YfUo8TkoGoqwzhA/joZKZ8f77wSMShOLHESY4Ys0bY=
github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ=
Expand Down
94 changes: 94 additions & 0 deletions pqarrow/arrowutils/sort.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package arrowutils

import (
"context"
"fmt"
"sort"

"github.com/apache/arrow/go/v14/arrow"
"github.com/apache/arrow/go/v14/arrow/array"
"github.com/apache/arrow/go/v14/arrow/compute"
"github.com/apache/arrow/go/v14/arrow/memory"
)

// SortRecord sorts the given record's rows by the given column. Currently only supports int64, string and binary columns.
func SortRecord(mem memory.Allocator, r arrow.Record, cols []int) (*array.Int64, error) {
if len(cols) > 1 {
return nil, fmt.Errorf("sorting by multiple columns isn't implemented yet")
}
indicesBuilder := array.NewInt64Builder(mem)

if r.NumRows() == 0 {
return indicesBuilder.NewInt64Array(), nil
}
if r.NumRows() == 1 {
indicesBuilder.Append(0)
return indicesBuilder.NewInt64Array(), nil
}

indices := make([]int64, r.NumRows())
metalmatze marked this conversation as resolved.
Show resolved Hide resolved
// populate indices
for i := range indices {
indices[i] = int64(i)
}

switch c := r.Column(cols[0]).(type) {
case *array.Int64:
sort.Sort(orderedSorter[int64]{array: c, indices: indices})
case *array.String:
sort.Sort(orderedSorter[string]{array: c, indices: indices})
default:
return nil, fmt.Errorf("unsupported column type for sorting %T", c)
}

indicesBuilder.Reserve(len(indices))
for _, i := range indices {
indicesBuilder.Append(i)
}

return indicesBuilder.NewInt64Array(), nil
}

// ReorderRecord reorders the given record's rows by the given indices.
// This is a wrapper around compute.Take which handles the type castings.
func ReorderRecord(ctx context.Context, r arrow.Record, indices arrow.Array) (arrow.Record, error) {
res, err := compute.Take(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, I also wanted to note, this will work majority of the time. We support dictionaries and there is no dictionary kernel for Take yet.

We need to optimise

  • If record has no dictionary column. Use compute.Take on hte record
  • If there is a dictionary column. Use compute.Take on individual non dictionary columns and fall back to manual taking for the dictionary and then assembling the record (preferably concurrently).

This can be done on a separate PR. Unless I'm mistaken, the PR just adds these functions but there is no expected call site for them in yet( There will be a need for a lot of changes for this to be used in my logictest PR).

I'm on mobile now , so I cant write what is needed for the logictest to use these functions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, that's nasty and a really good comment!
We should make sure this is supported in follow-up PRs!

ctx,
*compute.DefaultTakeOptions(),
compute.NewDatum(r),
compute.NewDatum(indices),
)
if err != nil {
return nil, err
}
return res.(*compute.RecordDatum).Value, nil
}

type orderedArray[T int64 | float64 | string] interface {
Value(int) T
IsNull(int) bool
Len() int
}

type orderedSorter[T int64 | float64 | string] struct {
array orderedArray[T]
indices []int64
}

func (s orderedSorter[T]) Len() int {
return s.array.Len()
}

func (s orderedSorter[T]) Less(i, j int) bool {
if s.array.IsNull(int(s.indices[i])) {
return false
}
if s.array.IsNull(int(s.indices[j])) {
return true
}
return s.array.Value(int(s.indices[i])) < s.array.Value(int(s.indices[j]))
}

func (s orderedSorter[T]) Swap(i, j int) {
s.indices[i], s.indices[j] = s.indices[j], s.indices[i]
}
83 changes: 83 additions & 0 deletions pqarrow/arrowutils/sort_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package arrowutils

import (
"context"
"testing"

"github.com/apache/arrow/go/v14/arrow"
"github.com/apache/arrow/go/v14/arrow/array"
"github.com/apache/arrow/go/v14/arrow/memory"
"github.com/stretchr/testify/require"
)

func TestSortRecord(t *testing.T) {
ctx := context.Background()
schema := arrow.NewSchema(
[]arrow.Field{
{Name: "int", Type: arrow.PrimitiveTypes.Int64},
{Name: "string", Type: arrow.BinaryTypes.String},
},
nil,
)

mem := memory.DefaultAllocator
ib := array.NewInt64Builder(mem)
ib.Append(0)
ib.AppendNull()
ib.Append(3)
ib.Append(5)
ib.Append(1)

sb := array.NewStringBuilder(mem)
sb.Append("d")
sb.Append("c")
sb.Append("b")
sb.AppendNull()
sb.Append("a")

record := array.NewRecord(schema, []arrow.Array{ib.NewArray(), sb.NewArray()}, int64(5))

// Sort the record by the first column - int64
{
sortedIndices, err := SortRecord(mem, record, []int{record.Schema().FieldIndices("int")[0]})
require.NoError(t, err)
require.Equal(t, []int64{0, 4, 2, 3, 1}, sortedIndices.Int64Values())

sortedByInts, err := ReorderRecord(ctx, record, sortedIndices)
require.NoError(t, err)

// check that the column got sortedIndices
intCol := sortedByInts.Column(0).(*array.Int64)
require.Equal(t, []int64{0, 1, 3, 5, 0}, intCol.Int64Values())
require.True(t, intCol.IsNull(intCol.Len()-1)) // last is NULL
// make sure the other column got updated too
strings := make([]string, sortedByInts.NumRows())
stringCol := sortedByInts.Column(1).(*array.String)
for i := 0; i < int(sortedByInts.NumRows()); i++ {
strings[i] = stringCol.Value(i)
}
require.Equal(t, []string{"d", "a", "b", "", "c"}, strings)
}

// Sort the record by the second column - string
{
sortedIndices, err := SortRecord(mem, record, []int{record.Schema().FieldIndices("string")[0]})
require.NoError(t, err)
require.Equal(t, []int64{4, 2, 1, 0, 3}, sortedIndices.Int64Values())

sortedByStrings, err := ReorderRecord(ctx, record, sortedIndices)
require.NoError(t, err)

// check that the column got sortedByInts
intCol := sortedByStrings.Column(0).(*array.Int64)
require.Equal(t, []int64{1, 3, 0, 0, 5}, intCol.Int64Values())
// make sure the other column got updated too
strings := make([]string, sortedByStrings.NumRows())
stringCol := sortedByStrings.Column(1).(*array.String)
for i := 0; i < int(sortedByStrings.NumRows()); i++ {
strings[i] = stringCol.Value(i)
}
require.Equal(t, []string{"a", "b", "c", "d", ""}, strings)
require.True(t, stringCol.IsNull(stringCol.Len()-1)) // last is NULL
}
}
Loading