Skip to content

Commit

Permalink
feat: first working iteration, yay!
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorwhitney committed Jan 8, 2025
1 parent b467e22 commit cb8a807
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 54 deletions.
16 changes: 2 additions & 14 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"hash"
"hash/crc32"
"io"
"strconv"
"time"
"unsafe"

Expand Down Expand Up @@ -1368,7 +1367,7 @@ func (hb *headBlock) MultiExtractorSampleIterator(

setQueryReferencedStructuredMetadata := false
for _, e := range hb.entries {
for i, extractor := range extractors {
for _, extractor := range extractors {
stats.AddHeadChunkBytes(int64(len(e.s)))
value, lbls, ok := extractor.ProcessString(e.t, e.s, e.structuredMetadata...)
if !ok {
Expand All @@ -1379,18 +1378,7 @@ func (hb *headBlock) MultiExtractorSampleIterator(
s *logproto.Series
)

streamLbls := lbls.Stream()
streamLbls = append(streamLbls, labels.Label{
Name: "__variant__",
Value: strconv.FormatInt(int64(i), 10),
})

builder := log.NewBaseLabelsBuilder().ForLabels(streamLbls, streamLbls.Hash())
builder.Add(log.StructuredMetadataLabel, lbls.StructuredMetadata()...)
builder.Add(log.ParsedLabel, lbls.Parsed()...)
newLbls := builder.LabelsResult()

lblStr := newLbls.String()
lblStr := lbls.String()
baseHash := extractor.BaseLabels().Hash()
if s, found = series[lblStr]; !found {
s = &logproto.Series{
Expand Down
16 changes: 2 additions & 14 deletions pkg/chunkenc/unordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"io"
"math"
"strconv"
"time"

"github.com/Workiva/go-datastructures/rangetree"
Expand Down Expand Up @@ -395,7 +394,7 @@ func (hb *unorderedHeadBlock) MultiExtractorSampleIterator(
func(statsCtx *stats.Context, ts int64, line string, structuredMetadataSymbols symbols) error {
structuredMetadata = hb.symbolizer.Lookup(structuredMetadataSymbols, structuredMetadata)

for i, extractor := range extractor {
for _, extractor := range extractor {
value, lbls, ok := extractor.ProcessString(ts, line, structuredMetadata...)
if !ok {
return nil
Expand All @@ -405,18 +404,7 @@ func (hb *unorderedHeadBlock) MultiExtractorSampleIterator(
s *logproto.Series
)

streamLbls := lbls.Stream()
streamLbls = append(streamLbls, labels.Label{
Name: "__variant__",
Value: strconv.FormatInt(int64(i), 10),
})

builder := log.NewBaseLabelsBuilder().ForLabels(streamLbls, streamLbls.Hash())
builder.Add(log.StructuredMetadataLabel, lbls.StructuredMetadata()...)
builder.Add(log.ParsedLabel, lbls.Parsed()...)
newLbls := builder.LabelsResult()

lblStr := newLbls.String()
lblStr := lbls.String()
s, found = series[lblStr]
if !found {
baseHash := extractor.BaseLabels().Hash()
Expand Down
16 changes: 2 additions & 14 deletions pkg/chunkenc/variants.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package chunkenc
import (
"context"
"sort"
"strconv"

"github.com/cespare/xxhash/v2"
"github.com/grafana/loki/v3/pkg/compression"
Expand Down Expand Up @@ -51,24 +50,13 @@ func (e *multiExtractorSampleBufferedIterator) Next() bool {
for e.bufferedIterator.Next() {
e.stats.AddPostFilterLines(1)

for i, extractor := range e.extractors {
for _, extractor := range e.extractors {
val, lbls, ok := extractor.Process(e.currTs, e.currLine, e.currStructuredMetadata...)
if !ok {
continue
}

streamLbls := lbls.Stream()
streamLbls = append(streamLbls, labels.Label{
Name: "__variant__",
Value: strconv.FormatInt(int64(i), 10),
})

builder := log.NewBaseLabelsBuilder().ForLabels(streamLbls, streamLbls.Hash())
builder.Add(log.StructuredMetadataLabel, lbls.StructuredMetadata()...)
builder.Add(log.ParsedLabel, lbls.Parsed()...)
e.currLabels = append(e.currLabels, builder.LabelsResult())

// TODO: is it enough to add __variant__ to result labels? Do the base labels need it too?
e.currLabels = append(e.currLabels, lbls)
e.currBaseLabels = append(e.currBaseLabels, extractor.BaseLabels())
e.cur = append(e.cur, logproto.Sample{
Value: val,
Expand Down
1 change: 0 additions & 1 deletion pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,6 @@ func (q *query) evalVariant(
}
defer util.LogErrorWithContext(ctx, "closing VariantsExpr", stepEvaluator.Close)

//TODO: this never returns
next, _, r := stepEvaluator.Next()
if stepEvaluator.Error() != nil {
return nil, stepEvaluator.Error()
Expand Down
7 changes: 4 additions & 3 deletions pkg/logql/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1498,15 +1498,16 @@ func (it *bufferedVariantsIterator) Next(index int) bool {
// If not, keep popping samples from the underlying iterator
for it.iter.Next() {
sample := it.iter.At()
variantIndex := it.getVariantIndex(it.iter.Labels())
labels := it.iter.Labels()
variantIndex := it.getVariantIndex(labels)
if variantIndex == -1 {
it.err = fmt.Errorf("variant label not found in %s", it.iter.Labels())
it.err = fmt.Errorf("variant label not found in %s", labels)
return false
}

currentSample := sampleWithLabelsAndStreamHash{
sample: sample,
labels: it.iter.Labels(),
labels: labels,
streamHash: it.iter.StreamHash(),
}

Expand Down
16 changes: 9 additions & 7 deletions pkg/storage/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ func (it *sampleBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, thro
it.ctx,
from,
through,
streamExtractors,
streamExtractors,
nextChunk,
)
if err != nil {
Expand Down Expand Up @@ -715,14 +715,16 @@ func (it *multiExtractorSampleIterator) buildIterators(
for _, chunks := range chks {
if len(chunks) != 0 && len(chunks[0]) != 0 {
extractors := make([]log.StreamSampleExtractor, 0, len(it.extractors))
for _, extractor := range it.extractors {
for i, extractor := range it.extractors {
ext := extractor.ForStream(
labels.NewBuilder(chunks[0][0].Chunk.Metric).
Del(labels.MetricName).
Labels(),
)
ext = log.NewVariantsStreamSampleExtractorWrapper(i, ext)
extractors = append(
extractors,
extractor.ForStream(
labels.NewBuilder(chunks[0][0].Chunk.Metric).
Del(labels.MetricName).
Labels(),
),
ext,
)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (s *LokiStore) SelectVariants(
chunkFilterer = s.chunkFilterer.ForRequest(ctx)
}

return newSampleBatchIterator(
return newMultiExtractorSampleBatchIterator(
ctx,
s.schemaCfg,
s.chunkMetrics,
Expand Down

0 comments on commit cb8a807

Please sign in to comment.