Skip to content

Commit

Permalink
Add NodeReplacer for BinaryExpr where it is a Literal and
Browse files Browse the repository at this point in the history
VectorSelector/AggregateExpr

If the query is something like `foo>1` prior to this patch we unwrap and
send down to each downstream `foo` and then do the comparison
(binaryExpr) in promxy. This is non-ideal as we are sending some data
back that could have been dropped at the remote end.

Although this is a great performance optimization we will need to be
careful when adding cases to this. For BinaryExprs to be sent to
each servergroup in a `valid` way the query must be "local" to a given
servergroup. For example; `sum(foo)>1` would *not* be safe to send
downstream as `sum(foo)` requires adding across a variety of
servergroups to determine the correct value.

Fixes #673
  • Loading branch information
jacksontj committed Sep 10, 2024
1 parent ca42886 commit b0f6fb8
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 1 deletion.
129 changes: 128 additions & 1 deletion pkg/proxystorage/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,11 @@ func (p *ProxyStorage) NodeReplacer(ctx context.Context, s *parser.EvalStmt, nod
return ok
}

isBinaryExpr := func(node parser.Node) bool {
_, ok := node.(*parser.BinaryExpr)
return ok
}

isVectorSelector := func(node parser.Node) bool {
_, ok := node.(*parser.VectorSelector)
return ok
Expand Down Expand Up @@ -355,7 +360,7 @@ func (p *ProxyStorage) NodeReplacer(ctx context.Context, s *parser.EvalStmt, nod

if aggFinder.Found > 0 {
// If there was a single agg and that was us, then we're okay
if !((isAgg(node) || isSubQuery(node)) && aggFinder.Found == 1) {
if !((isAgg(node) || isSubQuery(node) || isBinaryExpr(node)) && aggFinder.Found == 1) {
return nil, nil
}
}
Expand Down Expand Up @@ -399,6 +404,13 @@ func (p *ProxyStorage) NodeReplacer(ctx context.Context, s *parser.EvalStmt, nod
// Some AggregateExprs can be composed (meaning they are "reentrant". If the aggregation op
// is reentrant/composable then we'll do so, otherwise we let it fall through to normal query mechanisms
case *parser.AggregateExpr:
// If the vector selector already has the data we can skip
if vs, ok := n.Expr.(*parser.VectorSelector); ok {
if vs.UnexpandedSeriesSet != nil {
return nil, nil
}
}

logrus.Debugf("AggregateExpr %v %s", n, n.Op)

var result model.Value
Expand Down Expand Up @@ -733,6 +745,121 @@ func (p *ProxyStorage) NodeReplacer(ctx context.Context, s *parser.EvalStmt, nod
return n, nil
}

// BinaryExprs *can* be sent untouched to downstreams assuming there is no actual interaction between LHS/RHS
// these are relatively rare -- as things like `sum(foo) > 2` would *not* be viable as `sum(foo)` could
// potentially require multiple servergroups to generate the correct response.
// From inspection there are only 3 specific types where this sort of replacement is "safe" (assuming one side is a literal)
// `VectorSector`
// `AggregateExpr` (Max, Min, TopK, BottomK only -- and only if re-combined)
case *parser.BinaryExpr:
logrus.Debugf("BinaryExpr: %v", n)

// vectorBinaryExpr will send the node as a query to the downstream and return an expanded VectorSelector
vectorBinaryExpr := func(vs *parser.VectorSelector) (parser.Node, error) {
logrus.Debugf("BinaryExpr (VectorSelector + Literal): %v", n)
removeOffsetFn()

var result model.Value
var warnings v1.Warnings
var err error
if s.Interval > 0 {
vs.LookbackDelta = s.Interval - time.Duration(1)
result, warnings, err = state.client.QueryRange(ctx, n.String(), v1.Range{
Start: s.Start.Add(-offset),
End: s.End.Add(-offset),
Step: s.Interval,
})
} else {
result, warnings, err = state.client.Query(ctx, n.String(), s.Start.Add(-offset))
}

if err != nil {
return nil, err
}

iterators := promclient.IteratorsForValue(result)
series := make([]storage.Series, len(iterators))
for i, iterator := range iterators {
series[i] = &proxyquerier.Series{iterator}
}

ret := &parser.VectorSelector{OriginalOffset: offset}
if s.Interval > 0 {
ret.LookbackDelta = s.Interval - time.Duration(1)
}
ret.UnexpandedSeriesSet = proxyquerier.NewSeriesSet(series, promhttputil.WarningsConvert(warnings), err)
return ret, nil
}

// aggregateBinaryExpr will send the node as a query to the downstream and
// replace the aggregate expr with the resulting data. This will cause the aggregation
// (min, max, topk, bottomk) to be re-run against the expression.
aggregateBinaryExpr := func(agg *parser.AggregateExpr) error {
logrus.Debugf("BinaryExpr (AggregateExpr + Literal): %v", n)

removeOffsetFn()

var (
result model.Value
warnings v1.Warnings
err error
)

if s.Interval > 0 {
result, warnings, err = state.client.QueryRange(ctx, n.String(), v1.Range{
Start: s.Start.Add(-offset),
End: s.End.Add(-offset),
Step: s.Interval,
})
} else {
result, warnings, err = state.client.Query(ctx, n.String(), s.Start.Add(-offset))
}
if err != nil {
return err
}

iterators := promclient.IteratorsForValue(result)

series := make([]storage.Series, len(iterators))
for i, iterator := range iterators {
series[i] = &proxyquerier.Series{iterator}
}

ret := &parser.VectorSelector{OriginalOffset: offset}
if s.Interval > 0 {
ret.LookbackDelta = s.Interval - time.Duration(1)
}
ret.UnexpandedSeriesSet = proxyquerier.NewSeriesSet(series, promhttputil.WarningsConvert(warnings), err)

agg.Expr = ret
return nil
}

// Only valid if the other side is either `NumberLiteral` or `StringLiteral`
this := n.LHS
other := n.RHS
literal := ExprIsLiteral(UnwrapExpr(this))
if !literal {
this = n.RHS
other = n.LHS
literal = ExprIsLiteral(UnwrapExpr(this))
}
// If one side is a literal lets check
if literal {
switch otherTyped := other.(type) {
case *parser.VectorSelector:
return vectorBinaryExpr(otherTyped)
case *parser.AggregateExpr:
switch otherTyped.Op {
case parser.MIN, parser.MAX, parser.TOPK, parser.BOTTOMK:
if err := aggregateBinaryExpr(otherTyped); err != nil {
return nil, err
}
return n, nil
}
}
}

default:
logrus.Debugf("default %v %s", n, reflect.TypeOf(n))

Expand Down
18 changes: 18 additions & 0 deletions pkg/proxystorage/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,21 @@ func PreserveLabel(expr parser.Expr, srcLabel string, dstLabel string) (relabelE
relabelExpress, _ = parser.ParseExpr(fmt.Sprintf("label_replace(%s,`%s`,`$1`,`%s`,`(.*)`)", expr.String(), dstLabel, srcLabel))
return relabelExpress
}

func UnwrapExpr(expr parser.Expr) parser.Expr {
switch e := expr.(type) {
case *parser.StepInvariantExpr:
return e.Expr
}
return expr
}

func ExprIsLiteral(expr parser.Expr) bool {
switch expr.(type) {
case *parser.StringLiteral:
return true
case *parser.NumberLiteral:
return true
}
return false
}

0 comments on commit b0f6fb8

Please sign in to comment.