Package kotlinx.coroutines.flow

Flow — asynchronous cold stream of elements.

Types

AbstractFlow

abstract class AbstractFlow<T> : Flow<T>

Base class for stateful implementations of Flow. It tracks all the properties required for context preservation and throws an IllegalStateException if any of the properties are violated.

Flow

interface Flow<out T>

A cold asynchronous data stream that sequentially emits values and completes normally or with an exception.

FlowCollector

interface FlowCollector<in T>

FlowCollector is used as an intermediate or a terminal collector of the flow and represents an entity that accepts values emitted by the Flow.

Extensions for External Classes

kotlin.Array

kotlin.Function0

kotlin.IntArray

kotlin.LongArray

kotlin.SuspendFunction0

kotlin.collections.Iterable

kotlin.collections.Iterator

kotlin.ranges.IntRange

kotlin.ranges.LongRange

kotlin.sequences.Sequence

Properties

DEFAULT_CONCURRENCY

val DEFAULT_CONCURRENCY: Int

Default concurrency limit that is used by flattenMerge and flatMapMerge operators. It is 16 by default and can be changed on JVM using DEFAULT_CONCURRENCY_PROPERTY_NAME property.

DEFAULT_CONCURRENCY_PROPERTY_NAME

const val DEFAULT_CONCURRENCY_PROPERTY_NAME: String

Name of the property that defines the value of DEFAULT_CONCURRENCY.

Functions

asFlow

fun <T> BroadcastChannel<T>.asFlow(): Flow<T>

Represents the given broadcast channel as a hot flow. Every flow collector will trigger a new broadcast channel subscription.

broadcastIn

fun <T> Flow<T>.broadcastIn(
    scope: CoroutineScope,
    start: CoroutineStart = CoroutineStart.LAZY
): BroadcastChannel<T>

Creates a broadcast coroutine that collects the given flow.

buffer

fun <T> Flow<T>.buffer(capacity: Int = BUFFERED): Flow<T>

Buffers flow emissions via channel of a specified capacity and runs collector in a separate coroutine.

callbackFlow

fun <T> callbackFlow(
    block: suspend ProducerScope<T>.() -> Unit
): Flow<T>

Creates an instance of the cold Flow with elements that are sent to a SendChannel that is provided to the builder’s block of code via ProducerScope. It allows elements to be produced by the code that is running in a different context or running concurrently.

catch

fun <T> Flow<T>.catch(
    action: suspend FlowCollector<T>.(cause: Throwable) -> Unit
): Flow<T>

Catches exceptions in the flow completion and calls a specified action with the caught exception. This operator is transparent to exceptions that occur in downstream flow and does not catch exceptions that are thrown to cancel the flow.

channelFlow

fun <T> channelFlow(
    block: suspend ProducerScope<T>.() -> Unit
): Flow<T>

Creates an instance of the cold Flow with elements that are sent to a SendChannel that is provided to the builder’s block of code via ProducerScope. It allows elements to be produced by the code that is running in a different context or running concurrently. The resulting flow is cold, which means that block is called on each call of a terminal operator on the resulting flow.

collect

suspend fun Flow<*>.collect(): Unit

Terminal flow operator that collects the given flow but ignores all emitted values. If any exception occurs during collect or in the provided flow, this exception is rethrown from this method.

suspend fun <T> Flow<T>.collect(
    action: suspend (value: T) -> Unit
): Unit

Terminal flow operator that collects the given flow with a provided action. If any exception occurs during collect or in the provided flow, this exception is rethrown from this method.

collectIndexed

suspend fun <T> Flow<T>.collectIndexed(
    action: suspend (index: Int, value: T) -> Unit
): Unit

Terminal flow operator that collects the given flow with a provided action that takes the index of an element (zero-based) and the element. If any exception occurs during collect or in the provided flow, this exception is rethrown from this method.

collectLatest

suspend fun <T> Flow<T>.collectLatest(
    action: suspend (value: T) -> Unit
): Unit

