From 2a4e113d30178febc0c973fb42f9cc3071b8a7f2 Mon Sep 17 00:00:00 2001 From: Iwan Budi Kusnanto Date: Mon, 1 Apr 2019 21:32:27 +0700 Subject: [PATCH] Bcache now handles key expiration. (#24) - expiration will be handled by bcache, previous version which basically do nothing with expiration - passive key deletion, data deleted when Get found that the key is expired. No GC-like mechanism at this phase, could be added later if needed - deletion delay: add delay before actually delete the key, it is used to handle temporary network connection issue, which prevent data syncing between nodes - specify ttl instead of expirationTimestamp --- README.md | 34 ++++--------------- bcache.go | 70 +++++++++++++++++++++------------------ bcache_test.go | 88 ++++++++++++++++++++++++++------------------------ cache.go | 42 ++++++++++++++++-------- config.go | 14 ++++++++ message.go | 8 +++-- peer.go | 17 ++++++---- peer_test.go | 12 +++---- 8 files changed, 154 insertions(+), 131 deletions(-) diff --git a/README.md b/README.md index cb2f053..2d585d4 100644 --- a/README.md +++ b/README.md @@ -27,31 +27,11 @@ A Go Library to create distributed in-memory cache inside your app. Only need to specify one or few nodes as bootstrap nodes, and all nodes will find each other using gossip protocol -2. When there is cache `set`, the event will be propagated to all of the nodes. +2. When there is cache `Set` and `Delete`, the event will be propagated to all of the nodes. -So, all of the nodes will have synced data. +So, all of the nodes will eventually have synced data. -## Expiration - -Although this library doesn't invalidate the keys when it reachs the expiration time, -the expiration timestamp will be used in these ways: - -(1) On `Set`: -- as a way to decide which value is the newer when doing data synchronization among nodes -- set timestamp expiration - -(2) On `Get`: -- the expiration timestamp could be used to check whether the key has been expired - -(3) On `Delete`: -- to decide which operation is the lastes when doing syncronization, for example: - - `Delete` with timestamp 3 and `Set` with timestamp 4 -> `Set` is the latest, so the `Delete` is ignored - -So, it is **mandatory** to set the expiration time and the delta from current time must be the same -between `Set` and `Delete`. -We can also use [UnixNano](https://golang.org/pkg/time/#Time.UnixNano) for better precission than `Unix`. - ## Cache filling @@ -79,7 +59,7 @@ bc, err := New(Config{ if err != nil { log.Fatalf("failed to create cache: %v", err) } -bc.Set("my_key", "my_val",12345) +bc.Set("my_key", "my_val",86400) ``` In server 2 @@ -94,7 +74,7 @@ bc, err := New(Config{ if err != nil { log.Fatalf("failed to create cache: %v", err) } -bc.Set("my_key2", "my_val2", 12345) +bc.Set("my_key2", "my_val2", 86400) ``` In server 3 @@ -109,7 +89,7 @@ bc, err := New(Config{ if err != nil { log.Fatalf("failed to create cache: %v", err) } -val, exp, exists := bc.Get("my_key2") +val, exists := bc.Get("my_key2") ``` ### GetWithFiller example @@ -124,12 +104,12 @@ c, err := New(Config{ if err != nil { log.Fatalf("failed to create cache: %v", err) } -val, exp,err := bc.GetWithFiller("my_key2",func(key string) (string, int64, error) { +val, exp,err := bc.GetWithFiller("my_key2",func(key string) (string, error) { // get value from database ..... // return value, 0, nil -}) +}, 86400) ``` ## Credits diff --git a/bcache.go b/bcache.go index c125b85..9301b5f 100644 --- a/bcache.go +++ b/bcache.go @@ -4,6 +4,7 @@ import ( "errors" "net" "strconv" + "time" "github.com/weaveworks/mesh" "golang.org/x/sync/singleflight" @@ -22,10 +23,11 @@ var ( // Bcache represents bcache struct type Bcache struct { - peer *peer - router *mesh.Router - logger Logger - flight singleflight.Group + peer *peer + router *mesh.Router + logger Logger + flight singleflight.Group + deletionDelay time.Duration } // New creates new bcache from the given config @@ -89,40 +91,45 @@ func New(cfg Config) (*Bcache, error) { router.ConnectionMaker.InitiateConnections(cfg.Peers, true) return &Bcache{ - peer: peer, - router: router, - logger: logger, + peer: peer, + router: router, + logger: logger, + deletionDelay: time.Duration(cfg.DeletionDelay) * time.Second, }, nil } -// Set sets value for the given key. -// -// expiredTimestamp could be used in these way: -// -// - unix timestamp when this key will be expired -// - as a way to decide which value is the newer when doing data synchronization among nodes -func (b *Bcache) Set(key, val string, expiredTimestamp int64) { - b.peer.Set(key, val, expiredTimestamp) +// Set sets value for the given key with the given ttl in second. +// if ttl <= 0, the key will expired instantly +func (b *Bcache) Set(key, val string, ttl int) { + if ttl <= 0 { + b.Delete(key) + return + } + b.set(key, val, ttl) +} + +func (b *Bcache) set(key, val string, ttl int) int64 { + expired := time.Now().Add(time.Duration(ttl) * time.Second).UnixNano() + b.peer.Set(key, val, expired) + return expired } // Get gets value for the given key. // -// It returns the value, expiration timestamp, and true if the key exists -func (b *Bcache) Get(key string) (string, int64, bool) { +// It returns the value and true if the key exists +func (b *Bcache) Get(key string) (string, bool) { return b.peer.Get(key) } // Delete the given key. // -// The given timestamp is used to decide which operation is the lastes when doing syncronization. -// -// For example: `Delete` with timestamp 3 and `Set` with timestamp 4 -> `Set` is the latest, so the `Delete` is ignored -func (b *Bcache) Delete(key string, expiredTimestamp int64) { - b.peer.Delete(key, expiredTimestamp) +func (b *Bcache) Delete(key string) { + deleteTs := time.Now().Add(b.deletionDelay).UnixNano() + b.peer.Delete(key, deleteTs) } // Filler defines func to be called when the given key is not exists -type Filler func(key string) (val string, expired int64, err error) +type Filler func(key string) (val string, err error) // GetWithFiller gets value for the given key and fill the cache // if the given key is not exists. @@ -134,27 +141,26 @@ type Filler func(key string) (val string, expired int64, err error) // // // It useful to avoid cache stampede to the underlying database -func (b *Bcache) GetWithFiller(key string, filler Filler) (string, int64, error) { +func (b *Bcache) GetWithFiller(key string, filler Filler, ttl int) (string, error) { if filler == nil { - return "", 0, ErrNilFiller + return "", ErrNilFiller } // get value from cache - val, exp, ok := b.Get(key) + val, ok := b.Get(key) if ok { - return val, exp, nil + return val, nil } // construct singleflight filler flightFn := func() (interface{}, error) { - val, expired, err := filler(key) + val, err := filler(key) if err != nil { b.logger.Errorf("filler failed: %v", err) return nil, err } - // set the key if filler OK - b.peer.Set(key, val, expired) + expired := b.set(key, val, ttl) return value{ value: val, @@ -167,12 +173,12 @@ func (b *Bcache) GetWithFiller(key string, filler Filler) (string, int64, error) return flightFn() }) if err != nil { - return "", 0, err + return "", err } // return the value value := valueIf.(value) - return value.value, value.expired, nil + return value.value, nil } // Close closes the cache, free all the resource diff --git a/bcache_test.go b/bcache_test.go index d7b2107..1ff7905 100644 --- a/bcache_test.go +++ b/bcache_test.go @@ -20,37 +20,40 @@ func TestIntegration(t *testing.T) { numKeys = 50 ) var ( - keys []string - expiredIn = 10 * time.Minute - waitDur = 3 * time.Second + keys []string + ttl = 60 + waitDur = 3 * time.Second ) b1, err := New(Config{ - PeerID: 1, - ListenAddr: "127.0.0.1:12345", - Peers: nil, - MaxKeys: 1000, - Logger: &nopLogger{}, + PeerID: 1, + ListenAddr: "127.0.0.1:12345", + Peers: nil, + MaxKeys: 1000, + Logger: &nopLogger{}, + DeletionDelay: 1, }) require.NoError(t, err) defer b1.Close() b2, err := New(Config{ - PeerID: 2, - ListenAddr: "127.0.0.1:12346", - Peers: []string{"127.0.0.1:12345"}, - MaxKeys: 1000, - Logger: &nopLogger{}, + PeerID: 2, + ListenAddr: "127.0.0.1:12346", + Peers: []string{"127.0.0.1:12345"}, + MaxKeys: 1000, + Logger: &nopLogger{}, + DeletionDelay: 1, }) require.NoError(t, err) defer b2.Close() b3, err := New(Config{ - PeerID: 3, - ListenAddr: "127.0.0.1:12356", - Peers: []string{"127.0.0.1:12345"}, - MaxKeys: 1000, - Logger: &nopLogger{}, + PeerID: 3, + ListenAddr: "127.0.0.1:12356", + Peers: []string{"127.0.0.1:12345"}, + MaxKeys: 1000, + Logger: &nopLogger{}, + DeletionDelay: 1, }) require.NoError(t, err) defer b3.Close() @@ -62,58 +65,58 @@ func TestIntegration(t *testing.T) { // ---------- set from b1, check in b2 & b3 ----------- for _, key := range keys { - b1.Set(key, val1, time.Now().Add(expiredIn).UnixNano()) + b1.Set(key, val1, ttl) } // wait for it to propagate and check from b2 b3 time.Sleep(waitDur) for _, key := range keys { - get, exp, ok := b2.Get(key) + get, ok := b2.Get(key) require.True(t, ok) - require.NotZero(t, exp) require.Equal(t, val1, get) - get, exp, ok = b3.Get(key) + get, ok = b3.Get(key) require.True(t, ok) - require.NotZero(t, exp) require.Equal(t, val1, get) } // ----------- set from b2, check in b1 & b3 -------------- for _, key := range keys { - b2.Set(key, val2, time.Now().Add(expiredIn).UnixNano()) + b2.Set(key, val2, ttl) } // wait for it to propagate and check from b1 & b3 time.Sleep(waitDur) for _, key := range keys { - get, exp, ok := b1.Get(key) + get, ok := b1.Get(key) require.True(t, ok) - require.NotZero(t, exp) require.Equal(t, val2, get) - get, exp, ok = b3.Get(key) + get, ok = b3.Get(key) require.True(t, ok) - require.NotZero(t, exp) require.Equal(t, val2, get) } // ------ delete from b1, and check b2 & b3 ---------- for _, key := range keys { - b1.Delete(key, time.Now().Add(expiredIn).UnixNano()) + b1.Delete(key) } - // wait for it to propagate and check from b2 + + // wait for it to propagate and check from b2 & b3 time.Sleep(waitDur) for _, key := range keys { - _, _, exists := b2.Get(key) + _, exists := b1.Get(key) + require.False(t, exists) + + _, exists = b2.Get(key) require.False(t, exists) - _, _, exists = b3.Get(key) + _, exists = b3.Get(key) require.False(t, exists) } @@ -125,8 +128,8 @@ func TestJoinLater(t *testing.T) { numKeys = 15 ) var ( - keyvals = make(map[string]string) - expiredIn = 10 * time.Minute + keyvals = make(map[string]string) + ttl = 60 ) for i := 0; i < numKeys; i++ { k := fmt.Sprintf("key_%d", i) @@ -147,7 +150,7 @@ func TestJoinLater(t *testing.T) { // set values for k, v := range keyvals { - b1.Set(k, v, time.Now().Add(expiredIn).Unix()) + b1.Set(k, v, ttl) } b2, err := New(Config{ @@ -165,7 +168,7 @@ func TestJoinLater(t *testing.T) { // check we could get it from b2 for k, v := range keyvals { - got, _, ok := b2.Get(k) + got, ok := b2.Get(k) require.True(t, ok) require.Equal(t, v, got) } @@ -174,7 +177,7 @@ func TestJoinLater(t *testing.T) { func TestFiller(t *testing.T) { var ( errFillerFailed = errors.New("filler failed") - expiredIn = 10 * time.Minute + ttl = 600 ) testCases := []struct { @@ -192,8 +195,8 @@ func TestFiller(t *testing.T) { }, { name: "valid filler", - filler: func(key string) (string, int64, error) { - return key, time.Now().Add(expiredIn).Unix(), nil + filler: func(key string) (string, error) { + return key, nil }, key: "valid", err: nil, @@ -201,8 +204,8 @@ func TestFiller(t *testing.T) { }, { name: "failed filler", - filler: func(key string) (string, int64, error) { - return "", time.Now().Add(expiredIn).Unix(), errFillerFailed + filler: func(key string) (string, error) { + return "", errFillerFailed }, key: "failed", err: errFillerFailed, @@ -220,13 +223,12 @@ func TestFiller(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - val, exp, err := bc.GetWithFiller(tc.key, tc.filler) + val, err := bc.GetWithFiller(tc.key, tc.filler, ttl) require.Equal(t, tc.err, err) if tc.err != nil { return } require.Equal(t, tc.key, val) - require.NotZero(t, exp) }) } } diff --git a/cache.go b/cache.go index 6f8e167..600e616 100644 --- a/cache.go +++ b/cache.go @@ -2,7 +2,7 @@ package bcache import ( "sync" - //"time" + "time" "github.com/hashicorp/golang-lru" "github.com/weaveworks/mesh" @@ -28,12 +28,12 @@ func newCache(maxKeys int) (*cache, error) { // value represent cache value type value struct { value string - expired int64 - deleted bool + expired int64 // expiration timestamp of the value + deleted int64 // deletion timestamp of the value } // Set sets the value of a cache -func (c *cache) Set(key, val string, expiredTimestamp int64, deleted bool) { +func (c *cache) Set(key, val string, expiredTimestamp, deleted int64) { c.cc.Add(key, value{ value: val, expired: expiredTimestamp, @@ -44,14 +44,14 @@ func (c *cache) Set(key, val string, expiredTimestamp int64, deleted bool) { // Delete del the value of a cache. // returns true if the key exists in cache, false otherwise -func (c *cache) Delete(key string, expiredTimestamp int64) bool { +func (c *cache) Delete(key string, deleteTimestamp int64) (string, int64, bool) { val, ok := c.get(key) if !ok { - return false + return "", 0, false } - c.Set(key, val.value, expiredTimestamp, true) + c.Set(key, val.value, val.expired, deleteTimestamp) - return true + return val.value, val.expired, true } // Get gets cache value of the given key @@ -65,12 +65,23 @@ func (c *cache) get(key string) (*value, bool) { } // Get gets cache value of the given key -func (c *cache) Get(key string) (string, int64, bool) { +func (c *cache) Get(key string) (string, bool) { val, ok := c.get(key) - if !ok || val.deleted { - return "", 0, false + if !ok { + return "", false } - return val.value, val.expired, true + + now := time.Now().UnixNano() + + if now >= val.expired || (now >= val.deleted && val.deleted > 0) { + // delete the key if: + // - expired + // - deleted + c.cc.Remove(key) + return "", false + } + + return val.value, val.deleted <= 0 } func (c *cache) Messages() *message { @@ -118,8 +129,11 @@ func (c *cache) mergeChange(msg *message) (delta mesh.GossipData, changedKey int var existingKeys []string for _, e := range msg.Entries { cacheVal, ok := c.get(e.Key) - if ok && cacheVal.expired >= e.Expired { - // the key already exists and has bigger expiration value + if ok && cacheVal.expired >= e.Expired && cacheVal.deleted == e.Deleted { + // no changes: + // - key already exists + // - has bigger expiration value + // - has same deleted val existingKeys = append(existingKeys, e.Key) continue } diff --git a/config.go b/config.go index bb31042..ababdfb 100644 --- a/config.go +++ b/config.go @@ -2,6 +2,10 @@ package bcache import "github.com/weaveworks/mesh" +const ( + defaultDeletionDelay = 100 // default deletion delay : 100 seconds +) + // Config represents bcache configuration type Config struct { // PeerID is unique ID of this bcache @@ -23,6 +27,12 @@ type Config struct { // Logger to be used // leave it nil to use default logger which do nothing Logger Logger + + // DeletionDelay adds delay before actually delete the key, + // it is used to handle temporary network connection issue, + // which could prevent data syncing between nodes. + // Leave it to 0 make it use default value: 100 seconds. + DeletionDelay int } func (c *Config) setDefault() error { @@ -41,6 +51,10 @@ func (c *Config) setDefault() error { c.PeerID = uint64(pName) } + if c.DeletionDelay <= 0 { + c.DeletionDelay = defaultDeletionDelay + } + // if logger is nil, create default nopLogger if c.Logger == nil { c.Logger = &nopLogger{} diff --git a/message.go b/message.go index a48da17..f8354af 100644 --- a/message.go +++ b/message.go @@ -24,7 +24,7 @@ type entry struct { Key string Val string Expired int64 - Deleted bool + Deleted int64 } func newMessage(peerID mesh.PeerName, numEntries int) *message { @@ -47,7 +47,7 @@ func newMessageFromBuf(b []byte) (*message, error) { return &m, err } -func (m *message) add(key, val string, expired int64, deleted bool) { +func (m *message) add(key, val string, expired, deleted int64) { m.mux.Lock() m.Entries[key] = entry{ Key: key, @@ -82,6 +82,10 @@ func (m *message) mergeComplete(other *message) mesh.GossipData { for k, v := range other.Entries { existing, ok := m.Entries[k] + + // merge + // - the key not exists in + // - has less expiration time if !ok || existing.Expired < v.Expired { m.Entries[k] = v } diff --git a/peer.go b/peer.go index f8249a1..8ec20b1 100644 --- a/peer.go +++ b/peer.go @@ -105,11 +105,11 @@ func (p *peer) Set(key, val string, expiredTimestamp int64) { defer close(c) // set our cache - p.cc.Set(key, val, expiredTimestamp, false) + p.cc.Set(key, val, expiredTimestamp, 0) // construct & send the message m := newMessage(p.name, 1) - m.add(key, val, expiredTimestamp, false) + m.add(key, val, expiredTimestamp, 0) p.broadcast(m) } @@ -117,7 +117,7 @@ func (p *peer) Set(key, val string, expiredTimestamp int64) { <-c // wait for it to be finished } -func (p *peer) Delete(key string, expiredTimestamp int64) bool { +func (p *peer) Delete(key string, deleteTimestamp int64) bool { var ( c = make(chan struct{}) exist bool @@ -126,12 +126,15 @@ func (p *peer) Delete(key string, expiredTimestamp int64) bool { p.actionCh <- func() { defer close(c) - // set our cache - exist = p.cc.Delete(key, expiredTimestamp) + // delete from our cache + val, expired, exist := p.cc.Delete(key, deleteTimestamp) + if !exist { + return + } // construct & send the message m := newMessage(p.name, 1) - m.add(key, "", expiredTimestamp, true) + m.add(key, val, expired, deleteTimestamp) p.broadcast(m) } @@ -140,7 +143,7 @@ func (p *peer) Delete(key string, expiredTimestamp int64) bool { return exist } -func (p *peer) Get(key string) (string, int64, bool) { +func (p *peer) Get(key string) (string, bool) { return p.cc.Get(key) } diff --git a/peer_test.go b/peer_test.go index 200c616..1d22fe0 100644 --- a/peer_test.go +++ b/peer_test.go @@ -129,7 +129,7 @@ func TestPeerOnGossip(t *testing.T) { "key1": { Key: "key1", Expired: 2, - Deleted: true, + Deleted: 1, }, }, delta: map[string]entry{ @@ -137,7 +137,7 @@ func TestPeerOnGossip(t *testing.T) { Key: "key1", Val: "", Expired: 2, - Deleted: true, + Deleted: 1, }, }, }, @@ -294,7 +294,7 @@ func TestPeerOnGossipBroadcast(t *testing.T) { "key1": { Key: "key1", Expired: 2, - Deleted: true, + Deleted: 1, }, }, delta: map[string]entry{ @@ -302,7 +302,7 @@ func TestPeerOnGossipBroadcast(t *testing.T) { Key: "key1", Val: "", Expired: 2, - Deleted: true, + Deleted: 1, }, }, }, @@ -472,7 +472,7 @@ func TestPeerOnGossipUnicast(t *testing.T) { "key1": { Key: "key1", Expired: 2, - Deleted: true, + Deleted: 1, }, }, complete: map[string]entry{ @@ -480,7 +480,7 @@ func TestPeerOnGossipUnicast(t *testing.T) { Key: "key1", Val: "", Expired: 2, - Deleted: true, + Deleted: 1, }, }, },