Skip to content

Commit

Permalink
Use atomic functions for locking / unlocking the client and fetcher.
Browse files Browse the repository at this point in the history
Signed-off-by: gotjosh <[email protected]>
  • Loading branch information
gotjosh committed Nov 11, 2024
1 parent 1702196 commit e14bba5
Showing 1 changed file with 61 additions and 40 deletions.
101 changes: 61 additions & 40 deletions pkg/storage/ingest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ type PartitionReader struct {
consumerGroup string
concurrentFetchersMinBytesMaxWaitTime time.Duration

client *kgo.Client

fetcherMtx sync.Mutex
fetcher fetcher
// fetchingMtx protects both the client and the fetcher but never at the same time.
fetchingMtx sync.Mutex
client *kgo.Client
fetcher fetcher

newConsumer consumerFactory
metrics readerMetrics
Expand Down Expand Up @@ -136,15 +136,15 @@ func (r *PartitionReader) Update(_ context.Context, _, _ int) {
}

func (r *PartitionReader) BufferedRecords() int64 {
r.fetcherMtx.Lock()
defer r.fetcherMtx.Unlock()
f := r.getFetcher()
var fcount, ccount int64
if r.fetcher != nil && r.fetcher != r {
fcount = r.fetcher.BufferedRecords()
if f != nil && f != r {
fcount = f.BufferedRecords()
}

if r.client != nil {
ccount = r.client.BufferedFetchRecords()
c := r.getClient()
if c != nil {
ccount = c.BufferedFetchRecords()
}

return fcount + ccount
Expand Down Expand Up @@ -173,14 +173,15 @@ func (r *PartitionReader) start(ctx context.Context) (returnErr error) {
}

// Create a Kafka client without configuring any partition to consume (it will be done later).
r.client, err = NewKafkaReaderClient(r.kafkaCfg, r.metrics.kprom, r.logger)
client, err := NewKafkaReaderClient(r.kafkaCfg, r.metrics.kprom, r.logger)
if err != nil {
return errors.Wrap(err, "creating kafka reader client")
}
r.setClient(client)

r.committer = newPartitionCommitter(r.kafkaCfg, kadm.NewClient(r.client), r.partitionID, r.consumerGroup, r.logger, r.reg)
r.committer = newPartitionCommitter(r.kafkaCfg, kadm.NewClient(r.getClient()), r.partitionID, r.consumerGroup, r.logger, r.reg)

offsetsClient := newPartitionOffsetClient(r.client, r.kafkaCfg.Topic, r.reg, r.logger)
offsetsClient := newPartitionOffsetClient(r.getClient(), r.kafkaCfg.Topic, r.reg, r.logger)

// It's ok to have the start offset slightly outdated.
// We only need this offset accurate if we fall behind or if we start and the log gets truncated from beneath us.
Expand Down Expand Up @@ -213,30 +214,26 @@ func (r *PartitionReader) start(ctx context.Context) (returnErr error) {
//
// To make it happen, we do pause the fetching first and then we configure consumption. The consumption
// will be kept paused until the explicit ResumeFetchPartitions() is called.
r.client.PauseFetchPartitions(map[string][]int32{
r.getClient().PauseFetchPartitions(map[string][]int32{
r.kafkaCfg.Topic: {r.partitionID},
})
r.client.AddConsumePartitions(map[string]map[int32]kgo.Offset{
r.getClient().AddConsumePartitions(map[string]map[int32]kgo.Offset{
r.kafkaCfg.Topic: {r.partitionID: kgo.NewOffset().At(startOffset)},
})

f, err := newConcurrentFetchers(ctx, r.client, r.logger, r.kafkaCfg.Topic, r.partitionID, startOffset, r.kafkaCfg.StartupFetchConcurrency, r.kafkaCfg.StartupRecordsPerFetch, r.kafkaCfg.UseCompressedBytesAsFetchMaxBytes, r.concurrentFetchersMinBytesMaxWaitTime, offsetsClient, startOffsetReader, &r.metrics)
f, err := newConcurrentFetchers(ctx, r.getClient(), r.logger, r.kafkaCfg.Topic, r.partitionID, startOffset, r.kafkaCfg.StartupFetchConcurrency, r.kafkaCfg.StartupRecordsPerFetch, r.kafkaCfg.UseCompressedBytesAsFetchMaxBytes, r.concurrentFetchersMinBytesMaxWaitTime, offsetsClient, startOffsetReader, &r.metrics)
if err != nil {
return errors.Wrap(err, "creating concurrent fetchers during startup")
}
r.fetcherMtx.Lock()
r.fetcher = f
r.fetcherMtx.Unlock()
r.setFetcher(f)
} else {
// When concurrent fetch is disabled we read records directly from the Kafka client, so we want it
// to consume the partition.
r.client.AddConsumePartitions(map[string]map[int32]kgo.Offset{
r.getClient().AddConsumePartitions(map[string]map[int32]kgo.Offset{
r.kafkaCfg.Topic: {r.partitionID: kgo.NewOffset().At(startOffset)},
})

r.fetcherMtx.Lock()
r.fetcher = r
r.fetcherMtx.Unlock()
r.setFetcher(r)
}

// Enforce the max consumer lag (if enabled).
Expand Down Expand Up @@ -266,12 +263,14 @@ func (r *PartitionReader) stopDependencies() error {
}
}

if r.fetcher != nil {
r.fetcher.Stop()
f := r.getFetcher()
if f != nil {
f.Stop()
}

if r.client != nil {
r.client.Close()
c := r.getClient()
if c != nil {
c.Close()
}

return nil
Expand All @@ -294,42 +293,40 @@ func (r *PartitionReader) run(ctx context.Context) error {
// switchToOngoingFetcher switches to the configured ongoing fetcher. This function could be
// called multiple times.
func (r *PartitionReader) switchToOngoingFetcher(ctx context.Context) {
r.fetcherMtx.Lock()
defer r.fetcherMtx.Unlock()

if r.kafkaCfg.StartupFetchConcurrency == r.kafkaCfg.OngoingFetchConcurrency && r.kafkaCfg.StartupRecordsPerFetch == r.kafkaCfg.OngoingRecordsPerFetch {
// we're already using the same settings, no need to switch
return
}

if r.kafkaCfg.StartupFetchConcurrency > 0 && r.kafkaCfg.OngoingFetchConcurrency > 0 {

// No need to switch the fetcher, just update the records per fetch.
r.fetcher.Update(ctx, r.kafkaCfg.OngoingFetchConcurrency, r.kafkaCfg.OngoingRecordsPerFetch)
r.getFetcher().Update(ctx, r.kafkaCfg.OngoingFetchConcurrency, r.kafkaCfg.OngoingRecordsPerFetch)
return
}

if r.kafkaCfg.StartupFetchConcurrency > 0 && r.kafkaCfg.OngoingFetchConcurrency == 0 {
if r.fetcher == r {
if r.getFetcher() == r {
// This method has been called before, no need to switch the fetcher.
return
}

level.Info(r.logger).Log("msg", "partition reader is switching to non-concurrent fetcher")

// Stop the current fetcher before replacing it.
r.fetcher.Stop()
r.getFetcher().Stop()

// We need to switch to franz-go for ongoing fetches.
// If we've already fetched some records, we should discard them from franz-go and start from the last consumed offset.
r.fetcher = r
r.setFetcher(r)

lastConsumed := r.consumedOffsetWatcher.LastConsumedOffset()
if lastConsumed == -1 {
// We haven't consumed any records yet with the other fetcher.
//
// The franz-go client is initialized to start consuming from the same place as the other fetcher.
// We can just use the client, but we have to resume the fetching because it was previously paused.
r.client.ResumeFetchPartitions(map[string][]int32{
r.getClient().ResumeFetchPartitions(map[string][]int32{
r.kafkaCfg.Topic: {r.partitionID},
})
return
Expand All @@ -339,21 +336,21 @@ func (r *PartitionReader) switchToOngoingFetcher(ctx context.Context) {
// from a clean setup and have the guarantee that we're not going to read any previously buffered record,
// we do remove the partition consumption (this clears the buffer), then we resume the fetching and finally
// we add the consumption back.
r.client.RemoveConsumePartitions(map[string][]int32{
r.getClient().RemoveConsumePartitions(map[string][]int32{
r.kafkaCfg.Topic: {r.partitionID},
})
r.client.ResumeFetchPartitions(map[string][]int32{
r.getClient().ResumeFetchPartitions(map[string][]int32{
r.kafkaCfg.Topic: {r.partitionID},
})
r.client.AddConsumePartitions(map[string]map[int32]kgo.Offset{
r.getClient().AddConsumePartitions(map[string]map[int32]kgo.Offset{
// Resume from the next unconsumed offset.
r.kafkaCfg.Topic: {r.partitionID: kgo.NewOffset().At(lastConsumed + 1)},
})
}
}

func (r *PartitionReader) processNextFetches(ctx context.Context, delayObserver prometheus.Observer) error {
fetches, fetchCtx := r.fetcher.PollFetches(ctx)
fetches, fetchCtx := r.getFetcher().PollFetches(ctx)
// Propagate the fetching span to consuming the records.
ctx = opentracing.ContextWithSpan(ctx, opentracing.SpanFromContext(fetchCtx))
r.recordFetchesMetrics(fetches, delayObserver)
Expand Down Expand Up @@ -825,11 +822,35 @@ func (r *PartitionReader) waitReadConsistency(ctx context.Context, withOffset bo
return err
}

func (r *PartitionReader) setClient(c *kgo.Client) {
r.fetchingMtx.Lock()
defer r.fetchingMtx.Unlock()
r.client = c
}

func (r *PartitionReader) getClient() *kgo.Client {
r.fetchingMtx.Lock()
defer r.fetchingMtx.Unlock()
return r.client
}

func (r *PartitionReader) setFetcher(f fetcher) {
r.fetchingMtx.Lock()
defer r.fetchingMtx.Unlock()
r.fetcher = f
}

func (r *PartitionReader) getFetcher() fetcher {
r.fetchingMtx.Lock()
defer r.fetchingMtx.Unlock()
return r.fetcher
}

func (r *PartitionReader) PollFetches(ctx context.Context) (result kgo.Fetches, fetchContext context.Context) {
defer func(start time.Time) {
r.metrics.fetchWaitDuration.Observe(time.Since(start).Seconds())
}(time.Now())
return r.client.PollFetches(ctx), ctx
return r.getClient().PollFetches(ctx), ctx
}

type partitionCommitter struct {
Expand Down

0 comments on commit e14bba5

Please sign in to comment.