Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

switchMap: pause the outer subscription when cancelling the inner subscription, then resuming #737

Merged
merged 8 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
## 0.28.0-dev.1 (TBD)

Feedback on this change appreciated as this is a dev release before 0.28.0 stable!

### Changed

* `switchMap`: when cancelling the previous inner subscription,
`switchMap` will pause the outer subscription and and wait for the inner subscription to be completely canceled.
It will then resume the outer subscription, and listen to the next inner Stream.
Any errors from canceling the previous inner subscription will now be forwarded to the resulting Stream.

## 0.28.0-dev.0 (2023-07-26)

Feedback on this change appreciated as this is a dev release before 0.28.0 stable!
Expand Down
20 changes: 10 additions & 10 deletions lib/src/transformers/delay_when.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class _DelayWhenStreamSink<T> extends ForwardingSink<T, T> {
final Stream<void>? listenDelay;

final subscriptions = <StreamSubscription<void>>[];
StreamSubscription<void>? subscription;
StreamSubscription<void>? delaySubscription;
var closed = false;

_DelayWhenStreamSink(this.itemDelaySelector, this.listenDelay);
Expand Down Expand Up @@ -45,8 +45,8 @@ class _DelayWhenStreamSink<T> extends ForwardingSink<T, T> {

@override
Future<void>? onCancel() {
final future = subscription?.cancel();
subscription = null;
final future = delaySubscription?.cancel();
delaySubscription = null;

if (subscriptions.isEmpty) {
return future;
Expand All @@ -68,16 +68,16 @@ class _DelayWhenStreamSink<T> extends ForwardingSink<T, T> {
}

final completer = Completer<void>.sync();
subscription = listenDelay!.take(1).listen(
delaySubscription = listenDelay!.take(1).listen(
null,
onError: (Object e, StackTrace s) {
subscription?.cancel();
subscription = null;
delaySubscription?.cancel();
delaySubscription = null;
completer.completeError(e, s);
},
onDone: () {
subscription?.cancel();
subscription = null;
delaySubscription?.cancel();
delaySubscription = null;
completer.complete(null);
},
);
Expand All @@ -86,13 +86,13 @@ class _DelayWhenStreamSink<T> extends ForwardingSink<T, T> {

@override
void onPause() {
subscription?.pause();
delaySubscription?.pause();
subscriptions.pauseAll();
}

@override
void onResume() {
subscription?.resume();
delaySubscription?.resume();
subscriptions.resumeAll();
}
}
Expand Down
43 changes: 41 additions & 2 deletions lib/src/transformers/switch_map.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ class _SwitchMapStreamSink<S, T> extends ForwardingSink<S, T> {
final Stream<T> Function(S value) _mapper;
StreamSubscription<T>? _mapperSubscription;
bool _inputClosed = false;
bool _isCancelled = false;

_SwitchMapStreamSink(this._mapper);

Expand All @@ -20,7 +21,33 @@ class _SwitchMapStreamSink<S, T> extends ForwardingSink<S, T> {
return;
}

_mapperSubscription?.cancel();
final mapperSubscription = _mapperSubscription;

if (mapperSubscription == null) {
listenToInner(mappedStream);
return;
}

_mapperSubscription = null;
pauseSubscription();
mapperSubscription.cancel().onError<Object>((e, s) {
if (!_isCancelled) {
sink.addError(e, s);
}
}).whenComplete(() => resumeAndListenToInner(mappedStream));
}

void resumeAndListenToInner(Stream<T> mappedStream) {
if (_isCancelled) {
return;
}

resumeSubscription();
listenToInner(mappedStream);
}

void listenToInner(Stream<T> mappedStream) {
assert(_mapperSubscription == null);

_mapperSubscription = mappedStream.listen(
sink.add,
Expand All @@ -33,6 +60,14 @@ class _SwitchMapStreamSink<S, T> extends ForwardingSink<S, T> {
}
},
);

// https://github.com/dart-lang/stream_transform/blob/9743578b0119de6a8badd30bb16ef15d79bd3b15/lib/src/switch.dart#L71-L74
// If a pause happens during an _mapperSubscription.cancel,
// we still listen to the next stream when the cancel is done.
// Then we immediately pause it again here.
if (sink.isPaused) {
_mapperSubscription?.pause();
}
}

@override
Expand All @@ -46,7 +81,11 @@ class _SwitchMapStreamSink<S, T> extends ForwardingSink<S, T> {
}

@override
FutureOr<void> onCancel() => _mapperSubscription?.cancel();
FutureOr<void> onCancel() {
_isCancelled = true;

return _mapperSubscription?.cancel();
}

@override
void onListen() {}
Expand Down
35 changes: 32 additions & 3 deletions lib/src/utils/forwarding_sink.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ import 'dart:async';

import 'package:rxdart/src/utils/error_and_stacktrace.dart';

/// A enhanced [EventSink] that allows to check if the sink is paused.
abstract class EnhancedEventSink<T> implements EventSink<T> {
/// Whether the subscription would need to buffer events.
bool get isPaused;
}

/// A [Sink] that supports event hooks.
///
/// This makes it suitable for certain rx transformers that need to
Expand All @@ -11,14 +17,37 @@ import 'package:rxdart/src/utils/error_and_stacktrace.dart';
/// [Stream]s. See, for example, [Stream.eventTransformed] which uses
/// `EventSink`s to transform events.
abstract class ForwardingSink<T, R> {
EventSink<R>? _sink;
EnhancedEventSink<R>? _sink;
StreamSubscription<T>? _subscription;

/// The output sink.
EventSink<R> get sink =>
/// @nonVirtual
/// @internal
EnhancedEventSink<R> get sink =>
_sink ?? (throw StateError('Must call setSink(sink) before accessing!'));

/// Set the output sink.
void setSink(EventSink<R> sink) => _sink = sink;
/// @nonVirtual
/// @internal
void setSink(EnhancedEventSink<R> sink) => _sink = sink;

/// Set the upstream subscription
/// @nonVirtual
/// @internal
void setSubscription(StreamSubscription<T>? subscription) =>
_subscription = subscription;

/// --------------------------------------------------------------------------

/// Pause the upstream subscription.
/// @nonVirtual
void pauseSubscription() => _subscription?.pause();

/// Resume the upstream subscription.
/// @nonVirtual
void resumeSubscription() => _subscription?.resume();

/// --------------------------------------------------------------------------

/// Handle data event
void onData(T data);
Expand Down
31 changes: 29 additions & 2 deletions lib/src/utils/forwarding_stream.dart
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Stream<R> _forwardMulti<T, R>(
onError: sink.onError,
onDone: sink.onDone,
);
sink.setSubscription(subscription);
}

final futureOrVoid = sink.onListen();
Expand All @@ -57,6 +58,8 @@ Stream<R> _forwardMulti<T, R>(

final future = subscription?.cancel();
subscription = null;
sink.setSubscription(null);

return waitTwoFutures(future, sink.onCancel());
};
}, isBroadcast: true);
Expand Down Expand Up @@ -84,6 +87,7 @@ Stream<R> _forward<T, R>(
onError: sink.onError,
onDone: sink.onDone,
);
sink.setSubscription(subscription);

if (!stream.isBroadcast) {
controller.onPause = () {
Expand All @@ -97,7 +101,7 @@ Stream<R> _forward<T, R>(
}
}

sink.setSink(controller);
sink.setSink(_EnhancedEventSink(controller));
final futureOrVoid = sink.onListen();
if (futureOrVoid is Future<void>) {
futureOrVoid.then(listenToUpstream).onError<Object>((e, s) {
Expand All @@ -115,13 +119,14 @@ Stream<R> _forward<T, R>(

final future = subscription?.cancel();
subscription = null;
sink.setSubscription(null);

return waitTwoFutures(future, sink.onCancel());
};
return controller.stream;
}

class _MultiControllerSink<T> implements EventSink<T> {
class _MultiControllerSink<T> implements EventSink<T>, EnhancedEventSink<T> {
final MultiStreamController<T> controller;

_MultiControllerSink(this.controller);
Expand All @@ -135,4 +140,26 @@ class _MultiControllerSink<T> implements EventSink<T> {

@override
void close() => controller.closeSync();

@override
bool get isPaused => controller.isPaused;
}

class _EnhancedEventSink<T> implements EnhancedEventSink<T> {
final StreamController<T> _controller;

_EnhancedEventSink(this._controller);

@override
void add(T event) => _controller.add(event);

@override
void addError(Object error, [StackTrace? stackTrace]) =>
_controller.addError(error, stackTrace);

@override
void close() => _controller.close();

@override
bool get isPaused => _controller.isPaused;
}
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: rxdart
version: 0.28.0-dev.0
version: 0.28.0-dev.1
description: >
RxDart is an implementation of the popular reactiveX api for asynchronous
programming, leveraging the native Dart Streams api.
Expand Down
Loading