diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 181c92453b7af..ebe368f5aeece 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -8,7 +8,6 @@ import ( "hash" "hash/crc32" "io" - "strconv" "time" "unsafe" @@ -1368,7 +1367,7 @@ func (hb *headBlock) MultiExtractorSampleIterator( setQueryReferencedStructuredMetadata := false for _, e := range hb.entries { - for i, extractor := range extractors { + for _, extractor := range extractors { stats.AddHeadChunkBytes(int64(len(e.s))) value, lbls, ok := extractor.ProcessString(e.t, e.s, e.structuredMetadata...) if !ok { @@ -1379,18 +1378,7 @@ func (hb *headBlock) MultiExtractorSampleIterator( s *logproto.Series ) - streamLbls := lbls.Stream() - streamLbls = append(streamLbls, labels.Label{ - Name: "__variant__", - Value: strconv.FormatInt(int64(i), 10), - }) - - builder := log.NewBaseLabelsBuilder().ForLabels(streamLbls, streamLbls.Hash()) - builder.Add(log.StructuredMetadataLabel, lbls.StructuredMetadata()...) - builder.Add(log.ParsedLabel, lbls.Parsed()...) - newLbls := builder.LabelsResult() - - lblStr := newLbls.String() + lblStr := lbls.String() baseHash := extractor.BaseLabels().Hash() if s, found = series[lblStr]; !found { s = &logproto.Series{ diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go index d67a56673f1f0..03b42c6ef0e32 100644 --- a/pkg/chunkenc/unordered.go +++ b/pkg/chunkenc/unordered.go @@ -7,7 +7,6 @@ import ( "fmt" "io" "math" - "strconv" "time" "github.com/Workiva/go-datastructures/rangetree" @@ -395,7 +394,7 @@ func (hb *unorderedHeadBlock) MultiExtractorSampleIterator( func(statsCtx *stats.Context, ts int64, line string, structuredMetadataSymbols symbols) error { structuredMetadata = hb.symbolizer.Lookup(structuredMetadataSymbols, structuredMetadata) - for i, extractor := range extractor { + for _, extractor := range extractor { value, lbls, ok := extractor.ProcessString(ts, line, structuredMetadata...) if !ok { return nil @@ -405,18 +404,7 @@ func (hb *unorderedHeadBlock) MultiExtractorSampleIterator( s *logproto.Series ) - streamLbls := lbls.Stream() - streamLbls = append(streamLbls, labels.Label{ - Name: "__variant__", - Value: strconv.FormatInt(int64(i), 10), - }) - - builder := log.NewBaseLabelsBuilder().ForLabels(streamLbls, streamLbls.Hash()) - builder.Add(log.StructuredMetadataLabel, lbls.StructuredMetadata()...) - builder.Add(log.ParsedLabel, lbls.Parsed()...) - newLbls := builder.LabelsResult() - - lblStr := newLbls.String() + lblStr := lbls.String() s, found = series[lblStr] if !found { baseHash := extractor.BaseLabels().Hash() diff --git a/pkg/chunkenc/variants.go b/pkg/chunkenc/variants.go index 7a834c546eeb1..bc2446c7546d7 100644 --- a/pkg/chunkenc/variants.go +++ b/pkg/chunkenc/variants.go @@ -3,7 +3,6 @@ package chunkenc import ( "context" "sort" - "strconv" "github.com/cespare/xxhash/v2" "github.com/grafana/loki/v3/pkg/compression" @@ -51,24 +50,13 @@ func (e *multiExtractorSampleBufferedIterator) Next() bool { for e.bufferedIterator.Next() { e.stats.AddPostFilterLines(1) - for i, extractor := range e.extractors { + for _, extractor := range e.extractors { val, lbls, ok := extractor.Process(e.currTs, e.currLine, e.currStructuredMetadata...) if !ok { continue } - streamLbls := lbls.Stream() - streamLbls = append(streamLbls, labels.Label{ - Name: "__variant__", - Value: strconv.FormatInt(int64(i), 10), - }) - - builder := log.NewBaseLabelsBuilder().ForLabels(streamLbls, streamLbls.Hash()) - builder.Add(log.StructuredMetadataLabel, lbls.StructuredMetadata()...) - builder.Add(log.ParsedLabel, lbls.Parsed()...) - e.currLabels = append(e.currLabels, builder.LabelsResult()) - - // TODO: is it enough to add __variant__ to result labels? Do the base labels need it too? + e.currLabels = append(e.currLabels, lbls) e.currBaseLabels = append(e.currBaseLabels, extractor.BaseLabels()) e.cur = append(e.cur, logproto.Sample{ Value: val, diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index 3c307f5c8fb50..af0d2b3b0dedc 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -658,7 +658,6 @@ func (q *query) evalVariant( } defer util.LogErrorWithContext(ctx, "closing VariantsExpr", stepEvaluator.Close) - //TODO: this never returns next, _, r := stepEvaluator.Next() if stepEvaluator.Error() != nil { return nil, stepEvaluator.Error() diff --git a/pkg/logql/evaluator.go b/pkg/logql/evaluator.go index 13a58dafdb71d..77f6db9516f4f 100644 --- a/pkg/logql/evaluator.go +++ b/pkg/logql/evaluator.go @@ -1498,15 +1498,16 @@ func (it *bufferedVariantsIterator) Next(index int) bool { // If not, keep popping samples from the underlying iterator for it.iter.Next() { sample := it.iter.At() - variantIndex := it.getVariantIndex(it.iter.Labels()) + labels := it.iter.Labels() + variantIndex := it.getVariantIndex(labels) if variantIndex == -1 { - it.err = fmt.Errorf("variant label not found in %s", it.iter.Labels()) + it.err = fmt.Errorf("variant label not found in %s", labels) return false } currentSample := sampleWithLabelsAndStreamHash{ sample: sample, - labels: it.iter.Labels(), + labels: labels, streamHash: it.iter.StreamHash(), } diff --git a/pkg/storage/batch.go b/pkg/storage/batch.go index f04ccf91289c4..9fa09f371ec88 100644 --- a/pkg/storage/batch.go +++ b/pkg/storage/batch.go @@ -610,7 +610,7 @@ func (it *sampleBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, thro it.ctx, from, through, - streamExtractors, + streamExtractors, nextChunk, ) if err != nil { @@ -715,14 +715,16 @@ func (it *multiExtractorSampleIterator) buildIterators( for _, chunks := range chks { if len(chunks) != 0 && len(chunks[0]) != 0 { extractors := make([]log.StreamSampleExtractor, 0, len(it.extractors)) - for _, extractor := range it.extractors { + for i, extractor := range it.extractors { + ext := extractor.ForStream( + labels.NewBuilder(chunks[0][0].Chunk.Metric). + Del(labels.MetricName). + Labels(), + ) + ext = log.NewVariantsStreamSampleExtractorWrapper(i, ext) extractors = append( extractors, - extractor.ForStream( - labels.NewBuilder(chunks[0][0].Chunk.Metric). - Del(labels.MetricName). - Labels(), - ), + ext, ) } diff --git a/pkg/storage/store.go b/pkg/storage/store.go index abd32dc21f8fe..5672a9a65a59f 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -165,7 +165,7 @@ func (s *LokiStore) SelectVariants( chunkFilterer = s.chunkFilterer.ForRequest(ctx) } - return newSampleBatchIterator( + return newMultiExtractorSampleBatchIterator( ctx, s.schemaCfg, s.chunkMetrics,