Terminal flow operator that collects the given flow with a provided action. The crucial difference from collect is that when the original flow emits a new value, action block for previous value is cancelled.

combine

fun <T1, T2, R> Flow<T1>.combine(
    flow: Flow<T2>,
    transform: suspend (a: T1, b: T2) -> R
): Flow<R>
fun <T1, T2, T3, R> combine(
    flow: Flow<T1>,
    flow2: Flow<T2>,
    flow3: Flow<T3>,
    transform: suspend (T1, T2, T3) -> R
): Flow<R>
fun <T1, T2, T3, T4, R> combine(
    flow: Flow<T1>,
    flow2: Flow<T2>,
    flow3: Flow<T3>,
    flow4: Flow<T4>,
    transform: suspend (T1, T2, T3, T4) -> R
): Flow<R>
fun <T1, T2, T3, T4, T5, R> combine(
    flow: Flow<T1>,
    flow2: Flow<T2>,
    flow3: Flow<T3>,
    flow4: Flow<T4>,
    flow5: Flow<T5>,
    transform: suspend (T1, T2, T3, T4, T5) -> R
): Flow<R>
fun <T, R> combine(
    vararg flows: Flow<T>,
    transform: suspend (Array<T>) -> R
): Flow<R>
fun <T, R> combine(
    flows: Iterable<Flow<T>>,
    transform: suspend (Array<T>) -> R
): Flow<R>

Returns a Flow whose values are generated with transform function by combining the most recently emitted values by each flow.

combineLatest

fun <T1, T2, R> Flow<T1>.combineLatest(
    other: Flow<T2>,
    transform: suspend (T1, T2) -> R
): Flow<R>
fun <T1, T2, T3, R> Flow<T1>.combineLatest(
    other: Flow<T2>,
    other2: Flow<T3>,
    transform: suspend (T1, T2, T3) -> R
): <ERROR CLASS>
fun <T1, T2, T3, T4, R> Flow<T1>.combineLatest(
    other: Flow<T2>,
    other2: Flow<T3>,
    other3: Flow<T4>,
    transform: suspend (T1, T2, T3, T4) -> R
): <ERROR CLASS>
fun <T1, T2, T3, T4, T5, R> Flow<T1>.combineLatest(
    other: Flow<T2>,
    other2: Flow<T3>,
    other3: Flow<T4>,
    other4: Flow<T5>,
    transform: suspend (T1, T2, T3, T4, T5) -> R
): Flow<R>

combineTransform

fun <T1, T2, R> Flow<T1>.combineTransform(
    flow: Flow<T2>,
    transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
): Flow<R>
fun <T1, T2, T3, R> combineTransform(
    flow: Flow<T1>,
    flow2: Flow<T2>,
    flow3: Flow<T3>,
    transform: suspend FlowCollector<R>.(T1, T2, T3) -> Unit
): Flow<R>
fun <T1, T2, T3, T4, R> combineTransform(
    flow: Flow<T1>,
    flow2: Flow<T2>,
    flow3: Flow<T3>,
    flow4: Flow<T4>,
    transform: suspend FlowCollector<R>.(T1, T2, T3, T4) -> Unit
): Flow<R>
fun <T1, T2, T3, T4, T5, R> combineTransform(
    flow: Flow<T1>,
    flow2: Flow<T2>,
    flow3: Flow<T3>,
    flow4: Flow<T4>,
    flow5: Flow<T5>,
    transform: suspend FlowCollector<R>.(T1, T2, T3, T4, T5) -> Unit
): Flow<R>
fun <T, R> combineTransform(
    vararg flows: Flow<T>,
    transform: suspend FlowCollector<R>.(Array<T>) -> Unit
): Flow<R>
fun <T, R> combineTransform(
    flows: Iterable<Flow<T>>,
    transform: suspend FlowCollector<R>.(Array<T>) -> Unit
): Flow<R>

