From 651251746e9c1442af020bfc490874ecd0c4d8e1 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Tue, 27 Aug 2024 22:09:09 -0700 Subject: [PATCH] Add NodeReplacer for BinaryExpr where it is a Literal and VectorSelector/AggregateExpr If the query is something like `foo>1` prior to this patch we unwrap and send down to each downstream `foo` and then do the comparison (binaryExpr) in promxy. This is non-ideal as we are sending some data back that could have been dropped at the remote end. Although this is a great performance optimization we will need to be careful when adding cases to this. For BinaryExprs to be sent to each servergroup in a `valid` way the query must be "local" to a given servergroup. For example; `sum(foo)>1` would *not* be safe to send downstream as `sum(foo)` requires adding across a variety of servergroups to determine the correct value. Fixes #673 --- cmd/promxy/demo_robust.conf | 6 -- pkg/proxystorage/proxy.go | 129 +++++++++++++++++++++++++++++++++++- pkg/proxystorage/util.go | 18 +++++ 3 files changed, 146 insertions(+), 7 deletions(-) diff --git a/cmd/promxy/demo_robust.conf b/cmd/promxy/demo_robust.conf index 052976418..effb1334c 100644 --- a/cmd/promxy/demo_robust.conf +++ b/cmd/promxy/demo_robust.conf @@ -6,12 +6,6 @@ global: external_labels: source: promxy -# Rule files specifies a list of globs. Rules and alerts are read from -# all matching files. -rule_files: -- "*rule" - - ## ### Promxy configuration ## diff --git a/pkg/proxystorage/proxy.go b/pkg/proxystorage/proxy.go index 7027abe30..bb52609f8 100644 --- a/pkg/proxystorage/proxy.go +++ b/pkg/proxystorage/proxy.go @@ -321,6 +321,11 @@ func (p *ProxyStorage) NodeReplacer(ctx context.Context, s *parser.EvalStmt, nod return ok } + isBinaryExpr := func(node parser.Node) bool { + _, ok := node.(*parser.BinaryExpr) + return ok + } + isVectorSelector := func(node parser.Node) bool { _, ok := node.(*parser.VectorSelector) return ok @@ -355,7 +360,7 @@ func (p *ProxyStorage) NodeReplacer(ctx context.Context, s *parser.EvalStmt, nod if aggFinder.Found > 0 { // If there was a single agg and that was us, then we're okay - if !((isAgg(node) || isSubQuery(node)) && aggFinder.Found == 1) { + if !((isAgg(node) || isSubQuery(node) || isBinaryExpr(node)) && aggFinder.Found == 1) { return nil, nil } } @@ -399,6 +404,13 @@ func (p *ProxyStorage) NodeReplacer(ctx context.Context, s *parser.EvalStmt, nod // Some AggregateExprs can be composed (meaning they are "reentrant". If the aggregation op // is reentrant/composable then we'll do so, otherwise we let it fall through to normal query mechanisms case *parser.AggregateExpr: + // If the vector selector already has the data we can skip + if vs, ok := n.Expr.(*parser.VectorSelector); ok { + if vs.UnexpandedSeriesSet != nil { + return nil, nil + } + } + logrus.Debugf("AggregateExpr %v %s", n, n.Op) var result model.Value @@ -733,6 +745,121 @@ func (p *ProxyStorage) NodeReplacer(ctx context.Context, s *parser.EvalStmt, nod return n, nil } + // BinaryExprs *can* be sent untouched to downstreams assuming there is no actual interaction between LHS/RHS + // these are relatively rare -- as things like `sum(foo) > 2` would *not* be viable as `sum(foo)` could + // potentially require multiple servergroups to generate the correct response. + // From inspection there are only 3 specific types where this sort of replacement is "safe" (assuming one side is a literal) + // `VectorSector` + // `AggregateExpr` (Max, Min, TopK, BottomK only -- and only if re-combined) + case *parser.BinaryExpr: + logrus.Debugf("BinaryExpr: %v", n) + + // vectorBinaryExpr will send the node as a query to the downstream and return an expanded VectorSelector + vectorBinaryExpr := func(vs *parser.VectorSelector) (parser.Node, error) { + logrus.Debugf("BinaryExpr (VectorSelector + Literal): %v", n) + removeOffsetFn() + + var result model.Value + var warnings v1.Warnings + var err error + if s.Interval > 0 { + vs.LookbackDelta = s.Interval - time.Duration(1) + result, warnings, err = state.client.QueryRange(ctx, n.String(), v1.Range{ + Start: s.Start.Add(-offset), + End: s.End.Add(-offset), + Step: s.Interval, + }) + } else { + result, warnings, err = state.client.Query(ctx, n.String(), s.Start.Add(-offset)) + } + + if err != nil { + return nil, err + } + + iterators := promclient.IteratorsForValue(result) + series := make([]storage.Series, len(iterators)) + for i, iterator := range iterators { + series[i] = &proxyquerier.Series{iterator} + } + + ret := &parser.VectorSelector{OriginalOffset: offset} + if s.Interval > 0 { + ret.LookbackDelta = s.Interval - time.Duration(1) + } + ret.UnexpandedSeriesSet = proxyquerier.NewSeriesSet(series, promhttputil.WarningsConvert(warnings), err) + return ret, nil + } + + // aggregateBinaryExpr will send the node as a query to the downstream and + // replace the aggregate expr with the resulting data. This will cause the aggregation + // (min, max, topk, bottomk) to be re-run against the expression. + aggregateBinaryExpr := func(agg *parser.AggregateExpr) error { + logrus.Debugf("BinaryExpr (AggregateExpr + Literal): %v", n) + + removeOffsetFn() + + var ( + result model.Value + warnings v1.Warnings + err error + ) + + if s.Interval > 0 { + result, warnings, err = state.client.QueryRange(ctx, n.String(), v1.Range{ + Start: s.Start.Add(-offset), + End: s.End.Add(-offset), + Step: s.Interval, + }) + } else { + result, warnings, err = state.client.Query(ctx, n.String(), s.Start.Add(-offset)) + } + if err != nil { + return err + } + + iterators := promclient.IteratorsForValue(result) + + series := make([]storage.Series, len(iterators)) + for i, iterator := range iterators { + series[i] = &proxyquerier.Series{iterator} + } + + ret := &parser.VectorSelector{OriginalOffset: offset} + if s.Interval > 0 { + ret.LookbackDelta = s.Interval - time.Duration(1) + } + ret.UnexpandedSeriesSet = proxyquerier.NewSeriesSet(series, promhttputil.WarningsConvert(warnings), err) + + agg.Expr = ret + return nil + } + + // Only valid if the other side is either `NumberLiteral` or `StringLiteral` + this := n.LHS + other := n.RHS + literal := ExprIsLiteral(UnwrapExpr(this)) + if !literal { + this = n.RHS + other = n.LHS + literal = ExprIsLiteral(UnwrapExpr(this)) + } + // If one side is a literal lets check + if literal { + switch otherTyped := other.(type) { + case *parser.VectorSelector: + return vectorBinaryExpr(otherTyped) + case *parser.AggregateExpr: + switch otherTyped.Op { + case parser.MIN, parser.MAX, parser.TOPK, parser.BOTTOMK: + if err := aggregateBinaryExpr(otherTyped); err != nil { + return nil, err + } + return n, nil + } + } + } + default: logrus.Debugf("default %v %s", n, reflect.TypeOf(n)) diff --git a/pkg/proxystorage/util.go b/pkg/proxystorage/util.go index 3d1c76f24..ac538764f 100644 --- a/pkg/proxystorage/util.go +++ b/pkg/proxystorage/util.go @@ -119,3 +119,21 @@ func PreserveLabel(expr parser.Expr, srcLabel string, dstLabel string) (relabelE relabelExpress, _ = parser.ParseExpr(fmt.Sprintf("label_replace(%s,`%s`,`$1`,`%s`,`(.*)`)", expr.String(), dstLabel, srcLabel)) return relabelExpress } + +func UnwrapExpr(expr parser.Expr) parser.Expr { + switch e := expr.(type) { + case *parser.StepInvariantExpr: + return e.Expr + } + return expr +} + +func ExprIsLiteral(expr parser.Expr) bool { + switch expr.(type) { + case *parser.StringLiteral: + return true + case *parser.NumberLiteral: + return true + } + return false +}