diff --git a/server/events.go b/server/events.go index 1928e64d74..7c891b423d 100644 --- a/server/events.go +++ b/server/events.go @@ -1947,7 +1947,9 @@ type ServerAPIResponse struct { compress compressionType } -// Specialized response types for unmarshalling. +// Specialized response types for unmarshalling. These structures are not +// used in the server code and only there for users of the Z endpoints to +// unmarshal the data without having to create these structs in their code // ServerAPIConnzResponse is the response type connz type ServerAPIConnzResponse struct { @@ -1956,6 +1958,69 @@ type ServerAPIConnzResponse struct { Error *ApiError `json:"error,omitempty"` } +// ServerAPIRoutezResponse is the response type for routez +type ServerAPIRoutezResponse struct { + Server *ServerInfo `json:"server"` + Data *Routez `json:"data,omitempty"` + Error *ApiError `json:"error,omitempty"` +} + +// ServerAPIGatewayzResponse is the response type for gatewayz +type ServerAPIGatewayzResponse struct { + Server *ServerInfo `json:"server"` + Data *Gatewayz `json:"data,omitempty"` + Error *ApiError `json:"error,omitempty"` +} + +// ServerAPIJszResponse is the response type for jsz +type ServerAPIJszResponse struct { + Server *ServerInfo `json:"server"` + Data *JSInfo `json:"data,omitempty"` + Error *ApiError `json:"error,omitempty"` +} + +// ServerAPIHealthzResponse is the response type for healthz +type ServerAPIHealthzResponse struct { + Server *ServerInfo `json:"server"` + Data *HealthStatus `json:"data,omitempty"` + Error *ApiError `json:"error,omitempty"` +} + +// ServerAPIVarzResponse is the response type for varz +type ServerAPIVarzResponse struct { + Server *ServerInfo `json:"server"` + Data *Varz `json:"data,omitempty"` + Error *ApiError `json:"error,omitempty"` +} + +// ServerAPISubszResponse is the response type for subsz +type ServerAPISubszResponse struct { + Server *ServerInfo `json:"server"` + Data *Subsz `json:"data,omitempty"` + Error *ApiError `json:"error,omitempty"` +} + +// ServerAPILeafzResponse is the response type for leafz +type ServerAPILeafzResponse struct { + Server *ServerInfo `json:"server"` + Data *Leafz `json:"data,omitempty"` + Error *ApiError `json:"error,omitempty"` +} + +// ServerAPIAccountzResponse is the response type for accountz +type ServerAPIAccountzResponse struct { + Server *ServerInfo `json:"server"` + Data *Accountz `json:"data,omitempty"` + Error *ApiError `json:"error,omitempty"` +} + +// ServerAPIExpvarzResponse is the response type for expvarz +type ServerAPIExpvarzResponse struct { + Server *ServerInfo `json:"server"` + Data *ExpvarzStatus `json:"data,omitempty"` + Error *ApiError `json:"error,omitempty"` +} + // statszReq is a request for us to respond with current statsz. func (s *Server) statszReq(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) { if !s.EventsEnabled() { diff --git a/server/jetstream.go b/server/jetstream.go index 02920e76a4..2e606e6a6f 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -2107,7 +2107,7 @@ func (js *jetStream) wouldExceedLimits(storeType StorageType, sz int) bool { } else { total, max = &js.storeUsed, js.config.MaxStore } - return atomic.LoadInt64(total) > (max + int64(sz)) + return (atomic.LoadInt64(total) + int64(sz)) > max } func (js *jetStream) limitsExceeded(storeType StorageType) bool { diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 78cd23d6cb..c38d44be49 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -22981,3 +22981,19 @@ func TestJetStreamMemoryPurgeClearsSubjectsState(t *testing.T) { require_NoError(t, err) require_Len(t, len(si.State.Subjects), 0) } + +func TestJetStreamWouldExceedLimits(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + js := s.getJetStream() + require_NotNil(t, js) + + // Storing exactly up to the limit should work. + require_False(t, js.wouldExceedLimits(MemoryStorage, int(js.config.MaxMemory))) + require_False(t, js.wouldExceedLimits(FileStorage, int(js.config.MaxStore))) + + // Storing one more than the max should exceed limits. + require_True(t, js.wouldExceedLimits(MemoryStorage, int(js.config.MaxMemory)+1)) + require_True(t, js.wouldExceedLimits(FileStorage, int(js.config.MaxStore)+1)) +} diff --git a/server/raft_helpers_test.go b/server/raft_helpers_test.go index 662a6e9460..a69e4f22bb 100644 --- a/server/raft_helpers_test.go +++ b/server/raft_helpers_test.go @@ -28,6 +28,7 @@ import ( type stateMachine interface { server() *Server node() RaftNode + waitGroup() *sync.WaitGroup // This will call forward as needed so can be called on any node. propose(data []byte) // When entries have been committed and can be applied. @@ -157,9 +158,13 @@ func (c *cluster) createRaftGroupWithPeers(name string, servers []*Server, smf s // Driver program for the state machine. // Should be run in its own go routine. func smLoop(sm stateMachine) { - s, n := sm.server(), sm.node() + s, n, wg := sm.server(), sm.node(), sm.waitGroup() qch, lch, aq := n.QuitC(), n.LeadChangeC(), n.ApplyQ() + // Wait group used to allow waiting until we exit from here. + wg.Add(1) + defer wg.Done() + for { select { case <-s.quitCh: @@ -185,6 +190,7 @@ type stateAdder struct { sync.Mutex s *Server n RaftNode + wg sync.WaitGroup cfg *RaftConfig sum int64 lch chan bool @@ -196,12 +202,19 @@ func (a *stateAdder) server() *Server { defer a.Unlock() return a.s } + func (a *stateAdder) node() RaftNode { a.Lock() defer a.Unlock() return a.n } +func (a *stateAdder) waitGroup() *sync.WaitGroup { + a.Lock() + defer a.Unlock() + return &a.wg +} + func (a *stateAdder) propose(data []byte) { a.Lock() defer a.Unlock() @@ -243,10 +256,10 @@ func (a *stateAdder) proposeDelta(delta int64) { // Stop the group. func (a *stateAdder) stop() { - a.Lock() - defer a.Unlock() - a.n.Stop() - a.n.WaitForStop() + n, wg := a.node(), a.waitGroup() + n.Stop() + n.WaitForStop() + wg.Wait() } // Restart the group diff --git a/server/raft_test.go b/server/raft_test.go index df028fc548..46ea536a58 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -36,11 +36,11 @@ func TestNRGSimple(t *testing.T) { rg := c.createRaftGroup("TEST", 3, newStateAdder) rg.waitOnLeader() // Do several state transitions. - rg.randomMember().(*stateAdder).proposeDelta(11) - rg.randomMember().(*stateAdder).proposeDelta(11) - rg.randomMember().(*stateAdder).proposeDelta(-22) + rg.randomMember().(*stateAdder).proposeDelta(22) + rg.randomMember().(*stateAdder).proposeDelta(-11) + rg.randomMember().(*stateAdder).proposeDelta(-10) // Wait for all members to have the correct state. - rg.waitOnTotal(t, 0) + rg.waitOnTotal(t, 1) } func TestNRGSnapshotAndRestart(t *testing.T) { diff --git a/server/websocket.go b/server/websocket.go index 49881b2c1e..69e6e1a9a7 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -67,7 +67,6 @@ const ( wsCloseStatusProtocolError = 1002 wsCloseStatusUnsupportedData = 1003 wsCloseStatusNoStatusReceived = 1005 - wsCloseStatusAbnormalClosure = 1006 wsCloseStatusInvalidPayloadData = 1007 wsCloseStatusPolicyViolation = 1008 wsCloseStatusMessageTooBig = 1009 @@ -458,9 +457,21 @@ func (c *client) wsHandleControlFrame(r *wsReadInfo, frameType wsOpCode, nc io.R } } } - clm := wsCreateCloseMessage(status, body) + // If the status indicates that nothing was received, then we don't + // send anything back. + // From https://datatracker.ietf.org/doc/html/rfc6455#section-7.4 + // it says that code 1005 is a reserved value and MUST NOT be set as a + // status code in a Close control frame by an endpoint. It is + // designated for use in applications expecting a status code to indicate + // that no status code was actually present. + var clm []byte + if status != wsCloseStatusNoStatusReceived { + clm = wsCreateCloseMessage(status, body) + } c.wsEnqueueControlMessage(wsCloseMessage, clm) - nbPoolPut(clm) // wsEnqueueControlMessage has taken a copy. + if len(clm) > 0 { + nbPoolPut(clm) // wsEnqueueControlMessage has taken a copy. + } // Return io.EOF so that readLoop will close the connection as ClientClosed // after processing pending buffers. return pos, io.EOF @@ -647,10 +658,11 @@ func (c *client) wsEnqueueCloseMessage(reason ClosedState) { status = wsCloseStatusProtocolError case MaxPayloadExceeded: status = wsCloseStatusMessageTooBig - case ServerShutdown: + case WriteError, ReadError, StaleConnection, ServerShutdown: + // We used to have WriteError, ReadError and StaleConnection result in + // code 1006, which the spec says that it must not be used to set the + // status in the close message. So using this one instead. status = wsCloseStatusGoingAway - case WriteError, ReadError, StaleConnection: - status = wsCloseStatusAbnormalClosure default: status = wsCloseStatusInternalSrvError } diff --git a/server/websocket_test.go b/server/websocket_test.go index 368a9fe8d6..e4fa70e75a 100644 --- a/server/websocket_test.go +++ b/server/websocket_test.go @@ -647,19 +647,24 @@ func TestWSReadPongFrame(t *testing.T) { func TestWSReadCloseFrame(t *testing.T) { for _, test := range []struct { - name string - payload []byte + name string + noStatus bool + payload []byte }{ - {"without payload", nil}, - {"with payload", []byte("optional payload")}, + {"status without payload", false, nil}, + {"status with payload", false, []byte("optional payload")}, + {"no status no payload", true, nil}, } { t.Run(test.name, func(t *testing.T) { c, ri, tr := testWSSetupForRead() - // a close message has a status in 2 bytes + optional payload - payload := make([]byte, 2+len(test.payload)) - binary.BigEndian.PutUint16(payload[:2], wsCloseStatusNormalClosure) - if len(test.payload) > 0 { - copy(payload[2:], test.payload) + var payload []byte + if !test.noStatus { + // a close message has a status in 2 bytes + optional payload + payload = make([]byte, 2+len(test.payload)) + binary.BigEndian.PutUint16(payload[:2], wsCloseStatusNormalClosure) + if len(test.payload) > 0 { + copy(payload[2:], test.payload) + } } close := testWSCreateClientMsg(wsCloseMessage, 1, true, false, payload) // Have a normal frame prior to close to make sure that wsRead returns @@ -685,7 +690,11 @@ func TestWSReadCloseFrame(t *testing.T) { if n := len(nb); n == 0 { t.Fatalf("Expected buffers, got %v", n) } - if expected := 2 + 2 + len(test.payload); expected != len(nb[0]) { + if test.noStatus { + if expected := 2; expected != len(nb[0]) { + t.Fatalf("Expected buffer to be %v bytes long, got %v", expected, len(nb[0])) + } + } else if expected := 2 + 2 + len(test.payload); expected != len(nb[0]) { t.Fatalf("Expected buffer to be %v bytes long, got %v", expected, len(nb[0])) } b := nb[0][0] @@ -695,12 +704,14 @@ func TestWSReadCloseFrame(t *testing.T) { if b&byte(wsCloseMessage) == 0 { t.Fatalf("Should have been a CLOSE, it wasn't: %v", b) } - if status := binary.BigEndian.Uint16(nb[0][2:4]); status != wsCloseStatusNormalClosure { - t.Fatalf("Expected status to be %v, got %v", wsCloseStatusNormalClosure, status) - } - if len(test.payload) > 0 { - if !bytes.Equal(nb[0][4:], test.payload) { - t.Fatalf("Unexpected content: %s", nb[0][4:]) + if !test.noStatus { + if status := binary.BigEndian.Uint16(nb[0][2:4]); status != wsCloseStatusNormalClosure { + t.Fatalf("Expected status to be %v, got %v", wsCloseStatusNormalClosure, status) + } + if len(test.payload) > 0 { + if !bytes.Equal(nb[0][4:], test.payload) { + t.Fatalf("Unexpected content: %s", nb[0][4:]) + } } } }) @@ -778,7 +789,6 @@ func TestWSCloseFrameWithPartialOrInvalid(t *testing.T) { // Now test close with invalid status size (1 instead of 2 bytes) c, ri, tr = testWSSetupForRead() - payload[0] = 100 binary.BigEndian.PutUint16(payload, wsCloseStatusNormalClosure) closeMsg = testWSCreateClientMsg(wsCloseMessage, 1, true, false, payload[:1]) @@ -795,14 +805,15 @@ func TestWSCloseFrameWithPartialOrInvalid(t *testing.T) { if n := len(bufs); n != 0 { t.Fatalf("Unexpected buffer returned: %v", n) } - // A CLOSE should have been queued with the payload of the original close message. + // Since no status was received, the server will send a close frame without + // status code nor payload. c.mu.Lock() nb, _ = c.collapsePtoNB() c.mu.Unlock() if n := len(nb); n == 0 { t.Fatalf("Expected buffers, got %v", n) } - if expected := 2 + 2; expected != len(nb[0]) { + if expected := 2; expected != len(nb[0]) { t.Fatalf("Expected buffer to be %v bytes long, got %v", expected, len(nb[0])) } b = nb[0][0] @@ -812,13 +823,6 @@ func TestWSCloseFrameWithPartialOrInvalid(t *testing.T) { if b&byte(wsCloseMessage) == 0 { t.Fatalf("Should have been a CLOSE, it wasn't: %v", b) } - // Since satus was not valid, we should get wsCloseStatusNoStatusReceived - if status := binary.BigEndian.Uint16(nb[0][2:4]); status != wsCloseStatusNoStatusReceived { - t.Fatalf("Expected status to be %v, got %v", wsCloseStatusNoStatusReceived, status) - } - if len(nb[0][:]) != 4 { - t.Fatalf("Unexpected content: %s", nb[0][2:]) - } } func TestWSReadGetErrors(t *testing.T) { @@ -1015,9 +1019,9 @@ func TestWSEnqueueCloseMsg(t *testing.T) { {BadClientProtocolVersion, wsCloseStatusProtocolError}, {MaxPayloadExceeded, wsCloseStatusMessageTooBig}, {ServerShutdown, wsCloseStatusGoingAway}, - {WriteError, wsCloseStatusAbnormalClosure}, - {ReadError, wsCloseStatusAbnormalClosure}, - {StaleConnection, wsCloseStatusAbnormalClosure}, + {WriteError, wsCloseStatusGoingAway}, + {ReadError, wsCloseStatusGoingAway}, + {StaleConnection, wsCloseStatusGoingAway}, {ClosedState(254), wsCloseStatusInternalSrvError}, } { t.Run(test.reason.String(), func(t *testing.T) {