channelFlow

@ExperimentalCoroutinesApi fun <T> channelFlow(
    block: suspend ProducerScope<T>.() -> Unit
): Flow<T>
(source)

Creates an instance of the cold Flow with elements that are sent to a SendChannel that is provided to the builder’s block of code via ProducerScope. It allows elements to be produced by the code that is running in a different context or running concurrently. The resulting flow is cold, which means that block is called on each call of a terminal operator on the resulting flow.

This builder ensures thread-safety and context preservation, thus the provided ProducerScope can be used concurrently from different contexts. The resulting flow completes as soon as the code in the block and all its children complete. Use awaitClose as the last statement to keep it running. For more detailed example please refer to callbackFlow documentation.

A channel with default buffer size is used. Use buffer operator on the resulting flow to specify a value other than default and to control what happens when data is produced faster than it is consumed, that is to control backpressure behavior.

Adjacent applications of channelFlow, flowOn, buffer, produceIn, and broadcastIn are always fused so that only one properly configured channel is used for execution.

Examples of usage:

fun <T> Flow<T>.merge(other: Flow<T>): Flow<T> = channelFlow {
    // collect from one coroutine and send it
    launch {
        collect { send(it) }
    }
    // collect and send from this coroutine, too, concurrently
    other.collect { send(it) }
}

fun <T> contextualFlow(): Flow<T> = channelFlow {
    // send from one coroutine
    launch(Dispatchers.IO) {
        send(computeIoValue())
    }
    // send from another coroutine, concurrently
    launch(Dispatchers.Default) {
        send(computeCpuValue())
    }
}