Skip to content

Commit

Permalink
Avoid CoW while processing the next state when HTTP2FrameDecoder deco…
Browse files Browse the repository at this point in the history
…des (#438)

Motivation:

CoWs appear when switching over the current state of the parser and holding it while also modifying it - inside `processNExtState()`. Following `append(bytes: ByteBuffer)`'s pattern, we should use a function that sets the state to an intermediary one with no associated data, before making the transformations.

Modifications:
- created the `avoidParserCoW()` helper function for the throwing functions
- used it in the switch cases inside `processNextState()`

Result:

CoW will be avoided when changing the state of the HTTP2FrameDecoder when decoding, so we won't have unnecessary heap allocations.
  • Loading branch information
stefanadranca authored Apr 17, 2024
1 parent a0c580b commit c6afe04
Showing 1 changed file with 168 additions and 64 deletions.
232 changes: 168 additions & 64 deletions Sources/NIOHTTP2/HTTP2FrameParser.swift
Original file line number Diff line number Diff line change
Expand Up @@ -781,60 +781,117 @@ struct HTTP2FrameDecoder {

}

fileprivate enum ProcessAccumulatingResult {
case `continue`
case needMoreData
case parseFrame(header: FrameHeader, payloadBytes: ByteBuffer, paddingBytes: Int?)
}

private mutating func processNextState() throws -> ParseResult {
switch self.state {
case .awaitingClientMagic(var state):
guard let newState = try state.process() else {
return .needMoreData
return try self.avoidingParserCoW { newState in
let result = Result<ParseResult, Error> {
guard let processResult = try state.process() else {
newState = .awaitingClientMagic(state)
return .needMoreData
}
newState = .accumulatingFrameHeader(processResult)
return .continue
}
switch result {
case let .success(parseResult):
return parseResult
case let .failure(error):
newState = .awaitingClientMagic(state)
throw error
}
}

self.state = .accumulatingFrameHeader(newState)
return .continue


case .initialized:
// no bytes, no frame
return .needMoreData

case .accumulatingFrameHeader(var state):
guard let targetState = try state.process(maxFrameSize: self.maxFrameSize, maxHeaderListSize: self.headerDecoder.maxHeaderListSize) else {
return .needMoreData
let maxFrameSize = self.maxFrameSize
let maxHeaderListSize = self.headerDecoder.maxHeaderListSize
return try self.avoidingParserCoW { newState in
let result = Result<ParseResult, Error> {
guard let targetState = try state.process(maxFrameSize: maxFrameSize, maxHeaderListSize: maxHeaderListSize) else {
newState = .accumulatingFrameHeader(state)
return .needMoreData
}

newState = .init(targetState)
return .continue
}

switch result {
case let .success(processResult):
return processResult
case let .failure(error):
newState = .accumulatingFrameHeader(state)
throw error
}
}

self.state = .init(targetState)
return .continue


case .awaitingPaddingLengthByte(var state):
guard let targetState = try state.process() else {
return .needMoreData
return try self.avoidingParserCoW { newState in
let result = Result<ParseResult, Error> {
guard let targetState = try state.process() else {
newState = .awaitingPaddingLengthByte(state)
return .needMoreData
}

newState = .init(targetState)
return .continue
}
switch result {
case let .success(processResult):
return processResult
case let .failure(error):
newState = .awaitingPaddingLengthByte(state)
throw error
}
}

self.state = .init(targetState)
return .continue



case .accumulatingPayload(var state):
guard let processResult = try state.process() else {
return .needMoreData
let accumulatingResult: ProcessAccumulatingResult = try self.avoidingParserCoW { newState in
let result = Result<ProcessAccumulatingResult, Error> {
guard let processResult = try state.process() else {
newState = .accumulatingPayload(state)
return .needMoreData
}

switch processResult {
case .accumulateHeaderBlockFragments(let nextState):
newState = .accumulatingHeaderBlockFragments(nextState)
return .continue
case .parseFrame(header: let header, payloadBytes: let payloadBytes, paddingBytes: let paddingBytes, nextState: let nextState):
// Save off the state first, because if the frame payload parse throws we want to be able to skip over the frame.
newState = .accumulatingFrameHeader(nextState)
return .parseFrame(header: header, payloadBytes: payloadBytes, paddingBytes: paddingBytes)
}
}

switch result {
case let .success(processAccumulatingResult):
return processAccumulatingResult
case let .failure(error):
newState = .accumulatingPayload(state)
throw error
}
}

switch processResult {
case .accumulateHeaderBlockFragments(let state):
self.state = .accumulatingHeaderBlockFragments(state)
return .continue

case .parseFrame(header: let header, payloadBytes: var payloadBytes, paddingBytes: let paddingBytes, nextState: let nextState):
// Save off the state first, because if the frame payload parse throws we want to be able to skip over the frame.
self.state = .accumulatingFrameHeader(nextState)


switch accumulatingResult {
case .parseFrame(header: let header, payloadBytes: var payloadBytes, paddingBytes: let paddingBytes):
// an entire frame's data, including HEADERS/PUSH_PROMISE with the END_HEADERS flag set
// this may legitimately return nil if we ignore the frame
let result = try self.readFrame(withHeader: header, from: &payloadBytes, paddingBytes: paddingBytes)

if payloadBytes.readableBytes > 0 {
// Enforce that we consumed all the bytes.
throw InternalError.codecError(code: .frameSizeError)
}

// if we got a frame, return it. If not that means we consumed and ignored a frame, so we
// should go round again.
// We cannot emit DATA frames from here, so the flow controlled length is always 0.
Expand All @@ -843,38 +900,70 @@ struct HTTP2FrameDecoder {
} else {
return .continue
}
case .continue:
return .continue
case .needMoreData:
return .needMoreData
}

case .simulatingDataFrames(var state):
guard let processResult = try state.process() else {
return .needMoreData
return try self.avoidingParserCoW { newState in
let result = Result<ParseResult, Error> {
guard let processResult = try state.process() else {
newState = .simulatingDataFrames(state)
return .needMoreData
}
newState = .init(processResult.nextState)
return .frame(processResult.frame, flowControlledLength: processResult.flowControlledLength)
}
switch result {
case let .success(processResult):
return processResult
case let .failure(error):
newState = .simulatingDataFrames(state)
throw error
}
}

self.state = .init(processResult.nextState)
return .frame(processResult.frame, flowControlledLength: processResult.flowControlledLength)

case .strippingTrailingPadding(var state):
guard let nextState = state.process() else {
return .needMoreData
return self.avoidingParserCoW { newState in
guard let nextState = state.process() else {
newState = .strippingTrailingPadding(state)
return .needMoreData
}

newState = .init(nextState)
return .continue
}

self.state = .init(nextState)
return .continue

case .accumulatingContinuationPayload(var state):
guard let processResult = try state.process() else {
return .needMoreData
let accumulatingResult: ProcessAccumulatingResult = try self.avoidingParserCoW { newState in
let result = Result<ProcessAccumulatingResult, Error> {
guard let processResult = try state.process() else {
newState = .accumulatingContinuationPayload(state)
return .needMoreData
}
switch processResult {
case .accumulateContinuationHeader(let nextState):
newState = .accumulatingHeaderBlockFragments(nextState)
return .continue
case .parseFrame(header: let header, payloadBytes: let payloadBytes, paddingBytes: let paddingBytes, nextState: let nextState):
// Save off the state first, because if the frame payload parse throws we want to be able to skip over the frame.
newState = .accumulatingFrameHeader(nextState)
return .parseFrame(header: header, payloadBytes: payloadBytes, paddingBytes: paddingBytes)
}
}
switch result {
case let .success(processAccumulatingResult):
return processAccumulatingResult
case let .failure(error):
newState = .accumulatingContinuationPayload(state)
throw error
}
}

switch processResult {
case .accumulateContinuationHeader(let state):
self.state = .accumulatingHeaderBlockFragments(state)
return .continue

case .parseFrame(header: let header, payloadBytes: var payloadBytes, paddingBytes: let paddingBytes, nextState: let nextState):
// Save off the state first, because if the frame payload parse throws we want to be able to skip over the frame.
self.state = .accumulatingFrameHeader(nextState)


switch accumulatingResult {
case .parseFrame(header: let header, payloadBytes: var payloadBytes, paddingBytes: let paddingBytes):
// an entire frame's data, including HEADERS/PUSH_PROMISE with the END_HEADERS flag set
// this may legitimately return nil if we ignore the frame
let result = try self.readFrame(withHeader: header, from: &payloadBytes, paddingBytes: paddingBytes)
Expand All @@ -883,7 +972,6 @@ struct HTTP2FrameDecoder {
// Enforce that we consumed all the bytes.
throw InternalError.codecError(code: .frameSizeError)
}

// if we got a frame, return it. If not that means we consumed and ignored a frame, so we
// should go round again.
// We cannot emit DATA frames from here, so the flow controlled length is always 0.
Expand All @@ -892,16 +980,32 @@ struct HTTP2FrameDecoder {
} else {
return .continue
}
case .continue:
return .continue
case .needMoreData:
return .needMoreData
}

case .accumulatingHeaderBlockFragments(var state):
guard let processResult = try state.process(maxHeaderListSize: self.headerDecoder.maxHeaderListSize) else {
return .needMoreData
let maxHeaderListSize = self.headerDecoder.maxHeaderListSize
return try self.avoidingParserCoW { newState in
let result = Result<ParseResult, Error> {
guard let processResult = try state.process(maxHeaderListSize: maxHeaderListSize) else {
newState = .accumulatingHeaderBlockFragments(state)
return .needMoreData
}
newState = .accumulatingContinuationPayload(processResult)
return .continue
}
switch result {
case let .success(processResult):
return processResult
case let .failure(error):
newState = .accumulatingHeaderBlockFragments(state)
throw error
}
}

self.state = .accumulatingContinuationPayload(processResult)
return .continue


case .appending:
preconditionFailure("Attempting to process in appending state")
}
Expand Down Expand Up @@ -1346,13 +1450,13 @@ extension HTTP2FrameDecoder {
/// Sadly, because it's generic and has a closure, we need to force it to be inlined at all call sites, which is
/// not ideal.
@inline(__always)
private mutating func avoidingParserCoW<ReturnType>(_ body: (inout ParserState) -> ReturnType) -> ReturnType {
private mutating func avoidingParserCoW<ReturnType>(_ body: (inout ParserState) throws -> ReturnType) rethrows -> ReturnType {
self.state = .appending
defer {
assert(!self.isAppending)
}

return body(&self.state)
return try body(&self.state)
}

private var isAppending: Bool {
Expand Down

0 comments on commit c6afe04

Please sign in to comment.