Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to latest mimir-prometheus #10400

Merged
merged 8 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ require (
sigs.k8s.io/yaml v1.4.0 // indirect
)

replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20250109135143-114aaaadc203
replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20250110020350-a1e2bcf4a615

// Replace memberlist with our fork which includes some fixes that haven't been
// merged upstream yet:
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1283,8 +1283,8 @@ github.com/grafana/gomemcache v0.0.0-20241016125027-0a5bcc5aef40 h1:1TeKhyS+pvzO
github.com/grafana/gomemcache v0.0.0-20241016125027-0a5bcc5aef40/go.mod h1:IGRj8oOoxwJbHBYl1+OhS9UjQR0dv6SQOep7HqmtyFU=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
github.com/grafana/mimir-prometheus v0.0.0-20250109135143-114aaaadc203 h1:gCU3GO2mZUzsLAa/JRRDJpKbYhkXy7caWnzfNqbgDig=
github.com/grafana/mimir-prometheus v0.0.0-20250109135143-114aaaadc203/go.mod h1:KfyZCeyGxf5gvl6VZbrQsd400nJjGw+ygMEtDVZKIT4=
github.com/grafana/mimir-prometheus v0.0.0-20250110020350-a1e2bcf4a615 h1:lr3wUcXU0mScCDn/4NXc0CYglZJfy5l35sOJFar9qE0=
github.com/grafana/mimir-prometheus v0.0.0-20250110020350-a1e2bcf4a615/go.mod h1:KfyZCeyGxf5gvl6VZbrQsd400nJjGw+ygMEtDVZKIT4=
github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956 h1:em1oddjXL8c1tL0iFdtVtPloq2hRPen2MJQKoAWpxu0=
github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956/go.mod h1:qtI1ogk+2JhVPIXVc6q+NHziSmy2W5GbdQZFUHADCBU=
github.com/grafana/prometheus-alertmanager v0.25.1-0.20240930132144-b5e64e81e8d3 h1:6D2gGAwyQBElSrp3E+9lSr7k8gLuP3Aiy20rweLWeBw=
Expand Down
22 changes: 22 additions & 0 deletions pkg/ruler/fixtures/rules_chain.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
groups:
- name: chain
rules:
# Evaluated concurrently, no dependencies
- record: job:http_requests:rate1m
expr: sum by (job)(rate(http_requests_total[1m]))
- record: job:http_requests:rate5m
expr: sum by (job)(rate(http_requests_total[1m]))

# Evaluated sequentially, dependents and dependencies
- record: job1:http_requests:rate1m
expr: job:http_requests:rate1m{job="job1"}
- record: job1_cluster1:http_requests:rate1m
expr: job1:http_requests:rate1m{cluster="cluster1"}

# Evaluated concurrently, no dependents
- record: job1_cluster2:http_requests:rate1m
expr: job1:http_requests:rate1m{cluster="cluster2"}
- record: job1_cluster1_namespace1:http_requests:rate1m
expr: job1_cluster1:http_requests:rate1m{namespace="namespace1"}
- record: job1_cluster1_namespace2:http_requests:rate1m
expr: job1_cluster1:http_requests:rate1m{namespace="namespace2"}
18 changes: 18 additions & 0 deletions pkg/ruler/fixtures/rules_indeterminates.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
groups:
- name: indeterminate
rules:
# This shouldn't run in parallel because of the open matcher
- record: job:http_requests:rate1m
expr: sum by (job)(rate(http_requests_total[1m]))
- record: job:http_requests:rate5m
expr: sum by (job)(rate(http_requests_total[5m]))
- record: job:http_requests:rate15m
expr: sum by (job)(rate(http_requests_total[15m]))
- record: job:http_requests:rate30m
expr: sum by (job)(rate(http_requests_total[30m]))
- record: job:http_requests:rate1h
expr: sum by (job)(rate(http_requests_total[1h]))
- record: job:http_requests:rate2h
expr: sum by (job)(rate(http_requests_total[2h]))
- record: matcher
expr: '{job="job1"}'
15 changes: 15 additions & 0 deletions pkg/ruler/fixtures/rules_multiple_independent.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
groups:
- name: independents
rules:
- record: job:http_requests:rate1m
expr: sum by (job)(rate(http_requests_total[1m]))
- record: job:http_requests:rate5m
expr: sum by (job)(rate(http_requests_total[5m]))
- record: job:http_requests:rate15m
expr: sum by (job)(rate(http_requests_total[15m]))
- record: job:http_requests:rate30m
expr: sum by (job)(rate(http_requests_total[30m]))
- record: job:http_requests:rate1h
expr: sum by (job)(rate(http_requests_total[1h]))
- record: job:http_requests:rate2h
expr: sum by (job)(rate(http_requests_total[2h]))
245 changes: 245 additions & 0 deletions pkg/ruler/fixtures/rules_topological_sort_needed.json

