buffer

@ExperimentalCoroutinesApi fun <T> Flow<T>.buffer(
    capacity: Int = BUFFERED
): Flow<T>
(source)

Buffers flow emissions via channel of a specified capacity and runs collector in a separate coroutine.

Normally, flows are sequential. It means that the code of all operators is executed in the same coroutine. For example, consider the following code using onEach and collect operators:

flowOf("A", "B", "C")
    .onEach  { println("1$it") }
    .collect { println("2$it") }

It is going to be executed in the following order by the coroutine Q that calls this code:

Q : -->-- [1A] -- [2A] -- [1B] -- [2B] -- [1C] -- [2C] -->--

So if the operator’s code takes considerable time to execute, then the total execution time is going to be the sum of execution times for all operators.

The buffer operator creates a separate coroutine during execution for the flow it applies to. Consider the following code:

flowOf("A", "B", "C")
    .onEach  { println("1$it") }
    .buffer()  // <--------------- buffer between onEach and collect
    .collect { println("2$it") }

It will use two coroutines for execution of the code. A coroutine Q that calls this code is going to execute collect, and the code before buffer will be executed in a separate new coroutine P concurrently with Q:

P : -->-- [1A] -- [1B] -- [1C] ---------->--  // flowOf(...).onEach { ... }

                      |
                      | channel               // buffer()
                      V

Q : -->---------- [2A] -- [2B] -- [2C] -->--  // collect

When operator’s code takes time to execute this decreases the total execution time of the flow. A channel is used between the coroutines to send elements emitted by the coroutine P to the coroutine Q. If the code before buffer operator (in the coroutine P) is faster than the code after buffer operator (in the coroutine Q), then this channel will become full at some point and will suspend the producer coroutine P until the consumer coroutine Q catches up. The capacity parameter defines the size of this buffer.

Operator fusion

Adjacent applications of channelFlow, flowOn, buffer, produceIn, and broadcastIn are always fused so that only one properly configured channel is used for execution.

Explicitly specified buffer capacity takes precedence over buffer() or buffer(Channel.BUFFERED) calls, which effectively requests a buffer of any size. Multiple requests with a specified buffer size produce a buffer with the sum of the requested buffer sizes.

Conceptual implementation

The actual implementation of buffer is not trivial due to the fusing, but conceptually its implementation is equivalent to the following code that can be written using produce coroutine builder to produce a channel and consumeEach extension to consume it:

fun <T> Flow<T>.buffer(capacity: Int = DEFAULT): Flow<T> = flow {
    coroutineScope { // limit the scope of concurrent producer coroutine
        val channel = produce(capacity = capacity) {
            collect { send(it) } // send all to channel
        }
        // emit all received values
        channel.consumeEach { emit(it) }
    }
}

Conflation

Usage of this function with capacity of Channel.CONFLATED is provided as a shortcut via conflate operator. See its documentation for details.

Parameters

capacity - type/capacity of the buffer between coroutines. Allowed values are the same as in Channel(...) factory function: BUFFERED (by default), CONFLATED, RENDEZVOUS, UNLIMITED or a non-negative value indicating an explicitly requested size.