Skip to content

Commit

Permalink
wip: working when a full stream selector is provided
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorwhitney committed Jan 8, 2025
1 parent 05d325a commit b467e22
Show file tree
Hide file tree
Showing 13 changed files with 599 additions and 545 deletions.
9 changes: 5 additions & 4 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1691,7 +1691,7 @@ func (i *Ingester) QueryVariants(req *logproto.VariantsQueryRequest, queryServer
}
if sp != nil {
sp.LogKV("event", "finished instance query variants",
"selector", req.Selector,
"query", req.GetQuery,
"variants", req.Variants,
"start", req.Start,
"end", req.End)
Expand All @@ -1701,11 +1701,12 @@ func (i *Ingester) QueryVariants(req *logproto.VariantsQueryRequest, queryServer
storeReq := logql.SelectVariantsParams{VariantsQueryRequest: &logproto.VariantsQueryRequest{
Start: start,
End: end,
Selector: req.Selector,
Query: req.Query,
LogRange: req.LogRange,
Variants: req.Variants,
Shards: req.Shards,
Deletes: req.Deletes,
Plan: req.Plan,
// Deletes: req.Deletes,
Plan: req.Plan,
}}
storeItr, err := i.store.SelectVariants(ctx, storeReq)
if err != nil {
Expand Down
14 changes: 11 additions & 3 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,8 +619,12 @@ func (i *instance) queryVariants(
shard,
func(stream *stream) error {
streamSampleExtractors := make([]log.StreamSampleExtractor, 0, len(extractors))
for _, e := range extractors {
streamSampleExtractors = append(streamSampleExtractors, e.ForStream(stream.labels))
for i, e := range extractors {
ext := log.NewVariantsStreamSampleExtractorWrapper(
i,
e.ForStream(stream.labels),
)
streamSampleExtractors = append(streamSampleExtractors, ext)
}
iter, err := stream.SampleIterator(
ctx,
Expand Down Expand Up @@ -1169,7 +1173,11 @@ func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer QuerierQ
return nil
}

func sendSampleBatches(ctx context.Context, it iter.SampleIterator, queryServer logproto.Querier_QuerySampleServer) error {
func sendSampleBatches(
ctx context.Context,
it iter.SampleIterator,
queryServer logproto.Querier_QuerySampleServer,
) error {
sp := opentracing.SpanFromContext(ctx)

stats := stats.FromContext(ctx)
Expand Down
475 changes: 266 additions & 209 deletions pkg/logproto/logproto.pb.go

Large diffs are not rendered by default.

21 changes: 11 additions & 10 deletions pkg/logproto/logproto.proto
Original file line number Diff line number Diff line change
Expand Up @@ -508,24 +508,25 @@ message DetectedLabel {


message VariantsQueryRequest {
string selector = 1; // mark as reserved once we've fully migrated to plan.
repeated string variants = 2; // mark as reserved once we've fully migrated to plan.
uint32 limit = 3; // for log lines
google.protobuf.Timestamp start = 4 [
string query = 1; // mark as reserved once we've fully migrated to plan.
string logRange = 2;
repeated string variants = 3;
uint32 limit = 4; // for log lines
google.protobuf.Timestamp start = 5 [
(gogoproto.stdtime) = true,
(gogoproto.nullable) = false
];
google.protobuf.Timestamp end = 5 [
google.protobuf.Timestamp end = 6 [
(gogoproto.stdtime) = true,
(gogoproto.nullable) = false
];
Direction direction = 6; // for log lines
repeated string shards = 7 [(gogoproto.jsontag) = "shards,omitempty"];
repeated Delete deletes = 8;
Plan plan = 9 [(gogoproto.customtype) = "github.com/grafana/loki/v3/pkg/querier/plan.QueryPlan"];
Direction direction = 7; // for log lines
repeated string shards = 8 [(gogoproto.jsontag) = "shards,omitempty"];
repeated Delete deletes = 9;
Plan plan = 10 [(gogoproto.customtype) = "github.com/grafana/loki/v3/pkg/querier/plan.QueryPlan"];
// If populated, these represent the chunk references that the querier should
// use to fetch the data, plus any other chunks reported by ingesters.
ChunkRefGroup storeChunks = 10 [(gogoproto.jsontag) = "storeChunks"];
ChunkRefGroup storeChunks = 11 [(gogoproto.jsontag) = "storeChunks"];
}

message VariantsQueryResponse {
Expand Down
26 changes: 20 additions & 6 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,14 +688,28 @@ type SelectVariantsParams struct {
}

func (s SelectVariantsParams) Expr() (syntax.VariantsExpr, error) {
if s.VariantsQueryRequest.Plan == nil {
return nil, errors.New("query plan is empty")
if s.Plan != nil && s.Plan.AST != nil {
expr, ok := s.Plan.AST.(syntax.VariantsExpr)
if !ok {
return nil, errors.New("invalid variants expression")
}
return expr, nil
}
expr, ok := s.VariantsQueryRequest.Plan.AST.(syntax.VariantsExpr)
if !ok {
return nil, errors.New("only sample expression supported")

if s.Query == "" {
return nil, errors.New("invalid variants expression")
}

expr, err := syntax.ParseExpr(s.Query)
if err != nil {
return nil, err
}
switch e := expr.(type) {
case syntax.VariantsExpr:
return e, nil
default:
return nil, errors.New("invalid variants expression")
}
return expr, nil
}

func (s SelectVariantsParams) LogSelector() (syntax.LogSelectorExpr, error) {
Expand Down
8 changes: 5 additions & 3 deletions pkg/logql/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,9 +429,9 @@ func (ev *DefaultEvaluator) NewVariantsStepEvaluator(
// extend startTs backwards by step
Start: q.Start().Add(-logRange.Interval).Add(-logRange.Offset),
// add leap nanosecond to endTs to include lines exactly at endTs. range iterators work on start exclusive, end inclusive ranges
End: q.End().Add(-logRange.Offset).Add(time.Nanosecond),
// intentionally send the vector for reducing labels.
Selector: logRange.String(),
End: q.End().Add(-logRange.Offset).Add(time.Nanosecond),
Query: expr.String(),
LogRange: logRange.String(),
Variants: variants,
Shards: q.Shards(),
Plan: &plan.QueryPlan{
Expand Down Expand Up @@ -1444,6 +1444,8 @@ func (ev *DefaultEvaluator) newVariantsEvaluator(
return nil, err
}

e.Grouping.Groups = append(e.Grouping.Groups, "__variant__")

sort.Strings(e.Grouping.Groups)
variantEvaluator = &VectorAggEvaluator{
nextEvaluator: rangeEvaluator,
Expand Down
49 changes: 49 additions & 0 deletions pkg/logql/log/metrics_extraction.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,3 +320,52 @@ func convertBytes(v string) (float64, error) {
}
return float64(b), nil
}

type variantsStreamSampleExtractorWrapper struct {
StreamSampleExtractor
index int
}

func NewVariantsStreamSampleExtractorWrapper(
index int,
extractor StreamSampleExtractor,
) StreamSampleExtractor {
return &variantsStreamSampleExtractorWrapper{
StreamSampleExtractor: extractor,
index: index,
}
}

func (v *variantsStreamSampleExtractorWrapper) BaseLabels() LabelsResult {
return appendVariantLabel(v.StreamSampleExtractor.BaseLabels(), v.index)
}

func (v *variantsStreamSampleExtractorWrapper) Process(
ts int64,
line []byte,
structuredMetadata ...labels.Label,
) (float64, LabelsResult, bool) {
n, lbls, ok := v.StreamSampleExtractor.Process(ts, line, structuredMetadata...)
return n, appendVariantLabel(lbls, v.index), ok
}

func (v *variantsStreamSampleExtractorWrapper) ProcessString(
ts int64,
line string,
structuredMetadata ...labels.Label,
) (float64, LabelsResult, bool) {
n, lbls, ok := v.StreamSampleExtractor.ProcessString(ts, line, structuredMetadata...)
return n, appendVariantLabel(lbls, v.index), ok
}

func appendVariantLabel(lbls LabelsResult, variantIndex int) LabelsResult {
newLbls := lbls.Labels()
newLbls = append(newLbls, labels.Label{
Name: "__variant__",
Value: strconv.Itoa(variantIndex),
})
builder := NewBaseLabelsBuilder().ForLabels(newLbls, newLbls.Hash())
builder.Add(StructuredMetadataLabel, lbls.StructuredMetadata()...)
builder.Add(ParsedLabel, lbls.Parsed()...)
return builder.LabelsResult()
}
43 changes: 15 additions & 28 deletions pkg/logql/syntax/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -2447,20 +2447,27 @@ type VariantsExpr interface {
SetVariant(i int, e SampleExpr) error
Interval() time.Duration
Offset() time.Duration
IncludeLogs(bool)
ShouldIncludeLogs() bool
Extractors() ([]SampleExtractor, error)
Expr
}

type MultiVariantExpr struct {
logRange *LogRange
variants []SampleExpr
includeLogs bool
err error
logRange *LogRange
variants []SampleExpr
err error
implicit
}

func NewMultiVariantExpr(
logRange *LogRange,
variants []SampleExpr,
) *MultiVariantExpr {
return &MultiVariantExpr{
logRange: logRange,
variants: variants,
}
}

func (m *MultiVariantExpr) LogRange() *LogRange {
return m.logRange
}
Expand Down Expand Up @@ -2489,14 +2496,6 @@ func (m *MultiVariantExpr) AddVariant(v SampleExpr) {
m.variants = append(m.variants, v)
}

func (m *MultiVariantExpr) IncludeLogs(include bool) {
m.includeLogs = include
}

func (m *MultiVariantExpr) ShouldIncludeLogs() bool {
return m.includeLogs
}

func (m *MultiVariantExpr) SetVariant(i int, v SampleExpr) error {
if i >= len(m.variants) {
return fmt.Errorf("variant index out of range")
Expand Down Expand Up @@ -2541,12 +2540,6 @@ func (m *MultiVariantExpr) String() string {
sb.WriteString(m.logRange.String())
sb.WriteString(")")

if !m.ShouldIncludeLogs() {
sb.WriteString(Without)
sb.WriteString(" ")
sb.WriteString(Logs)
}

return sb.String()
}

Expand Down Expand Up @@ -2599,13 +2592,7 @@ func (m *MultiVariantExpr) Extractors() ([]log.SampleExtractor, error) {

func newVariantsExpr(variants []SampleExpr, logRange *LogRange) VariantsExpr {
return &MultiVariantExpr{
variants: variants,
logRange: logRange,
includeLogs: true,
variants: variants,
logRange: logRange,
}
}

func newLoglessVariantsExpr(v VariantsExpr) VariantsExpr {
v.IncludeLogs(false)
return v
}
1 change: 0 additions & 1 deletion pkg/logql/syntax/expr.y
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,6 @@ grouping:

variantsExpr:
variantsOp OPEN_PARENTHESIS variantsList CLOSE_PARENTHESIS variantsOf OPEN_PARENTHESIS variantsRangeExpr CLOSE_PARENTHESIS { $$ = newVariantsExpr($3, $7) }
| variantsExpr WITHOUT LOGS { $$ = newLoglessVariantsExpr($1) }
;

variantsOp:
Expand Down
Loading

0 comments on commit b467e22

Please sign in to comment.