Skip to content

Commit

Permalink
assert on buffered records
Browse files Browse the repository at this point in the history
Signed-off-by: gotjosh <[email protected]>
  • Loading branch information
gotjosh committed Nov 11, 2024
1 parent e14bba5 commit 753ba3b
Showing 1 changed file with 71 additions and 9 deletions.
80 changes: 71 additions & 9 deletions pkg/storage/ingest/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1799,15 +1799,35 @@ func TestPartitionReader_ShouldNotBufferRecordsInTheKafkaClientWhenDone(t *testi
partitionID = 1
)

concurrencyVariants := map[string][]readerTestCfgOpt{
"without concurrency": {withStartupConcurrency(0), withOngoingConcurrency(0)},
"with startup concurrency": {withStartupConcurrency(2), withOngoingConcurrency(0)},
"with startup and ongoing concurrency (same settings)": {withStartupConcurrency(2), withOngoingConcurrency(2)},
"with startup and ongoing concurrency (different settings)": {withStartupConcurrency(2), withOngoingConcurrency(4)},
tc := map[string]struct {
concurrencyVariant []readerTestCfgOpt
expectedBufferedRecords int
expectedBufferedRecordsFromClient int
}{
"without concurrency": {
concurrencyVariant: []readerTestCfgOpt{withStartupConcurrency(0), withOngoingConcurrency(0)},
expectedBufferedRecords: 1,
expectedBufferedRecordsFromClient: 1,
},
"with startup concurrency": {
concurrencyVariant: []readerTestCfgOpt{withStartupConcurrency(2), withOngoingConcurrency(0)},
expectedBufferedRecords: 1,
expectedBufferedRecordsFromClient: 1,
},
"with startup and ongoing concurrency": {
concurrencyVariant: []readerTestCfgOpt{withStartupConcurrency(2), withOngoingConcurrency(2)},
expectedBufferedRecords: 1,
expectedBufferedRecordsFromClient: 0,
},
"with startup and ongoing concurrency (different settings)": {
concurrencyVariant: []readerTestCfgOpt{withStartupConcurrency(2), withOngoingConcurrency(4)},
expectedBufferedRecords: 1,
expectedBufferedRecordsFromClient: 0,
},
}

for concurrencyName, concurrencyVariant := range concurrencyVariants {
concurrencyVariant := concurrencyVariant
for concurrencyName, tt := range tc {
concurrencyVariant := tt.concurrencyVariant

t.Run(concurrencyName, func(t *testing.T) {
t.Parallel()
Expand All @@ -1817,12 +1837,30 @@ func TestPartitionReader_ShouldNotBufferRecordsInTheKafkaClientWhenDone(t *testi
_, clusterAddr = testkafka.CreateCluster(t, partitionID+1, topicName)
consumedRecordsMx sync.Mutex
consumedRecords []string
blocked = atomic.NewBool(false)
)

consumer := consumerFunc(func(_ context.Context, records []record) error {
if blocked.Load() {
blockedTicker := time.NewTicker(100 * time.Millisecond)
defer blockedTicker.Stop()
outer:
for {
select {
case <-blockedTicker.C:
if !blocked.Load() {
break outer
}
case <-time.After(3 * time.Second):
// This is basically a test failure as we never finish the test in time.
t.Log("failed to finish unblocking the consumer in time")
return nil
}
}
}

consumedRecordsMx.Lock()
defer consumedRecordsMx.Unlock()

for _, r := range records {
consumedRecords = append(consumedRecords, string(r.content))
}
Expand Down Expand Up @@ -1888,11 +1926,35 @@ func TestPartitionReader_ShouldNotBufferRecordsInTheKafkaClientWhenDone(t *testi
`), "cortex_ingest_storage_reader_last_consumed_offset", "cortex_ingest_storage_reader_buffered_fetch_records_total", "cortex_ingest_storage_reader_buffered_fetched_records")
})

// Produce more records after the reader has started.
// Now, we want to assert that when the reader does have records buffered the metrics correctly reflect the current state.
// First, make the consumer block on the next consumption.
blocked.Store(true)

// Now, produce more records after the reader has started.
produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-3"))
produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-4"))
t.Log("produced 2 records")

// Now, we expect to have some records buffered.
test.Poll(t, time.Second, nil, func() interface{} {
return promtest.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(`
# HELP cortex_ingest_storage_reader_last_consumed_offset The last offset successfully consumed by the partition reader. Set to -1 if not offset has been consumed yet.
# TYPE cortex_ingest_storage_reader_last_consumed_offset gauge
cortex_ingest_storage_reader_last_consumed_offset{partition="1"} 1
# HELP cortex_ingest_storage_reader_buffered_fetch_records_total Total number of records buffered within the client ready to be consumed
# TYPE cortex_ingest_storage_reader_buffered_fetch_records_total gauge
cortex_ingest_storage_reader_buffered_fetch_records_total{component="partition-reader"} %d
# HELP cortex_ingest_storage_reader_buffered_fetched_records The number of records fetched from Kafka by both concurrent fetchers and the kafka client but not yet processed.
# TYPE cortex_ingest_storage_reader_buffered_fetched_records gauge
cortex_ingest_storage_reader_buffered_fetched_records %d
`, tt.expectedBufferedRecordsFromClient, tt.expectedBufferedRecords)), "cortex_ingest_storage_reader_last_consumed_offset", "cortex_ingest_storage_reader_buffered_fetch_records_total", "cortex_ingest_storage_reader_buffered_fetched_records")
})

// With that assertion done, we can unblock records consumption.
blocked.Store(false)

// We expect the reader to consume subsequent records too.
test.Poll(t, time.Second, []string{"record-1", "record-2", "record-3", "record-4"}, func() interface{} {
consumedRecordsMx.Lock()
Expand Down

0 comments on commit 753ba3b

Please sign in to comment.