Skip to content

Commit d533848

Browse files
committed
Various improvements in combine implementation
1 parent 5b56221 commit d533848

File tree

8 files changed

+32
-33
lines changed

8 files changed

+32
-33
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -891,7 +891,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
891891
public static final fun first (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
892892
public static final fun flatMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
893893
public static final fun flatMapConcat (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
894-
public static final fun flatMapLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
894+
public static final fun flatMapLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
895895
public static final fun flatMapMerge (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
896896
public static synthetic fun flatMapMerge$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
897897
public static final fun flatten (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
@@ -908,7 +908,6 @@ public final class kotlinx/coroutines/flow/FlowKt {
908908
public static synthetic fun flowViaChannel$default (ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
909909
public static final fun flowWith (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
910910
public static synthetic fun flowWith$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
911-
public static final fun flowZip (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
912911
public static final fun fold (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
913912
public static final fun forEach (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)V
914913
public static final fun getDEFAULT_CONCURRENCY ()I
@@ -982,7 +981,7 @@ public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/cor
982981
}
983982

984983
public final class kotlinx/coroutines/flow/internal/CombineKt {
985-
public static final fun combine ([Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
984+
public static final fun combineInternal (Lkotlinx/coroutines/flow/FlowCollector;[Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
986985
}
987986

988987
public final class kotlinx/coroutines/flow/internal/FlowExceptions_commonKt {

kotlinx-coroutines-core/common/src/flow/Migration.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,4 +437,4 @@ public fun <T> Flow<T>.delayEach(timeMillis: Long): Flow<T> = onEach { delay(tim
437437
message = "Flow analogues of 'switchMap' are 'transformLatest', 'flatMapLatest' and 'mapLatest'",
438438
replaceWith = ReplaceWith("this.flatMapLatest(transform)")
439439
)
440-
public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): Flow<R> = noImpl()
440+
public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): Flow<R> = flatMapLatest(transform)

kotlinx-coroutines-core/common/src/flow/internal/Combine.kt

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@ package kotlinx.coroutines.flow.internal
88
import kotlinx.coroutines.*
99
import kotlinx.coroutines.channels.*
1010
import kotlinx.coroutines.flow.*
11-
import kotlinx.coroutines.internal.Symbol
11+
import kotlinx.coroutines.internal.*
1212
import kotlinx.coroutines.selects.*
1313

1414
internal fun getNull(): Symbol = NULL // Workaround for JS BE bug
1515

16-
internal suspend inline fun <T1, T2, R> FlowCollector<R>.combineTransformInternal(
16+
internal suspend fun <T1, T2, R> FlowCollector<R>.combineTransformInternal(
1717
first: Flow<T1>, second: Flow<T2>,
18-
crossinline transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
18+
transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
1919
) {
2020
coroutineScope {
2121
val firstChannel = asFairChannel(first)
@@ -45,11 +45,11 @@ internal suspend inline fun <T1, T2, R> FlowCollector<R>.combineTransformInterna
4545
}
4646

4747
@PublishedApi
48-
internal fun <T, R> combine(
49-
vararg flows: Flow<T>,
48+
internal suspend fun <R, T> FlowCollector<R>.combineInternal(
49+
flows: Array<out Flow<T>>,
5050
arrayFactory: () -> Array<T?>,
5151
transform: suspend FlowCollector<R>.(Array<T>) -> Unit
52-
): Flow<R> = flow {
52+
) {
5353
coroutineScope {
5454
val size = flows.size
5555
val channels =

kotlinx-coroutines-core/common/src/flow/internal/Merge.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ internal class ChannelFlowTransformLatest<T, R>(
2020
ChannelFlowTransformLatest(transform, flow, context, capacity)
2121

2222
override suspend fun flowCollect(collector: FlowCollector<R>) {
23+
assert { collector is SendingCollector } // So cancellation behaviour is not leaking into the downstream
2324
flowScope {
2425
var previousFlow: Job? = null
2526
flow.collect { value ->

kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import kotlin.jvm.*
2424
* generic function that may transform emitted element, skip it or emit it multiple times.
2525
*
2626
* This operator can be used as a building block for other operators, for example:
27+
*
2728
* ```
2829
* fun Flow<Int>.skipOddAndDuplicateEven(): Flow<Int> = transform { value ->
2930
* if (value % 2 == 0) { // Emit only even values, but twice

kotlinx-coroutines-core/common/src/flow/operators/Merge.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,8 @@ public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY
120120
* ```
121121
* produces `a b b_last`.
122122
*
123-
* This operator is [buffered][buffer] by default and size of its output buffer can be changed by applying subsequent [buffer] operator.
123+
* This operator is [buffered][buffer] by default
124+
* and size of its output buffer can be changed by applying subsequent [buffer] operator.
124125
*/
125126
@ExperimentalCoroutinesApi
126127
public fun <T, R> Flow<T>.transformLatest(@BuilderInference transform: suspend FlowCollector<R>.(value: T) -> Unit): Flow<R> =
@@ -149,7 +150,7 @@ public fun <T, R> Flow<T>.transformLatest(@BuilderInference transform: suspend F
149150
* This operator is [buffered][buffer] by default and size of its output buffer can be changed by applying subsequent [buffer] operator.
150151
*/
151152
@ExperimentalCoroutinesApi
152-
public fun <T, R> Flow<T>.flatMapLatest(@BuilderInference transform: (value: T) -> Flow<R>): Flow<R> =
153+
public inline fun <T, R> Flow<T>.flatMapLatest(@BuilderInference crossinline transform: suspend (value: T) -> Flow<R>): Flow<R> =
153154
transformLatest { emitAll(transform(it)) }
154155

155156
/**

kotlinx-coroutines-core/common/src/flow/operators/Zip.kt

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,9 @@ public inline fun <T1, T2, T3, T4, T5, R> combineTransform(
242242
public inline fun <reified T, R> combine(
243243
vararg flows: Flow<T>,
244244
crossinline transform: suspend (Array<T>) -> R
245-
): Flow<R> = combine(*flows, arrayFactory = { arrayOfNulls(flows.size) }, transform = { emit(transform(it)) })
245+
): Flow<R> = flow {
246+
combineInternal(flows, { arrayOfNulls(flows.size) }, { emit(transform(it)) })
247+
}
246248

247249
/**
248250
* Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
@@ -254,7 +256,9 @@ public inline fun <reified T, R> combine(
254256
public inline fun <reified T, R> combineTransform(
255257
vararg flows: Flow<T>,
256258
@BuilderInference crossinline transform: suspend FlowCollector<R>.(Array<T>) -> Unit
257-
): Flow<R> = combine(*flows, arrayFactory = { arrayOfNulls(flows.size) }, transform = { transform(it) })
259+
): Flow<R> = safeFlow {
260+
combineInternal(flows, { arrayOfNulls(flows.size) }, { transform(it) })
261+
}
258262

259263
/**
260264
* Returns a [Flow] whose values are generated with [transform] function by combining
@@ -266,7 +270,12 @@ public inline fun <reified T, R> combine(
266270
crossinline transform: suspend (Array<T>) -> R
267271
): Flow<R> {
268272
val flowArray = flows.toList().toTypedArray()
269-
return combine(*flowArray, arrayFactory = { arrayOfNulls(flowArray.size) }, transform = { emit(transform(it)) })
273+
return flow {
274+
combineInternal(
275+
flowArray,
276+
arrayFactory = { arrayOfNulls(flowArray.size) },
277+
transform = { emit(transform(it)) })
278+
}
270279
}
271280

272281
/**
@@ -281,7 +290,9 @@ public inline fun <reified T, R> combineTransform(
281290
@BuilderInference crossinline transform: suspend FlowCollector<R>.(Array<T>) -> Unit
282291
): Flow<R> {
283292
val flowArray = flows.toList().toTypedArray()
284-
return combine(*flowArray, arrayFactory = { arrayOfNulls(flowArray.size) }, transform = { transform(it) })
293+
return safeFlow {
294+
combineInternal(flowArray, { arrayOfNulls(flowArray.size) }, { transform(it) })
295+
}
285296
}
286297

287298
/**
@@ -297,22 +308,5 @@ public inline fun <reified T, R> combineTransform(
297308
* }
298309
* ```
299310
*/
300-
@JvmName("flowZip")
301311
@ExperimentalCoroutinesApi
302312
public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = zipImpl(this, other, transform)
303-
304-
/**
305-
* Zips values from the current flow (`this`) with [other] flow using provided [transform] function applied to each pair of values.
306-
* The resulting flow completes as soon as one of the flows completes and cancel is called on the remaining flow.
307-
*
308-
* It can be demonstrated with the following example:
309-
* ```
310-
* val flow = flowOf(1, 2, 3).delayEach(10)
311-
* val flow2 = flowOf("a", "b", "c", "d").delayEach(15)
312-
* flow.zip(flow2) { i, s -> i.toString() + s }.collect {
313-
* println(it) // Will print "1a 2b 3c"
314-
* }
315-
* ```
316-
*/
317-
@ExperimentalCoroutinesApi
318-
public fun <T1, T2, R> zip(flow: Flow<T1>, flow2: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = zipImpl(flow, flow2, transform)

kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public suspend fun Flow<*>.collect() = collect(NopCollector)
3535
*
3636
* This operator is usually used with [onEach], [onCompletion] and [catch] operators to process all emitted values
3737
* handle an exception that might occur in the upstream flow or during processing, for example:
38+
*
3839
* ```
3940
* flow
4041
* .onEach { value -> updateUi(value) }
@@ -90,7 +91,9 @@ public suspend inline fun <T> Flow<T>.collectIndexed(crossinline action: suspend
9091
* Terminal flow operator that collects the given flow with a provided [action].
9192
* The crucial difference from [collect] is that when the original flow emits a new value, [action] block for previous
9293
* value is cancelled.
94+
*
9395
* It can be demonstrated by the following example:
96+
*
9497
* ```
9598
* flow {
9699
* emit(1)

0 commit comments

Comments
 (0)