diff --git a/Makefile b/Makefile index 2495150f2..bd17e1569 100644 --- a/Makefile +++ b/Makefile @@ -4,3 +4,6 @@ test: .PHONY: gen/proto gen/proto: buf generate + +lint: + golangci-lint --timeout=5m run --fix \ No newline at end of file diff --git a/dynparquet/example.go b/dynparquet/example.go index 0fd6ac135..55335d1d1 100644 --- a/dynparquet/example.go +++ b/dynparquet/example.go @@ -306,6 +306,21 @@ func SampleDefinition() *schemapb.Schema { } } +// Adds a float column to the SampleDefinition to be able to test +// aggregations with float values. +func SampleDefinitionWithFloat() *schemapb.Schema { + sample := SampleDefinition() + sample.Columns = append(sample.Columns, &schemapb.Column{ + Name: "floatvalue", + StorageLayout: &schemapb.StorageLayout{ + Type: schemapb.StorageLayout_TYPE_DOUBLE, + Nullable: true, + }, + Dynamic: false, + }) + return sample +} + func NewSampleSchema() *Schema { s, err := SchemaFromDefinition(SampleDefinition()) if err != nil { diff --git a/examples/aggregations/aggregations.go b/examples/aggregations/aggregations.go index 3dc63d12a..b997bfe28 100644 --- a/examples/aggregations/aggregations.go +++ b/examples/aggregations/aggregations.go @@ -37,7 +37,7 @@ func main() { type WeatherRecord struct { City interface{} Day string - Snowfall int64 + Snowfall float64 } type CityInProvince struct { @@ -58,16 +58,16 @@ func main() { WeatherRecord{Day: "Mon", Snowfall: 20, City: montreal}, WeatherRecord{Day: "Tue", Snowfall: 00, City: montreal}, WeatherRecord{Day: "Wed", Snowfall: 30, City: montreal}, - WeatherRecord{Day: "Thu", Snowfall: 25, City: montreal}, + WeatherRecord{Day: "Thu", Snowfall: 25.1, City: montreal}, WeatherRecord{Day: "Fri", Snowfall: 10, City: montreal}, WeatherRecord{Day: "Mon", Snowfall: 15, City: toronto}, WeatherRecord{Day: "Tue", Snowfall: 25, City: toronto}, WeatherRecord{Day: "Wed", Snowfall: 30, City: toronto}, WeatherRecord{Day: "Thu", Snowfall: 00, City: toronto}, WeatherRecord{Day: "Fri", Snowfall: 05, City: toronto}, - WeatherRecord{Day: "Mon", Snowfall: 40, City: minneapolis}, + WeatherRecord{Day: "Mon", Snowfall: 40.8, City: minneapolis}, WeatherRecord{Day: "Tue", Snowfall: 15, City: minneapolis}, - WeatherRecord{Day: "Wed", Snowfall: 32, City: minneapolis}, + WeatherRecord{Day: "Wed", Snowfall: 32.3, City: minneapolis}, WeatherRecord{Day: "Thu", Snowfall: 10, City: minneapolis}, WeatherRecord{Day: "Fri", Snowfall: 12, City: minneapolis}, ) @@ -130,7 +130,7 @@ func aggregationSchema() *schemapb.Schema { { Name: "snowfall", StorageLayout: &schemapb.StorageLayout{ - Type: schemapb.StorageLayout_TYPE_INT64, + Type: schemapb.StorageLayout_TYPE_DOUBLE, }, Dynamic: false, }, diff --git a/logictest/logic_test.go b/logictest/logic_test.go index 6f7db3283..1433a8e69 100644 --- a/logictest/logic_test.go +++ b/logictest/logic_test.go @@ -39,7 +39,7 @@ func (db frostDB) ScanTable(name string) query.Builder { } var schemas = map[string]*schemapb.Schema{ - "default": dynparquet.SampleDefinition(), + "default": dynparquet.SampleDefinitionWithFloat(), "simple_bool": { Name: "simple_bool", Columns: []*schemapb.Column{{ diff --git a/logictest/runner.go b/logictest/runner.go index 6c29a22fa..294acf38c 100644 --- a/logictest/runner.go +++ b/logictest/runner.go @@ -221,7 +221,15 @@ func (r *Runner) handleInsert(ctx context.Context, c *datadriven.TestData) (stri if err != nil { return "", fmt.Errorf("insert: %w", err) } - rows[i] = append(rows[i], parquet.ValueOf(v).Level(0, 0, colIdx)) + if col.StorageLayout.Optional() { + if parquet.ValueOf(v).IsNull() { + rows[i] = append(rows[i], parquet.ValueOf(v).Level(0, 0, colIdx)) + } else { + rows[i] = append(rows[i], parquet.ValueOf(v).Level(0, 1, colIdx)) + } + } else { + rows[i] = append(rows[i], parquet.ValueOf(v).Level(0, 0, colIdx)) + } colIdx++ continue } @@ -251,7 +259,6 @@ func (r *Runner) handleInsert(ctx context.Context, c *datadriven.TestData) (stri if _, err := buf.WriteRows(rows); err != nil { return "", fmt.Errorf("insert: %w", err) } - buf.Sort() // TODO: https://github.com/polarsignals/frostdb/issues/548 Should just build the arrow record directly. @@ -286,6 +293,12 @@ func stringToValue(t parquet.Type, stringValue string) (any, error) { return nil, fmt.Errorf("unexpected error converting %s to int: %w", stringValue, err) } return intValue, nil + case parquet.Double: + floatValue, err := strconv.ParseFloat(stringValue, 64) + if err != nil { + return nil, fmt.Errorf("unexpected error converting %s to float: %w", stringValue, err) + } + return floatValue, nil case parquet.Boolean: switch stringValue { case "true": @@ -401,6 +414,14 @@ func arrayToStringVals(a arrow.Array) ([]string, error) { } result[i] = strconv.Itoa(int(col.Value(i))) } + case *array.Float64: + for i := range result { + if col.IsNull(i) { + result[i] = nullString + continue + } + result[i] = fmt.Sprintf("%f", float64(col.Value(i))) + } case *array.Boolean: for i := range result { if col.IsNull(i) { diff --git a/logictest/testdata/exec/aggregate/aggregate b/logictest/testdata/exec/aggregate/aggregate index 33163e81e..87c799d9e 100644 --- a/logictest/testdata/exec/aggregate/aggregate +++ b/logictest/testdata/exec/aggregate/aggregate @@ -1,16 +1,16 @@ createtable schema=default ---- -insert cols=(labels.label1, labels.label2, labels.label3, labels.label4, stacktrace, timestamp, value) -value1 value2 null null stack1 1 1 -value2 value2 value3 null stack1 2 2 -value3 value2 null value4 stack1 3 3 +insert cols=(labels.label1, labels.label2, labels.label3, labels.label4, stacktrace, timestamp, value, floatvalue) +value1 value2 null null stack1 1 1 1.1 +value2 value2 value3 null stack1 2 2 2.2 +value3 value2 null value4 stack1 3 3 3.3 ---- -insert cols=(labels.label1, labels.label2, labels.label3, labels.label4, stacktrace, timestamp, value) -value4 value2 null null stack1 4 4 -value5 value2 value3 null stack1 5 5 -value6 value2 null value4 stack1 6 6 +insert cols=(labels.label1, labels.label2, labels.label3, labels.label4, stacktrace, timestamp, value, floatvalue) +value4 value2 null null stack1 4 4 4.4 +value5 value2 value3 null stack1 5 5 5.5 +value6 value2 null value4 stack1 6 6 6.6 ---- exec @@ -18,11 +18,26 @@ select sum(value) as value_sum group by labels.label2 ---- value2 21 +exec +select sum(floatvalue) as float_value_sum group by labels.label2 +---- +value2 23.100000 + exec select max(value) as value_max group by labels.label2 ---- value2 6 +exec +select max(floatvalue) as value_max group by labels.label2 +---- +value2 6.600000 + +exec +select min(floatvalue) as value_min group by labels.label2 +---- +value2 1.100000 + exec select count(value) as value_count group by labels.label2 ---- @@ -33,6 +48,11 @@ select avg(value) as value_avg group by labels.label2 ---- value2 3 +exec +select avg(floatvalue) as value_avg group by labels.label2 +---- +value2 3.850000 + exec select avg(value) as value_avg group by labels.label4 ---- @@ -44,6 +64,11 @@ select sum(value), count(value) group by stacktrace ---- stack1 21 6 +exec +select sum(floatvalue), count(floatvalue) group by stacktrace +---- +stack1 23.100000 6 + exec select sum(value) as value_sum, count(value) as value_count group by stacktrace ---- @@ -54,6 +79,11 @@ select sum(value), count(value), min(value), max(value) group by labels.label2 ---- value2 21 6 1 6 +exec +select sum(floatvalue), count(floatvalue), min(floatvalue), max(floatvalue) group by labels.label2 +---- +value2 23.100000 6 1.100000 6.600000 + exec unordered select sum(value) as value_sum where timestamp >= 1 group by labels.label1 ---- diff --git a/pqarrow/builder/utils.go b/pqarrow/builder/utils.go index 197bd238b..a50d2cdc4 100644 --- a/pqarrow/builder/utils.go +++ b/pqarrow/builder/utils.go @@ -65,6 +65,8 @@ func AppendValue(cb ColumnBuilder, arr arrow.Array, i int) error { b.AppendSingle(arr.(*array.Boolean).Value(i)) case *array.Int64Builder: b.Append(arr.(*array.Int64).Value(i)) + case *array.Float64Builder: + b.Append(arr.(*array.Float64).Value(i)) case *array.StringBuilder: b.Append(arr.(*array.String).Value(i)) case *array.BinaryBuilder: diff --git a/query/logicalplan/validate.go b/query/logicalplan/validate.go index 2bc4b32f6..4149adda5 100644 --- a/query/logicalplan/validate.go +++ b/query/logicalplan/validate.go @@ -224,7 +224,8 @@ func ValidateAggregationExpr(plan *LogicalPlan) *ExprValidationError { // check that the column type can be aggregated by the function type columnType := column.StorageLayout.Type() aggFuncExpr := aggFuncFinder.result.(*AggregationFunction) - if columnType.LogicalType().UTF8 != nil { + logicalType := columnType.LogicalType() + if logicalType != nil && logicalType.UTF8 != nil { switch aggFuncExpr.Func { case AggFuncSum: return &ExprValidationError{ diff --git a/query/physicalplan/aggregate.go b/query/physicalplan/aggregate.go index 19cb7ade2..67f3876c8 100644 --- a/query/physicalplan/aggregate.go +++ b/query/physicalplan/aggregate.go @@ -97,30 +97,15 @@ func Aggregate( func chooseAggregationFunction( aggFunc logicalplan.AggFunc, - dataType arrow.DataType, + _ arrow.DataType, ) (AggregationFunction, error) { switch aggFunc { case logicalplan.AggFuncSum: - switch dataType.ID() { - case arrow.INT64: - return &Int64SumAggregation{}, nil - default: - return nil, fmt.Errorf("unsupported sum of type: %s", dataType.Name()) - } + return &SumAggregation{}, nil case logicalplan.AggFuncMin: - switch dataType.ID() { - case arrow.INT64: - return &Int64MinAggregation{}, nil - default: - return nil, fmt.Errorf("unsupported min of type: %s", dataType.Name()) - } + return &MinAggregation{}, nil case logicalplan.AggFuncMax: - switch dataType.ID() { - case arrow.INT64: - return &Int64MaxAggregation{}, nil - default: - return nil, fmt.Errorf("unsupported max of type: %s", dataType.Name()) - } + return &MaxAggregation{}, nil case logicalplan.AggFuncCount: return &CountAggregation{}, nil default: @@ -672,11 +657,11 @@ func (a *HashAggregate) finishAggregate(ctx context.Context, aggIdx int, aggrega return nil } -type Int64SumAggregation struct{} +type SumAggregation struct{} -var ErrUnsupportedSumType = errors.New("unsupported type for sum aggregation, expected int64") +var ErrUnsupportedSumType = errors.New("unsupported type for sum aggregation, expected int64 or float64") -func (a *Int64SumAggregation) Aggregate(pool memory.Allocator, arrs []arrow.Array) (arrow.Array, error) { +func (a *SumAggregation) Aggregate(pool memory.Allocator, arrs []arrow.Array) (arrow.Array, error) { if len(arrs) == 0 { return array.NewInt64Builder(pool).NewArray(), nil } @@ -685,6 +670,8 @@ func (a *Int64SumAggregation) Aggregate(pool memory.Allocator, arrs []arrow.Arra switch typ { case arrow.INT64: return sumInt64arrays(pool, arrs), nil + case arrow.FLOAT64: + return sumFloat64arrays(pool, arrs), nil default: return nil, fmt.Errorf("sum array of %s: %w", typ, ErrUnsupportedSumType) } @@ -704,11 +691,25 @@ func sumInt64array(arr *array.Int64) int64 { return math.Int64.Sum(arr) } -var ErrUnsupportedMinType = errors.New("unsupported type for max aggregation, expected int64") +func sumFloat64arrays(pool memory.Allocator, arrs []arrow.Array) arrow.Array { + res := array.NewFloat64Builder(pool) + defer res.Release() + for _, arr := range arrs { + res.Append(sumFloat64array(arr.(*array.Float64))) + } + + return res.NewArray() +} + +func sumFloat64array(arr *array.Float64) float64 { + return math.Float64.Sum(arr) +} -type Int64MinAggregation struct{} +var ErrUnsupportedMinType = errors.New("unsupported type for max aggregation, expected int64 or float64") -func (a *Int64MinAggregation) Aggregate(pool memory.Allocator, arrs []arrow.Array) (arrow.Array, error) { +type MinAggregation struct{} + +func (a *MinAggregation) Aggregate(pool memory.Allocator, arrs []arrow.Array) (arrow.Array, error) { if len(arrs) == 0 { return array.NewInt64Builder(pool).NewArray(), nil } @@ -717,6 +718,8 @@ func (a *Int64MinAggregation) Aggregate(pool memory.Allocator, arrs []arrow.Arra switch typ { case arrow.INT64: return minInt64arrays(pool, arrs), nil + case arrow.FLOAT64: + return minFloat64arrays(pool, arrs), nil default: return nil, fmt.Errorf("min array of %s: %w", typ, ErrUnsupportedMinType) } @@ -752,11 +755,39 @@ func minInt64array(arr *array.Int64) int64 { return min } -type Int64MaxAggregation struct{} +func minFloat64arrays(pool memory.Allocator, arrs []arrow.Array) arrow.Array { + res := array.NewFloat64Builder(pool) + defer res.Release() + for _, arr := range arrs { + if arr.Len() == 0 { + res.AppendNull() + continue + } + res.Append(minFloat64array(arr.(*array.Float64))) + } + + return res.NewArray() +} + +// Same as minInt64array but for Float64. +func minFloat64array(arr *array.Float64) float64 { + // Note that the zero-length check must be performed before calling this + // function. + vals := arr.Float64Values() + min := vals[0] + for _, v := range vals { + if v < min { + min = v + } + } + return min +} + +type MaxAggregation struct{} -var ErrUnsupportedMaxType = errors.New("unsupported type for max aggregation, expected int64") +var ErrUnsupportedMaxType = errors.New("unsupported type for max aggregation, expected int64 or float64") -func (a *Int64MaxAggregation) Aggregate(pool memory.Allocator, arrs []arrow.Array) (arrow.Array, error) { +func (a *MaxAggregation) Aggregate(pool memory.Allocator, arrs []arrow.Array) (arrow.Array, error) { if len(arrs) == 0 { return array.NewInt64Builder(pool).NewArray(), nil } @@ -765,6 +796,8 @@ func (a *Int64MaxAggregation) Aggregate(pool memory.Allocator, arrs []arrow.Arra switch typ { case arrow.INT64: return maxInt64arrays(pool, arrs), nil + case arrow.FLOAT64: + return maxFloat64arrays(pool, arrs), nil default: return nil, fmt.Errorf("max array of %s: %w", typ, ErrUnsupportedMaxType) } @@ -800,6 +833,33 @@ func maxInt64array(arr *array.Int64) int64 { return max } +func maxFloat64arrays(pool memory.Allocator, arrs []arrow.Array) arrow.Array { + res := array.NewFloat64Builder(pool) + defer res.Release() + for _, arr := range arrs { + if arr.Len() == 0 { + res.AppendNull() + continue + } + res.Append(maxFloat64array(arr.(*array.Float64))) + } + + return res.NewArray() +} + +func maxFloat64array(arr *array.Float64) float64 { + // Note that the zero-length check must be performed before calling this + // function. + vals := arr.Float64Values() + max := vals[0] + for _, v := range vals { + if v > max { + max = v + } + } + return max +} + type CountAggregation struct{} func (a *CountAggregation) Aggregate(pool memory.Allocator, arrs []arrow.Array) (arrow.Array, error) { @@ -831,7 +891,7 @@ func runAggregation(finalStage bool, fn logicalplan.AggFunc, pool memory.Allocat if _, ok := aggFunc.(*CountAggregation); ok && finalStage { // The final stage of aggregation needs to sum up all the counts of the // previous steps, instead of counting the previous counts. - return (&Int64SumAggregation{}).Aggregate(pool, arrs) + return (&SumAggregation{}).Aggregate(pool, arrs) } return aggFunc.Aggregate(pool, arrs) } diff --git a/query/physicalplan/physicalplan.go b/query/physicalplan/physicalplan.go index aecff2a6b..3dfec0ac2 100644 --- a/query/physicalplan/physicalplan.go +++ b/query/physicalplan/physicalplan.go @@ -164,11 +164,7 @@ func (s *TableScan) Execute(ctx context.Context, pool memory.Allocator) error { })) } - if err := errg.Wait(); err != nil { - return err - } - - return nil + return errg.Wait() } type SchemaScan struct { diff --git a/query/physicalplan/project.go b/query/physicalplan/project.go index cfa24d4fb..f924dfb07 100644 --- a/query/physicalplan/project.go +++ b/query/physicalplan/project.go @@ -341,11 +341,22 @@ func (a *averageProjection) Project(mem memory.Allocator, r arrow.Record) ([]arr } // Add the field and column for the projected average aggregation. - fields = append(fields, arrow.Field{ - Name: resultName, - Type: &arrow.Int64Type{}, - }) - columns = append(columns, avgInt64arrays(mem, sums, counts)) + switch sums.DataType().ID() { + case arrow.INT64: + fields = append(fields, arrow.Field{ + Name: resultName, + Type: &arrow.Int64Type{}, + }) + columns = append(columns, avgInt64arrays(mem, sums, counts)) + case arrow.FLOAT64: + fields = append(fields, arrow.Field{ + Name: resultName, + Type: &arrow.Float64Type{}, + }) + columns = append(columns, avgFloat64arrays(mem, sums, counts)) + default: + return nil, nil, fmt.Errorf("Datatype %s is not supported for average projection", sums.DataType().ID()) + } return fields, columns, nil } @@ -363,6 +374,19 @@ func avgInt64arrays(pool memory.Allocator, sums, counts arrow.Array) arrow.Array return res.NewArray() } +func avgFloat64arrays(pool memory.Allocator, sums, counts arrow.Array) arrow.Array { + sumsFloats := sums.(*array.Float64) + countsInts := counts.(*array.Int64) + + res := array.NewFloat64Builder(pool) + defer res.Release() + for i := 0; i < sumsFloats.Len(); i++ { + res.Append(sumsFloats.Value(i) / float64(countsInts.Value(i))) + } + + return res.NewArray() +} + type allProjection struct{} func (a allProjection) Name() string { return "all" }