From 05fe77fd083936392534c81f609ca9ad7e39011a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-No=C3=ABl=20Moyne?= Date: Wed, 11 Oct 2023 12:26:19 -0700 Subject: [PATCH] Backport #4592 to 2.9 (#4651) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Backport #4592 to 2.9.23 --------- Signed-off-by: Jean-Noël Moyne --- server/jetstream_test.go | 2 ++ server/norace_test.go | 3 +- server/stream.go | 62 ++++++++++++++++++++++++++++------------ 3 files changed, 47 insertions(+), 20 deletions(-) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 51ad130ae4d..69e723b7aad 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -16773,6 +16773,8 @@ func TestJetStreamWorkQueueSourceRestart(t *testing.T) { sub, err := js.PullSubscribe("foo", "dur", nats.BindStream("TEST")) require_NoError(t, err) + time.Sleep(100 * time.Millisecond) + ci, err := js.ConsumerInfo("TEST", "dur") require_NoError(t, err) require_True(t, ci.NumPending == uint64(sent)) diff --git a/server/norace_test.go b/server/norace_test.go index 473576bd137..19da6a9fac2 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -1873,7 +1873,6 @@ func TestNoRaceJetStreamSuperClusterSources(t *testing.T) { msg := fmt.Sprintf("R-MSG-%d", i+1) for _, sname := range []string{"foo", "bar", "baz"} { m := nats.NewMsg(sname) - m.Header.Set(nats.MsgIdHdr, sname+"-"+msg) m.Data = []byte(msg) if _, err := js.PublishMsg(m); err != nil { t.Errorf("Unexpected publish error: %v", err) @@ -1890,7 +1889,7 @@ func TestNoRaceJetStreamSuperClusterSources(t *testing.T) { sc.clusterForName("C3").waitOnStreamLeader("$G", "MS2") <-doneCh - checkFor(t, 15*time.Second, 100*time.Millisecond, func() error { + checkFor(t, 15*time.Second, time.Second, func() error { si, err := js2.StreamInfo("MS2") if err != nil { return err diff --git a/server/stream.go b/server/stream.go index fb792845e89..68b7f4d57d8 100644 --- a/server/stream.go +++ b/server/stream.go @@ -217,7 +217,8 @@ type stream struct { mirror *sourceInfo // Sources - sources map[string]*sourceInfo + sources map[string]*sourceInfo + sourcesConsumerSetup *time.Timer // Indicates we have direct consumers. directs int @@ -681,6 +682,11 @@ func (mset *stream) setLeader(isLeader bool) error { return err } } else { + // cancel timer to create the source consumers if not fired yet + if mset.sourcesConsumerSetup != nil { + mset.sourcesConsumerSetup.Stop() + mset.sourcesConsumerSetup = nil + } // Stop responding to sync requests. mset.stopClusterSubs() // Unsubscribe from direct stream. @@ -2211,7 +2217,7 @@ func (mset *stream) scheduleSetupMirrorConsumerRetryAsap() { } // To make *sure* that the next request will not fail, add a bit of buffer // and some randomness. - next += time.Duration(rand.Intn(50)) + 10*time.Millisecond + next += time.Duration(rand.Intn(int(10*time.Millisecond))) + 10*time.Millisecond time.AfterFunc(next, func() { mset.mu.Lock() mset.setupMirrorConsumer() @@ -2530,7 +2536,7 @@ func (mset *stream) scheduleSetSourceConsumerRetryAsap(si *sourceInfo, seq uint6 } // To make *sure* that the next request will not fail, add a bit of buffer // and some randomness. - next += time.Duration(rand.Intn(50)) + 10*time.Millisecond + next += time.Duration(rand.Intn(int(10*time.Millisecond))) + 10*time.Millisecond mset.scheduleSetSourceConsumerRetry(si.iname, seq, next, startTime) } @@ -3025,15 +3031,8 @@ func (mset *stream) setStartingSequenceForSource(sname string) { } // Lock should be held. -// This will do a reverse scan on startup or leader election -// searching for the starting sequence number. -// This can be slow in degenerative cases. -// Lock should be held. -func (mset *stream) startingSequenceForSources() { - if len(mset.cfg.Sources) == 0 { - return - } - // Always reset here. +// Resets the SourceInfo for all the sources +func (mset *stream) resetSourceInfo() { mset.sources = make(map[string]*sourceInfo) for _, ssi := range mset.cfg.Sources { @@ -3043,6 +3042,20 @@ func (mset *stream) startingSequenceForSources() { si := &sourceInfo{name: ssi.Name, iname: ssi.iname} mset.sources[ssi.iname] = si } +} + +// Lock should be held. +// This will do a reverse scan on startup or leader election +// searching for the starting sequence number. +// This can be slow in degenerative cases. +// Lock should be held. +func (mset *stream) startingSequenceForSources() { + if len(mset.cfg.Sources) == 0 { + return + } + + // Always reset here. + mset.resetSourceInfo() var state StreamState mset.store.FastState(&state) @@ -3113,6 +3126,11 @@ func (mset *stream) setupSourceConsumers() error { } } + // If we are no longer the leader, give up + if !mset.isLeader() { + return nil + } + mset.startingSequenceForSources() // Setup our consumers at the proper starting position. @@ -3138,13 +3156,21 @@ func (mset *stream) subscribeToStream() error { } // Check if we need to setup mirroring. if mset.cfg.Mirror != nil { - if err := mset.setupMirrorConsumer(); err != nil { - return err - } + // setup the initial mirror sourceInfo + mset.mirror = &sourceInfo{name: mset.cfg.Mirror.Name} + + // delay the actual mirror consumer creation for after a delay + mset.scheduleSetupMirrorConsumerRetryAsap() } else if len(mset.cfg.Sources) > 0 { - if err := mset.setupSourceConsumers(); err != nil { - return err - } + // Setup the initial source infos for the sources + mset.resetSourceInfo() + // Delay the actual source consumer(s) creation(s) for after a delay + + mset.sourcesConsumerSetup = time.AfterFunc(time.Duration(rand.Intn(int(10*time.Millisecond)))+10*time.Millisecond, func() { + mset.mu.Lock() + mset.setupSourceConsumers() + mset.mu.Unlock() + }) } // Check for direct get access. // We spin up followers for clustered streams in monitorStream().