Returns a Flow whose values are generated by transform function that process the most recently emitted values by each flow.

conflate

fun <T> Flow<T>.conflate(): Flow<T>

Conflates flow emissions via conflated channel and runs collector in a separate coroutine. The effect of this is that emitter is never suspended due to a slow collector, but collector always gets the most recent value emitted.

consumeAsFlow

fun <T> ReceiveChannel<T>.consumeAsFlow(): Flow<T>

Represents the given receive channel as a hot flow and consumes the channel on the first collection from this flow. The resulting flow can be collected just once and throws IllegalStateException when trying to collect it more than once.

count

suspend fun <T> Flow<T>.count(): Int

Returns the number of elements in this flow.

suspend fun <T> Flow<T>.count(
    predicate: suspend (T) -> Boolean
): Int

Returns the number of elements matching the given predicate.

debounce

fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T>

Returns a flow that mirrors the original flow, but filters out values that are followed by the newer values within the given timeout. The latest value is always emitted.

distinctUntilChanged

fun <T> Flow<T>.distinctUntilChanged(): Flow<T>

Returns flow where all subsequent repetitions of the same value are filtered out.

fun <T> Flow<T>.distinctUntilChanged(
    areEquivalent: (old: T, new: T) -> Boolean
): Flow<T>

Returns flow where all subsequent repetitions of the same value are filtered out, when compared with each other via the provided areEquivalent function.

distinctUntilChangedBy

fun <T, K> Flow<T>.distinctUntilChangedBy(
    keySelector: (T) -> K
): Flow<T>

Returns flow where all subsequent repetitions of the same key are filtered out, where key is extracted with keySelector function.

drop

fun <T> Flow<T>.drop(count: Int): Flow<T>

Returns a flow that ignores first count elements. Throws IllegalArgumentException if count is negative.

dropWhile

fun <T> Flow<T>.dropWhile(
    predicate: suspend (T) -> Boolean
): Flow<T>

Returns a flow containing all elements except first elements that satisfy the given predicate.

emitAll

suspend fun <T> FlowCollector<T>.emitAll(
    channel: ReceiveChannel<T>
): Unit

Emits all elements from the given channel to this flow collector and cancels (consumes) the channel afterwards. If you need to iterate over the channel without consuming it, a regular for loop should be used instead.

suspend fun <T> FlowCollector<T>.emitAll(flow: Flow<T>): Unit

Collects all the values from the given flow and emits them to the collector. It is a shorthand for flow.collect { value -> emit(value) }.

emptyFlow

fun <T> emptyFlow(): Flow<T>

Returns an empty flow.

filter

fun <T> Flow<T>.filter(
    predicate: suspend (T) -> Boolean
): Flow<T>

Returns a flow containing only values of the original flow that matches the given predicate.

filterIsInstance

fun <R> Flow<*>.filterIsInstance(): Flow<R>

Returns a flow containing only values that are instances of specified type R.

filterNot

fun <T> Flow<T>.filterNot(
    predicate: suspend (T) -> Boolean
): Flow<T>

Returns a flow containing only values of the original flow that do not match the given predicate.

filterNotNull

fun <T : Any> Flow<T?>.filterNotNull(): Flow<T>

Returns a flow containing only values of the original flow that are not null.

first

suspend fun <T> Flow<T>.first(): T

The terminal operator that returns the first element emitted by the flow and then cancels flow’s collection. Throws NoSuchElementException if the flow was empty.

suspend fun <T> Flow<T>.first(
    predicate: suspend (T) -> Boolean
): T

The terminal operator that returns the first element emitted by the flow matching the given predicate and then cancels flow’s collection. Throws NoSuchElementException if the flow has not contained elements matching the predicate.

flatMapConcat

fun <T, R> Flow<T>.flatMapConcat(
    transform: suspend (value: T) -> Flow<R>
): Flow<R>

Transforms elements emitted by the original flow by applying transform, that returns another flow, and then concatenating and flattening these flows.

