Skip to content

Commit

Permalink
UsageTracker: throttle cleanup
Browse files Browse the repository at this point in the history
If we cleanup all tenant's shards in a row, we may end up in a situation
where a TrackSeries request has to wait on each one of the cleanup
calls, which causes a horrible p99.

This switches to cleanup each tenant's Nth shard first, and adds an
optional min interval between shards to cover the case when there's only
one tenant.

Signed-off-by: Oleg Zaytsev <[email protected]>
  • Loading branch information
colega committed Dec 9, 2024
1 parent 8061f7f commit 8949d85
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 27 deletions.
2 changes: 1 addition & 1 deletion pkg/usagetracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (t *UsageTracker) run(ctx context.Context) error {
for {
select {
case now := <-ticker.C:
t.store.cleanup(now)
t.store.cleanup(now, time.Minute/shards/2)
case <-ctx.Done():
return nil
case err := <-t.subservicesWatcher.Chan():
Expand Down
25 changes: 14 additions & 11 deletions pkg/usagetracker/tracker_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (t *trackerStore) getOrCreateTenant(tenantID string, limit uint64) *tracked
return tenant
}

func (t *trackerStore) cleanup(now time.Time) {
func (t *trackerStore) cleanup(now time.Time, minIntervalBetweenShards time.Duration) {
watermark := clock.ToMinutes(now.Add(-t.idleTimeout))

// We will work on a copy of tenants.
Expand All @@ -205,23 +205,26 @@ func (t *trackerStore) cleanup(now time.Time) {
t.mtx.RUnlock()

var deletionCandidates []string
done := make(chan struct{}, shards)
for tenantID, tenant := range tenantsClone {
for _, shard := range tenant.shards {
done := make(chan struct{})
for s := 0; s < shards; s++ {
t0 := time.Now() // We're using this for timing purposes, not for any logic.
for tenantID, tenant := range tenantsClone {
shard := tenant.shards[s]
shard.Events() <- tenantshard.Cleanup(
watermark,
tenant.series,
done,
)
}

// TODO: do we need this done? so far it's only added for testing purposes.
for i := 0; i < shards; i++ {
<-done
}

if tenant.series.Load() == 0 {
deletionCandidates = append(deletionCandidates, tenantID)
// On last shard, check tenant series.
if s == shards-1 && tenant.series.Load() == 0 {
deletionCandidates = append(deletionCandidates, tenantID)
}
}
elapsed := time.Since(t0)
if s < shards-1 && elapsed < minIntervalBetweenShards {
time.Sleep(minIntervalBetweenShards - elapsed)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/usagetracker/tracker_store_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func benckmarkTrackerStoreTrackSeries(b *testing.B, seriesRefs []uint64, seriesP
go func() {
defer wg.Done()
for cleanupTime := range cleanup {
t.cleanup(cleanupTime)
t.cleanup(cleanupTime, 10*time.Microsecond)
}
}()
}
Expand Down
28 changes: 14 additions & 14 deletions pkg/usagetracker/tracker_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestTrackerStore_HappyCase(t *testing.T) {
}
{
// Cleanup now is a noop, nothing expired yet.
tracker.cleanup(now)
tracker.cleanup(now, 0)
require.Equal(t, map[string]uint64{testUser1: 3}, tracker.seriesCounts())
}
{
Expand All @@ -61,7 +61,7 @@ func TestTrackerStore_HappyCase(t *testing.T) {
now = now.Add(idleTimeout / 2)
{
// Cleanup again will remove series 1.
tracker.cleanup(now)
tracker.cleanup(now, 0)
require.Equal(t, map[string]uint64{testUser1: 2}, tracker.seriesCounts())
}
{
Expand Down Expand Up @@ -137,16 +137,16 @@ func TestTrackerStore_CreatedSeriesCommunication(t *testing.T) {
}
now = now.Add(idleTimeout / 4)
{
tracker1.cleanup(now)
tracker2.cleanup(now)
tracker1.cleanup(now, 0)
tracker2.cleanup(now, 0)
// Only last 2 series remaining
require.Equal(t, map[string]uint64{testUser1: 2}, tracker1.seriesCounts())
require.Equal(t, map[string]uint64{testUser1: 2}, tracker2.seriesCounts())
}
now = now.Add(idleTimeout / 4)
{
tracker1.cleanup(now)
tracker2.cleanup(now)
tracker1.cleanup(now, 0)
tracker2.cleanup(now, 0)
// Only last 1 series remaining (the one that we pushed to tracker2)
// This tests that creation event uses the correct creation timestamp.
require.Equal(t, map[string]uint64{testUser1: 1}, tracker1.seriesCounts())
Expand All @@ -172,7 +172,7 @@ func TestTrackerStore_Snapshot(t *testing.T) {
require.Empty(t, rejected)
require.NoError(t, err)

tracker1.cleanup(now)
tracker1.cleanup(now, 0)
now = now.Add(time.Minute)
}

Expand Down Expand Up @@ -234,7 +234,7 @@ func TestTrackerStore_Cleanup_OffByOneError(t *testing.T) {
require.Empty(t, rejected)
require.NoError(t, err)

tracker.cleanup(now)
tracker.cleanup(now, 0)

// There should be exactly 1 series, the other one expired.
require.Equal(t, map[string]uint64{testUser1: 1}, tracker.seriesCounts())
Expand Down Expand Up @@ -269,7 +269,7 @@ func TestTrackerStore_Cleanup_Tenants(t *testing.T) {
require.Empty(t, rejected)

now = now.Add(defaultIdleTimeout / 2)
tracker.cleanup(now)
tracker.cleanup(now, 0)
// Tenant 1 is deleted.
// testUser2 still has series 1 and 2, with no data in shard3
require.Equal(t, map[string]uint64{testUser2: 2}, tracker.seriesCounts())
Expand All @@ -285,7 +285,7 @@ func TestTrackerStore_Cleanup_Tenants(t *testing.T) {

now = now.Add(defaultIdleTimeout / 2)

tracker.cleanup(now)
tracker.cleanup(now, 0)
// testUser2 is deleted.
require.Empty(t, tracker.seriesCounts())
for i := 0; i < shards; i++ {
Expand Down Expand Up @@ -327,7 +327,7 @@ func TestTrackerStore_Cleanup_Concurrency(t *testing.T) {
for createdSeries.count.Load() < 100*maxSeriesRange {
// Keep increasing the timestamp every time.
nowUnixMinutes.Inc()
tracker.cleanup(now())
tracker.cleanup(now(), 0)
cleanups++
}
close(done)
Expand All @@ -336,7 +336,7 @@ func TestTrackerStore_Cleanup_Concurrency(t *testing.T) {

// Wait an idle period then cleanup.
nowUnixMinutes.Add(idleTimeoutMinutes)
tracker.cleanup(now())
tracker.cleanup(now(), 0)
// Tracker should be empty nowUnixMinutes.
// If it's not, then we did something wrong.

Expand Down Expand Up @@ -390,7 +390,7 @@ func TestTrackerStore_PrometheusCollector(t *testing.T) {
require.Empty(t, rejected)

now = now.Add(defaultIdleTimeout / 2)
tracker.cleanup(now)
tracker.cleanup(now, 0)

// Tenant 1 is deleted.
require.NoError(t, testutil.CollectAndCompare(reg, strings.NewReader(`
Expand All @@ -401,7 +401,7 @@ func TestTrackerStore_PrometheusCollector(t *testing.T) {

now = now.Add(defaultIdleTimeout / 2)

tracker.cleanup(now)
tracker.cleanup(now, 0)
// testUser2 is deleted.
require.NoError(t, testutil.CollectAndCompare(reg, strings.NewReader(`
# HELP cortex_usage_tracker_active_series Number of active series tracker for each user.
Expand Down

0 comments on commit 8949d85

Please sign in to comment.