From 11ad065e68fc5e5249d2e186fc0e6d75359db9a3 Mon Sep 17 00:00:00 2001 From: Jeffsky Date: Wed, 22 Jun 2022 14:53:25 +0800 Subject: [PATCH] imporve: use sync map to store handlers (#125) --- internal/map32/map32.go | 154 ----------------------------------- internal/map32/map32_test.go | 114 -------------------------- internal/map32/types.go.go | 15 ---- internal/socket/duplex.go | 33 +++----- 4 files changed, 10 insertions(+), 306 deletions(-) delete mode 100644 internal/map32/map32.go delete mode 100644 internal/map32/map32_test.go delete mode 100644 internal/map32/types.go.go diff --git a/internal/map32/map32.go b/internal/map32/map32.go deleted file mode 100644 index 8f7fc17..0000000 --- a/internal/map32/map32.go +++ /dev/null @@ -1,154 +0,0 @@ -package map32 - -import ( - "sync" -) - -// Hasher calculates the sharding offset. -type Hasher func(key uint32, cap int) (offset int) - -type shard32 struct { - sync.Mutex - store map[uint32]interface{} -} - -func (s *shard32) Range(fn func(uint32, interface{}) bool) bool { - if s == nil { - return true - } - s.Lock() - defer s.Unlock() - - if s.store == nil { - return true - } - - for k, v := range s.store { - if !fn(k, v) { - return false - } - } - return true -} - -func (s *shard32) Load(key uint32) (v interface{}, ok bool) { - if s == nil { - return - } - s.Lock() - if s.store != nil { - v, ok = s.store[key] - } - s.Unlock() - return -} - -func (s *shard32) Store(key uint32, value interface{}) { - if s == nil { - return - } - s.Lock() - if s.store == nil { - s.store = make(map[uint32]interface{}) - } - s.store[key] = value - s.Unlock() -} - -func (s *shard32) Delete(key uint32) { - if s == nil { - return - } - s.Lock() - if s.store != nil { - delete(s.store, key) - } - s.Unlock() -} - -type map32 struct { - h Hasher - cap int - s []shard32 -} - -func (m *map32) Destroy() { - m.cap = 0 - m.s = nil -} - -func (m *map32) Range(fn func(uint32, interface{}) bool) { - for i := 0; i < len(m.s); i++ { - next := &m.s[i] - if !next.Range(fn) { - break - } - } -} - -func (m *map32) Load(key uint32) (interface{}, bool) { - return m.shard(key).Load(key) -} - -func (m *map32) Store(key uint32, value interface{}) { - m.shard(key).Store(key, value) -} - -func (m *map32) Delete(key uint32) { - m.shard(key).Delete(key) -} - -func (m *map32) shard(k uint32) *shard32 { - if m.cap == 0 { - return nil - } - var offset int - if m.h == nil { - offset = int(k) % m.cap - } else { - offset = m.h(k, m.cap) - } - - if offset < 0 || offset >= m.cap { - return nil - } - return &m.s[offset] -} - -type config struct { - cap int - h Hasher -} - -// Option configs the Map32. -type Option func(*config) - -// WithCap sets cap for Map32. -func WithCap(cap int) Option { - return func(c *config) { - c.cap = cap - } -} - -// WithHasher sets Hasher for Map32. -func WithHasher(h Hasher) Option { - return func(c *config) { - c.h = h - } -} - -// New creates a new Map32 with input options. -func New(options ...Option) Map32 { - c := config{ - cap: 32, - } - for _, it := range options { - it(&c) - } - s := make([]shard32, c.cap) - return &map32{ - h: c.h, - cap: c.cap, - s: s, - } -} diff --git a/internal/map32/map32_test.go b/internal/map32/map32_test.go deleted file mode 100644 index 0cff34e..0000000 --- a/internal/map32/map32_test.go +++ /dev/null @@ -1,114 +0,0 @@ -package map32 - -import ( - "math/rand" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func init() { - rand.Seed(time.Now().UnixNano()) -} - -func TestMap32_Destroy(t *testing.T) { - m := New() - const cnt = 10000 - for i := 0; i < cnt; i++ { - m.Store(uint32(i), time.Now()) - } - - m.Destroy() - - _, ok := m.Load(1) - assert.False(t, ok) -} - -func TestNil(t *testing.T) { - var s *shard32 - s.Load(1) - s.Delete(1) - s.Range(func(u uint32, i interface{}) bool { - return true - }) - s.Store(1, 1) -} - -func TestEmpty(t *testing.T) { - m := New(WithCap(8), WithHasher(func(key uint32, cap int) (offset int) { - offset = int(key) % cap - return - })) - - m.Load(1) - m.Range(func(u uint32, i interface{}) bool { - return true - }) - m.Delete(1) - m.Store(1, 1) - m.Load(1) -} - -func TestAll(t *testing.T) { - m := New(WithCap(16)) - const cnt = 100000 - var wg sync.WaitGroup - wg.Add(cnt) - for i := 0; i < cnt; i++ { - go func(n uint32) { - m.Store(n, struct{}{}) - wg.Done() - }(uint32(i)) - } - wg.Wait() - - wg.Add(cnt) - for i := 0; i < cnt; i++ { - go func(n uint32) { - defer wg.Done() - _, ok := m.Load(n) - assert.True(t, ok) - }(uint32(i)) - } - wg.Wait() - - var n int - m.Range(func(u uint32, i interface{}) bool { - n++ - return true - }) - assert.Equal(t, cnt, n) - - wg.Add(cnt) - for i := 0; i < cnt; i++ { - go func(n uint32) { - defer wg.Done() - m.Delete(n) - }(uint32(i)) - } - wg.Wait() - _, ok := m.Load(1) - assert.False(t, ok) - - _, ok = m.Load(cnt - 1) - assert.False(t, ok) - -} - -func BenchmarkMap32(b *testing.B) { - m := New() - const cnt = 1000000 - empty := struct{}{} - for i := 0; i < cnt; i++ { - m.Store(uint32(i), empty) - } - - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - m.Delete(uint32(rand.Intn(cnt))) - } - }) -} diff --git a/internal/map32/types.go.go b/internal/map32/types.go.go deleted file mode 100644 index 99ff9a2..0000000 --- a/internal/map32/types.go.go +++ /dev/null @@ -1,15 +0,0 @@ -package map32 - -// Map32 is a safe map for uint32 key. -type Map32 interface { - // Destroy destroy current map. - Destroy() - // Range visits all items. - Range(fn func(uint32, interface{}) bool) - // Load loads the value of key. - Load(key uint32) (v interface{}, ok bool) - // Store stores the key and value. - Store(key uint32, value interface{}) - // Delete deletes the key. - Delete(key uint32) -} diff --git a/internal/socket/duplex.go b/internal/socket/duplex.go index 5e06075..369d231 100644 --- a/internal/socket/duplex.go +++ b/internal/socket/duplex.go @@ -18,7 +18,6 @@ import ( "github.com/rsocket/rsocket-go/internal/bytesconv" "github.com/rsocket/rsocket-go/internal/common" "github.com/rsocket/rsocket-go/internal/fragmentation" - "github.com/rsocket/rsocket-go/internal/map32" "github.com/rsocket/rsocket-go/internal/misc" "github.com/rsocket/rsocket-go/internal/queue" "github.com/rsocket/rsocket-go/lease" @@ -60,10 +59,10 @@ type DuplexConnection struct { sndQueue chan core.WriteableFrame sndBacklog []core.WriteableFrame responder Responder - messages map32.Map32 // key=streamID, value=callback + messages sync.Map // key=streamID, value=callback sids StreamID mtu int - fragments map32.Map32 // key=streamID, value=Joiner + fragments sync.Map // key=streamID, value=Joiner writeDone chan struct{} keepaliver *Keepaliver cond sync.Cond @@ -162,11 +161,10 @@ func (dc *DuplexConnection) destroyTransport() { } func (dc *DuplexConnection) destroyHandler(err error) { - defer dc.messages.Destroy() // TODO: optimize callback map var callbacks []callback - dc.messages.Range(func(sid uint32, v interface{}) bool { - callbacks = append(callbacks, v.(callback)) + dc.messages.Range(func(_, value interface{}) bool { + callbacks = append(callbacks, value.(callback)) return true }) for _, next := range callbacks { @@ -175,11 +173,10 @@ func (dc *DuplexConnection) destroyHandler(err error) { } func (dc *DuplexConnection) destroyFragment() { - dc.fragments.Range(func(u uint32, i interface{}) bool { + dc.fragments.Range(func(_, i interface{}) bool { common.TryRelease(i) return true }) - dc.fragments.Destroy() } func (dc *DuplexConnection) destroySndQueue() { @@ -1360,21 +1357,11 @@ func newDuplexConnection(ctx context.Context, reqSche, resSche scheduler.Schedul leases: leases, sndQueue: make(chan core.WriteableFrame, _outChanSize), mtu: mtu, - messages: map32.New(map32.WithCap(32), map32.WithHasher(func(key uint32, _ int) int { - var n int - if key&1 == 0 { - n = int(key) >> 1 - } else { - n = (int(key)-1)>>1 + 1 - } - return n & 31 - })), - sids: sids, - fragments: map32.New(map32.WithCap(1)), - counter: core.NewTrafficCounter(), - keepaliver: ka, - closed: atomic.NewBool(false), - ready: atomic.NewBool(false), + sids: sids, + counter: core.NewTrafficCounter(), + keepaliver: ka, + closed: atomic.NewBool(false), + ready: atomic.NewBool(false), } c.cond.L = &c.locker