flatMapLatest

fun <T, R> Flow<T>.flatMapLatest(
    transform: suspend (value: T) -> Flow<R>
): Flow<R>

Returns a flow that switches to a new flow produced by transform function every time the original flow emits a value. When the original flow emits a new value, the previous flow produced by transform block is cancelled.

flatMapMerge

fun <T, R> Flow<T>.flatMapMerge(
    concurrency: Int = DEFAULT_CONCURRENCY,
    transform: suspend (value: T) -> Flow<R>
): Flow<R>

Transforms elements emitted by the original flow by applying transform, that returns another flow, and then merging and flattening these flows.

flattenConcat

fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T>

Flattens the given flow of flows into a single flow in a sequentially manner, without interleaving nested flows. This method is conceptually identical to flattenMerge(concurrency = 1) but has faster implementation.

flattenMerge

fun <T> Flow<Flow<T>>.flattenMerge(
    concurrency: Int = DEFAULT_CONCURRENCY
): Flow<T>

Flattens the given flow of flows into a single flow with a concurrency limit on the number of concurrently collected flows.

flow

fun <T> flow(
    block: suspend FlowCollector<T>.() -> Unit
): Flow<T>

Creates a flow from the given suspendable block.

flowOf

fun <T> flowOf(vararg elements: T): Flow<T>

Creates a flow that produces values from the given array of elements.

fun <T> flowOf(value: T): Flow<T>

Creates flow that produces a given value.

flowOn

fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T>

Changes the context where this flow is executed to the given context. This operator is composable and affects only preceding operators that do not have its own context. This operator is context preserving: context does not leak into the downstream flow.

flowWith

fun <T, R> Flow<T>.flowWith(
    flowContext: CoroutineContext,
    bufferSize: Int = BUFFERED,
    builder: Flow<T>.() -> Flow<R>
): Flow<R>

The operator that changes the context where all transformations applied to the given flow within a builder are executed. This operator is context preserving and does not affect the context of the preceding and subsequent operations.

fold

suspend fun <T, R> Flow<T>.fold(
    initial: R,
    operation: suspend (acc: R, value: T) -> R
): R

Accumulates value starting with initial value and applying operation current accumulator value and each element

launchIn

fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job

Terminal flow operator that launches the collection of the given flow in the scope. It is a shorthand for scope.launch { flow.collect() }.

map

fun <T, R> Flow<T>.map(
    transform: suspend (value: T) -> R
): Flow<R>

Returns a flow containing the results of applying the given transform function to each value of the original flow.

mapLatest

fun <T, R> Flow<T>.mapLatest(
    transform: suspend (value: T) -> R
): Flow<R>

Returns a flow that emits elements from the original flow transformed by transform function. When the original flow emits a new value, computation of the transform block for previous value is cancelled.

mapNotNull

fun <T, R : Any> Flow<T>.mapNotNull(
    transform: suspend (value: T) -> R?
): Flow<R>

Returns a flow that contains only non-null results of applying the given transform function to each value of the original flow.

onCompletion

fun <T> Flow<T>.onCompletion(
    action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
): Flow<T>

Invokes the given action when the given flow is completed or cancelled, using the exception from the upstream (if any) as cause parameter of action.

onEach

fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T>

Returns a flow which performs the given action on each value of the original flow.

onStart

fun <T> Flow<T>.onStart(
    action: suspend FlowCollector<T>.() -> Unit
): Flow<T>

Invokes the given action when the this flow starts to be collected.

produceIn

fun <T> Flow<T>.produceIn(
    scope: CoroutineScope
): ReceiveChannel<T>

Creates a produce coroutine that collects the given flow.

reduce

suspend fun <S, T : S> Flow<T>.reduce(
    operation: suspend (accumulator: S, value: T) -> S
): S

Accumulates value starting with the first element and applying operation to current accumulator value and each element. Throws UnsupportedOperationException if flow was empty.

retry

