Skip to content

Commit

Permalink
Use singleflight in localCache.load (milvus-io#21606)
Browse files Browse the repository at this point in the history
Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Jan 10, 2023
1 parent 3b41a69 commit c550427
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 12 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ linters:
- gosimple
- gosec
- revive
- gocritic
# - gocritic

linters-settings:
misspell:
Expand Down
34 changes: 23 additions & 11 deletions internal/util/cache/local_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ package cache

import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"go.uber.org/zap"
"golang.org/x/sync/singleflight"

"github.com/milvus-io/milvus/internal/log"
)
Expand Down Expand Up @@ -55,6 +57,7 @@ type localCache[K comparable, V any] struct {
onInsertion Func[K, V]
onRemoval Func[K, V]

singleflight singleflight.Group
loader LoaderFunc[K, V]
getPreLoadData GetPreLoadDataFunc[K, V]

Expand Down Expand Up @@ -345,20 +348,29 @@ func (c *localCache[K, V]) load(k K) (v V, err error) {
var ret V
return ret, errors.New("cache loader function must be set")
}
// TODO: Poll the value instead when the entry is loading.
start := currentTime()
v, err = c.loader(k)
now := currentTime()
loadTime := now.Sub(start)

// use singleflight here
val, err, _ := c.singleflight.Do(fmt.Sprintf("%v", k), func() (any, error) {
start := currentTime()
v, err := c.loader(k)
now := currentTime()
loadTime := now.Sub(start)
if err != nil {
c.stats.RecordLoadError(loadTime)
return v, err
}
c.stats.RecordLoadSuccess(loadTime)
en := newEntry(k, v, sum(k))
c.setEntryWriteTime(en, now)
c.setEntryAccessTime(en, now)
c.sendEvent(eventWrite, en)

return v, err
})
if err != nil {
c.stats.RecordLoadError(loadTime)
return v, err
}
c.stats.RecordLoadSuccess(loadTime)
en := newEntry(k, v, sum(k))
c.setEntryWriteTime(en, now)
c.setEntryAccessTime(en, now)
c.sendEvent(eventWrite, en)
v = val.(V)
return v, nil
}

Expand Down

0 comments on commit c550427

Please sign in to comment.