Flow

interface Flow<out T>

An asynchronous data stream that sequentially emits values and completes normally or with an exception.

Intermediate operators on the flow such as map, filter, take, zip, etc are functions that are applied to the upstream flow or flows and return a downstream flow where further operators can be applied to. Intermediate operations do not execute any code in the flow and are not suspending functions themselves. They only set up a chain of operations for future execution and quickly return. This is known as a cold flow property.

Terminal operators on the flow are either suspending functions such as collect, single, reduce, toList, etc. or launchIn operator that starts collection of the flow in the given scope. They are applied to the upstream flow and trigger execution of all operations. Execution of the flow is also called collecting the flow and is always performed in a suspending manner without actual blocking. Terminal operators complete normally or exceptionally depending on successful or failed execution of all the flow operations in the upstream. The most basic terminal operator is collect, for example:

try {
flow.collect { value ->
println("Received $value")
}
} catch (e: Exception) {
println("The flow has thrown an exception: $e")
}

By default, flows are sequential and all flow operations are executed sequentially in the same coroutine, with an exception for a few operations specifically designed to introduce concurrency into flow execution such as buffer and flatMapMerge. See their documentation for details.

The Flow interface does not carry information whether a flow is a cold stream that can be collected repeatedly and triggers execution of the same code every time it is collected, or if it is a hot stream that emits different values from the same running source on each collection. Usually flows represent cold streams, but there is a SharedFlow subtype that represents hot streams. In addition to that, any flow can be turned into a hot one by the stateIn and shareIn operators, or by converting the flow into a hot channel via the produceIn operator.

Flow builders

There are the following basic ways to create a flow:

  • flowOf(...) functions to create a flow from a fixed set of values.

  • asFlow() extension functions on various types to convert them into flows.

  • flow { ... } builder function to construct arbitrary flows from sequential calls to emit function.

  • channelFlow { ... } builder function to construct arbitrary flows from potentially concurrent calls to the send function.

  • MutableStateFlow and MutableSharedFlow define the corresponding constructor functions to create a hot flow that can be directly updated.

Flow constraints

All implementations of the Flow interface must adhere to two key properties described in detail below:

  • Context preservation.

  • Exception transparency.

These properties ensure the ability to perform local reasoning about the code with flows and modularize the code in such a way that upstream flow emitters can be developed separately from downstream flow collectors. A user of a flow does not need to be aware of implementation details of the upstream flows it uses.

Context preservation

The flow has a context preservation property: it encapsulates its own execution context and never propagates or leaks it downstream, thus making reasoning about the execution context of particular transformations or terminal operations trivial.

There is only one way to change the context of a flow: the flowOn operator that changes the upstream context ("everything above the flowOn operator"). For additional information refer to its documentation.

This reasoning can be demonstrated in practice:

val flowA = flowOf(1, 2, 3)
.map { it + 1 } // Will be executed in ctxA
.flowOn(ctxA) // Changes the upstream context: flowOf and map

// Now we have a context-preserving flow: it is executed somewhere but this information is encapsulated in the flow itself

val filtered = flowA // ctxA is encapsulated in flowA
.filter { it == 3 } // Pure operator without a context yet

withContext(Dispatchers.Main) {
// All non-encapsulated operators will be executed in Main: filter and single
val result = filtered.single()
myUi.text = result
}

From the implementation point of view, it means that all flow implementations should only emit from the same coroutine. This constraint is efficiently enforced by the default flow builder. The flow builder should be used if the flow implementation does not start any coroutines. Its implementation prevents most of the development mistakes:

val myFlow = flow {
// GlobalScope.launch { // is prohibited
// launch(Dispatchers.IO) { // is prohibited
// withContext(CoroutineName("myFlow")) // is prohibited
emit(1) // OK
coroutineScope {
emit(2) // OK -- still the same coroutine
}
}

Use channelFlow if the collection and emission of a flow are to be separated into multiple coroutines. It encapsulates all the context preservation work and allows you to focus on your domain-specific problem, rather than invariant implementation details. It is possible to use any combination of coroutine builders from within channelFlow.

If you are looking for performance and are sure that no concurrent emits and context jumps will happen, the flow builder can be used alongside a coroutineScope or supervisorScope instead:

  • Scoped primitive should be used to provide a CoroutineScope.

  • Changing the context of emission is prohibited, no matter whether it is withContext(ctx) or a builder argument (e.g. launch(ctx)).

  • Collecting another flow from a separate context is allowed, but it has the same effect as applying the flowOn operator to that flow, which is more efficient.

