Skip to content

Commit

Permalink
Merge branch 'main' into update-fake-gcs-server
Browse files Browse the repository at this point in the history
  • Loading branch information
paul1r authored Jan 10, 2025
2 parents 2f86e30 + b272573 commit 4225e40
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 12 deletions.
28 changes: 28 additions & 0 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ func queryFunc(evaluator Evaluator, checker readyChecker, userID string, logger
return nil, errNotReady
}

// Extract rule details
ruleName := detail.Name
ruleType := detail.Kind

// Add rule details to context
ctx = AddRuleDetailsToContext(ctx, ruleName, ruleType)
res, err := evaluator.Eval(ctx, qs, t)

if err != nil {
Expand Down Expand Up @@ -357,3 +363,25 @@ type noopRuleDependencyController struct{}
func (*noopRuleDependencyController) AnalyseRules([]rules.Rule) {
// Do nothing
}

// Define context keys to avoid collisions
type contextKey string

const (
ruleNameKey contextKey = "rule_name"
ruleTypeKey contextKey = "rule_type"
)

// AddRuleDetailsToContext adds rule details to the context
func AddRuleDetailsToContext(ctx context.Context, ruleName string, ruleType string) context.Context {
ctx = context.WithValue(ctx, ruleNameKey, ruleName)
ctx = context.WithValue(ctx, ruleTypeKey, ruleType)
return ctx
}

// GetRuleDetailsFromContext retrieves rule details from the context
func GetRuleDetailsFromContext(ctx context.Context) (string, string) {
ruleName, _ := ctx.Value(ruleNameKey).(string)
ruleType, _ := ctx.Value(ruleTypeKey).(string)
return ruleName, ruleType
}
16 changes: 16 additions & 0 deletions pkg/ruler/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,19 @@ type fakeChecker struct{}
func (f fakeChecker) isReady(_ string) bool {
return true
}

func TestAddAndGetRuleDetailsFromContext(t *testing.T) {
ctx := context.Background()
ruleName := "test_rule"
ruleType := "test_type"

// Add rule details to context
ctx = AddRuleDetailsToContext(ctx, ruleName, ruleType)

// Retrieve rule details from context
retrievedRuleName, retrievedRuleType := GetRuleDetailsFromContext(ctx)

// Assert that the retrieved values match the expected values
assert.Equal(t, ruleName, retrievedRuleName, "Expected rule name to match")
assert.Equal(t, ruleType, retrievedRuleType, "Expected rule type to match")
}
14 changes: 13 additions & 1 deletion pkg/ruler/evaluator_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,36 @@ package ruler
import (
"context"
"fmt"
"os"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql"
"github.com/grafana/loki/v3/pkg/logqlmodel"
"github.com/grafana/loki/v3/pkg/util"
)

const EvalModeLocal = "local"

type LocalEvaluator struct {
engine *logql.Engine
logger log.Logger

// we don't want/need to log all the additional context, such as
// caller=spanlogger.go:116 component=ruler evaluation_mode=remote method=ruler.remoteEvaluation.Query
// in insights logs, so create a new logger
insightsLogger log.Logger
}

func NewLocalEvaluator(engine *logql.Engine, logger log.Logger) (*LocalEvaluator, error) {
if engine == nil {
return nil, fmt.Errorf("given engine is nil")
}

return &LocalEvaluator{engine: engine, logger: logger}, nil
return &LocalEvaluator{engine: engine, logger: logger, insightsLogger: log.NewLogfmtLogger(os.Stderr)}, nil
}

func (l *LocalEvaluator) Eval(ctx context.Context, qs string, now time.Time) (*logqlmodel.Result, error) {
Expand All @@ -49,5 +57,9 @@ func (l *LocalEvaluator) Eval(ctx context.Context, qs string, now time.Time) (*l
return nil, err
}

// Retrieve rule details from context
ruleName, ruleType := GetRuleDetailsFromContext(ctx)

level.Info(l.insightsLogger).Log("msg", "request timings", "insight", "true", "source", "loki_ruler", "rule_name", ruleName, "rule_type", ruleType, "total", res.Statistics.Summary.ExecTime, "total_bytes", res.Statistics.Summary.TotalBytesProcessed, "query_hash", util.HashedQuery(qs))
return &res, nil
}
39 changes: 28 additions & 11 deletions pkg/ruler/evaluator_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"net/http"
"net/textproto"
"net/url"
"os"
"strconv"
"time"

Expand Down Expand Up @@ -72,15 +73,21 @@ type RemoteEvaluator struct {
overrides RulesLimits
logger log.Logger

// we don't want/need to log all the additional context, such as
// caller=spanlogger.go:116 component=ruler evaluation_mode=remote method=ruler.remoteEvaluation.Query
// in insights logs, so create a new logger
insightsLogger log.Logger

metrics *metrics
}

func NewRemoteEvaluator(client httpgrpc.HTTPClient, overrides RulesLimits, logger log.Logger, registerer prometheus.Registerer) (*RemoteEvaluator, error) {
return &RemoteEvaluator{
client: client,
overrides: overrides,
logger: logger,
metrics: newMetrics(registerer),
client: client,
overrides: overrides,
logger: logger,
insightsLogger: log.NewLogfmtLogger(os.Stderr),
metrics: newMetrics(registerer),
}, nil
}

Expand Down Expand Up @@ -220,6 +227,11 @@ func (r *RemoteEvaluator) query(ctx context.Context, orgID, query string, ts tim
body := []byte(args.Encode())
hash := util.HashedQuery(query)

// Retrieve rule details from context
ruleName, ruleType := GetRuleDetailsFromContext(ctx)

// Construct the X-Query-Tags header value
queryTags := fmt.Sprintf("source=ruler,rule_name=%s,rule_type=%s", ruleName, ruleType)
req := httpgrpc.HTTPRequest{
Method: http.MethodPost,
Url: queryEndpointPath,
Expand All @@ -228,7 +240,7 @@ func (r *RemoteEvaluator) query(ctx context.Context, orgID, query string, ts tim
{Key: textproto.CanonicalMIMEHeaderKey("User-Agent"), Values: []string{userAgent}},
{Key: textproto.CanonicalMIMEHeaderKey("Content-Type"), Values: []string{mimeTypeFormPost}},
{Key: textproto.CanonicalMIMEHeaderKey("Content-Length"), Values: []string{strconv.Itoa(len(body))}},
{Key: textproto.CanonicalMIMEHeaderKey(string(httpreq.QueryTagsHTTPHeader)), Values: []string{"source=ruler"}},
{Key: textproto.CanonicalMIMEHeaderKey(string(httpreq.QueryTagsHTTPHeader)), Values: []string{queryTags}},
{Key: textproto.CanonicalMIMEHeaderKey(user.OrgIDHeaderName), Values: []string{orgID}},
},
}
Expand All @@ -242,12 +254,12 @@ func (r *RemoteEvaluator) query(ctx context.Context, orgID, query string, ts tim
instrument.ObserveWithExemplar(ctx, r.metrics.responseSizeBytes.WithLabelValues(orgID), float64(len(resp.Body)))
}

log := log.With(logger, "query_hash", hash, "query", query, "instant", ts, "response_time", time.Since(start).String())
logger = log.With(logger, "query_hash", hash, "query", query, "instant", ts, "response_time", time.Since(start).String())

if err != nil {
r.metrics.failedEvals.WithLabelValues("error", orgID).Inc()

level.Warn(log).Log("msg", "failed to evaluate rule", "err", err)
level.Warn(logger).Log("msg", "failed to evaluate rule", "err", err)
return nil, fmt.Errorf("rule evaluation failed: %w", err)
}

Expand All @@ -261,22 +273,27 @@ func (r *RemoteEvaluator) query(ctx context.Context, orgID, query string, ts tim
r.metrics.failedEvals.WithLabelValues("upstream_error", orgID).Inc()

respBod, _ := io.ReadAll(limitedBody)
level.Warn(log).Log("msg", "rule evaluation failed with non-2xx response", "response_code", resp.Code, "response_body", respBod)
level.Warn(logger).Log("msg", "rule evaluation failed with non-2xx response", "response_code", resp.Code, "response_body", respBod)
return nil, fmt.Errorf("unsuccessful/unexpected response - status code %d", resp.Code)
}

maxSize := r.overrides.RulerRemoteEvaluationMaxResponseSize(orgID)
if maxSize > 0 && int64(len(fullBody)) >= maxSize {
r.metrics.failedEvals.WithLabelValues("max_size", orgID).Inc()

level.Error(log).Log("msg", "rule evaluation exceeded max size", "max_size", maxSize, "response_size", len(fullBody))
level.Error(logger).Log("msg", "rule evaluation exceeded max size", "max_size", maxSize, "response_size", len(fullBody))
return nil, fmt.Errorf("%d bytes exceeds response size limit of %d (defined by ruler_remote_evaluation_max_response_size)", len(resp.Body), maxSize)
}

level.Debug(log).Log("msg", "rule evaluation succeeded")
level.Debug(logger).Log("msg", "rule evaluation succeeded")
r.metrics.successfulEvals.WithLabelValues(orgID).Inc()

return r.decodeResponse(ctx, resp, orgID)
dr, err := r.decodeResponse(ctx, resp, orgID)
if err != nil {
return nil, err
}
level.Info(r.insightsLogger).Log("msg", "request timings", "insight", "true", "source", "loki_ruler", "rule_name", ruleName, "rule_type", ruleType, "total", dr.Statistics.Summary.ExecTime, "total_bytes", dr.Statistics.Summary.TotalBytesProcessed, "query_hash", util.HashedQuery(query))
return dr, err
}

func (r *RemoteEvaluator) decodeResponse(ctx context.Context, resp *httpgrpc.HTTPResponse, orgID string) (*logqlmodel.Result, error) {
Expand Down
10 changes: 10 additions & 0 deletions pkg/ruler/evaluator_remote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func TestRemoteEvalQueryTimeout(t *testing.T) {

ctx := context.Background()
ctx = user.InjectOrgID(ctx, "test")
ctx = AddRuleDetailsToContext(ctx, "test_rule_name", "test_rule_type")

_, err = ev.Eval(ctx, "sum(rate({foo=\"bar\"}[5m]))", time.Now())
require.Error(t, err)
Expand Down Expand Up @@ -98,6 +99,7 @@ func TestRemoteEvalMaxResponseSize(t *testing.T) {

ctx := context.Background()
ctx = user.InjectOrgID(ctx, "test")
ctx = AddRuleDetailsToContext(ctx, "test_rule_name", "test_rule_type")

_, err = ev.Eval(ctx, "sum(rate({foo=\"bar\"}[5m]))", time.Now())
require.Error(t, err)
Expand Down Expand Up @@ -146,6 +148,7 @@ func TestRemoteEvalScalar(t *testing.T) {

ctx := context.Background()
ctx = user.InjectOrgID(ctx, "test")
ctx = AddRuleDetailsToContext(ctx, "test_rule_name", "test_rule_type")

res, err := ev.Eval(ctx, "19", now)
require.NoError(t, err)
Expand Down Expand Up @@ -189,6 +192,7 @@ func TestRemoteEvalEmptyScalarResponse(t *testing.T) {

ctx := context.Background()
ctx = user.InjectOrgID(ctx, "test")
ctx = AddRuleDetailsToContext(ctx, "test_rule_name", "test_rule_type")

res, err := ev.Eval(ctx, "sum(rate({foo=\"bar\"}[5m]))", time.Now())
require.NoError(t, err)
Expand Down Expand Up @@ -247,6 +251,7 @@ func TestRemoteEvalVectorResponse(t *testing.T) {

ctx := context.Background()
ctx = user.InjectOrgID(ctx, "test")
ctx = AddRuleDetailsToContext(ctx, "test_rule_name", "test_rule_type")

res, err := ev.Eval(ctx, "sum(rate({foo=\"bar\"}[5m]))", now)
require.NoError(t, err)
Expand Down Expand Up @@ -294,6 +299,7 @@ func TestRemoteEvalEmptyVectorResponse(t *testing.T) {

ctx := context.Background()
ctx = user.InjectOrgID(ctx, "test")
ctx = AddRuleDetailsToContext(ctx, "test_rule_name", "test_rule_type")

_, err = ev.Eval(ctx, "sum(rate({foo=\"bar\"}[5m]))", time.Now())
require.NoError(t, err)
Expand All @@ -317,6 +323,7 @@ func TestRemoteEvalErrorResponse(t *testing.T) {

ctx := context.Background()
ctx = user.InjectOrgID(ctx, "test")
ctx = AddRuleDetailsToContext(ctx, "test_rule_name", "test_rule_type")

_, err = ev.Eval(ctx, "sum(rate({foo=\"bar\"}[5m]))", time.Now())
require.ErrorContains(t, err, "rule evaluation failed")
Expand All @@ -343,6 +350,7 @@ func TestRemoteEvalNon2xxResponse(t *testing.T) {

ctx := context.Background()
ctx = user.InjectOrgID(ctx, "test")
ctx = AddRuleDetailsToContext(ctx, "test_rule_name", "test_rule_type")

_, err = ev.Eval(ctx, "sum(rate({foo=\"bar\"}[5m]))", time.Now())
require.ErrorContains(t, err, fmt.Sprintf("unsuccessful/unexpected response - status code %d", httpErr))
Expand All @@ -367,6 +375,7 @@ func TestRemoteEvalNonJSONResponse(t *testing.T) {

ctx := context.Background()
ctx = user.InjectOrgID(ctx, "test")
ctx = AddRuleDetailsToContext(ctx, "test_rule_name", "test_rule_type")

_, err = ev.Eval(ctx, "sum(rate({foo=\"bar\"}[5m]))", time.Now())
require.ErrorContains(t, err, "unexpected body encoding, not valid JSON")
Expand Down Expand Up @@ -406,6 +415,7 @@ func TestRemoteEvalUnsupportedResultResponse(t *testing.T) {

ctx := context.Background()
ctx = user.InjectOrgID(ctx, "test")
ctx = AddRuleDetailsToContext(ctx, "test_rule_name", "test_rule_type")

_, err = ev.Eval(ctx, "sum(rate({foo=\"bar\"}[5m]))", time.Now())
require.ErrorContains(t, err, fmt.Sprintf("unsupported result type: %q", loghttp.ResultTypeStream))
Expand Down

0 comments on commit 4225e40

Please sign in to comment.