From 97a3f5ae51eb1c5779f0b780e586e1c7ee298740 Mon Sep 17 00:00:00 2001 From: Naeim Taheri Date: Sat, 24 Aug 2024 12:11:18 -0700 Subject: [PATCH] fix: gracefully handle nested sources --- core/src/main/scala/ox/channels/SourceOps.scala | 9 +++++---- .../test/scala/ox/channels/SourceOpsFlattenTest.scala | 7 +++++++ 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/ox/channels/SourceOps.scala b/core/src/main/scala/ox/channels/SourceOps.scala index 7ee1b5cb..f51a4b45 100644 --- a/core/src/main/scala/ox/channels/SourceOps.scala +++ b/core/src/main/scala/ox/channels/SourceOps.scala @@ -398,13 +398,15 @@ trait SourceOps[+T] { outer: Source[T] => */ def flatten[U](using Ox, StageCapacity, T <:< Source[U]): Source[U] = { val c2 = StageCapacity.newChannel[U] + case class Nested(child: Source[U]) forkPropagate(c2) { - var pool = List[Source[T] | Source[U]](this) + val childStream = this.mapAsView(Nested(_)) + var pool = List[Source[Nested] | Source[U]](childStream) repeatWhile { selectOrClosed(pool) match { case ChannelClosed.Done => - // TODO: best to remove the specific channel that signalled to be Done + // TODO: optimization idea: find a way to remove the specific channel that signalled to be Done pool = pool.filterNot(_.isClosedForReceiveDetail.contains(ChannelClosed.Done)) if pool.isEmpty then c2.doneOrClosed() @@ -413,8 +415,7 @@ trait SourceOps[+T] { outer: Source[T] => case ChannelClosed.Error(e) => c2.errorOrClosed(e) false - // TODO: we might go too deep and pull from non immediate children of the parent source - case t: Source[U] @unchecked => + case Nested(t) => pool = t :: pool true case r: U @unchecked => diff --git a/core/src/test/scala/ox/channels/SourceOpsFlattenTest.scala b/core/src/test/scala/ox/channels/SourceOpsFlattenTest.scala index a77431c8..55bd656d 100644 --- a/core/src/test/scala/ox/channels/SourceOpsFlattenTest.scala +++ b/core/src/test/scala/ox/channels/SourceOpsFlattenTest.scala @@ -35,6 +35,13 @@ class SourceOpsFlattenTest extends AnyFlatSpec with Matchers with OptionValues { } } + it should "handle nested sources" in { + supervised { + val source = Source.fromValues(Source.fromValues(Source.fromValues(10))) + source.flatten.toList.map(_.toList) should contain theSameElementsAs List(List(10)) + } + } + it should "pipe elements realtime" in { supervised { val source = Channel.bufferedDefault[Source[Int]]