From 99daad77116ec41262fd6444bdb99e8a77eb85ab Mon Sep 17 00:00:00 2001 From: Rueian Date: Tue, 24 Dec 2024 19:30:33 +0800 Subject: [PATCH] fix: broken tx retries for cluster clients after #697 (#709) * fix: broken tx retries for cluster clients after #697 Signed-off-by: Rueian * fix: broken tx retris for cluster clients after #697 Signed-off-by: Rueian * fix: broken tx retris for cluster clients after #697 Signed-off-by: Rueian --------- Signed-off-by: Rueian --- cluster.go | 114 ++++--- cluster_test.go | 769 ++++++++++++++++++++++++++++++++++++++++++++++++ syncp.go | 4 +- 3 files changed, 852 insertions(+), 35 deletions(-) diff --git a/cluster.go b/cluster.go index 11e2b41c..5fc1c3bf 100644 --- a/cluster.go +++ b/cluster.go @@ -525,9 +525,8 @@ func (c *clusterClient) toReplica(cmd Completed) bool { return false } -func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) { +func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry, init bool) { last := cmds.InitSlot - init := false for _, cmd := range multi { if cmd.Slot() == cmds.InitSlot { @@ -550,7 +549,7 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) { cc = c.pslots[cmd.Slot()] } if cc == nil { - return nil + return nil, false } count.m[cc]++ } @@ -569,13 +568,13 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) { cc = c.pslots[cmd.Slot()] } if cc == nil { // check cc == nil again in case of non-deterministic SendToReplicas. - return nil + return nil, false } re := retries.m[cc] re.commands = append(re.commands, cmd) re.cIndexes = append(re.cIndexes, i) } - return retries + return retries, init } inits := 0 @@ -589,25 +588,28 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) { } else if init && last != cmd.Slot() { panic(panicMixCxSlot) } - p := c.pslots[cmd.Slot()] - if p == nil { - return nil + cc := c.pslots[cmd.Slot()] + if cc == nil { + return nil, false } - count.m[p]++ + count.m[cc]++ } if last == cmds.InitSlot { // if all commands have no slots, such as INFO, we pick a non-nil slot. - for i, p := range c.pslots { - if p != nil { + for i, cc := range c.pslots { + if cc != nil { last = uint16(i) - count.m[p] = inits + count.m[cc] = inits break } } if last == cmds.InitSlot { - return nil + return nil, false } + } else if init { + cc := c.pslots[last] + count.m[cc] += inits } retries = connretryp.Get(len(count.m), len(count.m)) @@ -627,25 +629,34 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) { re.commands = append(re.commands, cmd) re.cIndexes = append(re.cIndexes, i) } - return retries + return retries, init } -func (c *clusterClient) pickMulti(ctx context.Context, multi []Completed) (*connretry, error) { - conns := c._pickMulti(multi) +func (c *clusterClient) pickMulti(ctx context.Context, multi []Completed) (*connretry, bool, error) { + conns, hasInit := c._pickMulti(multi) if conns == nil { if err := c.refresh(ctx); err != nil { - return nil, err + return nil, false, err } - if conns = c._pickMulti(multi); conns == nil { - return nil, ErrNoSlot + if conns, hasInit = c._pickMulti(multi); conns == nil { + return nil, false, ErrNoSlot } } - return conns, nil + return conns, hasInit, nil +} + +func isMulti(cmd Completed) bool { + return len(cmd.Commands()) == 1 && cmd.Commands()[0] == "MULTI" +} +func isExec(cmd Completed) bool { + return len(cmd.Commands()) == 1 && cmd.Commands()[0] == "EXEC" } func (c *clusterClient) doresultfn( - ctx context.Context, results *redisresults, retries *connretry, mu *sync.Mutex, cc conn, cIndexes []int, commands []Completed, resps []RedisResult, attempts int, + ctx context.Context, results *redisresults, retries *connretry, mu *sync.Mutex, cc conn, cIndexes []int, commands []Completed, resps []RedisResult, attempts int, hasInit bool, ) (clean bool) { + mi := -1 + ei := -1 clean = true for i, resp := range resps { clean = clean && resp.NonRedisError() == nil @@ -664,6 +675,37 @@ func (c *clusterClient) doresultfn( } else { nc = c.redirectOrNew(addr, cc, cm.Slot(), mode) } + if hasInit && ei < i { // find out if there is a transaction block or not. + for mi = i; mi >= 0 && !isMulti(commands[mi]) && !isExec(commands[mi]); mi-- { + } + for ei = i; ei < len(commands) && !isMulti(commands[ei]) && !isExec(commands[ei]); ei++ { + } + if mi >= 0 && ei < len(commands) && isMulti(commands[mi]) && isExec(commands[ei]) && resps[mi].val.string == ok { // a transaction is found. + mu.Lock() + retries.Redirects++ + nr := retries.m[nc] + if nr == nil { + nr = retryp.Get(0, len(commands)) + retries.m[nc] = nr + } + for i := mi; i <= ei; i++ { + ii := cIndexes[i] + cm := commands[i] + if mode == RedirectAsk { + nr.aIndexes = append(nr.aIndexes, ii) + nr.cAskings = append(nr.cAskings, cm) + } else { + nr.cIndexes = append(nr.cIndexes, ii) + nr.commands = append(nr.commands, cm) + } + } + mu.Unlock() + continue // the transaction has been added to the retries, go to the next cmd. + } + } + if hasInit && mi < i && i < ei && mi >= 0 && isMulti(commands[mi]) { + continue // the current cmd is in the processed transaction and has been added to the retries. + } mu.Lock() if mode != RedirectRetry { retries.Redirects++ @@ -690,17 +732,17 @@ func (c *clusterClient) doresultfn( } func (c *clusterClient) doretry( - ctx context.Context, cc conn, results *redisresults, retries *connretry, re *retry, mu *sync.Mutex, wg *sync.WaitGroup, attempts int, + ctx context.Context, cc conn, results *redisresults, retries *connretry, re *retry, mu *sync.Mutex, wg *sync.WaitGroup, attempts int, hasInit bool, ) { clean := true if len(re.commands) != 0 { resps := cc.DoMulti(ctx, re.commands...) - clean = c.doresultfn(ctx, results, retries, mu, cc, re.cIndexes, re.commands, resps.s, attempts) + clean = c.doresultfn(ctx, results, retries, mu, cc, re.cIndexes, re.commands, resps.s, attempts, hasInit) resultsp.Put(resps) } if len(re.cAskings) != 0 { resps := askingMulti(cc, ctx, re.cAskings) - clean = c.doresultfn(ctx, results, retries, mu, cc, re.aIndexes, re.cAskings, resps.s, attempts) && clean + clean = c.doresultfn(ctx, results, retries, mu, cc, re.aIndexes, re.cAskings, resps.s, attempts, hasInit) && clean resultsp.Put(resps) } if clean { @@ -714,7 +756,7 @@ func (c *clusterClient) DoMulti(ctx context.Context, multi ...Completed) []Redis return nil } - retries, err := c.pickMulti(ctx, multi) + retries, hasInit, err := c.pickMulti(ctx, multi) if err != nil { return fillErrs(len(multi), err) } @@ -742,10 +784,10 @@ retry: } for cc, re := range retries.m { delete(retries.m, cc) - go c.doretry(ctx, cc, results, retries, re, &mu, &wg, attempts) + go c.doretry(ctx, cc, results, retries, re, &mu, &wg, attempts, hasInit) } mu.Unlock() - c.doretry(ctx, cc1, results, retries, re1, &mu, &wg, attempts) + c.doretry(ctx, cc1, results, retries, re1, &mu, &wg, attempts, hasInit) wg.Wait() if len(retries.m) != 0 { @@ -753,7 +795,6 @@ retry: retries.Redirects = 0 goto retry } - if retries.RetryDelay >= 0 { c.retryHandler.WaitForRetry(ctx, retries.RetryDelay) attempts++ @@ -817,14 +858,23 @@ func (c *clusterClient) DoCache(ctx context.Context, cmd Cacheable, ttl time.Dur } func askingMulti(cc conn, ctx context.Context, multi []Completed) *redisresults { + var inTx bool commands := make([]Completed, 0, len(multi)*2) for _, cmd := range multi { - commands = append(commands, cmds.AskingCmd, cmd) + if inTx { + commands = append(commands, cmd) + inTx = !isExec(cmd) + } else { + commands = append(commands, cmds.AskingCmd, cmd) + inTx = isMulti(cmd) + } } results := resultsp.Get(0, len(multi)) resps := cc.DoMulti(ctx, commands...) - for i := 1; i < len(resps.s); i += 2 { - results.s = append(results.s, resps.s[i]) + for i, resp := range resps.s { + if commands[i] != cmds.AskingCmd { + results.s = append(results.s, resp) + } } resultsp.Put(resps) return results @@ -946,7 +996,6 @@ func (c *clusterClient) resultcachefn( if !c.retry { continue } - retryDelay = c.retryHandler.RetryDelay(attempts, Completed(cm.Cmd), resp.Error()) } else { nc = c.redirectOrNew(addr, cc, cm.Cmd.Slot(), mode) @@ -1040,7 +1089,6 @@ retry: retries.Redirects = 0 goto retry } - if retries.RetryDelay >= 0 { c.retryHandler.WaitForRetry(ctx, retries.RetryDelay) attempts++ diff --git a/cluster_test.go b/cluster_test.go index 5221648a..ee768432 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -4108,6 +4108,775 @@ func TestClusterClientErr(t *testing.T) { } }) + t.Run("slot moved DoMulti transactions", func(t *testing.T) { + var count int64 + client, err := newClusterClient( + &ClientOption{InitAddress: []string{":0"}}, + func(dst string, opt *ClientOption) conn { + return &mockConn{DoFn: func(cmd Completed) RedisResult { + return slotsMultiResp + }, DoMultiFn: func(multi ...Completed) *redisresults { + switch atomic.AddInt64(&count, 1) { + case 1: + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "1"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + newResult(RedisMessage{typ: '-', string: "EXECABORT"}, nil), + newResult(RedisMessage{typ: '+', string: "4"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '-', string: "EXECABORT"}, nil), + newResult(RedisMessage{typ: '+', string: "7"}, nil), + }} + case 2: + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: '+', string: "2"}, + {typ: '+', string: "3"}, + }}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: '+', string: "5"}, + {typ: '+', string: "6"}, + }}, nil), + }} + } + return nil + }} + }, + newRetryer(defaultRetryDelayFn), + ) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + resps := client.DoMulti( + context.Background(), + client.B().Get().Key("1{t}").Build(), + client.B().Multi().Build(), + client.B().Get().Key("2{t}").Build(), + client.B().Get().Key("3{t}").Build(), + client.B().Exec().Build(), + client.B().Get().Key("4{t}").Build(), + client.B().Multi().Build(), + client.B().Get().Key("5{t}").Build(), + client.B().Get().Key("6{t}").Build(), + client.B().Exec().Build(), + client.B().Get().Key("7{t}").Build(), + ) + if v, err := resps[0].ToString(); err != nil || v != "1" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[1].ToString(); err != nil || v != "OK" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[2].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[3].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[4].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"2", "3"}) { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[5].ToString(); err != nil || v != "4" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[6].ToString(); err != nil || v != "OK" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[7].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[8].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[9].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"5", "6"}) { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[10].ToString(); err != nil || v != "7" { + t.Fatalf("unexpected resp %v %v", v, err) + } + }) + + t.Run("slot moved DoMulti transactions ASKING", func(t *testing.T) { + var count int64 + client, err := newClusterClient( + &ClientOption{InitAddress: []string{":0"}}, + func(dst string, opt *ClientOption) conn { + return &mockConn{DoFn: func(cmd Completed) RedisResult { + return slotsMultiResp + }, DoMultiFn: func(multi ...Completed) *redisresults { + switch atomic.AddInt64(&count, 1) { + case 1: + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "1"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '-', string: "ASK 0 :1"}, nil), + newResult(RedisMessage{typ: '-', string: "EXECABORT"}, nil), + newResult(RedisMessage{typ: '+', string: "4"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '-', string: "ASK 0 :1"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '-', string: "EXECABORT"}, nil), + newResult(RedisMessage{typ: '+', string: "7"}, nil), + }} + case 2: + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: '+', string: "2"}, + {typ: '+', string: "3"}, + }}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: '+', string: "5"}, + {typ: '+', string: "6"}, + }}, nil), + }} + } + return nil + }} + }, + newRetryer(defaultRetryDelayFn), + ) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + resps := client.DoMulti( + context.Background(), + client.B().Get().Key("1{t}").Build(), + client.B().Multi().Build(), + client.B().Get().Key("2{t}").Build(), + client.B().Get().Key("3{t}").Build(), + client.B().Exec().Build(), + client.B().Get().Key("4{t}").Build(), + client.B().Multi().Build(), + client.B().Get().Key("5{t}").Build(), + client.B().Get().Key("6{t}").Build(), + client.B().Exec().Build(), + client.B().Get().Key("7{t}").Build(), + ) + if v, err := resps[0].ToString(); err != nil || v != "1" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[1].ToString(); err != nil || v != "OK" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[2].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[3].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[4].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"2", "3"}) { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[5].ToString(); err != nil || v != "4" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[6].ToString(); err != nil || v != "OK" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[7].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[8].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[9].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"5", "6"}) { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[10].ToString(); err != nil || v != "7" { + t.Fatalf("unexpected resp %v %v", v, err) + } + }) + + t.Run("slot moved DoMulti except transactions", func(t *testing.T) { + var count int64 + client, err := newClusterClient( + &ClientOption{InitAddress: []string{":0"}}, + func(dst string, opt *ClientOption) conn { + return &mockConn{DoFn: func(cmd Completed) RedisResult { + return slotsMultiResp + }, DoMultiFn: func(multi ...Completed) *redisresults { + switch atomic.AddInt64(&count, 1) { + case 1: + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: '+', string: "2"}, + {typ: '+', string: "3"}, + }}, nil), + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: '+', string: "5"}, + {typ: '+', string: "6"}, + }}, nil), + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + }} + case 2: + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "1"}, nil), + newResult(RedisMessage{typ: '+', string: "4"}, nil), + newResult(RedisMessage{typ: '+', string: "7"}, nil), + }} + } + return nil + }} + }, + newRetryer(defaultRetryDelayFn), + ) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + resps := client.DoMulti( + context.Background(), + client.B().Get().Key("1{t}").Build(), + client.B().Multi().Build(), + client.B().Get().Key("2{t}").Build(), + client.B().Get().Key("3{t}").Build(), + client.B().Exec().Build(), + client.B().Get().Key("4{t}").Build(), + client.B().Multi().Build(), + client.B().Get().Key("5{t}").Build(), + client.B().Get().Key("6{t}").Build(), + client.B().Exec().Build(), + client.B().Get().Key("7{t}").Build(), + ) + if v, err := resps[0].ToString(); err != nil || v != "1" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[1].ToString(); err != nil || v != "OK" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[2].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[3].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[4].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"2", "3"}) { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[5].ToString(); err != nil || v != "4" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[6].ToString(); err != nil || v != "OK" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[7].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[8].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[9].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"5", "6"}) { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[10].ToString(); err != nil || v != "7" { + t.Fatalf("unexpected resp %v %v", v, err) + } + }) + + t.Run("slot moved DoMulti except transactions ASKING", func(t *testing.T) { + var count int64 + client, err := newClusterClient( + &ClientOption{InitAddress: []string{":0"}}, + func(dst string, opt *ClientOption) conn { + return &mockConn{DoFn: func(cmd Completed) RedisResult { + return slotsMultiResp + }, DoMultiFn: func(multi ...Completed) *redisresults { + switch atomic.AddInt64(&count, 1) { + case 1: + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '-', string: "ASK 0 :1"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: '+', string: "2"}, + {typ: '+', string: "3"}, + }}, nil), + newResult(RedisMessage{typ: '-', string: "ASK 0 :1"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: '+', string: "5"}, + {typ: '+', string: "6"}, + }}, nil), + newResult(RedisMessage{typ: '-', string: "ASK 0 :1"}, nil), + }} + case 2: + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "1"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "4"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "7"}, nil), + }} + } + return nil + }} + }, + newRetryer(defaultRetryDelayFn), + ) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + resps := client.DoMulti( + context.Background(), + client.B().Get().Key("1{t}").Build(), + client.B().Multi().Build(), + client.B().Get().Key("2{t}").Build(), + client.B().Get().Key("3{t}").Build(), + client.B().Exec().Build(), + client.B().Get().Key("4{t}").Build(), + client.B().Multi().Build(), + client.B().Get().Key("5{t}").Build(), + client.B().Get().Key("6{t}").Build(), + client.B().Exec().Build(), + client.B().Get().Key("7{t}").Build(), + ) + if v, err := resps[0].ToString(); err != nil || v != "1" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[1].ToString(); err != nil || v != "OK" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[2].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[3].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[4].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"2", "3"}) { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[5].ToString(); err != nil || v != "4" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[6].ToString(); err != nil || v != "OK" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[7].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[8].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[9].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"5", "6"}) { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[10].ToString(); err != nil || v != "7" { + t.Fatalf("unexpected resp %v %v", v, err) + } + }) + + t.Run("slot moved DoMulti transactions mixed", func(t *testing.T) { + var count int64 + client, err := newClusterClient( + &ClientOption{InitAddress: []string{":0"}}, + func(dst string, opt *ClientOption) conn { + return &mockConn{DoFn: func(cmd Completed) RedisResult { + return slotsMultiResp + }, DoMultiFn: func(multi ...Completed) *redisresults { + switch atomic.AddInt64(&count, 1) { + case 1: + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "1"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + newResult(RedisMessage{typ: '-', string: "EXECABORT"}, nil), + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + newResult(RedisMessage{typ: '-', string: "EXECABORT"}, nil), + newResult(RedisMessage{typ: '+', string: "7"}, nil), + }} + case 2: + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: '+', string: "2"}, + {typ: '+', string: "3"}, + }}, nil), + newResult(RedisMessage{typ: '+', string: "4"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: '+', string: "5"}, + {typ: '+', string: "6"}, + }}, nil), + }} + } + return nil + }} + }, + newRetryer(defaultRetryDelayFn), + ) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + resps := client.DoMulti( + context.Background(), + client.B().Get().Key("1{t}").Build(), + client.B().Multi().Build(), + client.B().Get().Key("2{t}").Build(), + client.B().Get().Key("3{t}").Build(), + client.B().Exec().Build(), + client.B().Get().Key("4{t}").Build(), + client.B().Multi().Build(), + client.B().Get().Key("5{t}").Build(), + client.B().Get().Key("6{t}").Build(), + client.B().Exec().Build(), + client.B().Get().Key("7{t}").Build(), + ) + if v, err := resps[0].ToString(); err != nil || v != "1" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[1].ToString(); err != nil || v != "OK" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[2].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[3].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[4].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"2", "3"}) { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[5].ToString(); err != nil || v != "4" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[6].ToString(); err != nil || v != "OK" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[7].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[8].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[9].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"5", "6"}) { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[10].ToString(); err != nil || v != "7" { + t.Fatalf("unexpected resp %v %v", v, err) + } + }) + + t.Run("slot moved DoMulti transactions mixed ASKING", func(t *testing.T) { + var count int64 + client, err := newClusterClient( + &ClientOption{InitAddress: []string{":0"}}, + func(dst string, opt *ClientOption) conn { + return &mockConn{DoFn: func(cmd Completed) RedisResult { + return slotsMultiResp + }, DoMultiFn: func(multi ...Completed) *redisresults { + switch atomic.AddInt64(&count, 1) { + case 1: + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "1"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '-', string: "ASK 0 :1"}, nil), + newResult(RedisMessage{typ: '-', string: "ASK 0 :1"}, nil), + newResult(RedisMessage{typ: '-', string: "EXECABORT"}, nil), + newResult(RedisMessage{typ: '-', string: "ASK 0 :1"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '-', string: "ASK 0 :1"}, nil), + newResult(RedisMessage{typ: '-', string: "ASK 0 :1"}, nil), + newResult(RedisMessage{typ: '-', string: "EXECABORT"}, nil), + newResult(RedisMessage{typ: '+', string: "7"}, nil), + }} + case 2: + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: '+', string: "2"}, + {typ: '+', string: "3"}, + }}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "4"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: '+', string: "5"}, + {typ: '+', string: "6"}, + }}, nil), + }} + } + return nil + }} + }, + newRetryer(defaultRetryDelayFn), + ) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + resps := client.DoMulti( + context.Background(), + client.B().Get().Key("1{t}").Build(), + client.B().Multi().Build(), + client.B().Get().Key("2{t}").Build(), + client.B().Get().Key("3{t}").Build(), + client.B().Exec().Build(), + client.B().Get().Key("4{t}").Build(), + client.B().Multi().Build(), + client.B().Get().Key("5{t}").Build(), + client.B().Get().Key("6{t}").Build(), + client.B().Exec().Build(), + client.B().Get().Key("7{t}").Build(), + ) + if v, err := resps[0].ToString(); err != nil || v != "1" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[1].ToString(); err != nil || v != "OK" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[2].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[3].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[4].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"2", "3"}) { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[5].ToString(); err != nil || v != "4" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[6].ToString(); err != nil || v != "OK" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[7].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[8].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[9].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"5", "6"}) { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[10].ToString(); err != nil || v != "7" { + t.Fatalf("unexpected resp %v %v", v, err) + } + }) + + t.Run("slot moved DoMulti transactions edge cases 1", func(t *testing.T) { + var count int64 + client, err := newClusterClient( + &ClientOption{InitAddress: []string{":0"}}, + func(dst string, opt *ClientOption) conn { + return &mockConn{DoFn: func(cmd Completed) RedisResult { + return slotsMultiResp + }, DoMultiFn: func(multi ...Completed) *redisresults { + switch atomic.AddInt64(&count, 1) { + case 1: + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "1"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '-', string: "ERR Command not allowed inside a transaction"}, nil), + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + newResult(RedisMessage{typ: '-', string: "EXECABORT"}, nil), + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + }} + case 2: + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "4"}, nil), + }} + } + return nil + }} + }, + newRetryer(defaultRetryDelayFn), + ) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + resps := client.DoMulti( + context.Background(), + client.B().Get().Key("1{t}").Build(), + client.B().Multi().Build(), + client.B().Multi().Build(), // nested multi + client.B().Get().Key("2{t}").Build(), + client.B().Get().Key("3{t}").Build(), + client.B().Exec().Build(), + client.B().Get().Key("4{t}").Build(), + ) + if v, err := resps[0].ToString(); err != nil || v != "1" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[1].ToString(); err != nil || v != "OK" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if err := resps[2].Error(); err == nil || !strings.Contains(err.Error(), "Command not allowed inside a transaction") { + t.Fatalf("unexpected err %v", err) + } + if err := resps[3].Error(); err == nil || !strings.Contains(err.Error(), "MOVED 0 :1") { + t.Fatalf("unexpected err %v", err) + } + if err := resps[4].Error(); err == nil || !strings.Contains(err.Error(), "MOVED 0 :1") { + t.Fatalf("unexpected err %v", err) + } + if err := resps[5].Error(); err == nil || !strings.Contains(err.Error(), "EXECABORT") { + t.Fatalf("unexpected err %v", err) + } + if v, err := resps[6].ToString(); err != nil || v != "4" { + t.Fatalf("unexpected resp %v %v", v, err) + } + }) + + t.Run("slot moved DoMulti transactions edge cases 2", func(t *testing.T) { + var count int64 + client, err := newClusterClient( + &ClientOption{InitAddress: []string{":0"}}, + func(dst string, opt *ClientOption) conn { + return &mockConn{DoFn: func(cmd Completed) RedisResult { + return slotsMultiResp + }, DoMultiFn: func(multi ...Completed) *redisresults { + switch atomic.AddInt64(&count, 1) { + case 1: + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "1"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + newResult(RedisMessage{typ: '-', string: "ERR Command not allowed inside a transaction"}, nil), + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + newResult(RedisMessage{typ: '-', string: "EXECABORT"}, nil), + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + }} + case 2: + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "4"}, nil), + }} + } + return nil + }} + }, + newRetryer(defaultRetryDelayFn), + ) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + resps := client.DoMulti( + context.Background(), + client.B().Get().Key("1{t}").Build(), + client.B().Multi().Build(), + client.B().Get().Key("2{t}").Build(), + client.B().Multi().Build(), // nested multi + client.B().Get().Key("3{t}").Build(), + client.B().Exec().Build(), + client.B().Get().Key("4{t}").Build(), + ) + if v, err := resps[0].ToString(); err != nil || v != "1" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[1].ToString(); err != nil || v != "OK" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if err := resps[2].Error(); err == nil || !strings.Contains(err.Error(), "MOVED 0 :1") { + t.Fatalf("unexpected err %v", err) + } + if err := resps[3].Error(); err == nil || !strings.Contains(err.Error(), "Command not allowed inside a transaction") { + t.Fatalf("unexpected err %v", err) + } + if err := resps[4].Error(); err == nil || !strings.Contains(err.Error(), "MOVED 0 :1") { + t.Fatalf("unexpected err %v", err) + } + if err := resps[5].Error(); err == nil || !strings.Contains(err.Error(), "EXECABORT") { + t.Fatalf("unexpected err %v", err) + } + if v, err := resps[6].ToString(); err != nil || v != "4" { + t.Fatalf("unexpected resp %v %v", v, err) + } + }) + + t.Run("slot moved DoMulti transactions edge cases 3", func(t *testing.T) { + var count int64 + client, err := newClusterClient( + &ClientOption{InitAddress: []string{":0"}}, + func(dst string, opt *ClientOption) conn { + return &mockConn{DoFn: func(cmd Completed) RedisResult { + return slotsMultiResp + }, DoMultiFn: func(multi ...Completed) *redisresults { + switch atomic.AddInt64(&count, 1) { + case 1: + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "1"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + newResult(RedisMessage{typ: '-', string: "ERR Command not allowed inside a transaction"}, nil), + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + }} + } + return nil + }} + }, + newRetryer(defaultRetryDelayFn), + ) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + resps := client.DoMulti( + context.Background(), + client.B().Get().Key("1{t}").Build(), + client.B().Multi().Build(), + client.B().Get().Key("2{t}").Build(), + client.B().Multi().Build(), // nested multi + client.B().Get().Key("3{t}").Build(), + ) + if v, err := resps[0].ToString(); err != nil || v != "1" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[1].ToString(); err != nil || v != "OK" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if err := resps[2].Error(); err == nil || !strings.Contains(err.Error(), "MOVED 0 :1") { + t.Fatalf("unexpected err %v", err) + } + if err := resps[3].Error(); err == nil || !strings.Contains(err.Error(), "Command not allowed inside a transaction") { + t.Fatalf("unexpected err %v", err) + } + if err := resps[4].Error(); err == nil || !strings.Contains(err.Error(), "MOVED 0 :1") { + t.Fatalf("unexpected err %v", err) + } + }) + t.Run("slot moved DoMulti (multi)", func(t *testing.T) { var count int64 client, err := newClusterClient( diff --git a/syncp.go b/syncp.go index 77b99c1f..ff340bc7 100644 --- a/syncp.go +++ b/syncp.go @@ -221,8 +221,8 @@ func (r *conncount) ResetLen(n int) { type connretry struct { m map[conn]*retry n int + RetryDelay time.Duration // NOTE: This is not thread-safe. Redirects uint32 // NOTE: This is not thread-safe. - RetryDelay time.Duration // NOTE: It is not thread-safe. } func (r *connretry) Capacity() int { @@ -238,8 +238,8 @@ func (r *connretry) ResetLen(n int) { type connretrycache struct { m map[conn]*retrycache n int + RetryDelay time.Duration // NOTE: This is not thread-safe. Redirects uint32 // NOTE: This is not thread-safe. - RetryDelay time.Duration // NOTE: It is not thread-safe. } func (r *connretrycache) Capacity() int {