Skip to content

Commit

Permalink
Support nullable base types for (*Build[T])
Browse files Browse the repository at this point in the history
Before, we had `null` tag that set schema column to nullable. However the base
types allowed were `int64|float64|bool|string`.

This begged the question, what is `null`? for a `nullable` field of type
`int64` do we interpret `0` as `null`?, probably not. We were offering option
for something that was impossible to represent.

Fortunately, we can safely represent null values with out type system. For
base types   `*int64|*float64|*bool|*string` defines nullable base types.

This commit add support for both `int64|float64|bool|string` and
`*int64|*float64|*bool|*string` for base types . This works with dynamic
columns too.

With this change `null` tag is redundant so It was removed. Fields of type
`*int64|*float64|*bool|*string` will automatically generate nullable schema
 column.

NOTE: I also discovered a bug where when `T` has dynamic column and you append
multiple `T` without any dynamic column followed by T with dynamic column
caused a panic. The issue was how we adjusted dynamic columns to match current
`.Append` rows state. The fix is included in this commit because I needed this
behavior in tests.
  • Loading branch information
gernest committed Dec 17, 2023
1 parent ee6970e commit a76e291
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 56 deletions.
171 changes: 115 additions & 56 deletions dynparquet/record_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ const (
// desc | Sorts in descending order.Use desc(n) where n is an integer for sorting order
// lz4_raw | LZ4_RAW compression.
// pre_hash | Prehash the column before storing it.
// null | Nullable column.
// null_first | When used wit asc nulls are smallest and with des nulls are largest.
// zstd | ZSTD compression.
// rle_dict | Dictionary run-length encoding.
Expand Down Expand Up @@ -104,7 +103,7 @@ func NewBuild[T any](mem memory.Allocator) *Build[T] {
typ arrow.DataType
dictionary bool
preHash bool
null bool
nullable bool
sortColumn bool
nullFirst bool
sortOrder int
Expand All @@ -118,8 +117,6 @@ func NewBuild[T any](mem memory.Allocator) *Build[T] {
if tag != "" {
walkTag(tag, func(key, value string) {
switch key {
case "null":
null = true
case "null_first":
nullFirst = true
case "asc", "desc":
Expand Down Expand Up @@ -159,7 +156,6 @@ func NewBuild[T any](mem memory.Allocator) *Build[T] {
fr := &fieldRecord{
name: name,
preHash: preHash,
nullable: null,
sort: sortColumn,
sortOrder: sortOrder,
nullFirst: nullFirst,
Expand All @@ -169,6 +165,7 @@ func NewBuild[T any](mem memory.Allocator) *Build[T] {
}
fty := f.Type
for fty.Kind() == reflect.Ptr {
nullable = true
fty = fty.Elem()
}
switch fty.Kind() {
Expand All @@ -177,7 +174,12 @@ func NewBuild[T any](mem memory.Allocator) *Build[T] {
fr.typ = styp
fr.dynamic = true
fr.nullable = true
fr.build = newMapFieldBuilder(newFieldFunc(typ, mem, name))
fr.build = newMapFieldBuilder(newFieldFunc(typ, mem, name,
// Pointer base types needs to be property handled even for dynamic columns
// so map[string]string and map[string]*string should all work the same.
fty.Elem().Kind() == reflect.Ptr),
newRowsBeforeFunc(i, b.numRowsBefore),
)
case reflect.Slice:
switch {
case isUUIDSlice(fty):
Expand All @@ -195,7 +197,8 @@ func NewBuild[T any](mem memory.Allocator) *Build[T] {
case reflect.Int64, reflect.Float64, reflect.Bool, reflect.String:
typ, styp = baseType(fty, dictionary)
fr.typ = styp
fr.build = newFieldBuild(typ, mem, name, null)
fr.nullable = nullable
fr.build = newFieldBuild(typ, mem, name, nullable)
default:
panic("frostdb/dynschema: " + fty.String() + " is npt supported")
}
Expand All @@ -204,6 +207,39 @@ func NewBuild[T any](mem memory.Allocator) *Build[T] {
return b
}

// For dynamic columns we need to know the state of row counts to adjust nulls to
// match the record row count.
//
// This handles the case where a series of T without any dynamic columns is
// followed by dynamic columns.
func (b *Build[T]) numRowsBefore(fieldIdx int) int {
for i := 0; i <= len(b.fields) && i != fieldIdx; i++ {
before := i < fieldIdx
f := b.fields[i]
if f.dynamic {
// If we have dynamic columns before/after fieldIdx. We can stop looking if
// the columns were appended to.
if size := f.build.Len(); size != 0 {
if before {
// The field has already been processed. Adjust the size because we care
// about rows count before current T appending
size--
}
return size
}
continue
}
size := b.fields[i].build.Len()
if before {
// The field has already been processed. Adjust the size because we care
// about rows count before current T appending
size--
}
return size
}
return 0
}

func (b *Build[T]) Append(values ...T) error {
for _, value := range values {
v := reflect.ValueOf(value)
Expand Down Expand Up @@ -289,23 +325,31 @@ type fieldBuilder interface {
}

type mapFieldBuilder struct {
newField func(string) fieldBuilder
columns map[string]fieldBuilder
seen map[string]struct{}
keys []string
newField func(string) fieldBuilder
rowsBefore func() int
columns map[string]fieldBuilder
seen map[string]struct{}
keys []string
}

func newFieldFunc(dt arrow.DataType, mem memory.Allocator, name string) func(string) fieldBuilder {
func newFieldFunc(dt arrow.DataType, mem memory.Allocator, name string, nullable bool) func(string) fieldBuilder {
return func(s string) fieldBuilder {
return newFieldBuild(dt, mem, name+"."+s, true)
return newFieldBuild(dt, mem, name+"."+s, nullable)
}
}

func newMapFieldBuilder(newField func(string) fieldBuilder) *mapFieldBuilder {
func newRowsBeforeFunc(i int, f func(int) int) func() int {
return func() int {
return f(i)
}
}

func newMapFieldBuilder(newField func(string) fieldBuilder, rowsBefore func() int) *mapFieldBuilder {
return &mapFieldBuilder{
newField: newField,
columns: make(map[string]fieldBuilder),
seen: make(map[string]struct{}),
newField: newField,
rowsBefore: rowsBefore,
columns: make(map[string]fieldBuilder),
seen: make(map[string]struct{}),
}
}

Expand Down Expand Up @@ -358,16 +402,6 @@ func (m *mapFieldBuilder) Release() {
}

func (m *mapFieldBuilder) Append(v reflect.Value) error {
switch v.Kind() {
case reflect.Map:
return m.appendMap(v)
case reflect.Slice:
return m.appendSlice(v)
}
return nil
}

func (m *mapFieldBuilder) appendMap(v reflect.Value) error {
if v.IsNil() || v.Len() == 0 {
for _, v := range m.columns {
v.AppendNull()
Expand All @@ -377,6 +411,11 @@ func (m *mapFieldBuilder) appendMap(v reflect.Value) error {
clear(m.seen)
keys := v.MapKeys()
size := m.Len()
if size == 0 {
// Maybe we never supplied dynamic columns before but other columns were
// appended.
size = m.rowsBefore()
}
for _, key := range keys {
name := key.Interface().(string)
m.seen[name] = struct{}{}
Expand All @@ -396,35 +435,6 @@ func (m *mapFieldBuilder) appendMap(v reflect.Value) error {
return nil
}

func (m *mapFieldBuilder) appendSlice(v reflect.Value) error {
if v.IsNil() || v.Len() == 0 {
for _, v := range m.columns {
v.AppendNull()
}
return nil
}
clear(m.seen)
size := m.Len()
for n := 0; n < v.Len(); n++ {
e := v.Index(n)
name := ToSnakeCase(e.Field(0).Interface().(string))
m.seen[name] = struct{}{}
err := m.get(name, size).Append(e.Field(1))
if err != nil {
return err
}
}
for k, v := range m.columns {
_, ok := m.seen[k]
if !ok {
// All record columns must have the same length. Set columns not present in v
// to null
v.AppendNull()
}
}
return nil
}

func (m *mapFieldBuilder) Len() int {
for _, v := range m.columns {
return v.Len()
Expand Down Expand Up @@ -501,34 +511,83 @@ func newFieldBuild(dt arrow.DataType, mem memory.Allocator, name string, nullabl
switch e := b.(type) {
case *array.Int64Builder:
f.buildFunc = func(v reflect.Value) error {
if nullable {
if v.IsNil() {
e.AppendNull()
return nil
}
v = v.Elem()
}
e.Append(v.Int())
return nil
}
case *array.Int64DictionaryBuilder:
f.buildFunc = func(v reflect.Value) error {
if nullable {
if v.IsNil() {
e.AppendNull()
return nil
}
v = v.Elem()
}
return e.Append(v.Int())
}
case *array.Float64Builder:
f.buildFunc = func(v reflect.Value) error {
if nullable {
if v.IsNil() {
e.AppendNull()
return nil
}
v = v.Elem()
}
e.Append(v.Float())
return nil
}
case *array.Float64DictionaryBuilder:
f.buildFunc = func(v reflect.Value) error {
if nullable {
if v.IsNil() {
e.AppendNull()
return nil
}
v = v.Elem()
}
return e.Append(v.Float())
}
case *array.BooleanBuilder:
f.buildFunc = func(v reflect.Value) error {
if nullable {
if v.IsNil() {
e.AppendNull()
return nil
}
v = v.Elem()
}
e.Append(v.Bool())
return nil
}
case *array.StringBuilder:
f.buildFunc = func(v reflect.Value) error {
if nullable {
if v.IsNil() {
e.AppendNull()
return nil
}
v = v.Elem()
}
e.Append(v.Interface().(string))
return nil
}
case *array.BinaryDictionaryBuilder:
f.buildFunc = func(v reflect.Value) error {
if nullable {
if v.IsNil() {
e.AppendNull()
return nil
}
v = v.Elem()
}
return e.AppendString(v.Interface().(string))
}
case *array.ListBuilder:
Expand Down
38 changes: 38 additions & 0 deletions dynparquet/record_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,44 @@ func TestBuild(t *testing.T) {
})
}

func TestBuild_pointer_base_types(t *testing.T) {
type PointerBase struct {
Int *int64
Double *float64
String *string
Dynamic map[string]*string
}

b := NewBuild[PointerBase](memory.NewGoAllocator())
defer b.Release()

b.Append(

Check failure on line 146 in dynparquet/record_builder_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `b.Append` is not checked (errcheck)
PointerBase{},
PointerBase{
Int: point[int64](1),
Double: point[float64](1),
String: point[string]("1"),
Dynamic: map[string]*string{
"one": point[string]("1"),
},
},
)
r := b.NewRecord()
defer r.Release()

want := `[{"double":null,"dynamic.one":null,"int":null,"string":null}
,{"double":1,"dynamic.one":"1","int":1,"string":"1"}
]`

got, err := r.MarshalJSON()
require.Nil(t, err)
require.JSONEq(t, want, string(got))
}

func point[T any](t T) *T {
return &t
}

func BenchmarkBuild_Append_Then_NewRecord(b *testing.B) {
// The way the record builder is used consist of calling Append followed by
// NewRecord
Expand Down

0 comments on commit a76e291

Please sign in to comment.