Skip to content

Commit

Permalink
perf: use dedicated connections for DoMulti() with 2000+ commands by …
Browse files Browse the repository at this point in the history
…default (#631)

* perf: use dedicated connections for DoMulti() with 2000+ commands by default

Signed-off-by: Rueian <[email protected]>

* perf: use dedicated connections for DoMulti() with 2000+ commands by default

Signed-off-by: Rueian <[email protected]>

---------

Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian authored Sep 5, 2024
1 parent f119f68 commit 5135acd
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 0 deletions.
5 changes: 5 additions & 0 deletions mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type mux struct {
sc []*singleconnect
mu []sync.Mutex
maxp int
maxm int
}

func makeMux(dst string, option *ClientOption, dialFn dialFn) *mux {
Expand Down Expand Up @@ -88,6 +89,7 @@ func newMux(dst string, option *ClientOption, init, dead wire, wireFn wireFn, wi
mu: make([]sync.Mutex, multiplex),
sc: make([]*singleconnect, multiplex),
maxp: runtime.GOMAXPROCS(0),
maxm: option.BlockingPipeline,
}
m.clhks.Store(emptyclhks)
for i := 0; i < len(m.wire); i++ {
Expand Down Expand Up @@ -207,6 +209,9 @@ func (m *mux) Do(ctx context.Context, cmd Completed) (resp RedisResult) {
}

func (m *mux) DoMulti(ctx context.Context, multi ...Completed) (resp *redisresults) {
if len(multi) >= m.maxm && m.maxm > 0 {
goto block // use a dedicated connection if the pipeline is too large
}
for _, cmd := range multi {
if cmd.IsBlock() {
goto block
Expand Down
56 changes: 56 additions & 0 deletions mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ func setupMuxWithOption(wires []*mockWire, option *ClientOption) (conn *mux, che
count++
return wires[count]
}
if option.BlockingPipeline == 0 {
option.BlockingPipeline = DefaultBlockingPipeline
}
return newMux("", option, (*mockWire)(nil), (*mockWire)(nil), wfn, wfn), func(t *testing.T) {
if count != len(wires)-1 {
t.Fatalf("there is %d remaining unused wires", len(wires)-count-1)
Expand Down Expand Up @@ -695,6 +698,59 @@ func TestMuxDelegation(t *testing.T) {
wg.Wait()
})

t.Run("multiple long pipeline", func(t *testing.T) {
blocked := make(chan struct{})
responses := make(chan RedisResult)

m, checkClean := setupMux([]*mockWire{
{
// leave first wire for pipeline calls
},
{
DoMultiFn: func(cmd ...Completed) *redisresults {
blocked <- struct{}{}
return &redisresults{s: []RedisResult{<-responses}}
},
},
{
DoMultiFn: func(cmd ...Completed) *redisresults {
blocked <- struct{}{}
return &redisresults{s: []RedisResult{<-responses}}
},
},
})
defer checkClean(t)
defer m.Close()
if err := m.Dial(); err != nil {
t.Fatalf("unexpected dial error %v", err)
}

wg := sync.WaitGroup{}
wg.Add(2)
for i := 0; i < 2; i++ {
go func() {
pipeline := make(Commands, DefaultBlockingPipeline)
for i := 0; i < len(pipeline); i++ {
pipeline[i] = cmds.NewCompleted([]string{"SET"})
}
if val, err := m.DoMulti(context.Background(), pipeline...).s[0].ToString(); err != nil {
t.Errorf("unexpected error %v", err)
} else if val != "BLOCK_COMMANDS_RESPONSE" {
t.Errorf("unexpected response %v", val)
} else {
wg.Done()
}
}()
}
for i := 0; i < 2; i++ {
<-blocked
}
for i := 0; i < 2; i++ {
responses <- newResult(RedisMessage{typ: '+', string: "BLOCK_COMMANDS_RESPONSE"}, nil)
}
wg.Wait()
})

t.Run("multi blocking no recycle the wire if err", func(t *testing.T) {
closed := false
m, checkClean := setupMux([]*mockWire{
Expand Down
7 changes: 7 additions & 0 deletions rueidis.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ const (
DefaultRingScale = 10
// DefaultPoolSize is the default value of ClientOption.BlockingPoolSize
DefaultPoolSize = 1000
// DefaultBlockingPipeline is the default value of ClientOption.BlockingPipeline
DefaultBlockingPipeline = 2000
// DefaultDialTimeout is the default value of ClientOption.Dialer.Timeout
DefaultDialTimeout = 5 * time.Second
// DefaultTCPKeepAlive is the default value of ClientOption.Dialer.KeepAlive
Expand Down Expand Up @@ -132,6 +134,8 @@ type ClientOption struct {
// BlockingPoolSize is the size of the connection pool shared by blocking commands (ex BLPOP, XREAD with BLOCK).
// The default is DefaultPoolSize.
BlockingPoolSize int
// BlockingPipeline is the threshold of a pipeline that will be treated as blocking commands when exceeding it.
BlockingPipeline int

// PipelineMultiplex determines how many tcp connections used to pipeline commands to one redis instance.
// The default for single and sentinel clients is 2, which means 4 connections (2^2).
Expand Down Expand Up @@ -336,6 +340,9 @@ func NewClient(option ClientOption) (client Client, err error) {
if option.ConnWriteTimeout == 0 {
option.ConnWriteTimeout = option.Dialer.KeepAlive * 10
}
if option.BlockingPipeline == 0 {
option.BlockingPipeline = DefaultBlockingPipeline
}
if option.ShuffleInit {
util.Shuffle(len(option.InitAddress), func(i, j int) {
option.InitAddress[i], option.InitAddress[j] = option.InitAddress[j], option.InitAddress[i]
Expand Down

0 comments on commit 5135acd

Please sign in to comment.