From 0efcb7456645820aa4d5a2f5754e1458750ab41f Mon Sep 17 00:00:00 2001 From: Wojciech M Date: Wed, 29 Dec 2021 23:16:50 +0100 Subject: [PATCH] Add Flowables.combineLatest() and Flowables.zip() versions that take bufferSize and delayError parameters --- .../kotlin/io/reactivex/rxkotlin/Flowables.kt | 302 ++++++++++++++++++ 1 file changed, 302 insertions(+) diff --git a/src/main/kotlin/io/reactivex/rxkotlin/Flowables.kt b/src/main/kotlin/io/reactivex/rxkotlin/Flowables.kt index 1de4fa8..21809b8 100644 --- a/src/main/kotlin/io/reactivex/rxkotlin/Flowables.kt +++ b/src/main/kotlin/io/reactivex/rxkotlin/Flowables.kt @@ -10,6 +10,7 @@ import io.reactivex.annotations.BackpressureSupport import io.reactivex.annotations.CheckReturnValue import io.reactivex.annotations.SchedulerSupport import io.reactivex.functions.* +import io.reactivex.internal.functions.Functions import org.reactivestreams.Publisher @@ -25,6 +26,18 @@ object Flowables { ): Flowable = Flowable.combineLatest(source1, source2, BiFunction { t1, t2 -> combineFunction(t1, t2) }) + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + fun combineLatest( + source1: Flowable, + source2: Flowable, + bufferSize: Int, + combineFunction: (T1, T2) -> R + ): Flowable = Flowable.combineLatest(listOf(source1, source2), + Functions.toFunction(combineFunction), bufferSize) + + /** * Emits `Pair` */ @@ -37,6 +50,18 @@ object Flowables { ): Flowable> = Flowable.combineLatest(source1, source2, BiFunction> { t1, t2 -> t1 to t2 }) + /** + * Emits `Pair` + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + fun combineLatest( + source1: Flowable, + source2: Flowable, + bufferSize: Int + ): Flowable> = combineLatest(source1, source2, bufferSize, ::Pair) + @CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @@ -49,6 +74,19 @@ object Flowables { ): Flowable = Flowable.combineLatest(source1, source2, source3, Function3 { t1: T1, t2: T2, t3: T3 -> combineFunction(t1, t2, t3) }) + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + fun combineLatest( + source1: Flowable, + source2: Flowable, + source3: Flowable, + bufferSize: Int, + combineFunction: (T1, T2, T3) -> R + ): Flowable = Flowable.combineLatest(listOf(source1, source2, source3), + Functions.toFunction(combineFunction), bufferSize) + + /** * Emits `Triple` */ @@ -62,6 +100,20 @@ object Flowables { ): Flowable> = Flowable.combineLatest(source1, source2, source3, Function3> { t1, t2, t3 -> Triple(t1, t2, t3) }) + /** + * Emits `Triple` + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + fun combineLatest( + source1: Flowable, + source2: Flowable, + source3: Flowable, + bufferSize: Int + ): Flowable> = + combineLatest(source1, source2, source3, bufferSize, ::Triple) + @CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @@ -75,6 +127,19 @@ object Flowables { ): Flowable = Flowable.combineLatest(source1, source2, source3, source4, Function4 { t1: T1, t2: T2, t3: T3, t4: T4 -> combineFunction(t1, t2, t3, t4) }) + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + fun combineLatest( + source1: Flowable, + source2: Flowable, + source3: Flowable, + source4: Flowable, + bufferSize: Int, + combineFunction: (T1, T2, T3, T4) -> R + ): Flowable = Flowable.combineLatest(listOf(source1, source2, source3, source4), + Functions.toFunction(combineFunction), bufferSize) + @CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @@ -89,6 +154,20 @@ object Flowables { ): Flowable = Flowable.combineLatest(source1, source2, source3, source4, source5, Function5 { t1: T1, t2: T2, t3: T3, t4: T4, t5: T5 -> combineFunction(t1, t2, t3, t4, t5) }) + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + fun combineLatest( + source1: Flowable, + source2: Flowable, + source3: Flowable, + source4: Flowable, + source5: Flowable, + bufferSize: Int, + combineFunction: (T1, T2, T3, T4, T5) -> R + ): Flowable = Flowable.combineLatest(listOf(source1, source2, source3, source4, source5), + Functions.toFunction(combineFunction), bufferSize) + @CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @@ -104,6 +183,41 @@ object Flowables { ): Flowable = Flowable.combineLatest(source1, source2, source3, source4, source5, source6, Function6 { t1: T1, t2: T2, t3: T3, t4: T4, t5: T5, t6: T6 -> combineFunction(t1, t2, t3, t4, t5, t6) }) + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + fun combineLatest( + source1: Flowable, + source2: Flowable, + source3: Flowable, + source4: Flowable, + source5: Flowable, + source6: Flowable, + bufferSize: Int, + combineFunction: (T1, T2, T3, T4, T5, T6) -> R + ): Flowable = Flowable.combineLatest(listOf(source1, source2, source3, source4, source5, source6), + Functions.toFunction(combineFunction), bufferSize) + + + @Deprecated("New type inference algorithm in Kotlin 1.4 makes this method obsolete. Method will be removed in future RxKotlin release.", + replaceWith = ReplaceWith("Flowable.combineLatest(source1, source2, source3, source4, source5, source6, source7, combineFunction)", "io.reactivex.Flowable"), + level = DeprecationLevel.WARNING) + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + fun combineLatest( + source1: Flowable, + source2: Flowable, + source3: Flowable, + source4: Flowable, + source5: Flowable, + source6: Flowable, + source7: Flowable, + bufferSize: Int, + combineFunction: (T1, T2, T3, T4, T5, T6, T7) -> R + ): Flowable = Flowable.combineLatest(listOf(source1, source2, source3, source4, source5, source6, source7), + Functions.toFunction(combineFunction), bufferSize) + @CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) @@ -139,6 +253,27 @@ object Flowables { @CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) + fun combineLatest( + source1: Flowable, + source2: Flowable, + source3: Flowable, + source4: Flowable, + source5: Flowable, + source6: Flowable, + source7: Flowable, + source8: Flowable, + bufferSize: Int, + combineFunction: (T1, T2, T3, T4, T5, T6, T7, T8) -> R + ): Flowable = Flowable.combineLatest(listOf(source1, source2, source3, source4, source5, source6, source7, source8), + Functions.toFunction(combineFunction), bufferSize) + + + @Deprecated("New type inference algorithm in Kotlin 1.4 makes this method obsolete. Method will be removed in future RxKotlin release.", + replaceWith = ReplaceWith("Flowable.combineLatest(source1, source2, source3, source4, source5, source6, source7, source8, source9, combineFunction)", "io.reactivex.Flowable"), + level = DeprecationLevel.WARNING) + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) inline fun combineLatest( source1: Flowable, source2: Flowable, @@ -153,6 +288,24 @@ object Flowables { ): Flowable = Flowable.combineLatest(source1, source2, source3, source4, source5, source6, source7, source8, source9, Function9 { t1: T1, t2: T2, t3: T3, t4: T4, t5: T5, t6: T6, t7: T7, t8: T8, t9: T9 -> combineFunction(t1, t2, t3, t4, t5, t6, t7, t8, t9) }) + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + fun combineLatest( + source1: Flowable, + source2: Flowable, + source3: Flowable, + source4: Flowable, + source5: Flowable, + source6: Flowable, + source7: Flowable, + source8: Flowable, + source9: Flowable, + bufferSize: Int, + combineFunction: (T1, T2, T3, T4, T5, T6, T7, T8, T9) -> R + ): Flowable = Flowable.combineLatest(listOf(source1, source2, source3, source4, source5, source6, source7, source8, source9), + Functions.toFunction(combineFunction), bufferSize) + @CheckReturnValue @BackpressureSupport(BackpressureKind.SPECIAL) @@ -173,6 +326,19 @@ object Flowables { ): Flowable = Flowable.zip(source1, source2, BiFunction { t1, t2 -> combineFunction(t1, t2) }) + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + inline fun zip( + source1: Flowable, + source2: Flowable, + delayError: Boolean = false, + bufferSize: Int = Flowable.bufferSize(), + crossinline combineFunction: (T1, T2) -> R + ): Flowable = Flowable.zip(source1, source2, + BiFunction { t1, t2 -> combineFunction(t1, t2) }, delayError, bufferSize) + + /** * Emits `Pair` */ @@ -182,6 +348,19 @@ object Flowables { fun zip(source1: Flowable, source2: Flowable): Flowable> = Flowable.zip(source1, source2, BiFunction> { t1, t2 -> t1 to t2 }) + /** + * Emits `Pair` + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + fun zip( + source1: Flowable, + source2: Flowable, + delayError: Boolean = false, + bufferSize: Int = Flowable.bufferSize() + ): Flowable> = zip(source1, source2, delayError, bufferSize, ::Pair) + @CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @@ -194,6 +373,16 @@ object Flowables { ): Flowable = Flowable.zip(source1, source2, source3, Function3 { t1: T1, t2: T2, t3: T3 -> combineFunction(t1, t2, t3) }) + fun zip( + source1: Flowable, + source2: Flowable, + source3: Flowable, + delayError: Boolean = false, + bufferSize: Int = Flowable.bufferSize(), + combineFunction: (T1, T2, T3) -> R + ): Flowable = Flowable.zipIterable(listOf(source1, source2, source3), + Functions.toFunction(combineFunction), delayError, bufferSize) + /** * Emits `Triple` */ @@ -207,6 +396,24 @@ object Flowables { ): Flowable> = Flowable.zip(source1, source2, source3, Function3> { t1, t2, t3 -> Triple(t1, t2, t3) }) + /** + * Emits `Triple` + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + fun zip( + source1: Flowable, + source2: Flowable, + source3: Flowable, + delayError: Boolean = false, + bufferSize: Int = Flowable.bufferSize() + ): Flowable> = zip(source1, source2, source3, delayError, bufferSize, ::Triple) + + + @Deprecated("New type inference algorithm in Kotlin 1.4 makes this method obsolete. Method will be removed in future RxKotlin release.", + replaceWith = ReplaceWith("Flowable.zip(source1, source2, source3, source4, combineFunction)", "io.reactivex.Flowable"), + level = DeprecationLevel.WARNING) @CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) @@ -222,6 +429,24 @@ object Flowables { @CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) + fun zip( + source1: Flowable, + source2: Flowable, + source3: Flowable, + source4: Flowable, + delayError: Boolean = false, + bufferSize: Int = Flowable.bufferSize(), + combineFunction: (T1, T2, T3, T4) -> R + ): Flowable = Flowable.zipIterable(listOf(source1, source2, source3, source4), + Functions.toFunction(combineFunction), delayError, bufferSize) + + + @Deprecated("New type inference algorithm in Kotlin 1.4 makes this method obsolete. Method will be removed in future RxKotlin release.", + replaceWith = ReplaceWith("Flowable.zip(source1, source2, source3, source4, source5, combineFunction)", "io.reactivex.Flowable"), + level = DeprecationLevel.WARNING) + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) inline fun zip( source1: Flowable, source2: Flowable, source3: Flowable, source4: Flowable, @@ -229,6 +454,16 @@ object Flowables { ): Flowable = Flowable.zip(source1, source2, source3, source4, source5, Function5 { t1: T1, t2: T2, t3: T3, t4: T4, t5: T5 -> combineFunction(t1, t2, t3, t4, t5) }) + fun zip( + source1: Flowable, source2: Flowable, + source3: Flowable, source4: Flowable, + source5: Flowable, + delayError: Boolean = false, + bufferSize: Int = Flowable.bufferSize(), + combineFunction: (T1, T2, T3, T4, T5) -> R + ): Flowable = Flowable.zipIterable(listOf(source1, source2, source3, source4, source5), + Functions.toFunction(combineFunction), delayError, bufferSize) + @CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @@ -243,6 +478,23 @@ object Flowables { @CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) + fun zip( + source1: Flowable, source2: Flowable, + source3: Flowable, source4: Flowable, + source5: Flowable, source6: Flowable, + delayError: Boolean = false, + bufferSize: Int = Flowable.bufferSize(), + combineFunction: (T1, T2, T3, T4, T5, T6) -> R + ): Flowable = Flowable.zipIterable(listOf(source1, source2, source3, source4, source5, source6), + Functions.toFunction(combineFunction), delayError, bufferSize) + + + @Deprecated("New type inference algorithm in Kotlin 1.4 makes this method obsolete. Method will be removed in future RxKotlin release.", + replaceWith = ReplaceWith("Flowable.zip(source1, source2, source3, source4, source5, source6, source7, combineFunction)", "io.reactivex.Flowable"), + level = DeprecationLevel.WARNING) + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) inline fun zip( source1: Flowable, source2: Flowable, source3: Flowable, source4: Flowable, @@ -251,6 +503,20 @@ object Flowables { ): Flowable = Flowable.zip(source1, source2, source3, source4, source5, source6, source7, Function7 { t1: T1, t2: T2, t3: T3, t4: T4, t5: T5, t6: T6, t7: T7 -> combineFunction(t1, t2, t3, t4, t5, t6, t7) }) + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + fun zip( + source1: Flowable, source2: Flowable, + source3: Flowable, source4: Flowable, + source5: Flowable, source6: Flowable, + source7: Flowable, + delayError: Boolean = false, + bufferSize: Int = Flowable.bufferSize(), + combineFunction: (T1, T2, T3, T4, T5, T6, T7) -> R + ): Flowable = Flowable.zipIterable(listOf(source1, source2, source3, source4, source5, source6, source7), + Functions.toFunction(combineFunction), delayError, bufferSize) + @CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @@ -267,6 +533,23 @@ object Flowables { @CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) + fun zip( + source1: Flowable, source2: Flowable, + source3: Flowable, source4: Flowable, + source5: Flowable, source6: Flowable, + source7: Flowable, source8: Flowable, + delayError: Boolean = false, + bufferSize: Int = Flowable.bufferSize(), + combineFunction: (T1, T2, T3, T4, T5, T6, T7, T8) -> R + ): Flowable = Flowable.zipIterable(listOf(source1, source2, source3, source4, source5, source6, source7, source8), + Functions.toFunction(combineFunction), delayError, bufferSize) + + @Deprecated("New type inference algorithm in Kotlin 1.4 makes this method obsolete. Method will be removed in future RxKotlin release.", + replaceWith = ReplaceWith("Flowable.zip(source1, source2, source3, source4, source5, source6, source7, source8, source9, combineFunction)", "io.reactivex.Flowable"), + level = DeprecationLevel.WARNING) + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) inline fun zip( source1: Flowable, source2: Flowable, @@ -281,6 +564,25 @@ object Flowables { ): Flowable = Flowable.zip(source1, source2, source3, source4, source5, source6, source7, source8, source9, Function9 { t1: T1, t2: T2, t3: T3, t4: T4, t5: T5, t6: T6, t7: T7, t8: T8, t9: T9 -> combineFunction(t1, t2, t3, t4, t5, t6, t7, t8, t9) }) + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + fun zip( + source1: Flowable, + source2: Flowable, + source3: Flowable, + source4: Flowable, + source5: Flowable, + source6: Flowable, + source7: Flowable, + source8: Flowable, + source9: Flowable, + delayError: Boolean = false, + bufferSize: Int = Flowable.bufferSize(), + combineFunction: (T1, T2, T3, T4, T5, T6, T7, T8, T9) -> R + ): Flowable = Flowable.zipIterable(listOf(source1, source2, source3, source4, source5, source6, source7, source8, source9), + Functions.toFunction(combineFunction), delayError, bufferSize) + } /**