channelFlow

fun <T> channelFlow(block: suspend ProducerScope<T>.() -> Unit): Flow<T>

Creates an instance of a coldFlow with elements that are sent to a SendChannel provided to the builder's block of code via ProducerScope. It allows elements to be produced by code that is running in a different context or concurrently. The resulting flow is cold, which means that block is called every time a terminal operator is applied to the resulting flow.

This builder ensures thread-safety and context preservation, thus the provided ProducerScope can be used concurrently from different contexts. The resulting flow completes as soon as the code in the block and all its children completes. Use awaitClose as the last statement to keep it running. A more detailed example is provided in the documentation of callbackFlow.

A channel with the default buffer size is used. Use the buffer operator on the resulting flow to specify a user-defined value and to control what happens when data is produced faster than consumed, i.e. to control the back-pressure behavior.

Adjacent applications of channelFlow, flowOn, buffer, and produceIn are always fused so that only one properly configured channel is used for execution.

Examples of usage:

fun <T> Flow<T>.merge(other: Flow<T>): Flow<T> = channelFlow {
// collect from one coroutine and send it
launch {
collect { send(it) }
}
// collect and send from this coroutine, too, concurrently
other.collect { send(it) }
}

fun <T> contextualFlow(): Flow<T> = channelFlow {
// send from one coroutine
launch(Dispatchers.IO) {
send(computeIoValue())
}
// send from another coroutine, concurrently
launch(Dispatchers.Default) {
send(computeCpuValue())
}
}

Sources

common source
Link copied to clipboard