diff --git a/pkg/storegateway/expanded_replication.go b/pkg/storegateway/expanded_replication.go index 552fc8d13a..7d883f3deb 100644 --- a/pkg/storegateway/expanded_replication.go +++ b/pkg/storegateway/expanded_replication.go @@ -91,17 +91,20 @@ type MaxTimeExpandedReplication struct { now func() time.Time } -func (e *MaxTimeExpandedReplication) isMaxTimeInWindow(b ReplicatedBlock, now time.Time) bool { +func (e *MaxTimeExpandedReplication) EligibleForSync(b ReplicatedBlock) bool { + now := e.now() maxTimeDelta := now.Sub(b.GetMaxTime()) return maxTimeDelta <= e.maxTime } -func (e *MaxTimeExpandedReplication) EligibleForSync(b ReplicatedBlock) bool { - return e.isMaxTimeInWindow(b, e.now()) -} - func (e *MaxTimeExpandedReplication) EligibleForQuerying(b QueryableReplicatedBlock) bool { now := e.now() uploadedDelta := now.Sub(b.GetUploadedAt()) - return uploadedDelta > e.gracePeriod && e.isMaxTimeInWindow(b, now) + maxTimeDelta := now.Sub(b.GetMaxTime()) + // To be eligible for querying a block must: + // * Have been uploaded more than `gracePeriod` ago since we need to allow store-gateways + // to sync recently uploaded blocks. + // * Have a max time within `maxTime-gracePeriod` since we need to allow store-gateways to + // sync blocks that have recently become eligible for expanded replication. + return uploadedDelta > e.gracePeriod && maxTimeDelta <= (e.maxTime-e.gracePeriod) } diff --git a/pkg/storegateway/expanded_replication_test.go b/pkg/storegateway/expanded_replication_test.go index 1749e95b22..b8c90f661f 100644 --- a/pkg/storegateway/expanded_replication_test.go +++ b/pkg/storegateway/expanded_replication_test.go @@ -45,17 +45,26 @@ func TestMaxTimeExpandedReplication(t *testing.T) { }, "max time on boundary": { block: bucketindex.Block{ - MinTime: now.Add(-25 * time.Hour).UnixMilli(), - MaxTime: now.Add(-13 * time.Hour).UnixMilli(), + MinTime: now.Add(-49 * time.Hour).UnixMilli(), + MaxTime: now.Add(-25 * time.Hour).UnixMilli(), + UploadedAt: now.Add(-6 * time.Hour).Unix(), + }, + expectedSync: true, + expectedQuery: false, + }, + "max time on boundary including grace period": { + block: bucketindex.Block{ + MinTime: now.Add(-49 * time.Hour).UnixMilli(), + MaxTime: now.Add(-(24*time.Hour + 15*time.Minute)).UnixMilli(), UploadedAt: now.Add(-6 * time.Hour).Unix(), }, expectedSync: true, expectedQuery: true, }, - "max time on boundary recent upload": { + "max time on boundary including grace period recent upload": { block: bucketindex.Block{ - MinTime: now.Add(-25 * time.Hour).UnixMilli(), - MaxTime: now.Add(-13 * time.Hour).UnixMilli(), + MinTime: now.Add(-49 * time.Hour).UnixMilli(), + MaxTime: now.Add(-(24*time.Hour + 15*time.Minute)).UnixMilli(), UploadedAt: now.Add(-30 * time.Minute).Unix(), }, expectedSync: true, @@ -77,8 +86,8 @@ func TestMaxTimeExpandedReplication(t *testing.T) { canSync := replication.EligibleForSync(&tc.block) canQuery := replication.EligibleForQuerying(&tc.block) - require.Equal(t, tc.expectedSync, canSync, "expected to be able to sync block %+v using %+v", tc.block, replication) - require.Equal(t, tc.expectedQuery, canQuery, "expected to be able to query block %+v using %+v", tc.block, replication) + require.Equal(t, tc.expectedSync, canSync, "expected to be able/not-able to sync block %+v using %+v", tc.block, replication) + require.Equal(t, tc.expectedQuery, canQuery, "expected to be able/not-able to query block %+v using %+v", tc.block, replication) }) } }