Skip to content

Commit

Permalink
Apply grace period to max time _and_ upload time.
Browse files Browse the repository at this point in the history
Signed-off-by: Nick Pillitteri <[email protected]>
  • Loading branch information
56quarters committed Jan 10, 2025
1 parent e42e7c9 commit 450dda1
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 13 deletions.
15 changes: 9 additions & 6 deletions pkg/storegateway/expanded_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,20 @@ type MaxTimeExpandedReplication struct {
now func() time.Time
}

func (e *MaxTimeExpandedReplication) isMaxTimeInWindow(b ReplicatedBlock, now time.Time) bool {
func (e *MaxTimeExpandedReplication) EligibleForSync(b ReplicatedBlock) bool {
now := e.now()
maxTimeDelta := now.Sub(b.GetMaxTime())
return maxTimeDelta <= e.maxTime
}

func (e *MaxTimeExpandedReplication) EligibleForSync(b ReplicatedBlock) bool {
return e.isMaxTimeInWindow(b, e.now())
}

func (e *MaxTimeExpandedReplication) EligibleForQuerying(b QueryableReplicatedBlock) bool {
now := e.now()
uploadedDelta := now.Sub(b.GetUploadedAt())
return uploadedDelta > e.gracePeriod && e.isMaxTimeInWindow(b, now)
maxTimeDelta := now.Sub(b.GetMaxTime())
// To be eligible for querying a block must:
// * Have been uploaded more than `gracePeriod` ago since we need to allow store-gateways
// to sync recently uploaded blocks.
// * Have a max time within `maxTime-gracePeriod` since we need to allow store-gateways to
// sync blocks that have recently become eligible for expanded replication.
return uploadedDelta > e.gracePeriod && maxTimeDelta <= (e.maxTime-e.gracePeriod)
}
23 changes: 16 additions & 7 deletions pkg/storegateway/expanded_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,26 @@ func TestMaxTimeExpandedReplication(t *testing.T) {
},
"max time on boundary": {
block: bucketindex.Block{
MinTime: now.Add(-25 * time.Hour).UnixMilli(),
MaxTime: now.Add(-13 * time.Hour).UnixMilli(),
MinTime: now.Add(-49 * time.Hour).UnixMilli(),
MaxTime: now.Add(-25 * time.Hour).UnixMilli(),
UploadedAt: now.Add(-6 * time.Hour).Unix(),
},
expectedSync: true,
expectedQuery: false,
},
"max time on boundary including grace period": {
block: bucketindex.Block{
MinTime: now.Add(-49 * time.Hour).UnixMilli(),
MaxTime: now.Add(-(24*time.Hour + 15*time.Minute)).UnixMilli(),
UploadedAt: now.Add(-6 * time.Hour).Unix(),
},
expectedSync: true,
expectedQuery: true,
},
"max time on boundary recent upload": {
"max time on boundary including grace period recent upload": {
block: bucketindex.Block{
MinTime: now.Add(-25 * time.Hour).UnixMilli(),
MaxTime: now.Add(-13 * time.Hour).UnixMilli(),
MinTime: now.Add(-49 * time.Hour).UnixMilli(),
MaxTime: now.Add(-(24*time.Hour + 15*time.Minute)).UnixMilli(),
UploadedAt: now.Add(-30 * time.Minute).Unix(),
},
expectedSync: true,
Expand All @@ -77,8 +86,8 @@ func TestMaxTimeExpandedReplication(t *testing.T) {
canSync := replication.EligibleForSync(&tc.block)
canQuery := replication.EligibleForQuerying(&tc.block)

require.Equal(t, tc.expectedSync, canSync, "expected to be able to sync block %+v using %+v", tc.block, replication)
require.Equal(t, tc.expectedQuery, canQuery, "expected to be able to query block %+v using %+v", tc.block, replication)
require.Equal(t, tc.expectedSync, canSync, "expected to be able/not-able to sync block %+v using %+v", tc.block, replication)
require.Equal(t, tc.expectedQuery, canQuery, "expected to be able/not-able to query block %+v using %+v", tc.block, replication)
})
}
}

0 comments on commit 450dda1

Please sign in to comment.