From a68656c049b819caed09ca253e391752d57a4dc7 Mon Sep 17 00:00:00 2001 From: Petrus Nguyen Thai Hoc Date: Wed, 30 Aug 2023 16:05:29 +0700 Subject: [PATCH 1/8] pause subscription when cancelling inner subscription, then resume --- lib/src/transformers/delay_when.dart | 20 ++++++------- lib/src/transformers/switch_map.dart | 43 ++++++++++++++++++++++++++-- lib/src/utils/forwarding_sink.dart | 30 +++++++++++++++++-- lib/src/utils/forwarding_stream.dart | 31 ++++++++++++++++++-- 4 files changed, 107 insertions(+), 17 deletions(-) diff --git a/lib/src/transformers/delay_when.dart b/lib/src/transformers/delay_when.dart index 606f52aa5..6ee9bedc4 100644 --- a/lib/src/transformers/delay_when.dart +++ b/lib/src/transformers/delay_when.dart @@ -10,7 +10,7 @@ class _DelayWhenStreamSink extends ForwardingSink { final Stream? listenDelay; final subscriptions = >[]; - StreamSubscription? subscription; + StreamSubscription? delaySubscription; var closed = false; _DelayWhenStreamSink(this.itemDelaySelector, this.listenDelay); @@ -45,8 +45,8 @@ class _DelayWhenStreamSink extends ForwardingSink { @override Future? onCancel() { - final future = subscription?.cancel(); - subscription = null; + final future = delaySubscription?.cancel(); + delaySubscription = null; if (subscriptions.isEmpty) { return future; @@ -68,16 +68,16 @@ class _DelayWhenStreamSink extends ForwardingSink { } final completer = Completer.sync(); - subscription = listenDelay!.take(1).listen( + delaySubscription = listenDelay!.take(1).listen( null, onError: (Object e, StackTrace s) { - subscription?.cancel(); - subscription = null; + delaySubscription?.cancel(); + delaySubscription = null; completer.completeError(e, s); }, onDone: () { - subscription?.cancel(); - subscription = null; + delaySubscription?.cancel(); + delaySubscription = null; completer.complete(null); }, ); @@ -86,13 +86,13 @@ class _DelayWhenStreamSink extends ForwardingSink { @override void onPause() { - subscription?.pause(); + delaySubscription?.pause(); subscriptions.pauseAll(); } @override void onResume() { - subscription?.resume(); + delaySubscription?.resume(); subscriptions.resumeAll(); } } diff --git a/lib/src/transformers/switch_map.dart b/lib/src/transformers/switch_map.dart index 6954dc695..158cc0e0c 100644 --- a/lib/src/transformers/switch_map.dart +++ b/lib/src/transformers/switch_map.dart @@ -7,6 +7,7 @@ class _SwitchMapStreamSink extends ForwardingSink { final Stream Function(S value) _mapper; StreamSubscription? _mapperSubscription; bool _inputClosed = false; + bool _isCancelled = false; _SwitchMapStreamSink(this._mapper); @@ -20,7 +21,33 @@ class _SwitchMapStreamSink extends ForwardingSink { return; } - _mapperSubscription?.cancel(); + final mapperSubscription = _mapperSubscription; + + if (mapperSubscription == null) { + listenToInner(mappedStream); + return; + } + + _mapperSubscription = null; + pause(); + mapperSubscription.cancel().onError((e, s) { + if (!_isCancelled) { + sink.addError(e, s); + } + }).whenComplete(() => resumeAndListenToInner(mappedStream)); + } + + void resumeAndListenToInner(Stream mappedStream) { + if (_isCancelled) { + return; + } + + resume(); + listenToInner(mappedStream); + } + + void listenToInner(Stream mappedStream) { + assert(_mapperSubscription == null); _mapperSubscription = mappedStream.listen( sink.add, @@ -33,6 +60,14 @@ class _SwitchMapStreamSink extends ForwardingSink { } }, ); + + // https://github.com/dart-lang/stream_transform/blob/9743578b0119de6a8badd30bb16ef15d79bd3b15/lib/src/switch.dart#L71-L74 + // If a pause happens during an _mapperSubscription.cancel, + // we still listen to the next stream when the cancel is done. + // Then we immediately pause it again here. + if (sink.isPaused) { + _mapperSubscription?.pause(); + } } @override @@ -46,7 +81,11 @@ class _SwitchMapStreamSink extends ForwardingSink { } @override - FutureOr onCancel() => _mapperSubscription?.cancel(); + FutureOr onCancel() { + _isCancelled = true; + + return _mapperSubscription?.cancel(); + } @override void onListen() {} diff --git a/lib/src/utils/forwarding_sink.dart b/lib/src/utils/forwarding_sink.dart index c973d4a0b..b0e6bcbd4 100644 --- a/lib/src/utils/forwarding_sink.dart +++ b/lib/src/utils/forwarding_sink.dart @@ -2,6 +2,12 @@ import 'dart:async'; import 'package:rxdart/src/utils/error_and_stacktrace.dart'; +/// TODO +abstract class EnhancedEventSink implements EventSink { + /// TODO + bool get isPaused; +} + /// A [Sink] that supports event hooks. /// /// This makes it suitable for certain rx transformers that need to @@ -11,14 +17,32 @@ import 'package:rxdart/src/utils/error_and_stacktrace.dart'; /// [Stream]s. See, for example, [Stream.eventTransformed] which uses /// `EventSink`s to transform events. abstract class ForwardingSink { - EventSink? _sink; + EnhancedEventSink? _sink; + StreamSubscription? _subscription; /// The output sink. - EventSink get sink => + /// @nonVirtual + EnhancedEventSink get sink => _sink ?? (throw StateError('Must call setSink(sink) before accessing!')); /// Set the output sink. - void setSink(EventSink sink) => _sink = sink; + /// @nonVirtual + void setSink(EnhancedEventSink sink) => _sink = sink; + + /// Set the upstream subscription + /// @nonVirtual + void setSubscription(StreamSubscription? subscription) => + _subscription = subscription; + + /// Pause the subscription. + /// @nonVirtual + void pause() => _subscription?.pause(); + + /// Resume the subscription. + /// @nonVirtual + void resume() => _subscription?.resume(); + + /// -------------------------------------------------------------------------- /// Handle data event void onData(T data); diff --git a/lib/src/utils/forwarding_stream.dart b/lib/src/utils/forwarding_stream.dart index 484ccd8a7..1050eca4c 100644 --- a/lib/src/utils/forwarding_stream.dart +++ b/lib/src/utils/forwarding_stream.dart @@ -38,6 +38,7 @@ Stream _forwardMulti( onError: sink.onError, onDone: sink.onDone, ); + sink.setSubscription(subscription); } final futureOrVoid = sink.onListen(); @@ -57,6 +58,8 @@ Stream _forwardMulti( final future = subscription?.cancel(); subscription = null; + sink.setSubscription(null); + return waitTwoFutures(future, sink.onCancel()); }; }, isBroadcast: true); @@ -84,6 +87,7 @@ Stream _forward( onError: sink.onError, onDone: sink.onDone, ); + sink.setSubscription(subscription); if (!stream.isBroadcast) { controller.onPause = () { @@ -97,7 +101,7 @@ Stream _forward( } } - sink.setSink(controller); + sink.setSink(_EnhancedEventSink(controller)); final futureOrVoid = sink.onListen(); if (futureOrVoid is Future) { futureOrVoid.then(listenToUpstream).onError((e, s) { @@ -115,13 +119,14 @@ Stream _forward( final future = subscription?.cancel(); subscription = null; + sink.setSubscription(null); return waitTwoFutures(future, sink.onCancel()); }; return controller.stream; } -class _MultiControllerSink implements EventSink { +class _MultiControllerSink implements EventSink, EnhancedEventSink { final MultiStreamController controller; _MultiControllerSink(this.controller); @@ -135,4 +140,26 @@ class _MultiControllerSink implements EventSink { @override void close() => controller.closeSync(); + + @override + bool get isPaused => controller.isPaused; +} + +class _EnhancedEventSink implements EnhancedEventSink { + final StreamController _controller; + + _EnhancedEventSink(this._controller); + + @override + void add(T event) => _controller.add(event); + + @override + void addError(Object error, [StackTrace? stackTrace]) => + _controller.addError(error, stackTrace); + + @override + void close() => _controller.close(); + + @override + bool get isPaused => _controller.isPaused; } From ebc87ea8361b9e8e47d5e9eebf1033048c0f2b8b Mon Sep 17 00:00:00 2001 From: Petrus Nguyen Thai Hoc Date: Tue, 24 Oct 2023 14:27:07 +0700 Subject: [PATCH 2/8] add tests --- test/transformers/switch_map_test.dart | 43 ++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/test/transformers/switch_map_test.dart b/test/transformers/switch_map_test.dart index ae6da3d05..05619e5f1 100644 --- a/test/transformers/switch_map_test.dart +++ b/test/transformers/switch_map_test.dart @@ -175,6 +175,49 @@ void main() { (s) => s.switchMap((v) => Stream.value(v)), ); }); + + test( + 'Rx.switch pauses subscription when cancelling inner subscription, then resume', + () async { + final controller = StreamController>(); + + var b = false; + + final completer1 = Completer(); + final c1 = StreamController(); + c1.add(1); + c1.add(2); + c1.onCancel = () => completer1.future.then((_) => b = true); + controller.add(c1); + + final completer2 = Completer(); + final c2 = StreamController(); + c2.add(3); + c2.add(4); + c2.onListen = () { + if (!b) { + throw Exception('should be paused'); + } + }; + c2.onCancel = () => completer2.future; + + final stream = controller.stream.switchMap((c) => c.stream); + + stream.listen( + expectAsync1( + (v) async { + print(v); + if (v == 2) { + controller.add(c2); + await Future.delayed(const Duration(milliseconds: 1000)); + completer1.complete(null); + } + }, + count: 4, + ), + ); + }, + ); } class OnSubscriptionTriggerableStream extends Stream { From 9a74a0c6b29143509679119de63c5ed26080940f Mon Sep 17 00:00:00 2001 From: Petrus Nguyen Thai Hoc Date: Tue, 24 Oct 2023 17:11:00 +0700 Subject: [PATCH 3/8] add tests --- test/transformers/switch_map_test.dart | 61 ++++++++++++++------------ 1 file changed, 34 insertions(+), 27 deletions(-) diff --git a/test/transformers/switch_map_test.dart b/test/transformers/switch_map_test.dart index 05619e5f1..30453def9 100644 --- a/test/transformers/switch_map_test.dart +++ b/test/transformers/switch_map_test.dart @@ -179,38 +179,45 @@ void main() { test( 'Rx.switch pauses subscription when cancelling inner subscription, then resume', () async { - final controller = StreamController>(); - - var b = false; - - final completer1 = Completer(); - final c1 = StreamController(); - c1.add(1); - c1.add(2); - c1.onCancel = () => completer1.future.then((_) => b = true); - controller.add(c1); - - final completer2 = Completer(); - final c2 = StreamController(); - c2.add(3); - c2.add(4); - c2.onListen = () { - if (!b) { - throw Exception('should be paused'); - } - }; - c2.onCancel = () => completer2.future; - + var controller1Cancelled = false; + final cancelCompleter1 = Completer.sync(); + final controller1 = StreamController() + ..add(0) + ..add(1) + ..onCancel = () async { + await Future.delayed(const Duration(milliseconds: 10)); + await cancelCompleter1.future; + controller1Cancelled = true; + }; + + final controller2 = StreamController() + ..add(2) + ..add(3) + ..onListen = () { + expect( + controller1Cancelled, + true, + reason: + 'controller1 should be cancelled before controller2 is listened to', + ); + }; + + final controller = StreamController>() + ..add(controller1); final stream = controller.stream.switchMap((c) => c.stream); + var expected = 0; stream.listen( expectAsync1( (v) async { - print(v); - if (v == 2) { - controller.add(c2); - await Future.delayed(const Duration(milliseconds: 1000)); - completer1.complete(null); + expect(v, expected++); + + if (v == 1) { + // switch to controller2.stream + controller.add(controller2); + + await Future.delayed(const Duration(milliseconds: 10)); + cancelCompleter1.complete(null); } }, count: 4, From 21aca1c41d256bffdcf99f32a0c6202d2fa57ff0 Mon Sep 17 00:00:00 2001 From: Petrus Nguyen Thai Hoc Date: Thu, 26 Oct 2023 17:30:42 +0700 Subject: [PATCH 4/8] add tests --- lib/src/transformers/switch_map.dart | 4 ++-- lib/src/utils/forwarding_sink.dart | 17 +++++++++++------ 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/lib/src/transformers/switch_map.dart b/lib/src/transformers/switch_map.dart index 158cc0e0c..233a8eefc 100644 --- a/lib/src/transformers/switch_map.dart +++ b/lib/src/transformers/switch_map.dart @@ -29,7 +29,7 @@ class _SwitchMapStreamSink extends ForwardingSink { } _mapperSubscription = null; - pause(); + pauseSubscription(); mapperSubscription.cancel().onError((e, s) { if (!_isCancelled) { sink.addError(e, s); @@ -42,7 +42,7 @@ class _SwitchMapStreamSink extends ForwardingSink { return; } - resume(); + resumeSubscription(); listenToInner(mappedStream); } diff --git a/lib/src/utils/forwarding_sink.dart b/lib/src/utils/forwarding_sink.dart index b0e6bcbd4..8b2765f5b 100644 --- a/lib/src/utils/forwarding_sink.dart +++ b/lib/src/utils/forwarding_sink.dart @@ -2,9 +2,9 @@ import 'dart:async'; import 'package:rxdart/src/utils/error_and_stacktrace.dart'; -/// TODO +/// A enhanced [EventSink] that allows to check if the sink is paused. abstract class EnhancedEventSink implements EventSink { - /// TODO + /// Whether the subscription would need to buffer events. bool get isPaused; } @@ -22,25 +22,30 @@ abstract class ForwardingSink { /// The output sink. /// @nonVirtual + /// @internal EnhancedEventSink get sink => _sink ?? (throw StateError('Must call setSink(sink) before accessing!')); /// Set the output sink. /// @nonVirtual + /// @internal void setSink(EnhancedEventSink sink) => _sink = sink; /// Set the upstream subscription /// @nonVirtual + /// @internal void setSubscription(StreamSubscription? subscription) => _subscription = subscription; - /// Pause the subscription. + /// -------------------------------------------------------------------------- + + /// Pause the upstream subscription. /// @nonVirtual - void pause() => _subscription?.pause(); + void pauseSubscription() => _subscription?.pause(); - /// Resume the subscription. + /// Resume the upstream subscription. /// @nonVirtual - void resume() => _subscription?.resume(); + void resumeSubscription() => _subscription?.resume(); /// -------------------------------------------------------------------------- From a3045755c32130bd8ba6cb1276ba4352e0b971cf Mon Sep 17 00:00:00 2001 From: Petrus Nguyen Thai Hoc Date: Thu, 26 Oct 2023 17:40:21 +0700 Subject: [PATCH 5/8] add tests --- test/transformers/switch_map_test.dart | 56 ++++++++++++++++++++++++-- 1 file changed, 52 insertions(+), 4 deletions(-) diff --git a/test/transformers/switch_map_test.dart b/test/transformers/switch_map_test.dart index 30453def9..b938a505e 100644 --- a/test/transformers/switch_map_test.dart +++ b/test/transformers/switch_map_test.dart @@ -177,9 +177,9 @@ void main() { }); test( - 'Rx.switch pauses subscription when cancelling inner subscription, then resume', + 'Rx.switchMap pauses subscription when cancelling inner subscription, then resume', () async { - var controller1Cancelled = false; + var isController1Cancelled = false; final cancelCompleter1 = Completer.sync(); final controller1 = StreamController() ..add(0) @@ -187,7 +187,7 @@ void main() { ..onCancel = () async { await Future.delayed(const Duration(milliseconds: 10)); await cancelCompleter1.future; - controller1Cancelled = true; + isController1Cancelled = true; }; final controller2 = StreamController() @@ -195,7 +195,7 @@ void main() { ..add(3) ..onListen = () { expect( - controller1Cancelled, + isController1Cancelled, true, reason: 'controller1 should be cancelled before controller2 is listened to', @@ -225,6 +225,54 @@ void main() { ); }, ); + + test('Rx.switchMap forwards errors from the cancel()', () { + var isController1Cancelled = false; + + final controller1 = StreamController() + ..add(0) + ..add(1) + ..onCancel = () async { + await Future.delayed(const Duration(milliseconds: 10)); + isController1Cancelled = true; + throw Exception('cancel error'); + }; + + final controller2 = StreamController() + ..add(2) + ..add(3) + ..onListen = () { + expect( + isController1Cancelled, + true, + reason: + 'controller1 should be cancelled before controller2 is listened to', + ); + }; + + final controller = StreamController>() + ..add(controller1); + final stream = controller.stream.switchMap((c) => c.stream); + + var expected = 0; + stream.listen( + expectAsync1( + (v) async { + expect(v, expected++); + + if (v == 1) { + // switch to controller2.stream + controller.add(controller2); + } + }, + count: 4, + ), + onError: expectAsync1( + (Object error) => expect(error, isException), + count: 1, + ), + ); + }); } class OnSubscriptionTriggerableStream extends Stream { From 0cc2a007caa75d3b32259f77d7de2ab6a26f0a03 Mon Sep 17 00:00:00 2001 From: Petrus Nguyen Thai Hoc Date: Thu, 26 Oct 2023 17:54:01 +0700 Subject: [PATCH 6/8] doen --- test/transformers/switch_map_test.dart | 65 ++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/test/transformers/switch_map_test.dart b/test/transformers/switch_map_test.dart index b938a505e..4c8f5ee8b 100644 --- a/test/transformers/switch_map_test.dart +++ b/test/transformers/switch_map_test.dart @@ -273,6 +273,71 @@ void main() { ), ); }); + + test( + 'Rx.switchMap pauses the next inner StreamSubscription when pausing while cancelling the previous inner Stream', + () { + var isController1Cancelled = false; + final cancelCompleter1 = Completer.sync(); + final controller1 = StreamController() + ..add(0) + ..add(1) + ..onCancel = () async { + await Future.delayed(const Duration(milliseconds: 10)); + await cancelCompleter1.future; + isController1Cancelled = true; + }; + + final controller2 = StreamController() + ..add(2) + ..add(3) + ..onListen = () { + expect( + isController1Cancelled, + true, + reason: + 'controller1 should be cancelled before controller2 is listened to', + ); + }; + + final controller = StreamController>() + ..add(controller1); + final stream = controller.stream.switchMap((c) => c.stream); + + var expected = 0; + late StreamSubscription subscription; + subscription = stream.listen( + expectAsync1( + (v) async { + expect(v, expected++); + + if (v == 1) { + // switch to controller2.stream + controller.add(controller2); + + await Future.delayed(const Duration(milliseconds: 10)); + + // pauses the subscription while cancelling the controller1 + subscription.pause(); + + // let the controller1 cancel complete + cancelCompleter1.complete(null); + + // make sure the controller2.stream is added to the controller + await pumpEventQueue(); + + // controller2.stream should be paused + expect(controller2.isPaused, true); + + // resume the subscription to continue the rest of the stream + subscription.resume(); + } + }, + count: 4, + ), + ); + }, + ); } class OnSubscriptionTriggerableStream extends Stream { From b2f83aa01d5c4d74d7b66aa816c9df9b541b0565 Mon Sep 17 00:00:00 2001 From: Petrus Nguyen Thai Hoc Date: Thu, 26 Oct 2023 17:56:15 +0700 Subject: [PATCH 7/8] DONE --- test/transformers/switch_map_test.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/transformers/switch_map_test.dart b/test/transformers/switch_map_test.dart index 4c8f5ee8b..44580fbf3 100644 --- a/test/transformers/switch_map_test.dart +++ b/test/transformers/switch_map_test.dart @@ -320,7 +320,7 @@ void main() { // pauses the subscription while cancelling the controller1 subscription.pause(); - // let the controller1 cancel complete + // let the cancellation of controller1 complete cancelCompleter1.complete(null); // make sure the controller2.stream is added to the controller From 845ae779cac49cb16ac22dde3e95cd1bb80bd234 Mon Sep 17 00:00:00 2001 From: Petrus Nguyen Thai Hoc Date: Thu, 26 Oct 2023 18:07:38 +0700 Subject: [PATCH 8/8] changelog [skip ci] --- CHANGELOG.md | 11 +++++++++++ pubspec.yaml | 2 +- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ce5a6e0cb..6ed91327e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,14 @@ +## 0.28.0-dev.1 (TBD) + +Feedback on this change appreciated as this is a dev release before 0.28.0 stable! + +### Changed + +* `switchMap`: when cancelling the previous inner subscription, + `switchMap` will pause the outer subscription and and wait for the inner subscription to be completely canceled. + It will then resume the outer subscription, and listen to the next inner Stream. + Any errors from canceling the previous inner subscription will now be forwarded to the resulting Stream. + ## 0.28.0-dev.0 (2023-07-26) Feedback on this change appreciated as this is a dev release before 0.28.0 stable! diff --git a/pubspec.yaml b/pubspec.yaml index 10793a377..e9b74dcd4 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,5 +1,5 @@ name: rxdart -version: 0.28.0-dev.0 +version: 0.28.0-dev.1 description: > RxDart is an implementation of the popular reactiveX api for asynchronous programming, leveraging the native Dart Streams api.