diff --git a/server/filestore.go b/server/filestore.go index f50b54abee..c5920587da 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -2616,6 +2616,10 @@ func (fs *fileStore) numFilteredPendingWithLast(filter string, last bool, ss *Si // Always reset. ss.First, ss.Last, ss.Msgs = 0, 0, 0 + if filter == _EMPTY_ { + filter = fwcs + } + // We do need to figure out the first and last sequences. wc := subjectHasWildcard(filter) start, stop := uint32(math.MaxUint32), uint32(0) @@ -7832,6 +7836,17 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) { ss.Msgs-- + // Only one left. + if ss.Msgs == 1 { + if seq == ss.Last { + ss.Last = ss.First + } else { + ss.First = ss.Last + } + ss.firstNeedsUpdate = false + return + } + // We can lazily calculate the first sequence when needed. ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate } @@ -7857,12 +7872,8 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si startSlot = 0 } - fseq := startSeq + 1 - if mbFseq := atomic.LoadUint64(&mb.first.seq); fseq < mbFseq { - fseq = mbFseq - } var le = binary.LittleEndian - for slot := startSlot; slot < len(mb.cache.idx); slot++ { + for slot, fseq := startSlot, atomic.LoadUint64(&mb.first.seq); slot < len(mb.cache.idx); slot++ { bi := mb.cache.idx[slot] &^ hbit if bi == dbit { // delete marker so skip. diff --git a/server/memstore.go b/server/memstore.go index c3cb7d0b66..e2ca1cae29 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -1009,9 +1009,8 @@ func (ms *memStore) Compact(seq uint64) (uint64, error) { if sm := ms.msgs[seq]; sm != nil { bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg) purged++ - ms.removeSeqPerSubject(sm.subj, seq) - // Must delete message after updating per-subject info, to be consistent with file store. delete(ms.msgs, seq) + ms.removeSeqPerSubject(sm.subj, seq) } } if purged > ms.state.Msgs { @@ -1099,9 +1098,8 @@ func (ms *memStore) Truncate(seq uint64) error { if sm := ms.msgs[i]; sm != nil { purged++ bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg) - ms.removeSeqPerSubject(sm.subj, i) - // Must delete message after updating per-subject info, to be consistent with file store. delete(ms.msgs, i) + ms.removeSeqPerSubject(sm.subj, i) } } // Reset last. @@ -1362,8 +1360,17 @@ func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) { } ss.Msgs-- - // We can lazily calculate the first sequence when needed. - ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate + // If we know we only have 1 msg left don't need to search for next first. + if ss.Msgs == 1 { + if seq == ss.Last { + ss.Last = ss.First + } else { + ss.First = ss.Last + } + ss.firstNeedsUpdate = false + } else { + ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate + } } // Will recalculate the first sequence for this subject in this block. @@ -1396,6 +1403,7 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool { ss = memStoreMsgSize(sm.subj, sm.hdr, sm.msg) + delete(ms.msgs, seq) if ms.state.Msgs > 0 { ms.state.Msgs-- if ss > ms.state.Bytes { @@ -1420,8 +1428,6 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool { // Remove any per subject tracking. ms.removeSeqPerSubject(sm.subj, seq) - // Must delete message after updating per-subject info, to be consistent with file store. - delete(ms.msgs, seq) if ms.scb != nil { // We do not want to hold any locks here. diff --git a/server/store_test.go b/server/store_test.go index 19d4c0251f..e447017829 100644 --- a/server/store_test.go +++ b/server/store_test.go @@ -142,100 +142,6 @@ func TestStoreDeleteRange(t *testing.T) { require_Equal(t, num, 1) } -func TestStoreSubjectStateConsistency(t *testing.T) { - testAllStoreAllPermutations( - t, false, - StreamConfig{Name: "TEST", Subjects: []string{"foo"}}, - func(t *testing.T, fs StreamStore) { - getSubjectState := func() SimpleState { - t.Helper() - ss := fs.SubjectsState("foo") - return ss["foo"] - } - - // Publish an initial batch of messages. - for i := 0; i < 4; i++ { - _, _, err := fs.StoreMsg("foo", nil, nil) - require_NoError(t, err) - } - - // Expect 4 msgs, with first=1, last=4. - ss := getSubjectState() - require_Equal(t, ss.Msgs, 4) - require_Equal(t, ss.First, 1) - require_Equal(t, ss.Last, 4) - - // Remove first message, ss.First is lazy so will only mark ss.firstNeedsUpdate. - removed, err := fs.RemoveMsg(1) - require_NoError(t, err) - require_True(t, removed) - - // Will update first, so corrects to seq 2. - ss = getSubjectState() - require_Equal(t, ss.Msgs, 3) - require_Equal(t, ss.First, 2) - require_Equal(t, ss.Last, 4) - - // Remove last message. - removed, err = fs.RemoveMsg(4) - require_NoError(t, err) - require_True(t, removed) - - // ss.Last is lazy, just like ss.First, but it's not recalculated. Only total msg count decreases. - ss = getSubjectState() - require_Equal(t, ss.Msgs, 2) - require_Equal(t, ss.First, 2) - require_Equal(t, ss.Last, 4) - - // Remove first message again. - removed, err = fs.RemoveMsg(2) - require_NoError(t, err) - require_True(t, removed) - - // Since we only have one message left, must update ss.First and set ss.Last to equal. - ss = getSubjectState() - require_Equal(t, ss.Msgs, 1) - require_Equal(t, ss.First, 3) - require_Equal(t, ss.Last, 3) - - // Publish some more messages so we can test another scenario. - for i := 0; i < 3; i++ { - _, _, err := fs.StoreMsg("foo", nil, nil) - require_NoError(t, err) - } - - // Just check the state is complete again. - ss = getSubjectState() - require_Equal(t, ss.Msgs, 4) - require_Equal(t, ss.First, 3) - require_Equal(t, ss.Last, 7) - - // Remove last sequence, ss.Last is lazy so doesn't get updated. - removed, err = fs.RemoveMsg(7) - require_NoError(t, err) - require_True(t, removed) - - // Remove first sequence, ss.First is lazy so doesn't get updated. - removed, err = fs.RemoveMsg(3) - require_NoError(t, err) - require_True(t, removed) - - // Remove (now) first sequence, but because ss.First is lazy we first need to recalculate - // to know seq 5 became ss.First. And since we're removing seq 5 we need to recalculate ss.First - // yet again, since ss.Last is lazy and is not correct. - removed, err = fs.RemoveMsg(5) - require_NoError(t, err) - require_True(t, removed) - - // ss.First should equal ss.Last, last should have been updated now. - ss = getSubjectState() - require_Equal(t, ss.Msgs, 1) - require_Equal(t, ss.First, 6) - require_Equal(t, ss.Last, 6) - }, - ) -} - func TestStoreMaxMsgsPerUpdateBug(t *testing.T) { config := func() StreamConfig { return StreamConfig{Name: "TEST", Subjects: []string{"foo"}, MaxMsgsPer: 0}