fun <T> Flow<T>.retry(
    retries: Long = Long.MAX_VALUE,
    predicate: suspend (cause: Throwable) -> Boolean = { true }
): Flow<T>

Retries collection of the given flow up to retries times when an exception that matches the given predicate occurs in the upstream flow. This operator is transparent to exceptions that occur in downstream flow and does not retry on exceptions that are thrown to cancel the flow.

fun <T> Flow<T>.retry(
    retries: Int = Int.MAX_VALUE,
    predicate: (Throwable) -> Boolean = { true }
): Flow<T>

retryWhen

fun <T> Flow<T>.retryWhen(
    predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean
): Flow<T>

Retries collection of the given flow when an exception occurs in the upstream flow and the predicate returns true. The predicate also receives an attempt number as parameter, starting from zero on the initial call. This operator is transparent to exceptions that occur in downstream flow and does not retry on exceptions that are thrown to cancel the flow.

sample

fun <T> Flow<T>.sample(periodMillis: Long): Flow<T>

Returns a flow that emits only the latest value emitted by the original flow during the given sampling period.

scan

fun <T, R> Flow<T>.scan(
    initial: R,
    operation: suspend (accumulator: R, value: T) -> R
): Flow<R>

Folds the given flow with operation, emitting every intermediate result, including initial value. Note that initial value should be immutable (or should not be mutated) as it is shared between different collectors. For example:

scanReduce

fun <T> Flow<T>.scanReduce(
    operation: suspend (accumulator: T, value: T) -> T
): Flow<T>

Reduces the given flow with operation, emitting every intermediate result, including initial value. The first element is taken as initial value for operation accumulator. This operator has a sibling with initial value – scan.

single

suspend fun <T> Flow<T>.single(): T

The terminal operator, that awaits for one and only one value to be published. Throws NoSuchElementException for empty flow and IllegalStateException for flow that contains more than one element.

singleOrNull

suspend fun <T : Any> Flow<T>.singleOrNull(): T?

The terminal operator, that awaits for one and only one value to be published. Throws IllegalStateException for flow that contains more than one element.

switchMap

fun <T, R> Flow<T>.switchMap(
    transform: suspend (value: T) -> Flow<R>
): Flow<R>

take

fun <T> Flow<T>.take(count: Int): Flow<T>

Returns a flow that contains first count elements. When count elements are consumed, the original flow is cancelled. Throws IllegalArgumentException if count is not positive.

takeWhile

fun <T> Flow<T>.takeWhile(
    predicate: suspend (T) -> Boolean
): Flow<T>

Returns a flow that contains first elements satisfying the given predicate.

toCollection

suspend fun <T, C : MutableCollection<in T>> Flow<T>.toCollection(
    destination: C
): C

Collects given flow into a destination

toList

suspend fun <T> Flow<T>.toList(
    destination: MutableList<T> = ArrayList()
): List<T>

Collects given flow into a destination

toSet

suspend fun <T> Flow<T>.toSet(
    destination: MutableSet<T> = LinkedHashSet()
): Set<T>

Collects given flow into a destination

transform

fun <T, R> Flow<T>.transform(
    transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R>

Applies transform function to each value of the given flow.

transformLatest

fun <T, R> Flow<T>.transformLatest(
    transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R>

Returns a flow that produces element by transform function every time the original flow emits a value. When the original flow emits a new value, the previous transform block is cancelled, thus the name transformLatest.

withIndex

fun <T> Flow<T>.withIndex(): Flow<IndexedValue<T>>

Returns a flow that wraps each element into IndexedValue, containing value and its index (starting from zero).

zip

fun <T1, T2, R> Flow<T1>.zip(
    other: Flow<T2>,
    transform: suspend (T1, T2) -> R
): Flow<R>

Zips values from the current flow (this) with other flow using provided transform function applied to each pair of values. The resulting flow completes as soon as one of the flows completes and cancel is called on the remaining flow.