callbackFlow

@ExperimentalCoroutinesApi inline fun <T> callbackFlow(
    noinline block: suspend ProducerScope<T>.() -> Unit
): Flow<T>
(source)

Creates an instance of the cold Flow with elements that are sent to a SendChannel that is provided to the builder’s block of code via ProducerScope. It allows elements to be produced by the code that is running in a different context or running concurrently.

The resulting flow is cold, which means that block is called on each call of a terminal operator on the resulting flow.

This builder ensures thread-safety and context preservation, thus the provided ProducerScope can be used from any context, e.g. from the callback-based API. The resulting flow completes as soon as the code in the block and all its children complete. Use awaitClose as the last statement to keep it running. awaitClose argument is called when either flow consumer cancels flow collection or when callback-based API invokes SendChannel.close manually.

A channel with default buffer size is used. Use buffer operator on the resulting flow to specify a value other than default and to control what happens when data is produced faster than it is consumed, that is to control backpressure behavior.

Adjacent applications of callbackFlow, flowOn, buffer, produceIn, and broadcastIn are always fused so that only one properly configured channel is used for execution.

Example of usage:

fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
    val callback = object : Callback { // implementation of some callback interface
        override fun onNextValue(value: T) {
            // Note: offer drops value when buffer is full
            // Use either buffer(Channel.CONFLATED) or buffer(Channel.UNLIMITED) to avoid overfill
            offer(value)
        }
        override fun onApiError(cause: Throwable) {
            cancel(CancellationException("API Error", cause))
        }
        override fun onCompleted() = channel.close()
    }
    api.register(callback)
    // Suspend until either onCompleted or external cancellation are invoked
    awaitClose { api.unregister(callback) }
}