Skip to content

Commit

Permalink
feat: initial work for aggregations with float64 (#619)
Browse files Browse the repository at this point in the history
* feat: initial work for aggregations with float64

* add remaining aggregations

* finish up tests

* fixes from lint and review
  • Loading branch information
garrensmith authored Dec 12, 2023
1 parent 9e1c181 commit 1afca81
Show file tree
Hide file tree
Showing 11 changed files with 208 additions and 56 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@ test:
.PHONY: gen/proto
gen/proto:
buf generate

lint:
golangci-lint --timeout=5m run --fix
15 changes: 15 additions & 0 deletions dynparquet/example.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,21 @@ func SampleDefinition() *schemapb.Schema {
}
}

// Adds a float column to the SampleDefinition to be able to test
// aggregations with float values.
func SampleDefinitionWithFloat() *schemapb.Schema {
sample := SampleDefinition()
sample.Columns = append(sample.Columns, &schemapb.Column{
Name: "floatvalue",
StorageLayout: &schemapb.StorageLayout{
Type: schemapb.StorageLayout_TYPE_DOUBLE,
Nullable: true,
},
Dynamic: false,
})
return sample
}

func NewSampleSchema() *Schema {
s, err := SchemaFromDefinition(SampleDefinition())
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions examples/aggregations/aggregations.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func main() {
type WeatherRecord struct {
City interface{}
Day string
Snowfall int64
Snowfall float64
}

type CityInProvince struct {
Expand All @@ -58,16 +58,16 @@ func main() {
WeatherRecord{Day: "Mon", Snowfall: 20, City: montreal},
WeatherRecord{Day: "Tue", Snowfall: 00, City: montreal},
WeatherRecord{Day: "Wed", Snowfall: 30, City: montreal},
WeatherRecord{Day: "Thu", Snowfall: 25, City: montreal},
WeatherRecord{Day: "Thu", Snowfall: 25.1, City: montreal},
WeatherRecord{Day: "Fri", Snowfall: 10, City: montreal},
WeatherRecord{Day: "Mon", Snowfall: 15, City: toronto},
WeatherRecord{Day: "Tue", Snowfall: 25, City: toronto},
WeatherRecord{Day: "Wed", Snowfall: 30, City: toronto},
WeatherRecord{Day: "Thu", Snowfall: 00, City: toronto},
WeatherRecord{Day: "Fri", Snowfall: 05, City: toronto},
WeatherRecord{Day: "Mon", Snowfall: 40, City: minneapolis},
WeatherRecord{Day: "Mon", Snowfall: 40.8, City: minneapolis},
WeatherRecord{Day: "Tue", Snowfall: 15, City: minneapolis},
WeatherRecord{Day: "Wed", Snowfall: 32, City: minneapolis},
WeatherRecord{Day: "Wed", Snowfall: 32.3, City: minneapolis},
WeatherRecord{Day: "Thu", Snowfall: 10, City: minneapolis},
WeatherRecord{Day: "Fri", Snowfall: 12, City: minneapolis},
)
Expand Down Expand Up @@ -130,7 +130,7 @@ func aggregationSchema() *schemapb.Schema {
{
Name: "snowfall",
StorageLayout: &schemapb.StorageLayout{
Type: schemapb.StorageLayout_TYPE_INT64,
Type: schemapb.StorageLayout_TYPE_DOUBLE,
},
Dynamic: false,
},
Expand Down
2 changes: 1 addition & 1 deletion logictest/logic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (db frostDB) ScanTable(name string) query.Builder {
}

var schemas = map[string]*schemapb.Schema{
"default": dynparquet.SampleDefinition(),
"default": dynparquet.SampleDefinitionWithFloat(),
"simple_bool": {
Name: "simple_bool",
Columns: []*schemapb.Column{{
Expand Down
25 changes: 23 additions & 2 deletions logictest/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,15 @@ func (r *Runner) handleInsert(ctx context.Context, c *datadriven.TestData) (stri
if err != nil {
return "", fmt.Errorf("insert: %w", err)
}
rows[i] = append(rows[i], parquet.ValueOf(v).Level(0, 0, colIdx))
if col.StorageLayout.Optional() {
if parquet.ValueOf(v).IsNull() {
rows[i] = append(rows[i], parquet.ValueOf(v).Level(0, 0, colIdx))
} else {
rows[i] = append(rows[i], parquet.ValueOf(v).Level(0, 1, colIdx))
}
} else {
rows[i] = append(rows[i], parquet.ValueOf(v).Level(0, 0, colIdx))
}
colIdx++
continue
}
Expand Down Expand Up @@ -251,7 +259,6 @@ func (r *Runner) handleInsert(ctx context.Context, c *datadriven.TestData) (stri
if _, err := buf.WriteRows(rows); err != nil {
return "", fmt.Errorf("insert: %w", err)
}

buf.Sort()

// TODO: https://github.com/polarsignals/frostdb/issues/548 Should just build the arrow record directly.
Expand Down Expand Up @@ -286,6 +293,12 @@ func stringToValue(t parquet.Type, stringValue string) (any, error) {
return nil, fmt.Errorf("unexpected error converting %s to int: %w", stringValue, err)
}
return intValue, nil
case parquet.Double:
floatValue, err := strconv.ParseFloat(stringValue, 64)
if err != nil {
return nil, fmt.Errorf("unexpected error converting %s to float: %w", stringValue, err)
}
return floatValue, nil
case parquet.Boolean:
switch stringValue {
case "true":
Expand Down Expand Up @@ -401,6 +414,14 @@ func arrayToStringVals(a arrow.Array) ([]string, error) {
}
result[i] = strconv.Itoa(int(col.Value(i)))
}
case *array.Float64:
for i := range result {
if col.IsNull(i) {
result[i] = nullString
continue
}
result[i] = fmt.Sprintf("%f", float64(col.Value(i)))
}
case *array.Boolean:
for i := range result {
if col.IsNull(i) {
Expand Down
46 changes: 38 additions & 8 deletions logictest/testdata/exec/aggregate/aggregate
Original file line number Diff line number Diff line change
@@ -1,28 +1,43 @@
createtable schema=default
----

insert cols=(labels.label1, labels.label2, labels.label3, labels.label4, stacktrace, timestamp, value)
value1 value2 null null stack1 1 1
value2 value2 value3 null stack1 2 2
value3 value2 null value4 stack1 3 3
insert cols=(labels.label1, labels.label2, labels.label3, labels.label4, stacktrace, timestamp, value, floatvalue)
value1 value2 null null stack1 1 1 1.1
value2 value2 value3 null stack1 2 2 2.2
value3 value2 null value4 stack1 3 3 3.3
----

insert cols=(labels.label1, labels.label2, labels.label3, labels.label4, stacktrace, timestamp, value)
value4 value2 null null stack1 4 4
value5 value2 value3 null stack1 5 5
value6 value2 null value4 stack1 6 6
insert cols=(labels.label1, labels.label2, labels.label3, labels.label4, stacktrace, timestamp, value, floatvalue)
value4 value2 null null stack1 4 4 4.4
value5 value2 value3 null stack1 5 5 5.5
value6 value2 null value4 stack1 6 6 6.6
----

exec
select sum(value) as value_sum group by labels.label2
----
value2 21

exec
select sum(floatvalue) as float_value_sum group by labels.label2
----
value2 23.100000

exec
select max(value) as value_max group by labels.label2
----
value2 6

exec
select max(floatvalue) as value_max group by labels.label2
----
value2 6.600000

exec
select min(floatvalue) as value_min group by labels.label2
----
value2 1.100000

exec
select count(value) as value_count group by labels.label2
----
Expand All @@ -33,6 +48,11 @@ select avg(value) as value_avg group by labels.label2
----
value2 3

exec
select avg(floatvalue) as value_avg group by labels.label2
----
value2 3.850000

exec
select avg(value) as value_avg group by labels.label4
----
Expand All @@ -44,6 +64,11 @@ select sum(value), count(value) group by stacktrace
----
stack1 21 6

exec
select sum(floatvalue), count(floatvalue) group by stacktrace
----
stack1 23.100000 6

exec
select sum(value) as value_sum, count(value) as value_count group by stacktrace
----
Expand All @@ -54,6 +79,11 @@ select sum(value), count(value), min(value), max(value) group by labels.label2
----
value2 21 6 1 6

exec
select sum(floatvalue), count(floatvalue), min(floatvalue), max(floatvalue) group by labels.label2
----
value2 23.100000 6 1.100000 6.600000

exec unordered
select sum(value) as value_sum where timestamp >= 1 group by labels.label1
----
Expand Down
2 changes: 2 additions & 0 deletions pqarrow/builder/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ func AppendValue(cb ColumnBuilder, arr arrow.Array, i int) error {
b.AppendSingle(arr.(*array.Boolean).Value(i))
case *array.Int64Builder:
b.Append(arr.(*array.Int64).Value(i))
case *array.Float64Builder:
b.Append(arr.(*array.Float64).Value(i))
case *array.StringBuilder:
b.Append(arr.(*array.String).Value(i))
case *array.BinaryBuilder:
Expand Down
3 changes: 2 additions & 1 deletion query/logicalplan/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,8 @@ func ValidateAggregationExpr(plan *LogicalPlan) *ExprValidationError {
// check that the column type can be aggregated by the function type
columnType := column.StorageLayout.Type()
aggFuncExpr := aggFuncFinder.result.(*AggregationFunction)
if columnType.LogicalType().UTF8 != nil {
logicalType := columnType.LogicalType()
if logicalType != nil && logicalType.UTF8 != nil {
switch aggFuncExpr.Func {
case AggFuncSum:
return &ExprValidationError{
Expand Down
Loading

0 comments on commit 1afca81

Please sign in to comment.