Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix DemandBuffer Data Races #3514

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 39 additions & 28 deletions Sources/ComposableArchitecture/Internal/Create.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,28 +36,25 @@ final class DemandBuffer<S: Subscriber>: @unchecked Sendable {

func buffer(value: S.Input) -> Subscribers.Demand {
lock.lock()
defer { lock.unlock() }

precondition(
self.completion == nil, "How could a completed publisher sent values?! Beats me 🤷‍♂️")
self.completion == nil, "How could a completed publisher send values?! Beats me 🤷‍♂️")

switch demandState.requested {
case .unlimited:
if demandState.requested == .unlimited {
lock.unlock()
return subscriber.receive(value)
default:
} else {
buffer.append(value)
lock.unlock()
return flush()
}
}

func complete(completion: Subscribers.Completion<S.Failure>) {
lock.lock()
defer { lock.unlock() }

precondition(
self.completion == nil, "Completion have already occurred, which is quite awkward 🥺")

self.completion == nil, "Completion has already occurred, which is quite awkward 🥺")
self.completion = completion
lock.unlock()
_ = flush()
}

Expand All @@ -66,39 +63,53 @@ final class DemandBuffer<S: Subscriber>: @unchecked Sendable {
}

private func flush(adding newDemand: Subscribers.Demand? = nil) -> Subscribers.Demand {
self.lock.sync {
var sentDemand = Subscribers.Demand.none
var completionToSend: Subscribers.Completion<S.Failure>?

if let newDemand = newDemand {
demandState.requested += newDemand
}
lock.lock()
if let newDemand = newDemand {
demandState.requested += newDemand
}
lock.unlock()

// If buffer isn't ready for flushing, return immediately
guard demandState.requested > 0 || newDemand == Subscribers.Demand.none else { return .none }
var loop = true
while loop {
var valueToSend: S.Input?

while !buffer.isEmpty && demandState.processed < demandState.requested {
demandState.requested += subscriber.receive(buffer.remove(at: 0))
lock.lock()
if !buffer.isEmpty && demandState.processed < demandState.requested {
valueToSend = buffer.remove(at: 0)
demandState.processed += 1
}

if let completion = completion {
// Completion event was already sent
sentDemand += 1
} else if let completion = completion {
buffer = []
demandState = .init()
self.completion = nil
subscriber.receive(completion: completion)
return .none
completionToSend = completion
loop = false
} else {
loop = false
}
lock.unlock()

if let value = valueToSend {
let additionalDemand = subscriber.receive(value)
lock.lock()
demandState.requested += additionalDemand
lock.unlock()
}
}

let sentDemand = demandState.requested - demandState.sent
demandState.sent += sentDemand
return sentDemand
if let completion = completionToSend {
subscriber.receive(completion: completion)
}

return sentDemand
}

struct Demand {
var processed: Subscribers.Demand = .none
var requested: Subscribers.Demand = .none
var sent: Subscribers.Demand = .none
}
}

Expand Down
170 changes: 170 additions & 0 deletions Tests/ComposableArchitectureTests/DemandBufferTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
#if DEBUG
@preconcurrency import Combine
@testable @preconcurrency import ComposableArchitecture
import XCTest

final class DemandBufferTests: BaseTCATestCase {
func testConcurrentSend() async throws {
let values = LockIsolated<Set<Int>>([])

let effect = AnyPublisher<Int, Never>.create { subscriber in
Task.detached { @Sendable in
for index in 0...1_000 {
subscriber.send(index)
}
subscriber.send(completion: .finished)
}
return AnyCancellable {}
}

let cancellable = effect.sink { value in
values.withValue {
_ = $0.insert(value)
}
}

try await Task.sleep(nanoseconds: NSEC_PER_SEC)

XCTAssertEqual(values.value, Set(0...1_000))

_ = cancellable
}

func testConcurrentDemandAndSend() async throws {
let values = LockIsolated<Set<Int>>([])
let subscriberLock = LockIsolated<Void>(())

let effectSubscriber = LockIsolated<Effect<Int>.Subscriber?>(nil)
let effect = AnyPublisher<Int, Never>.create { subscriber in
effectSubscriber.setValue(subscriber)
return AnyCancellable {}
}

let cancellable = effect.sink { value in
values.withValue {
_ = $0.insert(value)
}
}

await withTaskGroup(of: Void.self) { group in
for index in 0..<1_000 {
group.addTask { @Sendable in
effectSubscriber.value?.send(index)
}
group.addTask { @Sendable in
subscriberLock.withValue { _ in
_ = (effectSubscriber.value as? any Subscription)?.request(.max(1))
}
}
}
}

try await Task.sleep(nanoseconds: NSEC_PER_SEC)

XCTAssertEqual(values.value, Set(0..<1_000))

_ = cancellable
}

func testReentrantSubscriber() async throws {
let values = LockIsolated<Set<Int>>([])
let effectSubscriber = LockIsolated<Effect<Int>.Subscriber?>(nil)

let effect = AnyPublisher<Int, Never>.create { subscriber in
effectSubscriber.setValue(subscriber)
return AnyCancellable {}
}

let cancellable = effect.sink { value in
values.withValue {
_ = $0.insert(value)
}
if value < 1_000 {
Task { @MainActor in
effectSubscriber.value?.send(value + 1_000)
}
}
}

Task.detached { @Sendable in
for index in 0..<1_000 {
effectSubscriber.value?.send(index)
}
}

try await Task.sleep(nanoseconds: NSEC_PER_SEC)

XCTAssertEqual(values.value, Set(0..<2_000))

_ = cancellable
}

func testNoDeadlockOnReentrantSend() {
let values = LockIsolated<Set<Int>>([])
let expectation = XCTestExpectation(description: "Test should not deadlock")

let effectSubscriber = LockIsolated<Effect<Int>.Subscriber?>(nil)
let effect = AnyPublisher<Int, Never>.create { subscriber in
effectSubscriber.withValue { $0 = subscriber }
return AnyCancellable {}
}

let cancellable = effect.sink { value in
values.withValue {
_ = $0.insert(value)
}
// Prevent infinite recursion by limiting re-entrant calls
if value == 0 {
effectSubscriber.withValue { $0?.send(value + 1_000) }
} else {
expectation.fulfill()
}
}

// Ensure that 'effectSubscriber' is set before we use it
effectSubscriber.withValue { subscriber in
XCTAssertNotNil(subscriber)
subscriber?.send(0)
}

// Wait for the test to complete
wait(for: [expectation], timeout: 1.0)

XCTAssertEqual(values.value, Set([0, 1_000]))

_ = cancellable
}

func testConcurrentSendAndCompletion() {
let values = LockIsolated<Set<Int>>([])
let expectation = XCTestExpectation(description: "All values received")

let effect = AnyPublisher<Int, Never>.create { subscriber in
// Concurrently send values
DispatchQueue.concurrentPerform(iterations: 1000) { index in
subscriber.send(index)
}
// Send completion
DispatchQueue.global().async {
subscriber.send(completion: .finished)
}
return AnyCancellable {}
}

let cancellable = effect.sink(
receiveCompletion: { _ in expectation.fulfill() },
receiveValue: { value in
values.withValue {
_ = $0.insert(value)
}
}
)

wait(for: [expectation], timeout: 5.0)

XCTAssertEqual(values.value.count, 1000)

Check failure on line 165 in Tests/ComposableArchitectureTests/DemandBufferTests.swift

View workflow job for this annotation

GitHub Actions / xcodebuild (15) (test, MACOS, 15.4)

testConcurrentSendAndCompletion, XCTAssertEqual failed: ("999") is not equal to ("1000")

_ = cancellable
}
}
#endif
Loading