Skip to content

Commit

Permalink
Rule Concurrency: Prevent flapping of concurrency (#10189)
Browse files Browse the repository at this point in the history
* Rule Concurrency: Prevent flapping of concurrency

Iterates on #8146

The `isGroupAtRisk` function only uses the group's last evaluation time as a metric
However, if the concurrency of the group causes the group's eval time to lower to less than the threshold, this will flap between enabling concurrency and disabling it on every run

In this PR, a condition is added to also sum up the last evaluation time of each rule to compare against the threshold

* Linting

* Use the new `evaluationRuleTimeSum` field from the group

* Linting

* Add changelog + metric

* Apply suggestions from code review

Co-authored-by: gotjosh <[email protected]>

* Unrevert crypto

* Fix typo in changelog

---------

Co-authored-by: gotjosh <[email protected]>
  • Loading branch information
julienduchesne and gotjosh committed Dec 17, 2024
1 parent 8136212 commit 8fc97dd
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 35 deletions.
9 changes: 9 additions & 0 deletions pkg/ruler/manager_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type ManagerMetrics struct {
GroupInterval *prometheus.Desc
GroupLastEvalTime *prometheus.Desc
GroupLastDuration *prometheus.Desc
GroupLastRuleDurationSum *prometheus.Desc
GroupLastRestoreDuration *prometheus.Desc
GroupRules *prometheus.Desc
GroupLastEvalSamples *prometheus.Desc
Expand Down Expand Up @@ -89,6 +90,12 @@ func NewManagerMetrics(logger log.Logger) *ManagerMetrics {
[]string{"user", "rule_group"},
nil,
),
GroupLastRuleDurationSum: prometheus.NewDesc(
"cortex_prometheus_rule_group_last_rule_duration_sum_seconds",
"The sum of time in seconds it took to evaluate each rule in the group regardless of concurrency. This should be higher than the group duration if rules are evaluated concurrently.",
[]string{"user", "rule_group"},
nil,
),
GroupLastRestoreDuration: prometheus.NewDesc(
"cortex_prometheus_rule_group_last_restore_duration_seconds",
"The duration of the last alert rules alerts restoration using the `ALERTS_FOR_STATE` series across all rule groups.",
Expand Down Expand Up @@ -131,6 +138,7 @@ func (m *ManagerMetrics) Describe(out chan<- *prometheus.Desc) {
out <- m.GroupInterval
out <- m.GroupLastEvalTime
out <- m.GroupLastDuration
out <- m.GroupLastRuleDurationSum
out <- m.GroupLastRestoreDuration
out <- m.GroupRules
out <- m.GroupLastEvalSamples
Expand All @@ -156,6 +164,7 @@ func (m *ManagerMetrics) Collect(out chan<- prometheus.Metric) {
data.SendSumOfGaugesPerTenant(out, m.GroupInterval, "prometheus_rule_group_interval_seconds", dskit_metrics.WithLabels("rule_group"))
data.SendSumOfGaugesPerTenant(out, m.GroupLastEvalTime, "prometheus_rule_group_last_evaluation_timestamp_seconds", dskit_metrics.WithLabels("rule_group"))
data.SendSumOfGaugesPerTenant(out, m.GroupLastDuration, "prometheus_rule_group_last_duration_seconds", dskit_metrics.WithLabels("rule_group"))
data.SendSumOfGaugesPerTenant(out, m.GroupLastRuleDurationSum, "prometheus_rule_group_last_rule_duration_sum_seconds", dskit_metrics.WithLabels("rule_group"))
data.SendSumOfGaugesPerTenant(out, m.GroupRules, "prometheus_rule_group_rules", dskit_metrics.WithLabels("rule_group"))
data.SendSumOfGaugesPerTenant(out, m.GroupLastEvalSamples, "prometheus_rule_group_last_evaluation_samples", dskit_metrics.WithLabels("rule_group"))
}
14 changes: 12 additions & 2 deletions pkg/ruler/rule_concurrency.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,19 @@ func (c *TenantConcurrencyController) Allow(_ context.Context, group *rules.Grou
// isGroupAtRisk checks if the rule group's last evaluation time is within the risk threshold.
func (c *TenantConcurrencyController) isGroupAtRisk(group *rules.Group) bool {
interval := group.Interval().Seconds()
lastEvaluation := group.GetEvaluationTime().Seconds()
runtimeThreshold := interval * c.thresholdRuleConcurrency / 100

return lastEvaluation >= interval*c.thresholdRuleConcurrency/100
// If the group evaluation time is greater than the threshold, the group is at risk.
if group.GetEvaluationTime().Seconds() >= runtimeThreshold {
return true
}

// If the total rule evaluation time is greater than the threshold, the group is at risk.
if group.GetRuleEvaluationTimeSum().Seconds() >= runtimeThreshold {
return true
}

return false
}

// isRuleIndependent checks if the rule is independent of other rules.
Expand Down
113 changes: 80 additions & 33 deletions pkg/ruler/rule_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@ package ruler
import (
"bytes"
"context"
"fmt"
"testing"
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/util/teststorage"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"golang.org/x/sync/semaphore"
Expand Down Expand Up @@ -264,11 +267,51 @@ func TestIsRuleIndependent(t *testing.T) {
}

func TestGroupAtRisk(t *testing.T) {
exp, err := parser.ParseExpr("vector(1)")
require.NoError(t, err)
rule1 := rules.NewRecordingRule("test", exp, labels.Labels{})
rule1.SetNoDependencyRules(true)
rule1.SetNoDependentRules(true)
createAndEvalTestGroup := func(interval time.Duration, evalConcurrently bool) *rules.Group {
st := teststorage.New(t)
defer st.Close()

// Create 100 rules that all take 1ms to evaluate.
var createdRules []rules.Rule
ruleCt := 100
ruleWaitTime := 1 * time.Millisecond
for i := 0; i < ruleCt; i++ {
q, err := parser.ParseExpr("vector(1)")
require.NoError(t, err)
rule := rules.NewRecordingRule(fmt.Sprintf("test_rule%d", i), q, labels.Labels{})
rule.SetNoDependencyRules(true)
rule.SetNoDependentRules(true)
createdRules = append(createdRules, rule)
}

// Create the group and evaluate it
opts := rules.GroupOptions{
Interval: interval,
Opts: &rules.ManagerOptions{
Appendable: st,
QueryFunc: func(_ context.Context, _ string, _ time.Time) (promql.Vector, error) {
time.Sleep(ruleWaitTime)
return promql.Vector{}, nil
},
},
Rules: createdRules,
}
if evalConcurrently {
opts.Opts.RuleConcurrencyController = &allowAllConcurrencyController{}
}
g := rules.NewGroup(opts)
rules.DefaultEvalIterationFunc(context.Background(), g, time.Now())

// Sanity check that we're actually running the rules concurrently.
// The group should take less time than the sum of all rules if we're running them concurrently, more otherwise.
if evalConcurrently {
require.Less(t, g.GetEvaluationTime(), time.Duration(ruleCt)*ruleWaitTime)
} else {
require.Greater(t, g.GetEvaluationTime(), time.Duration(ruleCt)*ruleWaitTime)
}

return g
}

m := newMultiTenantConcurrencyControllerMetrics(prometheus.NewPedanticRegistry())
controller := &TenantConcurrencyController{
Expand All @@ -284,44 +327,48 @@ func TestGroupAtRisk(t *testing.T) {
}

tc := map[string]struct {
group *rules.Group
expected bool
groupInterval time.Duration
evalConcurrently bool
expected bool
}{
"group last evaluation greater than interval": {
group: func() *rules.Group {
g := rules.NewGroup(rules.GroupOptions{
Interval: -1 * time.Minute,
Opts: &rules.ManagerOptions{},
})
return g
}(),
expected: true,
// Total runtime: 100x1ms ~ 100ms (run sequentially), > 1ms -> Not at risk
groupInterval: 1 * time.Millisecond,
evalConcurrently: false,
expected: true,
},
"group last evaluation less than interval": {
group: func() *rules.Group {
g := rules.NewGroup(rules.GroupOptions{
Interval: 1 * time.Minute,
Opts: &rules.ManagerOptions{},
})
return g
}(),
expected: false,
// Total runtime: 100x1ms ~ 100ms (run sequentially), < 1s -> Not at risk
groupInterval: 1 * time.Second,
evalConcurrently: false,
expected: false,
},
"group last evaluation exactly at concurrency trigger threshold": {
group: func() *rules.Group {
g := rules.NewGroup(rules.GroupOptions{
Interval: 0 * time.Minute,
Opts: &rules.ManagerOptions{},
})
return g
}(),
expected: true,
"group total rule evaluation duration of last evaluation greater than threshold": {
// Total runtime: 100x1ms ~ 100ms, > 50ms -> Group isn't at risk for its runtime, but it is for the sum of all rules.
groupInterval: 50 * time.Millisecond,
evalConcurrently: true,
expected: true,
},
"group total rule evaluation duration of last evaluation less than threshold": {
// Total runtime: 100x1ms ~ 100ms, < 1s -> Not at risk
groupInterval: 1 * time.Second,
evalConcurrently: true,
expected: false,
},
}

for name, tt := range tc {
t.Run(name, func(t *testing.T) {
require.Equal(t, tt.expected, controller.isGroupAtRisk(tt.group))
group := createAndEvalTestGroup(tt.groupInterval, tt.evalConcurrently)
require.Equal(t, tt.expected, controller.isGroupAtRisk(group))
})
}
}

type allowAllConcurrencyController struct{}

func (a *allowAllConcurrencyController) Allow(_ context.Context, _ *rules.Group, _ rules.Rule) bool {
return true
}

func (a *allowAllConcurrencyController) Done(_ context.Context) {}

0 comments on commit 8fc97dd

Please sign in to comment.