Skip to content

Commit

Permalink
Make HTTP2StreamChannel.isActive thread safe (#199)
Browse files Browse the repository at this point in the history
Motivation:

All public parts of the Channel protocol, such as isActive, are required
to be thread safe. Unfortunately, we missed a spot with the isActive
bool on HTTP2StreamChannel.

Modifications:

Back the HTTP2StreamChannel.isActive bool with an atomic, and set it on
state change.

Result:

Thread-safety for public readers of channel activity
  • Loading branch information
Lukasa authored May 11, 2020
1 parent 0ca9705 commit 5f665cd
Showing 1 changed file with 26 additions and 9 deletions.
35 changes: 26 additions & 9 deletions Sources/NIOHTTP2/HTTP2StreamChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
self.streamID = streamID
self.multiplexer = multiplexer
self.windowManager = InboundWindowManager(targetSize: Int32(targetWindowSize))
self._isActiveAtomic = .makeAtomic(value: false)
self._isWritable = .makeAtomic(value: true)
self.state = .idle
self.writabilityManager = StreamChannelFlowController(highWatermark: outboundBytesHighWatermark,
Expand Down Expand Up @@ -192,7 +193,7 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
return
}

self.state.activate()
self.modifyingState { $0.activate() }
self.pipeline.fireChannelActive()
if self.autoRead {
self.read0()
Expand All @@ -207,7 +208,7 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
self.parent?.writeAndFlush(resetFrame, promise: nil)
return
}
self.state.networkActive()
self.modifyingState { $0.networkActive() }

if self.writabilityManager.isWritable != self._isWritable.load() {
// We have probably delayed telling the user that this channel isn't writable, but we should do
Expand Down Expand Up @@ -307,10 +308,16 @@ final class HTTP2StreamChannel: Channel, ChannelCore {

private let _isWritable: NIOAtomic<Bool>

public var isActive: Bool {
private var _isActive: Bool {
return self.state == .active || self.state == .closing || self.state == .localActive
}

public var isActive: Bool {
return self._isActiveAtomic.load()
}

private let _isActiveAtomic: NIOAtomic<Bool>

public var _channelCore: ChannelCore {
return self
}
Expand Down Expand Up @@ -398,7 +405,7 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
public func flush0() {
self.pendingWrites.mark()

if self.isActive {
if self._isActive {
self.deliverPendingWrites()
}
}
Expand Down Expand Up @@ -470,7 +477,7 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
return
}

self.state.beginClosing()
self.modifyingState { $0.beginClosing() }
let resetFrame = HTTP2Frame(streamID: self.streamID, payload: .rstStream(.cancel))
self.receiveOutboundFrame(resetFrame, promise: nil)
self.multiplexer.childChannelFlush()
Expand All @@ -480,7 +487,7 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
guard self.state != .closed else {
return
}
self.state.completeClosing()
self.modifyingState { $0.completeClosing() }
self.dropPendingReads()
self.failPendingWrites(error: ChannelError.eof)
if let promise = self.pendingClosePromise {
Expand All @@ -500,7 +507,7 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
guard self.state != .closed else {
return
}
self.state.completeClosing()
self.modifyingState { $0.completeClosing() }
self.dropPendingReads()
self.failPendingWrites(error: error)
if let promise = self.pendingClosePromise {
Expand All @@ -524,7 +531,7 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
}

// If we're not active, we will hold on to these reads.
guard self.isActive else {
guard self._isActive else {
return
}

Expand Down Expand Up @@ -556,7 +563,7 @@ private extension HTTP2StreamChannel {

/// Deliver all pending reads to the channel.
private func deliverPendingReads() {
assert(self.isActive)
assert(self._isActive)
while self.pendingReads.count > 0 {
let frame = self.pendingReads.removeFirst()

Expand Down Expand Up @@ -705,6 +712,16 @@ internal extension HTTP2StreamChannel {
}
}

extension HTTP2StreamChannel {
// A helper function used to ensure that state modification leads to changes in the channel active atomic.
private func modifyingState<ReturnType>(_ closure: (inout StreamChannelState) throws -> ReturnType) rethrows -> ReturnType {
defer {
self._isActiveAtomic.store(self._isActive)
}
return try closure(&self.state)
}
}

extension HTTP2Frame {
/// A shorthand heuristic for how many bytes we assume a frame consumes on the wire.
///
Expand Down

0 comments on commit 5f665cd

Please sign in to comment.