Skip to content

Commit

Permalink
Cherry-picks for 2.10.24-RC.3 (#6268)
Browse files Browse the repository at this point in the history
Includes the following:

- #6260
- #6249
- #6264
- #6265
- #6266

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander authored Dec 16, 2024
2 parents f2a78cf + 34bb983 commit 4e7b79b
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 46 deletions.
67 changes: 66 additions & 1 deletion server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -1947,7 +1947,9 @@ type ServerAPIResponse struct {
compress compressionType
}

// Specialized response types for unmarshalling.
// Specialized response types for unmarshalling. These structures are not
// used in the server code and only there for users of the Z endpoints to
// unmarshal the data without having to create these structs in their code

// ServerAPIConnzResponse is the response type connz
type ServerAPIConnzResponse struct {
Expand All @@ -1956,6 +1958,69 @@ type ServerAPIConnzResponse struct {
Error *ApiError `json:"error,omitempty"`
}

// ServerAPIRoutezResponse is the response type for routez
type ServerAPIRoutezResponse struct {
Server *ServerInfo `json:"server"`
Data *Routez `json:"data,omitempty"`
Error *ApiError `json:"error,omitempty"`
}

// ServerAPIGatewayzResponse is the response type for gatewayz
type ServerAPIGatewayzResponse struct {
Server *ServerInfo `json:"server"`
Data *Gatewayz `json:"data,omitempty"`
Error *ApiError `json:"error,omitempty"`
}

// ServerAPIJszResponse is the response type for jsz
type ServerAPIJszResponse struct {
Server *ServerInfo `json:"server"`
Data *JSInfo `json:"data,omitempty"`
Error *ApiError `json:"error,omitempty"`
}

// ServerAPIHealthzResponse is the response type for healthz
type ServerAPIHealthzResponse struct {
Server *ServerInfo `json:"server"`
Data *HealthStatus `json:"data,omitempty"`
Error *ApiError `json:"error,omitempty"`
}

// ServerAPIVarzResponse is the response type for varz
type ServerAPIVarzResponse struct {
Server *ServerInfo `json:"server"`
Data *Varz `json:"data,omitempty"`
Error *ApiError `json:"error,omitempty"`
}

// ServerAPISubszResponse is the response type for subsz
type ServerAPISubszResponse struct {
Server *ServerInfo `json:"server"`
Data *Subsz `json:"data,omitempty"`
Error *ApiError `json:"error,omitempty"`
}

// ServerAPILeafzResponse is the response type for leafz
type ServerAPILeafzResponse struct {
Server *ServerInfo `json:"server"`
Data *Leafz `json:"data,omitempty"`
Error *ApiError `json:"error,omitempty"`
}

// ServerAPIAccountzResponse is the response type for accountz
type ServerAPIAccountzResponse struct {
Server *ServerInfo `json:"server"`
Data *Accountz `json:"data,omitempty"`
Error *ApiError `json:"error,omitempty"`
}

// ServerAPIExpvarzResponse is the response type for expvarz
type ServerAPIExpvarzResponse struct {
Server *ServerInfo `json:"server"`
Data *ExpvarzStatus `json:"data,omitempty"`
Error *ApiError `json:"error,omitempty"`
}

// statszReq is a request for us to respond with current statsz.
func (s *Server) statszReq(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
if !s.EventsEnabled() {
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2107,7 +2107,7 @@ func (js *jetStream) wouldExceedLimits(storeType StorageType, sz int) bool {
} else {
total, max = &js.storeUsed, js.config.MaxStore
}
return atomic.LoadInt64(total) > (max + int64(sz))
return (atomic.LoadInt64(total) + int64(sz)) > max
}

func (js *jetStream) limitsExceeded(storeType StorageType) bool {
Expand Down
16 changes: 16 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22981,3 +22981,19 @@ func TestJetStreamMemoryPurgeClearsSubjectsState(t *testing.T) {
require_NoError(t, err)
require_Len(t, len(si.State.Subjects), 0)
}

func TestJetStreamWouldExceedLimits(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

js := s.getJetStream()
require_NotNil(t, js)

// Storing exactly up to the limit should work.
require_False(t, js.wouldExceedLimits(MemoryStorage, int(js.config.MaxMemory)))
require_False(t, js.wouldExceedLimits(FileStorage, int(js.config.MaxStore)))

// Storing one more than the max should exceed limits.
require_True(t, js.wouldExceedLimits(MemoryStorage, int(js.config.MaxMemory)+1))
require_True(t, js.wouldExceedLimits(FileStorage, int(js.config.MaxStore)+1))
}
23 changes: 18 additions & 5 deletions server/raft_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
type stateMachine interface {
server() *Server
node() RaftNode
waitGroup() *sync.WaitGroup
// This will call forward as needed so can be called on any node.
propose(data []byte)
// When entries have been committed and can be applied.
Expand Down Expand Up @@ -157,9 +158,13 @@ func (c *cluster) createRaftGroupWithPeers(name string, servers []*Server, smf s
// Driver program for the state machine.
// Should be run in its own go routine.
func smLoop(sm stateMachine) {
s, n := sm.server(), sm.node()
s, n, wg := sm.server(), sm.node(), sm.waitGroup()
qch, lch, aq := n.QuitC(), n.LeadChangeC(), n.ApplyQ()

// Wait group used to allow waiting until we exit from here.
wg.Add(1)
defer wg.Done()

for {
select {
case <-s.quitCh:
Expand All @@ -185,6 +190,7 @@ type stateAdder struct {
sync.Mutex
s *Server
n RaftNode
wg sync.WaitGroup
cfg *RaftConfig
sum int64
lch chan bool
Expand All @@ -196,12 +202,19 @@ func (a *stateAdder) server() *Server {
defer a.Unlock()
return a.s
}

func (a *stateAdder) node() RaftNode {
a.Lock()
defer a.Unlock()
return a.n
}

func (a *stateAdder) waitGroup() *sync.WaitGroup {
a.Lock()
defer a.Unlock()
return &a.wg
}

func (a *stateAdder) propose(data []byte) {
a.Lock()
defer a.Unlock()
Expand Down Expand Up @@ -243,10 +256,10 @@ func (a *stateAdder) proposeDelta(delta int64) {

// Stop the group.
func (a *stateAdder) stop() {
a.Lock()
defer a.Unlock()
a.n.Stop()
a.n.WaitForStop()
n, wg := a.node(), a.waitGroup()
n.Stop()
n.WaitForStop()
wg.Wait()
}

// Restart the group
Expand Down
8 changes: 4 additions & 4 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ func TestNRGSimple(t *testing.T) {
rg := c.createRaftGroup("TEST", 3, newStateAdder)
rg.waitOnLeader()
// Do several state transitions.
rg.randomMember().(*stateAdder).proposeDelta(11)
rg.randomMember().(*stateAdder).proposeDelta(11)
rg.randomMember().(*stateAdder).proposeDelta(-22)
rg.randomMember().(*stateAdder).proposeDelta(22)
rg.randomMember().(*stateAdder).proposeDelta(-11)
rg.randomMember().(*stateAdder).proposeDelta(-10)
// Wait for all members to have the correct state.
rg.waitOnTotal(t, 0)
rg.waitOnTotal(t, 1)
}

func TestNRGSnapshotAndRestart(t *testing.T) {
Expand Down
24 changes: 18 additions & 6 deletions server/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ const (
wsCloseStatusProtocolError = 1002
wsCloseStatusUnsupportedData = 1003
wsCloseStatusNoStatusReceived = 1005
wsCloseStatusAbnormalClosure = 1006
wsCloseStatusInvalidPayloadData = 1007
wsCloseStatusPolicyViolation = 1008
wsCloseStatusMessageTooBig = 1009
Expand Down Expand Up @@ -458,9 +457,21 @@ func (c *client) wsHandleControlFrame(r *wsReadInfo, frameType wsOpCode, nc io.R
}
}
}
clm := wsCreateCloseMessage(status, body)
// If the status indicates that nothing was received, then we don't
// send anything back.
// From https://datatracker.ietf.org/doc/html/rfc6455#section-7.4
// it says that code 1005 is a reserved value and MUST NOT be set as a
// status code in a Close control frame by an endpoint. It is
// designated for use in applications expecting a status code to indicate
// that no status code was actually present.
var clm []byte
if status != wsCloseStatusNoStatusReceived {
clm = wsCreateCloseMessage(status, body)
}
c.wsEnqueueControlMessage(wsCloseMessage, clm)
nbPoolPut(clm) // wsEnqueueControlMessage has taken a copy.
if len(clm) > 0 {
nbPoolPut(clm) // wsEnqueueControlMessage has taken a copy.
}
// Return io.EOF so that readLoop will close the connection as ClientClosed
// after processing pending buffers.
return pos, io.EOF
Expand Down Expand Up @@ -647,10 +658,11 @@ func (c *client) wsEnqueueCloseMessage(reason ClosedState) {
status = wsCloseStatusProtocolError
case MaxPayloadExceeded:
status = wsCloseStatusMessageTooBig
case ServerShutdown:
case WriteError, ReadError, StaleConnection, ServerShutdown:
// We used to have WriteError, ReadError and StaleConnection result in
// code 1006, which the spec says that it must not be used to set the
// status in the close message. So using this one instead.
status = wsCloseStatusGoingAway
case WriteError, ReadError, StaleConnection:
status = wsCloseStatusAbnormalClosure
default:
status = wsCloseStatusInternalSrvError
}
Expand Down
62 changes: 33 additions & 29 deletions server/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,19 +647,24 @@ func TestWSReadPongFrame(t *testing.T) {

func TestWSReadCloseFrame(t *testing.T) {
for _, test := range []struct {
name string
payload []byte
name string
noStatus bool
payload []byte
}{
{"without payload", nil},
{"with payload", []byte("optional payload")},
{"status without payload", false, nil},
{"status with payload", false, []byte("optional payload")},
{"no status no payload", true, nil},
} {
t.Run(test.name, func(t *testing.T) {
c, ri, tr := testWSSetupForRead()
// a close message has a status in 2 bytes + optional payload
payload := make([]byte, 2+len(test.payload))
binary.BigEndian.PutUint16(payload[:2], wsCloseStatusNormalClosure)
if len(test.payload) > 0 {
copy(payload[2:], test.payload)
var payload []byte
if !test.noStatus {
// a close message has a status in 2 bytes + optional payload
payload = make([]byte, 2+len(test.payload))
binary.BigEndian.PutUint16(payload[:2], wsCloseStatusNormalClosure)
if len(test.payload) > 0 {
copy(payload[2:], test.payload)
}
}
close := testWSCreateClientMsg(wsCloseMessage, 1, true, false, payload)
// Have a normal frame prior to close to make sure that wsRead returns
Expand All @@ -685,7 +690,11 @@ func TestWSReadCloseFrame(t *testing.T) {
if n := len(nb); n == 0 {
t.Fatalf("Expected buffers, got %v", n)
}
if expected := 2 + 2 + len(test.payload); expected != len(nb[0]) {
if test.noStatus {
if expected := 2; expected != len(nb[0]) {
t.Fatalf("Expected buffer to be %v bytes long, got %v", expected, len(nb[0]))
}
} else if expected := 2 + 2 + len(test.payload); expected != len(nb[0]) {
t.Fatalf("Expected buffer to be %v bytes long, got %v", expected, len(nb[0]))
}
b := nb[0][0]
Expand All @@ -695,12 +704,14 @@ func TestWSReadCloseFrame(t *testing.T) {
if b&byte(wsCloseMessage) == 0 {
t.Fatalf("Should have been a CLOSE, it wasn't: %v", b)
}
if status := binary.BigEndian.Uint16(nb[0][2:4]); status != wsCloseStatusNormalClosure {
t.Fatalf("Expected status to be %v, got %v", wsCloseStatusNormalClosure, status)
}
if len(test.payload) > 0 {
if !bytes.Equal(nb[0][4:], test.payload) {
t.Fatalf("Unexpected content: %s", nb[0][4:])
if !test.noStatus {
if status := binary.BigEndian.Uint16(nb[0][2:4]); status != wsCloseStatusNormalClosure {
t.Fatalf("Expected status to be %v, got %v", wsCloseStatusNormalClosure, status)
}
if len(test.payload) > 0 {
if !bytes.Equal(nb[0][4:], test.payload) {
t.Fatalf("Unexpected content: %s", nb[0][4:])
}
}
}
})
Expand Down Expand Up @@ -778,7 +789,6 @@ func TestWSCloseFrameWithPartialOrInvalid(t *testing.T) {

// Now test close with invalid status size (1 instead of 2 bytes)
c, ri, tr = testWSSetupForRead()
payload[0] = 100
binary.BigEndian.PutUint16(payload, wsCloseStatusNormalClosure)
closeMsg = testWSCreateClientMsg(wsCloseMessage, 1, true, false, payload[:1])

Expand All @@ -795,14 +805,15 @@ func TestWSCloseFrameWithPartialOrInvalid(t *testing.T) {
if n := len(bufs); n != 0 {
t.Fatalf("Unexpected buffer returned: %v", n)
}
// A CLOSE should have been queued with the payload of the original close message.
// Since no status was received, the server will send a close frame without
// status code nor payload.
c.mu.Lock()
nb, _ = c.collapsePtoNB()
c.mu.Unlock()
if n := len(nb); n == 0 {
t.Fatalf("Expected buffers, got %v", n)
}
if expected := 2 + 2; expected != len(nb[0]) {
if expected := 2; expected != len(nb[0]) {
t.Fatalf("Expected buffer to be %v bytes long, got %v", expected, len(nb[0]))
}
b = nb[0][0]
Expand All @@ -812,13 +823,6 @@ func TestWSCloseFrameWithPartialOrInvalid(t *testing.T) {
if b&byte(wsCloseMessage) == 0 {
t.Fatalf("Should have been a CLOSE, it wasn't: %v", b)
}
// Since satus was not valid, we should get wsCloseStatusNoStatusReceived
if status := binary.BigEndian.Uint16(nb[0][2:4]); status != wsCloseStatusNoStatusReceived {
t.Fatalf("Expected status to be %v, got %v", wsCloseStatusNoStatusReceived, status)
}
if len(nb[0][:]) != 4 {
t.Fatalf("Unexpected content: %s", nb[0][2:])
}
}

func TestWSReadGetErrors(t *testing.T) {
Expand Down Expand Up @@ -1015,9 +1019,9 @@ func TestWSEnqueueCloseMsg(t *testing.T) {
{BadClientProtocolVersion, wsCloseStatusProtocolError},
{MaxPayloadExceeded, wsCloseStatusMessageTooBig},
{ServerShutdown, wsCloseStatusGoingAway},
{WriteError, wsCloseStatusAbnormalClosure},
{ReadError, wsCloseStatusAbnormalClosure},
{StaleConnection, wsCloseStatusAbnormalClosure},
{WriteError, wsCloseStatusGoingAway},
{ReadError, wsCloseStatusGoingAway},
{StaleConnection, wsCloseStatusGoingAway},
{ClosedState(254), wsCloseStatusInternalSrvError},
} {
t.Run(test.reason.String(), func(t *testing.T) {
Expand Down

0 comments on commit 4e7b79b

Please sign in to comment.