buffer

fun <T> Flow<T>.buffer(capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow<T>

Buffers flow emissions via channel of a specified capacity and runs collector in a separate coroutine.

Normally, flows are sequential. It means that the code of all operators is executed in the same coroutine. For example, consider the following code using onEach and collect operators:

flowOf("A", "B", "C")
.onEach { println("1$it") }
.collect { println("2$it") }

It is going to be executed in the following order by the coroutine Q that calls this code:

Q : -->-- [1A] -- [2A] -- [1B] -- [2B] -- [1C] -- [2C] -->--

So if the operator's code takes considerable time to execute, then the total execution time is going to be the sum of execution times for all operators.

The buffer operator creates a separate coroutine during execution for the flow it applies to. Consider the following code:

flowOf("A", "B", "C")
.onEach { println("1$it") }
.buffer() // <--------------- buffer between onEach and collect
.collect { println("2$it") }

It will use two coroutines for execution of the code. A coroutine Q that calls this code is going to execute collect, and the code before buffer will be executed in a separate new coroutine P concurrently with Q:

P : -->-- [1A] -- [1B] -- [1C] ---------->--  // flowOf(...).onEach { ... }

|
| channel // buffer()
V

Q : -->---------- [2A] -- [2B] -- [2C] -->-- // collect

When the operator's code takes some time to execute, this decreases the total execution time of the flow. A channel is used between the coroutines to send elements emitted by the coroutine P to the coroutine Q. If the code before buffer operator (in the coroutine P) is faster than the code after buffer operator (in the coroutine Q), then this channel will become full at some point and will suspend the producer coroutine P until the consumer coroutine Q catches up. The capacity parameter defines the size of this buffer.

Buffer overflow

By default, the emitter is suspended when the buffer overflows, to let collector catch up. This strategy can be overridden with an optional onBufferOverflow parameter so that the emitter is never suspended. In this case, on buffer overflow either the oldest value in the buffer is dropped with the DROP_OLDEST strategy and the latest emitted value is added to the buffer, or the latest value that is being emitted is dropped with the DROP_LATEST strategy, keeping the buffer intact. To implement either of the custom strategies, a buffer of at least one element is used.

Operator fusion

Adjacent applications of channelFlow, flowOn, buffer, and produceIn are always fused so that only one properly configured channel is used for execution.

Explicitly specified buffer capacity takes precedence over buffer() or buffer(Channel.BUFFERED) calls, which effectively requests a buffer of any size. Multiple requests with a specified buffer size produce a buffer with the sum of the requested buffer sizes.

A buffer call with a non-default value of the onBufferOverflow parameter overrides all immediately preceding buffering operators, because it never suspends its upstream, and thus no upstream buffer would ever be used.

Conceptual implementation

The actual implementation of buffer is not trivial due to the fusing, but conceptually its basic implementation is equivalent to the following code that can be written using produce coroutine builder to produce a channel and consumeEach extension to consume it:

fun <T> Flow<T>.buffer(capacity: Int = DEFAULT): Flow<T> = flow {
coroutineScope { // limit the scope of concurrent producer coroutine
val channel = produce(capacity = capacity) {
collect { send(it) } // send all to channel
}
// emit all received values
channel.consumeEach { emit(it) }
}
}

Conflation

Usage of this function with capacity of Channel.CONFLATED is a shortcut to buffer(onBufferOverflow = BufferOverflow.DROP_OLDEST), and is available via a separate conflate operator. See its documentation for details.

Parameters

capacity

type/capacity of the buffer between coroutines. Allowed values are the same as in Channel(...) factory function: BUFFERED (by default), CONFLATED, RENDEZVOUS, UNLIMITED or a non-negative value indicating an explicitly requested size.

onBufferOverflow

configures an action on buffer overflow (optional, defaults to SUSPEND, supported only when capacity >= 0 or capacity == Channel.BUFFERED, implicitly creates a channel with at least one buffered element).


fun <T> Flow<T>.buffer(capacity: Int = BUFFERED): Flow<T>