Skip to content

Commit

Permalink
fix: gracefully handle nested sources
Browse files Browse the repository at this point in the history
  • Loading branch information
nimatrueway committed Aug 24, 2024
1 parent 3015ba3 commit 97a3f5a
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
9 changes: 5 additions & 4 deletions core/src/main/scala/ox/channels/SourceOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -398,13 +398,15 @@ trait SourceOps[+T] { outer: Source[T] =>
*/
def flatten[U](using Ox, StageCapacity, T <:< Source[U]): Source[U] = {
val c2 = StageCapacity.newChannel[U]
case class Nested(child: Source[U])

forkPropagate(c2) {
var pool = List[Source[T] | Source[U]](this)
val childStream = this.mapAsView(Nested(_))
var pool = List[Source[Nested] | Source[U]](childStream)
repeatWhile {
selectOrClosed(pool) match {
case ChannelClosed.Done =>
// TODO: best to remove the specific channel that signalled to be Done
// TODO: optimization idea: find a way to remove the specific channel that signalled to be Done
pool = pool.filterNot(_.isClosedForReceiveDetail.contains(ChannelClosed.Done))
if pool.isEmpty then
c2.doneOrClosed()
Expand All @@ -413,8 +415,7 @@ trait SourceOps[+T] { outer: Source[T] =>
case ChannelClosed.Error(e) =>
c2.errorOrClosed(e)
false
// TODO: we might go too deep and pull from non immediate children of the parent source
case t: Source[U] @unchecked =>
case Nested(t) =>
pool = t :: pool
true
case r: U @unchecked =>
Expand Down
7 changes: 7 additions & 0 deletions core/src/test/scala/ox/channels/SourceOpsFlattenTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ class SourceOpsFlattenTest extends AnyFlatSpec with Matchers with OptionValues {
}
}

it should "handle nested sources" in {
supervised {
val source = Source.fromValues(Source.fromValues(Source.fromValues(10)))
source.flatten.toList.map(_.toList) should contain theSameElementsAs List(List(10))
}
}

it should "pipe elements realtime" in {
supervised {
val source = Channel.bufferedDefault[Source[Int]]
Expand Down

0 comments on commit 97a3f5a

Please sign in to comment.