diff --git a/core/src/main/scala/ox/flow/FlowOps.scala b/core/src/main/scala/ox/flow/FlowOps.scala index 1c8e04d4..64b5b6c8 100644 --- a/core/src/main/scala/ox/flow/FlowOps.scala +++ b/core/src/main/scala/ox/flow/FlowOps.scala @@ -73,6 +73,19 @@ class FlowOps[+T]: def filter(f: T => Boolean): Flow[T] = Flow.usingEmitInline: emit => last.run(FlowEmit.fromInline(t => if f(t) then emit.apply(t))) + /** Emits only every nth element emitted by this flow. + * + * @param n + * The interval between two emitted elements. + */ + def sample(n: Int): Flow[T] = Flow.usingEmitInline: emit => + var sampleCounter = 0 + last.run( + FlowEmit.fromInline: t => + sampleCounter += 1 + if n != 0 && sampleCounter % n == 0 then emit(t) + ) + /** Applies the given mapping function `f` to each element emitted by this flow, for which the function is defined, and emits the result. * If `f` is not defined at an element, the element will be skipped. * @@ -82,6 +95,23 @@ class FlowOps[+T]: def collect[U](f: PartialFunction[T, U]): Flow[U] = Flow.usingEmitInline: emit => last.run(FlowEmit.fromInline(t => if f.isDefinedAt(t) then emit.apply(f(t)))) + /** Transforms the elements of the flow by applying an accumulation function to each element, producing a new value at each step. The + * resulting flow contains the accumulated values at each point in the original flow. + * + * @param initial + * The initial value to start the accumulation. + * @param f + * The accumulation function that is applied to each element of the flow. + */ + def scan[V](initial: V)(f: (V, T) => V): Flow[V] = Flow.usingEmitInline: emit => + emit(initial) + var accumulator = initial + last.run( + FlowEmit.fromInline: t => + accumulator = f(accumulator, t) + emit(accumulator) + ) + /** Applies the given effectful function `f` to each element emitted by this flow. The returned flow emits the elements unchanged. If `f` * throws an exceptions, the flow fails and propagates the exception. */ @@ -441,6 +471,17 @@ class FlowOps[+T]: emit((t, otherDefault)); true ) + /** Combines each element from this and the index of the element (starting at 0). + */ + def zipWithIndex: Flow[(T, Long)] = Flow.usingEmitInline: emit => + var index = 0L + last.run( + FlowEmit.fromInline: t => + val zipped = (t, index) + index += 1 + emit(zipped) + ) + /** Emits a given number of elements (determined byc `segmentSize`) from this flow to the returned flow, then emits the same number of * elements from the `other` flow and repeats. The order of elements in both flows is preserved. * diff --git a/core/src/test/scala/ox/flow/FlowOpsFilterTest.scala b/core/src/test/scala/ox/flow/FlowOpsFilterTest.scala new file mode 100644 index 00000000..a76d2bd7 --- /dev/null +++ b/core/src/test/scala/ox/flow/FlowOpsFilterTest.scala @@ -0,0 +1,30 @@ +package ox.flow + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import ox.* + +class FlowOpsFilterTest extends AnyFlatSpec with Matchers: + behavior of "filter" + + it should "not filter anything from the empty flow" in: + val c = Flow.empty[Int] + val s = c.filter(_ % 2 == 0) + s.runToList() shouldBe List.empty + + it should "filter out everything if no element meets 'f'" in: + val c = Flow.fromValues(1 to 10: _*) + val s = c.filter(_ => false) + s.runToList() shouldBe List.empty + + it should "not filter anything if all the elements meet 'f'" in: + val c = Flow.fromValues(1 to 10: _*) + val s = c.filter(_ => true) + s.runToList() shouldBe (1 to 10) + + it should "filter out elements that don't meet 'f'" in: + val c = Flow.fromValues(1 to 10: _*) + val s = c.filter(_ % 2 == 0) + s.runToList() shouldBe (2 to 10 by 2) + +end FlowOpsFilterTest diff --git a/core/src/test/scala/ox/flow/FlowOpsSampleTest.scala b/core/src/test/scala/ox/flow/FlowOpsSampleTest.scala new file mode 100644 index 00000000..d711c55b --- /dev/null +++ b/core/src/test/scala/ox/flow/FlowOpsSampleTest.scala @@ -0,0 +1,32 @@ +package ox.flow + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import ox.* + +class FlowOpsSampleTest extends AnyFlatSpec with Matchers: + behavior of "sample" + + it should "not sample anything from an empty flow" in: + val c = Flow.empty[Int] + val s = c.sample(5) + s.runToList() shouldBe List.empty + + it should "not sample anything when 'n == 0'" in: + val c = Flow.fromValues(1 to 10: _*) + val s = c.sample(0) + s.runToList() shouldBe List.empty + + it should "sample every element of the flow when 'n == 1'" in: + val c = Flow.fromValues(1 to 10: _*) + val n = 1 + val s = c.sample(n) + s.runToList() shouldBe (n to 10 by n) + + it should "sample every nth element of the flow" in: + val c = Flow.fromValues(1 to 10: _*) + val n = 3 + val s = c.sample(n) + s.runToList() shouldBe (n to 10 by n) + +end FlowOpsSampleTest diff --git a/core/src/test/scala/ox/flow/FlowOpsScanTest.scala b/core/src/test/scala/ox/flow/FlowOpsScanTest.scala new file mode 100644 index 00000000..50a9e6ce --- /dev/null +++ b/core/src/test/scala/ox/flow/FlowOpsScanTest.scala @@ -0,0 +1,30 @@ +package ox.flow + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import ox.* + +class FlowOpsScanTest extends AnyFlatSpec with Matchers: + behavior of "scan" + + it should "scan the empty flow" in: + val flow: Flow[Int] = Flow.empty + val scannedFlow = flow.scan(0)((acc, el) => acc + el) + scannedFlow.runToList() shouldBe List(0) + + it should "scan a flow of summed Int" in: + val flow = Flow.fromValues(1 to 10: _*) + val scannedFlow = flow.scan(0)((acc, el) => acc + el) + scannedFlow.runToList() shouldBe List(0, 1, 3, 6, 10, 15, 21, 28, 36, 45, 55) + + it should "scan a flow of multiplied Int" in: + val flow = Flow.fromValues(1 to 10: _*) + val scannedFlow = flow.scan(1)((acc, el) => acc * el) + scannedFlow.runToList() shouldBe List(1, 1, 2, 6, 24, 120, 720, 5040, 40320, 362880, 3628800) + + it should "scan a flow of concatenated String" in: + val flow = Flow.fromValues("f", "l", "o", "w") + val scannedFlow = flow.scan("my")((acc, el) => acc + el) + scannedFlow.runToList() shouldBe List("my", "myf", "myfl", "myflo", "myflow") + +end FlowOpsScanTest diff --git a/core/src/test/scala/ox/flow/FlowOpsZipWithIndexTest.scala b/core/src/test/scala/ox/flow/FlowOpsZipWithIndexTest.scala new file mode 100644 index 00000000..079f40fd --- /dev/null +++ b/core/src/test/scala/ox/flow/FlowOpsZipWithIndexTest.scala @@ -0,0 +1,20 @@ +package ox.flow + +import org.scalatest.concurrent.Eventually +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class FlowOpsZipWithIndexTest extends AnyFlatSpec with Matchers with Eventually: + behavior of "zipWithIndex" + + it should "not zip anything from an empty flow" in: + val c = Flow.empty[Int] + val s = c.zipWithIndex + s.runToList() shouldBe List.empty + + it should "zip flow with index" in: + val c = Flow.fromValues(1 to 5: _*) + val s = c.zipWithIndex + s.runToList() shouldBe List((1, 0), (2, 1), (3, 2), (4, 3), (5, 4)) + +end FlowOpsZipWithIndexTest