diff --git a/CHANGELOG.md b/CHANGELOG.md index ce5a6e0c..6ed91327 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,14 @@ +## 0.28.0-dev.1 (TBD) + +Feedback on this change appreciated as this is a dev release before 0.28.0 stable! + +### Changed + +* `switchMap`: when cancelling the previous inner subscription, + `switchMap` will pause the outer subscription and and wait for the inner subscription to be completely canceled. + It will then resume the outer subscription, and listen to the next inner Stream. + Any errors from canceling the previous inner subscription will now be forwarded to the resulting Stream. + ## 0.28.0-dev.0 (2023-07-26) Feedback on this change appreciated as this is a dev release before 0.28.0 stable! diff --git a/lib/src/transformers/delay_when.dart b/lib/src/transformers/delay_when.dart index 606f52aa..6ee9bedc 100644 --- a/lib/src/transformers/delay_when.dart +++ b/lib/src/transformers/delay_when.dart @@ -10,7 +10,7 @@ class _DelayWhenStreamSink extends ForwardingSink { final Stream? listenDelay; final subscriptions = >[]; - StreamSubscription? subscription; + StreamSubscription? delaySubscription; var closed = false; _DelayWhenStreamSink(this.itemDelaySelector, this.listenDelay); @@ -45,8 +45,8 @@ class _DelayWhenStreamSink extends ForwardingSink { @override Future? onCancel() { - final future = subscription?.cancel(); - subscription = null; + final future = delaySubscription?.cancel(); + delaySubscription = null; if (subscriptions.isEmpty) { return future; @@ -68,16 +68,16 @@ class _DelayWhenStreamSink extends ForwardingSink { } final completer = Completer.sync(); - subscription = listenDelay!.take(1).listen( + delaySubscription = listenDelay!.take(1).listen( null, onError: (Object e, StackTrace s) { - subscription?.cancel(); - subscription = null; + delaySubscription?.cancel(); + delaySubscription = null; completer.completeError(e, s); }, onDone: () { - subscription?.cancel(); - subscription = null; + delaySubscription?.cancel(); + delaySubscription = null; completer.complete(null); }, ); @@ -86,13 +86,13 @@ class _DelayWhenStreamSink extends ForwardingSink { @override void onPause() { - subscription?.pause(); + delaySubscription?.pause(); subscriptions.pauseAll(); } @override void onResume() { - subscription?.resume(); + delaySubscription?.resume(); subscriptions.resumeAll(); } } diff --git a/lib/src/transformers/switch_map.dart b/lib/src/transformers/switch_map.dart index 6954dc69..233a8eef 100644 --- a/lib/src/transformers/switch_map.dart +++ b/lib/src/transformers/switch_map.dart @@ -7,6 +7,7 @@ class _SwitchMapStreamSink extends ForwardingSink { final Stream Function(S value) _mapper; StreamSubscription? _mapperSubscription; bool _inputClosed = false; + bool _isCancelled = false; _SwitchMapStreamSink(this._mapper); @@ -20,7 +21,33 @@ class _SwitchMapStreamSink extends ForwardingSink { return; } - _mapperSubscription?.cancel(); + final mapperSubscription = _mapperSubscription; + + if (mapperSubscription == null) { + listenToInner(mappedStream); + return; + } + + _mapperSubscription = null; + pauseSubscription(); + mapperSubscription.cancel().onError((e, s) { + if (!_isCancelled) { + sink.addError(e, s); + } + }).whenComplete(() => resumeAndListenToInner(mappedStream)); + } + + void resumeAndListenToInner(Stream mappedStream) { + if (_isCancelled) { + return; + } + + resumeSubscription(); + listenToInner(mappedStream); + } + + void listenToInner(Stream mappedStream) { + assert(_mapperSubscription == null); _mapperSubscription = mappedStream.listen( sink.add, @@ -33,6 +60,14 @@ class _SwitchMapStreamSink extends ForwardingSink { } }, ); + + // https://github.com/dart-lang/stream_transform/blob/9743578b0119de6a8badd30bb16ef15d79bd3b15/lib/src/switch.dart#L71-L74 + // If a pause happens during an _mapperSubscription.cancel, + // we still listen to the next stream when the cancel is done. + // Then we immediately pause it again here. + if (sink.isPaused) { + _mapperSubscription?.pause(); + } } @override @@ -46,7 +81,11 @@ class _SwitchMapStreamSink extends ForwardingSink { } @override - FutureOr onCancel() => _mapperSubscription?.cancel(); + FutureOr onCancel() { + _isCancelled = true; + + return _mapperSubscription?.cancel(); + } @override void onListen() {} diff --git a/lib/src/utils/forwarding_sink.dart b/lib/src/utils/forwarding_sink.dart index c973d4a0..8b2765f5 100644 --- a/lib/src/utils/forwarding_sink.dart +++ b/lib/src/utils/forwarding_sink.dart @@ -2,6 +2,12 @@ import 'dart:async'; import 'package:rxdart/src/utils/error_and_stacktrace.dart'; +/// A enhanced [EventSink] that allows to check if the sink is paused. +abstract class EnhancedEventSink implements EventSink { + /// Whether the subscription would need to buffer events. + bool get isPaused; +} + /// A [Sink] that supports event hooks. /// /// This makes it suitable for certain rx transformers that need to @@ -11,14 +17,37 @@ import 'package:rxdart/src/utils/error_and_stacktrace.dart'; /// [Stream]s. See, for example, [Stream.eventTransformed] which uses /// `EventSink`s to transform events. abstract class ForwardingSink { - EventSink? _sink; + EnhancedEventSink? _sink; + StreamSubscription? _subscription; /// The output sink. - EventSink get sink => + /// @nonVirtual + /// @internal + EnhancedEventSink get sink => _sink ?? (throw StateError('Must call setSink(sink) before accessing!')); /// Set the output sink. - void setSink(EventSink sink) => _sink = sink; + /// @nonVirtual + /// @internal + void setSink(EnhancedEventSink sink) => _sink = sink; + + /// Set the upstream subscription + /// @nonVirtual + /// @internal + void setSubscription(StreamSubscription? subscription) => + _subscription = subscription; + + /// -------------------------------------------------------------------------- + + /// Pause the upstream subscription. + /// @nonVirtual + void pauseSubscription() => _subscription?.pause(); + + /// Resume the upstream subscription. + /// @nonVirtual + void resumeSubscription() => _subscription?.resume(); + + /// -------------------------------------------------------------------------- /// Handle data event void onData(T data); diff --git a/lib/src/utils/forwarding_stream.dart b/lib/src/utils/forwarding_stream.dart index 484ccd8a..1050eca4 100644 --- a/lib/src/utils/forwarding_stream.dart +++ b/lib/src/utils/forwarding_stream.dart @@ -38,6 +38,7 @@ Stream _forwardMulti( onError: sink.onError, onDone: sink.onDone, ); + sink.setSubscription(subscription); } final futureOrVoid = sink.onListen(); @@ -57,6 +58,8 @@ Stream _forwardMulti( final future = subscription?.cancel(); subscription = null; + sink.setSubscription(null); + return waitTwoFutures(future, sink.onCancel()); }; }, isBroadcast: true); @@ -84,6 +87,7 @@ Stream _forward( onError: sink.onError, onDone: sink.onDone, ); + sink.setSubscription(subscription); if (!stream.isBroadcast) { controller.onPause = () { @@ -97,7 +101,7 @@ Stream _forward( } } - sink.setSink(controller); + sink.setSink(_EnhancedEventSink(controller)); final futureOrVoid = sink.onListen(); if (futureOrVoid is Future) { futureOrVoid.then(listenToUpstream).onError((e, s) { @@ -115,13 +119,14 @@ Stream _forward( final future = subscription?.cancel(); subscription = null; + sink.setSubscription(null); return waitTwoFutures(future, sink.onCancel()); }; return controller.stream; } -class _MultiControllerSink implements EventSink { +class _MultiControllerSink implements EventSink, EnhancedEventSink { final MultiStreamController controller; _MultiControllerSink(this.controller); @@ -135,4 +140,26 @@ class _MultiControllerSink implements EventSink { @override void close() => controller.closeSync(); + + @override + bool get isPaused => controller.isPaused; +} + +class _EnhancedEventSink implements EnhancedEventSink { + final StreamController _controller; + + _EnhancedEventSink(this._controller); + + @override + void add(T event) => _controller.add(event); + + @override + void addError(Object error, [StackTrace? stackTrace]) => + _controller.addError(error, stackTrace); + + @override + void close() => _controller.close(); + + @override + bool get isPaused => _controller.isPaused; } diff --git a/pubspec.yaml b/pubspec.yaml index 10793a37..e9b74dcd 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,5 +1,5 @@ name: rxdart -version: 0.28.0-dev.0 +version: 0.28.0-dev.1 description: > RxDart is an implementation of the popular reactiveX api for asynchronous programming, leveraging the native Dart Streams api. diff --git a/test/transformers/switch_map_test.dart b/test/transformers/switch_map_test.dart index ae6da3d0..44580fbf 100644 --- a/test/transformers/switch_map_test.dart +++ b/test/transformers/switch_map_test.dart @@ -175,6 +175,169 @@ void main() { (s) => s.switchMap((v) => Stream.value(v)), ); }); + + test( + 'Rx.switchMap pauses subscription when cancelling inner subscription, then resume', + () async { + var isController1Cancelled = false; + final cancelCompleter1 = Completer.sync(); + final controller1 = StreamController() + ..add(0) + ..add(1) + ..onCancel = () async { + await Future.delayed(const Duration(milliseconds: 10)); + await cancelCompleter1.future; + isController1Cancelled = true; + }; + + final controller2 = StreamController() + ..add(2) + ..add(3) + ..onListen = () { + expect( + isController1Cancelled, + true, + reason: + 'controller1 should be cancelled before controller2 is listened to', + ); + }; + + final controller = StreamController>() + ..add(controller1); + final stream = controller.stream.switchMap((c) => c.stream); + + var expected = 0; + stream.listen( + expectAsync1( + (v) async { + expect(v, expected++); + + if (v == 1) { + // switch to controller2.stream + controller.add(controller2); + + await Future.delayed(const Duration(milliseconds: 10)); + cancelCompleter1.complete(null); + } + }, + count: 4, + ), + ); + }, + ); + + test('Rx.switchMap forwards errors from the cancel()', () { + var isController1Cancelled = false; + + final controller1 = StreamController() + ..add(0) + ..add(1) + ..onCancel = () async { + await Future.delayed(const Duration(milliseconds: 10)); + isController1Cancelled = true; + throw Exception('cancel error'); + }; + + final controller2 = StreamController() + ..add(2) + ..add(3) + ..onListen = () { + expect( + isController1Cancelled, + true, + reason: + 'controller1 should be cancelled before controller2 is listened to', + ); + }; + + final controller = StreamController>() + ..add(controller1); + final stream = controller.stream.switchMap((c) => c.stream); + + var expected = 0; + stream.listen( + expectAsync1( + (v) async { + expect(v, expected++); + + if (v == 1) { + // switch to controller2.stream + controller.add(controller2); + } + }, + count: 4, + ), + onError: expectAsync1( + (Object error) => expect(error, isException), + count: 1, + ), + ); + }); + + test( + 'Rx.switchMap pauses the next inner StreamSubscription when pausing while cancelling the previous inner Stream', + () { + var isController1Cancelled = false; + final cancelCompleter1 = Completer.sync(); + final controller1 = StreamController() + ..add(0) + ..add(1) + ..onCancel = () async { + await Future.delayed(const Duration(milliseconds: 10)); + await cancelCompleter1.future; + isController1Cancelled = true; + }; + + final controller2 = StreamController() + ..add(2) + ..add(3) + ..onListen = () { + expect( + isController1Cancelled, + true, + reason: + 'controller1 should be cancelled before controller2 is listened to', + ); + }; + + final controller = StreamController>() + ..add(controller1); + final stream = controller.stream.switchMap((c) => c.stream); + + var expected = 0; + late StreamSubscription subscription; + subscription = stream.listen( + expectAsync1( + (v) async { + expect(v, expected++); + + if (v == 1) { + // switch to controller2.stream + controller.add(controller2); + + await Future.delayed(const Duration(milliseconds: 10)); + + // pauses the subscription while cancelling the controller1 + subscription.pause(); + + // let the cancellation of controller1 complete + cancelCompleter1.complete(null); + + // make sure the controller2.stream is added to the controller + await pumpEventQueue(); + + // controller2.stream should be paused + expect(controller2.isPaused, true); + + // resume the subscription to continue the rest of the stream + subscription.resume(); + } + }, + count: 4, + ), + ); + }, + ); } class OnSubscriptionTriggerableStream extends Stream {