From e14bba531244269d51dde0c0fc0fc47776219516 Mon Sep 17 00:00:00 2001 From: gotjosh Date: Mon, 11 Nov 2024 09:46:34 +0000 Subject: [PATCH] Use atomic functions for locking / unlocking the client and fetcher. Signed-off-by: gotjosh --- pkg/storage/ingest/reader.go | 101 +++++++++++++++++++++-------------- 1 file changed, 61 insertions(+), 40 deletions(-) diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index 08941746203..ca8ef6149b9 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -80,10 +80,10 @@ type PartitionReader struct { consumerGroup string concurrentFetchersMinBytesMaxWaitTime time.Duration - client *kgo.Client - - fetcherMtx sync.Mutex - fetcher fetcher + // fetchingMtx protects both the client and the fetcher but never at the same time. + fetchingMtx sync.Mutex + client *kgo.Client + fetcher fetcher newConsumer consumerFactory metrics readerMetrics @@ -136,15 +136,15 @@ func (r *PartitionReader) Update(_ context.Context, _, _ int) { } func (r *PartitionReader) BufferedRecords() int64 { - r.fetcherMtx.Lock() - defer r.fetcherMtx.Unlock() + f := r.getFetcher() var fcount, ccount int64 - if r.fetcher != nil && r.fetcher != r { - fcount = r.fetcher.BufferedRecords() + if f != nil && f != r { + fcount = f.BufferedRecords() } - if r.client != nil { - ccount = r.client.BufferedFetchRecords() + c := r.getClient() + if c != nil { + ccount = c.BufferedFetchRecords() } return fcount + ccount @@ -173,14 +173,15 @@ func (r *PartitionReader) start(ctx context.Context) (returnErr error) { } // Create a Kafka client without configuring any partition to consume (it will be done later). - r.client, err = NewKafkaReaderClient(r.kafkaCfg, r.metrics.kprom, r.logger) + client, err := NewKafkaReaderClient(r.kafkaCfg, r.metrics.kprom, r.logger) if err != nil { return errors.Wrap(err, "creating kafka reader client") } + r.setClient(client) - r.committer = newPartitionCommitter(r.kafkaCfg, kadm.NewClient(r.client), r.partitionID, r.consumerGroup, r.logger, r.reg) + r.committer = newPartitionCommitter(r.kafkaCfg, kadm.NewClient(r.getClient()), r.partitionID, r.consumerGroup, r.logger, r.reg) - offsetsClient := newPartitionOffsetClient(r.client, r.kafkaCfg.Topic, r.reg, r.logger) + offsetsClient := newPartitionOffsetClient(r.getClient(), r.kafkaCfg.Topic, r.reg, r.logger) // It's ok to have the start offset slightly outdated. // We only need this offset accurate if we fall behind or if we start and the log gets truncated from beneath us. @@ -213,30 +214,26 @@ func (r *PartitionReader) start(ctx context.Context) (returnErr error) { // // To make it happen, we do pause the fetching first and then we configure consumption. The consumption // will be kept paused until the explicit ResumeFetchPartitions() is called. - r.client.PauseFetchPartitions(map[string][]int32{ + r.getClient().PauseFetchPartitions(map[string][]int32{ r.kafkaCfg.Topic: {r.partitionID}, }) - r.client.AddConsumePartitions(map[string]map[int32]kgo.Offset{ + r.getClient().AddConsumePartitions(map[string]map[int32]kgo.Offset{ r.kafkaCfg.Topic: {r.partitionID: kgo.NewOffset().At(startOffset)}, }) - f, err := newConcurrentFetchers(ctx, r.client, r.logger, r.kafkaCfg.Topic, r.partitionID, startOffset, r.kafkaCfg.StartupFetchConcurrency, r.kafkaCfg.StartupRecordsPerFetch, r.kafkaCfg.UseCompressedBytesAsFetchMaxBytes, r.concurrentFetchersMinBytesMaxWaitTime, offsetsClient, startOffsetReader, &r.metrics) + f, err := newConcurrentFetchers(ctx, r.getClient(), r.logger, r.kafkaCfg.Topic, r.partitionID, startOffset, r.kafkaCfg.StartupFetchConcurrency, r.kafkaCfg.StartupRecordsPerFetch, r.kafkaCfg.UseCompressedBytesAsFetchMaxBytes, r.concurrentFetchersMinBytesMaxWaitTime, offsetsClient, startOffsetReader, &r.metrics) if err != nil { return errors.Wrap(err, "creating concurrent fetchers during startup") } - r.fetcherMtx.Lock() - r.fetcher = f - r.fetcherMtx.Unlock() + r.setFetcher(f) } else { // When concurrent fetch is disabled we read records directly from the Kafka client, so we want it // to consume the partition. - r.client.AddConsumePartitions(map[string]map[int32]kgo.Offset{ + r.getClient().AddConsumePartitions(map[string]map[int32]kgo.Offset{ r.kafkaCfg.Topic: {r.partitionID: kgo.NewOffset().At(startOffset)}, }) - r.fetcherMtx.Lock() - r.fetcher = r - r.fetcherMtx.Unlock() + r.setFetcher(r) } // Enforce the max consumer lag (if enabled). @@ -266,12 +263,14 @@ func (r *PartitionReader) stopDependencies() error { } } - if r.fetcher != nil { - r.fetcher.Stop() + f := r.getFetcher() + if f != nil { + f.Stop() } - if r.client != nil { - r.client.Close() + c := r.getClient() + if c != nil { + c.Close() } return nil @@ -294,22 +293,20 @@ func (r *PartitionReader) run(ctx context.Context) error { // switchToOngoingFetcher switches to the configured ongoing fetcher. This function could be // called multiple times. func (r *PartitionReader) switchToOngoingFetcher(ctx context.Context) { - r.fetcherMtx.Lock() - defer r.fetcherMtx.Unlock() - if r.kafkaCfg.StartupFetchConcurrency == r.kafkaCfg.OngoingFetchConcurrency && r.kafkaCfg.StartupRecordsPerFetch == r.kafkaCfg.OngoingRecordsPerFetch { // we're already using the same settings, no need to switch return } if r.kafkaCfg.StartupFetchConcurrency > 0 && r.kafkaCfg.OngoingFetchConcurrency > 0 { + // No need to switch the fetcher, just update the records per fetch. - r.fetcher.Update(ctx, r.kafkaCfg.OngoingFetchConcurrency, r.kafkaCfg.OngoingRecordsPerFetch) + r.getFetcher().Update(ctx, r.kafkaCfg.OngoingFetchConcurrency, r.kafkaCfg.OngoingRecordsPerFetch) return } if r.kafkaCfg.StartupFetchConcurrency > 0 && r.kafkaCfg.OngoingFetchConcurrency == 0 { - if r.fetcher == r { + if r.getFetcher() == r { // This method has been called before, no need to switch the fetcher. return } @@ -317,11 +314,11 @@ func (r *PartitionReader) switchToOngoingFetcher(ctx context.Context) { level.Info(r.logger).Log("msg", "partition reader is switching to non-concurrent fetcher") // Stop the current fetcher before replacing it. - r.fetcher.Stop() + r.getFetcher().Stop() // We need to switch to franz-go for ongoing fetches. // If we've already fetched some records, we should discard them from franz-go and start from the last consumed offset. - r.fetcher = r + r.setFetcher(r) lastConsumed := r.consumedOffsetWatcher.LastConsumedOffset() if lastConsumed == -1 { @@ -329,7 +326,7 @@ func (r *PartitionReader) switchToOngoingFetcher(ctx context.Context) { // // The franz-go client is initialized to start consuming from the same place as the other fetcher. // We can just use the client, but we have to resume the fetching because it was previously paused. - r.client.ResumeFetchPartitions(map[string][]int32{ + r.getClient().ResumeFetchPartitions(map[string][]int32{ r.kafkaCfg.Topic: {r.partitionID}, }) return @@ -339,13 +336,13 @@ func (r *PartitionReader) switchToOngoingFetcher(ctx context.Context) { // from a clean setup and have the guarantee that we're not going to read any previously buffered record, // we do remove the partition consumption (this clears the buffer), then we resume the fetching and finally // we add the consumption back. - r.client.RemoveConsumePartitions(map[string][]int32{ + r.getClient().RemoveConsumePartitions(map[string][]int32{ r.kafkaCfg.Topic: {r.partitionID}, }) - r.client.ResumeFetchPartitions(map[string][]int32{ + r.getClient().ResumeFetchPartitions(map[string][]int32{ r.kafkaCfg.Topic: {r.partitionID}, }) - r.client.AddConsumePartitions(map[string]map[int32]kgo.Offset{ + r.getClient().AddConsumePartitions(map[string]map[int32]kgo.Offset{ // Resume from the next unconsumed offset. r.kafkaCfg.Topic: {r.partitionID: kgo.NewOffset().At(lastConsumed + 1)}, }) @@ -353,7 +350,7 @@ func (r *PartitionReader) switchToOngoingFetcher(ctx context.Context) { } func (r *PartitionReader) processNextFetches(ctx context.Context, delayObserver prometheus.Observer) error { - fetches, fetchCtx := r.fetcher.PollFetches(ctx) + fetches, fetchCtx := r.getFetcher().PollFetches(ctx) // Propagate the fetching span to consuming the records. ctx = opentracing.ContextWithSpan(ctx, opentracing.SpanFromContext(fetchCtx)) r.recordFetchesMetrics(fetches, delayObserver) @@ -825,11 +822,35 @@ func (r *PartitionReader) waitReadConsistency(ctx context.Context, withOffset bo return err } +func (r *PartitionReader) setClient(c *kgo.Client) { + r.fetchingMtx.Lock() + defer r.fetchingMtx.Unlock() + r.client = c +} + +func (r *PartitionReader) getClient() *kgo.Client { + r.fetchingMtx.Lock() + defer r.fetchingMtx.Unlock() + return r.client +} + +func (r *PartitionReader) setFetcher(f fetcher) { + r.fetchingMtx.Lock() + defer r.fetchingMtx.Unlock() + r.fetcher = f +} + +func (r *PartitionReader) getFetcher() fetcher { + r.fetchingMtx.Lock() + defer r.fetchingMtx.Unlock() + return r.fetcher +} + func (r *PartitionReader) PollFetches(ctx context.Context) (result kgo.Fetches, fetchContext context.Context) { defer func(start time.Time) { r.metrics.fetchWaitDuration.Observe(time.Since(start).Seconds()) }(time.Now()) - return r.client.PollFetches(ctx), ctx + return r.getClient().PollFetches(ctx), ctx } type partitionCommitter struct {