Skip to content

Commit

Permalink
FlowOps: sample operator (#254)
Browse files Browse the repository at this point in the history
corem authored Dec 16, 2024
1 parent 23d12d5 commit 875a605
Showing 5 changed files with 153 additions and 0 deletions.
41 changes: 41 additions & 0 deletions core/src/main/scala/ox/flow/FlowOps.scala
Original file line number Diff line number Diff line change
@@ -73,6 +73,19 @@ class FlowOps[+T]:
def filter(f: T => Boolean): Flow[T] = Flow.usingEmitInline: emit =>
last.run(FlowEmit.fromInline(t => if f(t) then emit.apply(t)))

/** Emits only every nth element emitted by this flow.
*
* @param n
* The interval between two emitted elements.
*/
def sample(n: Int): Flow[T] = Flow.usingEmitInline: emit =>
var sampleCounter = 0
last.run(
FlowEmit.fromInline: t =>
sampleCounter += 1
if n != 0 && sampleCounter % n == 0 then emit(t)
)

/** Applies the given mapping function `f` to each element emitted by this flow, for which the function is defined, and emits the result.
* If `f` is not defined at an element, the element will be skipped.
*
@@ -82,6 +95,23 @@ class FlowOps[+T]:
def collect[U](f: PartialFunction[T, U]): Flow[U] = Flow.usingEmitInline: emit =>
last.run(FlowEmit.fromInline(t => if f.isDefinedAt(t) then emit.apply(f(t))))

/** Transforms the elements of the flow by applying an accumulation function to each element, producing a new value at each step. The
* resulting flow contains the accumulated values at each point in the original flow.
*
* @param initial
* The initial value to start the accumulation.
* @param f
* The accumulation function that is applied to each element of the flow.
*/
def scan[V](initial: V)(f: (V, T) => V): Flow[V] = Flow.usingEmitInline: emit =>
emit(initial)
var accumulator = initial
last.run(
FlowEmit.fromInline: t =>
accumulator = f(accumulator, t)
emit(accumulator)
)

/** Applies the given effectful function `f` to each element emitted by this flow. The returned flow emits the elements unchanged. If `f`
* throws an exceptions, the flow fails and propagates the exception.
*/
@@ -441,6 +471,17 @@ class FlowOps[+T]:
emit((t, otherDefault)); true
)

/** Combines each element from this and the index of the element (starting at 0).
*/
def zipWithIndex: Flow[(T, Long)] = Flow.usingEmitInline: emit =>
var index = 0L
last.run(
FlowEmit.fromInline: t =>
val zipped = (t, index)
index += 1
emit(zipped)
)

/** Emits a given number of elements (determined byc `segmentSize`) from this flow to the returned flow, then emits the same number of
* elements from the `other` flow and repeats. The order of elements in both flows is preserved.
*
30 changes: 30 additions & 0 deletions core/src/test/scala/ox/flow/FlowOpsFilterTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package ox.flow

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

class FlowOpsFilterTest extends AnyFlatSpec with Matchers:
behavior of "filter"

it should "not filter anything from the empty flow" in:
val c = Flow.empty[Int]
val s = c.filter(_ % 2 == 0)
s.runToList() shouldBe List.empty

it should "filter out everything if no element meets 'f'" in:
val c = Flow.fromValues(1 to 10: _*)
val s = c.filter(_ => false)
s.runToList() shouldBe List.empty

it should "not filter anything if all the elements meet 'f'" in:
val c = Flow.fromValues(1 to 10: _*)
val s = c.filter(_ => true)
s.runToList() shouldBe (1 to 10)

it should "filter out elements that don't meet 'f'" in:
val c = Flow.fromValues(1 to 10: _*)
val s = c.filter(_ % 2 == 0)
s.runToList() shouldBe (2 to 10 by 2)

end FlowOpsFilterTest
32 changes: 32 additions & 0 deletions core/src/test/scala/ox/flow/FlowOpsSampleTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package ox.flow

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

class FlowOpsSampleTest extends AnyFlatSpec with Matchers:
behavior of "sample"

it should "not sample anything from an empty flow" in:
val c = Flow.empty[Int]
val s = c.sample(5)
s.runToList() shouldBe List.empty

it should "not sample anything when 'n == 0'" in:
val c = Flow.fromValues(1 to 10: _*)
val s = c.sample(0)
s.runToList() shouldBe List.empty

it should "sample every element of the flow when 'n == 1'" in:
val c = Flow.fromValues(1 to 10: _*)
val n = 1
val s = c.sample(n)
s.runToList() shouldBe (n to 10 by n)

it should "sample every nth element of the flow" in:
val c = Flow.fromValues(1 to 10: _*)
val n = 3
val s = c.sample(n)
s.runToList() shouldBe (n to 10 by n)

end FlowOpsSampleTest
30 changes: 30 additions & 0 deletions core/src/test/scala/ox/flow/FlowOpsScanTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package ox.flow

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

class FlowOpsScanTest extends AnyFlatSpec with Matchers:
behavior of "scan"

it should "scan the empty flow" in:
val flow: Flow[Int] = Flow.empty
val scannedFlow = flow.scan(0)((acc, el) => acc + el)
scannedFlow.runToList() shouldBe List(0)

it should "scan a flow of summed Int" in:
val flow = Flow.fromValues(1 to 10: _*)
val scannedFlow = flow.scan(0)((acc, el) => acc + el)
scannedFlow.runToList() shouldBe List(0, 1, 3, 6, 10, 15, 21, 28, 36, 45, 55)

it should "scan a flow of multiplied Int" in:
val flow = Flow.fromValues(1 to 10: _*)
val scannedFlow = flow.scan(1)((acc, el) => acc * el)
scannedFlow.runToList() shouldBe List(1, 1, 2, 6, 24, 120, 720, 5040, 40320, 362880, 3628800)

it should "scan a flow of concatenated String" in:
val flow = Flow.fromValues("f", "l", "o", "w")
val scannedFlow = flow.scan("my")((acc, el) => acc + el)
scannedFlow.runToList() shouldBe List("my", "myf", "myfl", "myflo", "myflow")

end FlowOpsScanTest
20 changes: 20 additions & 0 deletions core/src/test/scala/ox/flow/FlowOpsZipWithIndexTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package ox.flow

import org.scalatest.concurrent.Eventually
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

class FlowOpsZipWithIndexTest extends AnyFlatSpec with Matchers with Eventually:
behavior of "zipWithIndex"

it should "not zip anything from an empty flow" in:
val c = Flow.empty[Int]
val s = c.zipWithIndex
s.runToList() shouldBe List.empty

it should "zip flow with index" in:
val c = Flow.fromValues(1 to 5: _*)
val s = c.zipWithIndex
s.runToList() shouldBe List((1, 0), (2, 1), (3, 2), (4, 3), (5, 4))

end FlowOpsZipWithIndexTest

0 comments on commit 875a605

Please sign in to comment.