Exception transparency

Flow implementations never catch or handle exceptions that occur in downstream flows. From the implementation standpoint it means that calls to emit and emitAll shall never be wrapped into try { ... } catch { ... } blocks. Exception handling in flows shall be performed with catch operator and it is designed to only catch exceptions coming from upstream flows while passing all downstream exceptions. Similarly, terminal operators like collect throw any unhandled exceptions that occur in their code or in upstream flows, for example:

flow { emitData() }
.map { computeOne(it) }
.catch { ... } // catches exceptions in emitData and computeOne
.map { computeTwo(it) }
.collect { process(it) } // throws exceptions from process and computeTwo

The same reasoning can be applied to the onCompletion operator that is a declarative replacement for the finally block.

Failure to adhere to the exception transparency requirement can lead to strange behaviors which make it hard to reason about the code because an exception in the collect { ... } could be somehow "caught" by an upstream flow, limiting the ability of local reasoning about the code.

Flow machinery enforces exception transparency at runtime and throws IllegalStateException on any attempt to emit a value, if an exception has been thrown on previous attempt.

Reactive streams

Flow is Reactive Streams compliant, you can safely interop it with reactive streams using Flow.asPublisher and Publisher.asFlow from kotlinx-coroutines-reactive module.

Not stable for inheritance

The Flow interface is not stable for inheritance in 3rd party libraries, as new methods might be added to this interface in the future, but is stable for use. Use the flow { ... } builder function to create an implementation.

Functions

collect
Link copied to clipboard
common
abstract suspend fun collect(collector: FlowCollector<T>)
Accepts the given collector and emits values into it.

Inheritors

AbstractFlow
Link copied to clipboard
SharedFlow
Link copied to clipboard

Extensions

broadcastIn
Link copied to clipboard
common
fun <T> Flow<T>.broadcastIn(scope: CoroutineScope, start: CoroutineStart = CoroutineStart.LAZY): BroadcastChannel<T>

Deprecated

