diff --git a/pkg/ruler/manager_metrics.go b/pkg/ruler/manager_metrics.go index 9f5a3953fea..784b07340f3 100644 --- a/pkg/ruler/manager_metrics.go +++ b/pkg/ruler/manager_metrics.go @@ -25,6 +25,7 @@ type ManagerMetrics struct { GroupInterval *prometheus.Desc GroupLastEvalTime *prometheus.Desc GroupLastDuration *prometheus.Desc + GroupLastRuleDurationSum *prometheus.Desc GroupLastRestoreDuration *prometheus.Desc GroupRules *prometheus.Desc GroupLastEvalSamples *prometheus.Desc @@ -89,6 +90,12 @@ func NewManagerMetrics(logger log.Logger) *ManagerMetrics { []string{"user", "rule_group"}, nil, ), + GroupLastRuleDurationSum: prometheus.NewDesc( + "cortex_prometheus_rule_group_last_rule_duration_sum_seconds", + "The sum of time in seconds it took to evaluate each rule in the group regardless of concurrency. This should be higher than the group duration if rules are evaluated concurrently.", + []string{"user", "rule_group"}, + nil, + ), GroupLastRestoreDuration: prometheus.NewDesc( "cortex_prometheus_rule_group_last_restore_duration_seconds", "The duration of the last alert rules alerts restoration using the `ALERTS_FOR_STATE` series across all rule groups.", @@ -131,6 +138,7 @@ func (m *ManagerMetrics) Describe(out chan<- *prometheus.Desc) { out <- m.GroupInterval out <- m.GroupLastEvalTime out <- m.GroupLastDuration + out <- m.GroupLastRuleDurationSum out <- m.GroupLastRestoreDuration out <- m.GroupRules out <- m.GroupLastEvalSamples @@ -156,6 +164,7 @@ func (m *ManagerMetrics) Collect(out chan<- prometheus.Metric) { data.SendSumOfGaugesPerTenant(out, m.GroupInterval, "prometheus_rule_group_interval_seconds", dskit_metrics.WithLabels("rule_group")) data.SendSumOfGaugesPerTenant(out, m.GroupLastEvalTime, "prometheus_rule_group_last_evaluation_timestamp_seconds", dskit_metrics.WithLabels("rule_group")) data.SendSumOfGaugesPerTenant(out, m.GroupLastDuration, "prometheus_rule_group_last_duration_seconds", dskit_metrics.WithLabels("rule_group")) + data.SendSumOfGaugesPerTenant(out, m.GroupLastRuleDurationSum, "prometheus_rule_group_last_rule_duration_sum_seconds", dskit_metrics.WithLabels("rule_group")) data.SendSumOfGaugesPerTenant(out, m.GroupRules, "prometheus_rule_group_rules", dskit_metrics.WithLabels("rule_group")) data.SendSumOfGaugesPerTenant(out, m.GroupLastEvalSamples, "prometheus_rule_group_last_evaluation_samples", dskit_metrics.WithLabels("rule_group")) } diff --git a/pkg/ruler/rule_concurrency.go b/pkg/ruler/rule_concurrency.go index 964ea9a3269..ba4a506c2a5 100644 --- a/pkg/ruler/rule_concurrency.go +++ b/pkg/ruler/rule_concurrency.go @@ -190,9 +190,19 @@ func (c *TenantConcurrencyController) Allow(_ context.Context, group *rules.Grou // isGroupAtRisk checks if the rule group's last evaluation time is within the risk threshold. func (c *TenantConcurrencyController) isGroupAtRisk(group *rules.Group) bool { interval := group.Interval().Seconds() - lastEvaluation := group.GetEvaluationTime().Seconds() + runtimeThreshold := interval * c.thresholdRuleConcurrency / 100 - return lastEvaluation >= interval*c.thresholdRuleConcurrency/100 + // If the group evaluation time is greater than the threshold, the group is at risk. + if group.GetEvaluationTime().Seconds() >= runtimeThreshold { + return true + } + + // If the total rule evaluation time is greater than the threshold, the group is at risk. + if group.GetRuleEvaluationTimeSum().Seconds() >= runtimeThreshold { + return true + } + + return false } // isRuleIndependent checks if the rule is independent of other rules. diff --git a/pkg/ruler/rule_concurrency_test.go b/pkg/ruler/rule_concurrency_test.go index ac953d2c4af..64be33715cc 100644 --- a/pkg/ruler/rule_concurrency_test.go +++ b/pkg/ruler/rule_concurrency_test.go @@ -5,6 +5,7 @@ package ruler import ( "bytes" "context" + "fmt" "testing" "time" @@ -12,8 +13,10 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/rules" + "github.com/prometheus/prometheus/util/teststorage" "github.com/stretchr/testify/require" "go.uber.org/atomic" "golang.org/x/sync/semaphore" @@ -264,11 +267,51 @@ func TestIsRuleIndependent(t *testing.T) { } func TestGroupAtRisk(t *testing.T) { - exp, err := parser.ParseExpr("vector(1)") - require.NoError(t, err) - rule1 := rules.NewRecordingRule("test", exp, labels.Labels{}) - rule1.SetNoDependencyRules(true) - rule1.SetNoDependentRules(true) + createAndEvalTestGroup := func(interval time.Duration, evalConcurrently bool) *rules.Group { + st := teststorage.New(t) + defer st.Close() + + // Create 100 rules that all take 1ms to evaluate. + var createdRules []rules.Rule + ruleCt := 100 + ruleWaitTime := 1 * time.Millisecond + for i := 0; i < ruleCt; i++ { + q, err := parser.ParseExpr("vector(1)") + require.NoError(t, err) + rule := rules.NewRecordingRule(fmt.Sprintf("test_rule%d", i), q, labels.Labels{}) + rule.SetNoDependencyRules(true) + rule.SetNoDependentRules(true) + createdRules = append(createdRules, rule) + } + + // Create the group and evaluate it + opts := rules.GroupOptions{ + Interval: interval, + Opts: &rules.ManagerOptions{ + Appendable: st, + QueryFunc: func(_ context.Context, _ string, _ time.Time) (promql.Vector, error) { + time.Sleep(ruleWaitTime) + return promql.Vector{}, nil + }, + }, + Rules: createdRules, + } + if evalConcurrently { + opts.Opts.RuleConcurrencyController = &allowAllConcurrencyController{} + } + g := rules.NewGroup(opts) + rules.DefaultEvalIterationFunc(context.Background(), g, time.Now()) + + // Sanity check that we're actually running the rules concurrently. + // The group should take less time than the sum of all rules if we're running them concurrently, more otherwise. + if evalConcurrently { + require.Less(t, g.GetEvaluationTime(), time.Duration(ruleCt)*ruleWaitTime) + } else { + require.Greater(t, g.GetEvaluationTime(), time.Duration(ruleCt)*ruleWaitTime) + } + + return g + } m := newMultiTenantConcurrencyControllerMetrics(prometheus.NewPedanticRegistry()) controller := &TenantConcurrencyController{ @@ -284,44 +327,48 @@ func TestGroupAtRisk(t *testing.T) { } tc := map[string]struct { - group *rules.Group - expected bool + groupInterval time.Duration + evalConcurrently bool + expected bool }{ "group last evaluation greater than interval": { - group: func() *rules.Group { - g := rules.NewGroup(rules.GroupOptions{ - Interval: -1 * time.Minute, - Opts: &rules.ManagerOptions{}, - }) - return g - }(), - expected: true, + // Total runtime: 100x1ms ~ 100ms (run sequentially), > 1ms -> Not at risk + groupInterval: 1 * time.Millisecond, + evalConcurrently: false, + expected: true, }, "group last evaluation less than interval": { - group: func() *rules.Group { - g := rules.NewGroup(rules.GroupOptions{ - Interval: 1 * time.Minute, - Opts: &rules.ManagerOptions{}, - }) - return g - }(), - expected: false, + // Total runtime: 100x1ms ~ 100ms (run sequentially), < 1s -> Not at risk + groupInterval: 1 * time.Second, + evalConcurrently: false, + expected: false, }, - "group last evaluation exactly at concurrency trigger threshold": { - group: func() *rules.Group { - g := rules.NewGroup(rules.GroupOptions{ - Interval: 0 * time.Minute, - Opts: &rules.ManagerOptions{}, - }) - return g - }(), - expected: true, + "group total rule evaluation duration of last evaluation greater than threshold": { + // Total runtime: 100x1ms ~ 100ms, > 50ms -> Group isn't at risk for its runtime, but it is for the sum of all rules. + groupInterval: 50 * time.Millisecond, + evalConcurrently: true, + expected: true, + }, + "group total rule evaluation duration of last evaluation less than threshold": { + // Total runtime: 100x1ms ~ 100ms, < 1s -> Not at risk + groupInterval: 1 * time.Second, + evalConcurrently: true, + expected: false, }, } for name, tt := range tc { t.Run(name, func(t *testing.T) { - require.Equal(t, tt.expected, controller.isGroupAtRisk(tt.group)) + group := createAndEvalTestGroup(tt.groupInterval, tt.evalConcurrently) + require.Equal(t, tt.expected, controller.isGroupAtRisk(group)) }) } } + +type allowAllConcurrencyController struct{} + +func (a *allowAllConcurrencyController) Allow(_ context.Context, _ *rules.Group, _ rules.Rule) bool { + return true +} + +func (a *allowAllConcurrencyController) Done(_ context.Context) {}