Skip to content

Commit

Permalink
Revert "Fix query concurrency (#861)" (#885)
Browse files Browse the repository at this point in the history
This reverts commit 7697384.
  • Loading branch information
brancz authored May 30, 2024
1 parent d04bda2 commit 7269cf6
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 97 deletions.
119 changes: 51 additions & 68 deletions query/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,83 +12,66 @@ import (
"github.com/polarsignals/frostdb/dynparquet"
schemapb "github.com/polarsignals/frostdb/gen/proto/go/frostdb/schema/v1alpha1"
"github.com/polarsignals/frostdb/query/logicalplan"
"github.com/polarsignals/frostdb/query/physicalplan"
)

func TestUniqueAggregation(t *testing.T) {
tests := map[string]struct {
execOptions []physicalplan.Option
}{
"no concurrency": {
execOptions: []physicalplan.Option{
physicalplan.WithConcurrency(1),
},
},
"default": {
execOptions: []physicalplan.Option{},
},
}
for name, test := range tests {
t.Run(name, func(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer mem.AssertSize(t, 0)
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer mem.AssertSize(t, 0)

schema, err := dynparquet.SchemaFromDefinition(&schemapb.Schema{
Name: "test",
Columns: []*schemapb.Column{{
Name: "example",
StorageLayout: &schemapb.StorageLayout{
Type: schemapb.StorageLayout_TYPE_INT64,
},
}, {
Name: "timestamp",
StorageLayout: &schemapb.StorageLayout{
Type: schemapb.StorageLayout_TYPE_INT64,
},
}},
})
require.NoError(t, err)
schema, err := dynparquet.SchemaFromDefinition(&schemapb.Schema{
Name: "test",
Columns: []*schemapb.Column{{
Name: "example",
StorageLayout: &schemapb.StorageLayout{
Type: schemapb.StorageLayout_TYPE_INT64,
},
}, {
Name: "timestamp",
StorageLayout: &schemapb.StorageLayout{
Type: schemapb.StorageLayout_TYPE_INT64,
},
}},
})
require.NoError(t, err)

rb := array.NewRecordBuilder(mem, arrow.NewSchema([]arrow.Field{{
Name: "example",
Type: arrow.PrimitiveTypes.Int64,
}, {
Name: "timestamp",
Type: arrow.PrimitiveTypes.Int64,
}}, nil))
defer rb.Release()
rb := array.NewRecordBuilder(mem, arrow.NewSchema([]arrow.Field{{
Name: "example",
Type: arrow.PrimitiveTypes.Int64,
}, {
Name: "timestamp",
Type: arrow.PrimitiveTypes.Int64,
}}, nil))
defer rb.Release()

rb.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil)
rb.Field(1).(*array.Int64Builder).AppendValues([]int64{1, 1, 3}, nil)
rb.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil)
rb.Field(1).(*array.Int64Builder).AppendValues([]int64{1, 1, 3}, nil)

r := rb.NewRecord()
defer r.Release()
r := rb.NewRecord()
defer r.Release()

ran := false
err = NewEngine(mem, &FakeTableProvider{
Tables: map[string]logicalplan.TableReader{
"test": &FakeTableReader{
FrostdbSchema: schema,
Records: []arrow.Record{r},
},
},
}, WithPhysicalplanOptions(test.execOptions...)).ScanTable("test").
Aggregate(
[]*logicalplan.AggregationFunction{logicalplan.Unique(logicalplan.Col("example"))},
[]logicalplan.Expr{logicalplan.Col("timestamp")},
).
Execute(context.Background(), func(ctx context.Context, r arrow.Record) error {
require.Equal(t, []int64{1, 3}, r.Column(0).(*array.Int64).Int64Values())
require.True(t, r.Column(1).(*array.Int64).IsNull(0))
require.True(t, r.Column(1).(*array.Int64).IsValid(1))
require.Equal(t, int64(3), r.Column(1).(*array.Int64).Value(1))
ran = true
return nil
})
require.NoError(t, err)
require.True(t, ran)
ran := false
err = NewEngine(mem, &FakeTableProvider{
Tables: map[string]logicalplan.TableReader{
"test": &FakeTableReader{
FrostdbSchema: schema,
Records: []arrow.Record{r},
},
},
}).ScanTable("test").
Aggregate(
[]*logicalplan.AggregationFunction{logicalplan.Unique(logicalplan.Col("example"))},
[]logicalplan.Expr{logicalplan.Col("timestamp")},
).
Execute(context.Background(), func(ctx context.Context, r arrow.Record) error {
require.Equal(t, []int64{1, 3}, r.Column(0).(*array.Int64).Int64Values())
require.True(t, r.Column(1).(*array.Int64).IsNull(0))
require.True(t, r.Column(1).(*array.Int64).IsValid(1))
require.Equal(t, int64(3), r.Column(1).(*array.Int64).Value(1))
ran = true
return nil
})
}
require.NoError(t, err)
require.True(t, ran)
}

func TestAndAggregation(t *testing.T) {
Expand Down
47 changes: 18 additions & 29 deletions query/physicalplan/physicalplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import (
"github.com/polarsignals/frostdb/recovery"
)

var defaultConcurrency = runtime.GOMAXPROCS(0)
// TODO: Make this smarter.
var concurrencyHardcoded = runtime.GOMAXPROCS(0)

type PhysicalPlan interface {
Callback(ctx context.Context, r arrow.Record) error
Expand Down Expand Up @@ -254,47 +255,34 @@ func (p *noopOperator) Draw() *Diagram {
return p.next.Draw()
}

type ExecOptions struct {
type execOptions struct {
orderedAggregations bool
overrideInput []PhysicalPlan
readMode logicalplan.ReadMode
concurrency int
}

func NewExecOptions() ExecOptions {
return ExecOptions{
concurrency: defaultConcurrency,
}
}

type Option func(o *ExecOptions)
type Option func(o *execOptions)

func WithReadMode(m logicalplan.ReadMode) Option {
return func(o *ExecOptions) {
return func(o *execOptions) {
o.readMode = m
}
}

func WithOrderedAggregations() Option {
return func(o *ExecOptions) {
return func(o *execOptions) {
o.orderedAggregations = true
}
}

// WithOverrideInput can be used to provide an input stage on top of which the
// Build function can build the physical plan.
func WithOverrideInput(input []PhysicalPlan) Option {
return func(o *ExecOptions) {
return func(o *execOptions) {
o.overrideInput = input
}
}

func WithConcurrency(concurrency int) Option {
return func(o *ExecOptions) {
o.concurrency = concurrency
}
}

func Build(
ctx context.Context,
pool memory.Allocator,
Expand All @@ -306,7 +294,7 @@ func Build(
_, span := tracer.Start(ctx, "PhysicalPlan/Build")
defer span.End()

execOpts := NewExecOptions()
execOpts := execOptions{}
for _, o := range options {
o(&execOpts)
}
Expand All @@ -330,7 +318,7 @@ func Build(
// Create noop operators since we don't know what to push the scan
// results to. In a following node visit, these noops will have
// SetNext called on them and push to the correct operator.
plans := make([]PhysicalPlan, execOpts.concurrency)
plans := make([]PhysicalPlan, concurrencyHardcoded)
for i := range plans {
plans[i] = &noopOperator{}
}
Expand All @@ -345,7 +333,7 @@ func Build(
// Create noop operators since we don't know what to push the scan
// results to. In a following node visit, these noops will have
// SetNext called on them and push to the correct operator.
plans := make([]PhysicalPlan, execOpts.concurrency)
plans := make([]PhysicalPlan, concurrencyHardcoded)
for i := range plans {
plans[i] = &noopOperator{}
}
Expand Down Expand Up @@ -447,12 +435,13 @@ func Build(
ordered = false
}
var sync PhysicalPlan
// These aggregate operators need to be synchronized.
// NOTE: that in the case of concurrency 1 we still add a syncronizer because the Aggregation operator expects a final aggregation to be performed.
if ordered && len(plan.Aggregation.GroupExprs) > 0 {
sync = NewOrderedSynchronizer(pool, len(prev), plan.Aggregation.GroupExprs)
} else {
sync = Synchronize(len(prev))
if len(prev) > 1 {
// These aggregate operators need to be synchronized.
if ordered && len(plan.Aggregation.GroupExprs) > 0 {
sync = NewOrderedSynchronizer(pool, len(prev), plan.Aggregation.GroupExprs)
} else {
sync = Synchronize(len(prev))
}
}
seed := maphash.MakeSeed()
for i := 0; i < len(prev); i++ {
Expand Down Expand Up @@ -511,7 +500,7 @@ func Build(
}

func shouldPlanOrderedAggregate(
execOpts ExecOptions, oInfo *planOrderingInfo, agg *logicalplan.Aggregation,
execOpts execOptions, oInfo *planOrderingInfo, agg *logicalplan.Aggregation,
) (bool, error) {
if !execOpts.orderedAggregations {
// Ordered aggregations disabled.
Expand Down

0 comments on commit 7269cf6

Please sign in to comment.