This API is deprecated.
buffer
Link copied to clipboard
common
fun <T> Flow<T>.buffer(capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow<T>
Buffers flow emissions via channel of a specified capacity and runs collector in a separate coroutine.
fun <T> Flow<T>.buffer(capacity: Int = BUFFERED): Flow<T>
cache
Link copied to clipboard
common
fun <T> Flow<T>.cache(): Flow<T>
cancellable
Link copied to clipboard
common
fun <T> Flow<T>.cancellable(): Flow<T>
Returns a flow which checks cancellation status on each emission and throws the corresponding cancellation cause if flow collector was cancelled.
catch
Link copied to clipboard
common
fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(cause: Throwable) -> Unit): Flow<T>
Catches exceptions in the flow completion and calls a specified action with the caught exception.
collect
Link copied to clipboard
common
suspend fun Flow<*>.collect()
Terminal flow operator that collects the given flow but ignores all emitted values.
inline suspend fun <T> Flow<T>.collect(crossinline action: suspend (T) -> Unit)
Terminal flow operator that collects the given flow with a provided action.
collectIndexed
Link copied to clipboard
common
inline suspend fun <T> Flow<T>.collectIndexed(crossinline action: suspend (index: Int, T) -> Unit)
Terminal flow operator that collects the given flow with a provided action that takes the index of an element (zero-based) and the element.
collectLatest
Link copied to clipboard
common
suspend fun <T> Flow<T>.collectLatest(action: suspend (T) -> Unit)
Terminal flow operator that collects the given flow with a provided action.
combine
Link copied to clipboard
common
@JvmName(name = flowCombine)
fun <T1, T2, R> Flow<T1>.combine(flow: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R>
Returns a Flow whose values are generated with transform function by combining the most recently emitted values by each flow.
combineLatest
Link copied to clipboard
common
fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R>
fun <T1, T2, T3, R> Flow<T1>.combineLatest(other: Flow<T2>, other2: Flow<T3>, transform: suspend (T1, T2, T3) -> R): Flow<R>
fun <T1, T2, T3, T4, R> Flow<T1>.combineLatest(other: Flow<T2>, other2: Flow<T3>, other3: Flow<T4>, transform: suspend (T1, T2, T3, T4) -> R): Flow<R>
fun <T1, T2, T3, T4, T5, R> Flow<T1>.combineLatest(other: Flow<T2>, other2: Flow<T3>, other3: Flow<T4>, other4: Flow<T5>, transform: suspend (T1, T2, T3, T4, T5) -> R): Flow<R>
combineTransform
Link copied to clipboard
common
@JvmName(name = flowCombineTransform)
fun <T1, T2, R> Flow<T1>.combineTransform(flow: Flow<T2>, transform: suspend FlowCollector<R>.(T1, T2) -> Unit): Flow<R>
Returns a Flow whose values are generated by transform function that process the most recently emitted values by each flow.
conflate
Link copied to clipboard
common
fun <T> Flow<T>.conflate(): Flow<T>
Conflates flow emissions via conflated channel and runs collector in a separate coroutine.
count
Link copied to clipboard
common
suspend fun <T> Flow<T>.count(): Int
Returns the number of elements in this flow.
suspend fun <T> Flow<T>.count(predicate: suspend (T) -> Boolean): Int
Returns the number of elements matching the given predicate.
debounce
Link copied to clipboard
common
fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T>
Returns a flow that mirrors the original flow, but filters out values that are followed by the newer values within the given timeout.
fun <T> Flow<T>.debounce(timeoutMillis: (T) -> Long): Flow<T>
Returns a flow that mirrors the original flow, but filters out values that are followed by the newer values within the given timeout.
fun <T> Flow<T>.debounce(timeout: Duration): Flow<T>
Returns a flow that mirrors the original flow, but filters out values that are followed by the newer values within the given timeout.
@JvmName(name = debounceDuration)
fun <T> Flow<T>.debounce(timeout: (T) -> Duration): Flow<T>
Returns a flow that mirrors the original flow, but filters out values that are followed by the newer values within the given timeout.
distinctUntilChanged
Link copied to clipboard
common
fun <T> Flow<T>.distinctUntilChanged(): Flow<T>
Returns flow where all subsequent repetitions of the same value are filtered out.
fun <T> Flow<T>.distinctUntilChanged(areEquivalent: (T, T) -> Boolean): Flow<T>
Returns flow where all subsequent repetitions of the same value are filtered out, when compared with each other via the provided areEquivalent function.
distinctUntilChangedBy
Link copied to clipboard
common
fun <T, K> Flow<T>.distinctUntilChangedBy(keySelector: (T) -> K): Flow<T>
Returns flow where all subsequent repetitions of the same key are filtered out, where key is extracted with keySelector function.
drop
Link copied to clipboard
common
fun <T> Flow<T>.drop(count: Int): Flow<T>
Returns a flow that ignores first count elements.
dropWhile
Link copied to clipboard
common
fun <T> Flow<T>.dropWhile(predicate: suspend (T) -> Boolean): Flow<T>
Returns a flow containing all elements except first elements that satisfy the given predicate.
filter
Link copied to clipboard
common
inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T>
Returns a flow containing only values of the original flow that match the given predicate.
filterIsInstance
Link copied to clipboard
common
inline fun <R> Flow<*>.filterIsInstance(): Flow<R>
Returns a flow containing only values that are instances of specified type R.
filterNot
Link copied to clipboard
common
inline fun <T> Flow<T>.filterNot(crossinline predicate: suspend (T) -> Boolean): Flow<T>
Returns a flow containing only values of the original flow that do not match the given predicate.
filterNotNull
Link copied to clipboard
common
fun <T : Any> Flow<T?>.filterNotNull(): Flow<T>
Returns a flow containing only values of the original flow that are not null.
first
Link copied to clipboard
common
suspend fun <T> Flow<T>.first(): T
The terminal operator that returns the first element emitted by the flow and then cancels flow's collection.
suspend fun <T> Flow<T>.first(predicate: suspend (T) -> Boolean): T
The terminal operator that returns the first element emitted by the flow matching the given predicate and then cancels flow's collection.
firstOrNull
Link copied to clipboard
common
suspend fun <T> Flow<T>.firstOrNull(): T?
The terminal operator that returns the first element emitted by the flow and then cancels flow's collection.
suspend fun <T> Flow<T>.firstOrNull(predicate: suspend (T) -> Boolean): T?
The terminal operator that returns the first element emitted by the flow matching the given predicate and then cancels flow's collection.
flatMapConcat
Link copied to clipboard
common
fun <T, R> Flow<T>.flatMapConcat(transform: suspend (T) -> Flow<R>): Flow<R>
Transforms elements emitted by the original flow by applying transform, that returns another flow, and then concatenating and flattening these flows.
flatMapLatest
Link copied to clipboard
common
inline fun <T, R> Flow<T>.flatMapLatest(crossinline transform: suspend (T) -> Flow<R>): Flow<R>
Returns a flow that switches to a new flow produced by transform function every time the original flow emits a value.
flatMapMerge
Link copied to clipboard
common
fun <T, R> Flow<T>.flatMapMerge(concurrency: Int = DEFAULT_CONCURRENCY, transform: suspend (T) -> Flow<R>): Flow<R>
Transforms elements emitted by the original flow by applying transform, that returns another flow, and then merging and flattening these flows.
flattenConcat
Link copied to clipboard
common
fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T>
Flattens the given flow of flows into a single flow in a sequentially manner, without interleaving nested flows.
flattenMerge
Link copied to clipboard
common
fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY): Flow<T>
Flattens the given flow of flows into a single flow with a concurrency limit on the number of concurrently collected flows.
flowOn
Link copied to clipboard
common
fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T>
Changes the context where this flow is executed to the given context.
fold
Link copied to clipboard
common
inline suspend fun <T, R> Flow<T>.fold(initial: R, crossinline operation: suspend (R, T) -> R): R
Accumulates value starting with initial value and applying operation current accumulator value and each element
last
Link copied to clipboard
common
suspend fun <T> Flow<T>.last(): T
The terminal operator that returns the last element emitted by the flow.
lastOrNull
Link copied to clipboard
common
suspend fun <T> Flow<T>.lastOrNull(): T?
The terminal operator that returns the last element emitted by the flow or null if the flow was empty.
launchIn
Link copied to clipboard
common
fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job
Terminal flow operator that launches the collection of the given flow in the scope.
map
Link copied to clipboard
common
inline fun <T, R> Flow<T>.map(crossinline transform: suspend (T) -> R): Flow<R>
Returns a flow containing the results of applying the given transform function to each value of the original flow.
mapLatest
Link copied to clipboard
common
fun <T, R> Flow<T>.mapLatest(transform: suspend (T) -> R): Flow<R>
Returns a flow that emits elements from the original flow transformed by transform function.
mapNotNull
Link copied to clipboard
common
inline fun <T, R : Any> Flow<T>.mapNotNull(crossinline transform: suspend (T) -> R?): Flow<R>
Returns a flow that contains only non-null results of applying the given transform function to each value of the original flow.
onCompletion
Link copied to clipboard
common
fun <T> Flow<T>.onCompletion(action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit): Flow<T>
Returns a flow that invokes the given actionafter the flow is completed or cancelled, passing the cancellation exception or failure as cause parameter of action.
onEach
Link copied to clipboard
common
fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T>
Returns a flow that invokes the given actionbefore each value of the upstream flow is emitted downstream.
onEmpty
Link copied to clipboard
common
fun <T> Flow<T>.onEmpty(action: suspend FlowCollector<T>.() -> Unit): Flow<T>
Invokes the given action when this flow completes without emitting any elements.
onStart
Link copied to clipboard
common
fun <T> Flow<T>.onStart(action: suspend FlowCollector<T>.() -> Unit): Flow<T>
Returns a flow that invokes the given actionbefore this flow starts to be collected.
produceIn
Link copied to clipboard
common
fun <T> Flow<T>.produceIn(scope: CoroutineScope): ReceiveChannel<T>
Creates a produce coroutine that collects the given flow.
publish
Link copied to clipboard
common
fun <T> Flow<T>.publish(): Flow<T>
fun <T> Flow<T>.publish(bufferSize: Int): Flow<T>
reduce
Link copied to clipboard
common
suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (S, T) -> S): S
Accumulates value starting with the first element and applying operation to current accumulator value and each element.
replay
Link copied to clipboard
common
fun <T> Flow<T>.replay(): Flow<T>
fun <T> Flow<T>.replay(bufferSize: Int): Flow<T>
retry
Link copied to clipboard
common
fun <T> Flow<T>.retry(retries: Long = Long.MAX_VALUE, predicate: suspend (cause: Throwable) -> Boolean = { true }): Flow<T>
Retries collection of the given flow up to retries times when an exception that matches the given predicate occurs in the upstream flow.
fun <T> Flow<T>.retry(retries: Int = Int.MAX_VALUE, predicate: (Throwable) -> Boolean = { true }): Flow<T>
retryWhen
Link copied to clipboard
common
fun <T> Flow<T>.retryWhen(predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean): Flow<T>
Retries collection of the given flow when an exception occurs in the upstream flow and the predicate returns true.
runningFold
Link copied to clipboard
common
fun <T, R> Flow<T>.runningFold(initial: R, operation: suspend (R, T) -> R): Flow<R>
Folds the given flow with operation, emitting every intermediate result, including initial value.
runningReduce
Link copied to clipboard
common
fun <T> Flow<T>.runningReduce(operation: suspend (T, T) -> T): Flow<T>
Reduces the given flow with operation, emitting every intermediate result, including initial value.
sample
Link copied to clipboard
common
fun <T> Flow<T>.sample(periodMillis: Long): Flow<T>
Returns a flow that emits only the latest value emitted by the original flow during the given sampling period.
fun <T> Flow<T>.sample(period: Duration): Flow<T>
Returns a flow that emits only the latest value emitted by the original flow during the given sampling period.
scan
Link copied to clipboard
common
fun <T, R> Flow<T>.scan(initial: R, operation: suspend (R, T) -> R): Flow<R>
Folds the given flow with operation, emitting every intermediate result, including initial value.
scanReduce
Link copied to clipboard
common
fun <T> Flow<T>.scanReduce(operation: suspend (T, T) -> T): Flow<T>
shareIn
Link copied to clipboard
common
fun <T> Flow<T>.shareIn(scope: CoroutineScope, started: SharingStarted, replay: Int = 0): SharedFlow<T>
Converts a coldFlow into a hotSharedFlow that is started in the given coroutine scope, sharing emissions from a single running instance of the upstream flow with multiple downstream subscribers, and replaying a specified number of replay values to new subscribers.
single
Link copied to clipboard
common
suspend fun <T> Flow<T>.single(): T
The terminal operator that awaits for one and only one value to be emitted.
singleOrNull
Link copied to clipboard
common
suspend fun <T> Flow<T>.singleOrNull(): T?
The terminal operator that awaits for one and only one value to be emitted.
stateIn
Link copied to clipboard
common
fun <T> Flow<T>.stateIn(scope: CoroutineScope, started: SharingStarted, initialValue: T): StateFlow<T>
Converts a coldFlow into a hotStateFlow that is started in the given coroutine scope, sharing the most recently emitted value from a single running instance of the upstream flow with multiple downstream subscribers.
suspend fun <T> Flow<T>.stateIn(scope: CoroutineScope): StateFlow<T>
Starts the upstream flow in a given scope, suspends until the first value is emitted, and returns a hotStateFlow of future emissions, sharing the most recently emitted value from this running instance of the upstream flow with multiple downstream subscribers.
switchMap
Link copied to clipboard
common
fun <T, R> Flow<T>.switchMap(transform: suspend (T) -> Flow<R>): Flow<R>
take
Link copied to clipboard
common
fun <T> Flow<T>.take(count: Int): Flow<T>
Returns a flow that contains first count elements.
takeWhile
Link copied to clipboard
common
fun <T> Flow<T>.takeWhile(predicate: suspend (T) -> Boolean): Flow<T>
Returns a flow that contains first elements satisfying the given predicate.
toCollection
Link copied to clipboard
common
suspend fun <T, C : MutableCollection<in T>> Flow<T>.toCollection(destination: C): C
Collects given flow into a destination
toList
Link copied to clipboard
common
suspend fun <T> Flow<T>.toList(destination: MutableList<T> = ArrayList()): List<T>
Collects given flow into a destination
toSet
Link copied to clipboard
common
suspend fun <T> Flow<T>.toSet(destination: MutableSet<T> = LinkedHashSet()): Set<T>
Collects given flow into a destination
transform
Link copied to clipboard
common
inline fun <T, R> Flow<T>.transform(crossinline transform: suspend FlowCollector<R>.(T) -> Unit): Flow<R>
Applies transform function to each value of the given flow.
transformLatest
Link copied to clipboard
common
fun <T, R> Flow<T>.transformLatest(transform: suspend FlowCollector<R>.(T) -> Unit): Flow<R>
Returns a flow that produces element by transform function every time the original flow emits a value.
transformWhile
Link copied to clipboard
common
fun <T, R> Flow<T>.transformWhile(transform: suspend FlowCollector<R>.(T) -> Boolean): Flow<R>
Applies transform function to each value of the given flow while this function returns true.
withIndex
Link copied to clipboard
common
fun <T> Flow<T>.withIndex(): Flow<IndexedValue<T>>
Returns a flow that wraps each element into IndexedValue, containing value and its index (starting from zero).
zip
Link copied to clipboard
common
fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R>
Zips values from the current flow (this) with other flow using provided transform function applied to each pair of values.

Sources

common source
Link copied to clipboard