conflate

@ExperimentalCoroutinesApi fun <T> Flow<T>.conflate(): Flow<T> (source)

Conflates flow emissions via conflated channel and runs collector in a separate coroutine. The effect of this is that emitter is never suspended due to a slow collector, but collector always gets the most recent value emitted.

For example, consider the flow that emits integers from 1 to 30 with 100 ms delay between them:

val flow = flow {
    for (i in 1..30) {
        delay(100)
        emit(i)
    }
}

Applying conflate() operator to it allows a collector that delays 1 second on each element to get integers 1, 10, 20, 30:

val result = flow.conflate().onEach { delay(1000) }.toList()
assertEquals(listOf(1, 10, 20, 30), result)

Note that conflate operator is a shortcut for buffer with capacity of Channel.CONFLATED.

Operator fusion

Adjacent applications of conflate/buffer, channelFlow, flowOn, produceIn, and broadcastIn are always fused so that only one properly configured channel is used for execution. Conflation takes precedence over buffer() calls with any other capacity.