Large diffs are not rendered by default.

107 changes: 89 additions & 18 deletions pkg/ruler/rule_concurrency.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/rules"
Expand Down Expand Up @@ -115,6 +116,7 @@ func NewMultiTenantConcurrencyController(logger log.Logger, maxGlobalConcurrency
// NewTenantConcurrencyControllerFor returns a new rules.RuleConcurrencyController to use for the input tenantID.
func (c *MultiTenantConcurrencyController) NewTenantConcurrencyControllerFor(tenantID string) rules.RuleConcurrencyController {
return &TenantConcurrencyController{
logger: log.With(c.logger, "tenant", tenantID),
slotsInUse: c.metrics.SlotsInUse.WithLabelValues(tenantID),
attemptsStartedTotal: c.metrics.AttemptsStartedTotal.WithLabelValues(tenantID),
attemptsIncompleteTotal: c.metrics.AttemptsIncompleteTotal.WithLabelValues(tenantID),
Expand All @@ -132,6 +134,7 @@ func (c *MultiTenantConcurrencyController) NewTenantConcurrencyControllerFor(ten
// TenantConcurrencyController is a concurrency controller that limits the number of concurrent rule evaluations per tenant.
// It also takes into account the global concurrency limit.
type TenantConcurrencyController struct {
logger log.Logger
tenantID string
thresholdRuleConcurrency float64 // Percentage of the rule interval at which we consider the rule group at risk of missing its evaluation.

Expand All @@ -155,19 +158,7 @@ func (c *TenantConcurrencyController) Done(_ context.Context) {
}

// Allow tries to acquire a slot from the concurrency controller.
func (c *TenantConcurrencyController) Allow(_ context.Context, group *rules.Group, rule rules.Rule) bool {
// To allow a rule to be executed concurrently, we need 3 conditions:
// 1. The rule group must be at risk of missing its evaluation.
// 2. The rule must not have any rules that depend on it.
// 3. The rule itself must not depend on any other rules.
if !c.isGroupAtRisk(group) {
return false
}

if !isRuleIndependent(rule) {
return false
}

func (c *TenantConcurrencyController) Allow(_ context.Context, _ *rules.Group, _ rules.Rule) bool {
// Next, try to acquire a global concurrency slot.
c.attemptsStartedTotal.Inc()
if !c.globalConcurrency.TryAcquire(1) {
Expand All @@ -187,6 +178,78 @@ func (c *TenantConcurrencyController) Allow(_ context.Context, group *rules.Grou
return false
}

func (c *TenantConcurrencyController) SplitGroupIntoBatches(_ context.Context, g *rules.Group) []rules.ConcurrentRules {
julienduchesne marked this conversation as resolved.
Show resolved Hide resolved
if !c.isGroupAtRisk(g) {
// If the group is not at risk, we can evaluate the rules sequentially.
return sequentialRules(g)
}

logger := log.With(c.logger, "group", g.Name())

type ruleInfo struct {
julienduchesne marked this conversation as resolved.
Show resolved Hide resolved
ruleIdx int
unevaluatedDependencies map[rules.Rule]struct{}
}
remainingRules := make(map[rules.Rule]ruleInfo)
firstBatch := rules.ConcurrentRules{}
julienduchesne marked this conversation as resolved.
Show resolved Hide resolved
for i, r := range g.Rules() {
if r.NoDependencyRules() {
firstBatch = append(firstBatch, i)
continue
}
// Initialize the rule info with the rule's dependencies.
// Use a copy of the dependencies to avoid mutating the rule.
info := ruleInfo{ruleIdx: i, unevaluatedDependencies: map[rules.Rule]struct{}{}}
for _, dep := range r.DependencyRules() {
info.unevaluatedDependencies[dep] = struct{}{}
}
remainingRules[r] = info
}
if len(firstBatch) == 0 {
// There are no rules without dependencies.
// Fall back to sequential evaluation.
level.Info(logger).Log("msg", "No rules without dependencies found, falling back to sequential rule evaluation. This may be due to indeterminate rule dependencies.")
return sequentialRules(g)
}
order := []rules.ConcurrentRules{firstBatch}
julienduchesne marked this conversation as resolved.
Show resolved Hide resolved

// Build the order of rules to evaluate based on dependencies.
for len(remainingRules) > 0 {
previousBatch := order[len(order)-1]
// Remove the batch's rules from the dependencies of its dependents.
for _, idx := range previousBatch {
rule := g.Rules()[idx]
for _, dependent := range rule.DependentRules() {
dependentInfo := remainingRules[dependent]
delete(dependentInfo.unevaluatedDependencies, rule)
}
}

var batch rules.ConcurrentRules
// Find rules that have no remaining dependencies.
for name, info := range remainingRules {
if len(info.unevaluatedDependencies) == 0 {
batch = append(batch, info.ruleIdx)
delete(remainingRules, name)
}
}

if len(batch) == 0 {
// There is a cycle in the rules' dependencies.
// We can't evaluate them concurrently.
// Fall back to sequential evaluation.
level.Warn(logger).Log("msg", "Cyclic rule dependencies detected, falling back to sequential rule evaluation")
return sequentialRules(g)
}

order = append(order, batch)
}

level.Info(logger).Log("msg", "Batched rules into concurrent blocks", "rules", len(g.Rules()), "batches", len(order))
julienduchesne marked this conversation as resolved.
Show resolved Hide resolved

return order
}

// isGroupAtRisk checks if the rule group's last evaluation time is within the risk threshold.
func (c *TenantConcurrencyController) isGroupAtRisk(group *rules.Group) bool {
interval := group.Interval().Seconds()
Expand All @@ -205,11 +268,6 @@ func (c *TenantConcurrencyController) isGroupAtRisk(group *rules.Group) bool {
return false
}

// isRuleIndependent checks if the rule is independent of other rules.
func isRuleIndependent(rule rules.Rule) bool {
return rule.NoDependentRules() && rule.NoDependencyRules()
}

// NoopMultiTenantConcurrencyController is a concurrency controller that does not allow for concurrency.
type NoopMultiTenantConcurrencyController struct{}

Expand All @@ -221,6 +279,19 @@ func (n *NoopMultiTenantConcurrencyController) NewTenantConcurrencyControllerFor
type NoopTenantConcurrencyController struct{}

func (n *NoopTenantConcurrencyController) Done(_ context.Context) {}
func (n *NoopTenantConcurrencyController) SplitGroupIntoBatches(_ context.Context, g *rules.Group) []rules.ConcurrentRules {
return sequentialRules(g)
}

func (n *NoopTenantConcurrencyController) Allow(_ context.Context, _ *rules.Group, _ rules.Rule) bool {
return false
}

func sequentialRules(g *rules.Group) []rules.ConcurrentRules {
// Split the group into batches of 1 rule each.
order := make([]rules.ConcurrentRules, len(g.Rules()))
for i := range g.Rules() {
order[i] = []int{i}
}
return order
}
Loading
Loading