SharedFlow

interface SharedFlow<out T> : Flow<T> (source)

A hot Flow that shares emitted values among all its collectors in a broadcast fashion, so that all collectors get all emitted values. A shared flow is called hot because its active instance exists independently of the presence of collectors. This is opposed to a regular Flow, such as defined by the flow { ... } function, which is cold and is started separately for each collector.

Shared flow never completes. A call to Flow.collect on a shared flow never completes normally, and neither does a coroutine started by the Flow.launchIn function. An active collector of a shared flow is called a subscriber.

A subscriber of a shared flow can be cancelled. This usually happens when the scope in which the coroutine is running is cancelled. A subscriber to a shared flow is always cancellable, and checks for cancellation before each emission. Note that most terminal operators like Flow.toList would also not complete, when applied to a shared flow, but flow-truncating operators like Flow.take and Flow.takeWhile can be used on a shared flow to turn it into a completing one.

A mutable shared flow is created using the MutableSharedFlow… constructor function. Its state can be updated by emitting values to it and performing other operations. See the MutableSharedFlow documentation for details.

SharedFlow is useful for broadcasting events that happen inside an application to subscribers that can come and go. For example, the following class encapsulates an event bus that distributes events to all subscribers in a rendezvous manner, suspending until all subscribers process each event:

class EventBus {
    private val _events = MutableSharedFlow<Event>() // private mutable shared flow
    val events = _events.asSharedFlow() // publicly exposed as read-only shared flow

    suspend fun produceEvent(event: Event) {
        _events.emit(event) // suspends until all subscribers receive it
    }
}

As an alternative to the above usage with the MutableSharedFlow(...) constructor function, any cold Flow can be converted to a shared flow using the shareIn operator.

There is a specialized implementation of shared flow for the case where the most recent state value needs to be shared. See StateFlow for details.

Replay cache and buffer

A shared flow keeps a specific number of the most recent values in its replay cache. Every new subscriber first gets the values from the replay cache and then gets new emitted values. The maximum size of the replay cache is specified when the shared flow is created by the replay parameter. A snapshot of the current replay cache is available via the replayCache property and it can be reset with the MutableSharedFlow.resetReplayCache function.

A replay cache also provides buffer for emissions to the shared flow, allowing slow subscribers to get values from the buffer without suspending emitters. The buffer space determines how much slow subscribers can lag from the fast ones. When creating a shared flow, additional buffer capacity beyond replay can be reserved using the extraBufferCapacity parameter.

A shared flow with a buffer can be configured to avoid suspension of emitters on buffer overflow using the onBufferOverflow parameter, which is equal to one of the entries of the BufferOverflow enum. When a strategy other than SUSPENDED is configured, emissions to the shared flow never suspend.

Unbuffered shared flow

A default implementation of a shared flow that is created with MutableSharedFlow() constructor function without parameters has no replay cache nor additional buffer. emit call to such a shared flow suspends until all subscribers receive the emitted value and returns immediately if there are no subscribers. Thus, tryEmit call succeeds and returns true only if there are no subscribers (in which case the emitted value is immediately lost).

SharedFlow vs BroadcastChannel

Conceptually shared flow is similar to BroadcastChannel and is designed to completely replace BroadcastChannel in the future. It has the following important differences:

To migrate BroadcastChannel usage to SharedFlow, start by replacing usages of the BroadcastChannel(capacity) constructor with MutableSharedFlow(0, extraBufferCapacity=capacity) (broadcast channel does not replay values to new subscribers). Replace send and offer calls with emit and tryEmit, and convert subscribers’ code to flow operators.

Concurrency

All methods of shared flow are thread-safe and can be safely invoked from concurrent coroutines without external synchronization.

Operator fusion

Application of flowOn, buffer with RENDEZVOUS capacity, or cancellable operators to a shared flow has no effect.

Implementation notes

Shared flow implementation uses a lock to ensure thread-safety, but suspending collector and emitter coroutines are resumed outside of this lock to avoid dead-locks when using unconfined coroutines. Adding new subscribers has O(1) amortized cost, but emitting has O(N) cost, where N is the number of subscribers.

Not stable for inheritance

The SharedFlow interface is not stable for inheritance in 3rd party libraries, as new methods might be added to this interface in the future, but is stable for use. Use the MutableSharedFlow(replay, ...) constructor function to create an implementation.

Properties

replayCache

abstract val replayCache: List<T>

A snapshot of the replay cache.

Inherited Functions

collect

abstract suspend fun collect(
    collector: FlowCollector<T>
): Unit

Accepts the given collector and emits values into it. This method should never be implemented or used directly.

Extension Functions

broadcastIn

fun <T> Flow<T>.broadcastIn(
    scope: CoroutineScope,
    start: CoroutineStart = CoroutineStart.LAZY
): BroadcastChannel<T>

Creates a broadcast coroutine that collects the given flow.

buffer

fun <T> Flow<T>.buffer(
    capacity: Int = BUFFERED,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): Flow<T>

Buffers flow emissions via channel of a specified capacity and runs collector in a separate coroutine.

fun <T> Flow<T>.buffer(capacity: Int = BUFFERED): Flow<T>

cache

fun <T> Flow<T>.cache(): Flow<T>

cancellable

fun <T> SharedFlow<T>.cancellable(): Flow<T>

Applying cancellable to a SharedFlow has no effect. See the SharedFlow documentation on Operator Fusion.

catch

fun <T> SharedFlow<T>.catch(
    action: suspend FlowCollector<T>.(cause: Throwable) -> Unit
): Flow<T>
fun <T> Flow<T>.catch(
    action: suspend FlowCollector<T>.(cause: Throwable) -> Unit
): Flow<T>

Catches exceptions in the flow completion and calls a specified action with the caught exception. This operator is transparent to exceptions that occur in downstream flow and does not catch exceptions that are thrown to cancel the flow.

collect

suspend fun Flow<*>.collect(): Unit

Terminal flow operator that collects the given flow but ignores all emitted values. If any exception occurs during collect or in the provided flow, this exception is rethrown from this method.

suspend fun <T> Flow<T>.collect(
    action: suspend (value: T) -> Unit
): Unit

Terminal flow operator that collects the given flow with a provided action. If any exception occurs during collect or in the provided flow, this exception is rethrown from this method.

collectIndexed

suspend fun <T> Flow<T>.collectIndexed(
    action: suspend (index: Int, value: T) -> Unit
): Unit

Terminal flow operator that collects the given flow with a provided action that takes the index of an element (zero-based) and the element. If any exception occurs during collect or in the provided flow, this exception is rethrown from this method.

collectLatest

suspend fun <T> Flow<T>.collectLatest(
    action: suspend (value: T) -> Unit
): Unit

Terminal flow operator that collects the given flow with a provided action. The crucial difference from collect is that when the original flow emits a new value, action block for previous value is cancelled.

combine

fun <T1, T2, R> Flow<T1>.combine(
    flow: Flow<T2>,
    transform: suspend (a: T1, b: T2) -> R
): Flow<R>

Returns a Flow whose values are generated with transform function by combining the most recently emitted values by each flow.

combineLatest

fun <T1, T2, R> Flow<T1>.combineLatest(
    other: Flow<T2>,
    transform: suspend (T1, T2) -> R
): Flow<R>
fun <T1, T2, T3, R> Flow<T1>.combineLatest(
    other: Flow<T2>,
    other2: Flow<T3>,
    transform: suspend (T1, T2, T3) -> R
): Flow<R>
fun <T1, T2, T3, T4, R> Flow<T1>.combineLatest(
    other: Flow<T2>,
    other2: Flow<T3>,
    other3: Flow<T4>,
    transform: suspend (T1, T2, T3, T4) -> R
): Flow<R>
fun <T1, T2, T3, T4, T5, R> Flow<T1>.combineLatest(
    other: Flow<T2>,
    other2: Flow<T3>,
    other3: Flow<T4>,
    other4: Flow<T5>,
    transform: suspend (T1, T2, T3, T4, T5) -> R
): Flow<R>

combineTransform

fun <T1, T2, R> Flow<T1>.combineTransform(
    flow: Flow<T2>,
    transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
): Flow<R>

Returns a Flow whose values are generated by transform function that process the most recently emitted values by each flow.

conflate

fun <T> Flow<T>.conflate(): Flow<T>

Conflates flow emissions via conflated channel and runs collector in a separate coroutine. The effect of this is that emitter is never suspended due to a slow collector, but collector always gets the most recent value emitted.

count

suspend fun <T> SharedFlow<T>.count(): Int
suspend fun <T> Flow<T>.count(
    predicate: suspend (T) -> Boolean
): Int

Returns the number of elements matching the given predicate.

debounce

fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T>
fun <T> Flow<T>.debounce(timeoutMillis: (T) -> Long): Flow<T>

Returns a flow that mirrors the original flow, but filters out values that are followed by the newer values within the given timeout. The latest value is always emitted.

fun <T> Flow<T>.debounce(timeout: <ERROR CLASS>): Flow<T>
fun <T> Flow<T>.debounce(
    timeout: (T) -> <ERROR CLASS>
): Flow<T>

Returns a flow that mirrors the original flow, but filters out values that are followed by the newer values within the given timeout. The latest value is always emitted.

distinctUntilChanged

fun <T> Flow<T>.distinctUntilChanged(): Flow<T>

Returns flow where all subsequent repetitions of the same value are filtered out.

fun <T> Flow<T>.distinctUntilChanged(
    areEquivalent: (old: T, new: T) -> Boolean
): Flow<T>

Returns flow where all subsequent repetitions of the same value are filtered out, when compared with each other via the provided areEquivalent function.

distinctUntilChangedBy

fun <T, K> Flow<T>.distinctUntilChangedBy(
    keySelector: (T) -> K
): Flow<T>

Returns flow where all subsequent repetitions of the same key are filtered out, where key is extracted with keySelector function.

drop

fun <T> Flow<T>.drop(count: Int): Flow<T>

Returns a flow that ignores first count elements. Throws IllegalArgumentException if count is negative.

dropWhile

fun <T> Flow<T>.dropWhile(
    predicate: suspend (T) -> Boolean
): Flow<T>

Returns a flow containing all elements except first elements that satisfy the given predicate.

filter

fun <T> Flow<T>.filter(
    predicate: suspend (T) -> Boolean
): Flow<T>

Returns a flow containing only values of the original flow that matches the given predicate.

filterIsInstance

fun <R> Flow<*>.filterIsInstance(): Flow<R>

Returns a flow containing only values that are instances of specified type R.

filterNot

fun <T> Flow<T>.filterNot(
    predicate: suspend (T) -> Boolean
): Flow<T>

Returns a flow containing only values of the original flow that do not match the given predicate.

filterNotNull

fun <T : Any> Flow<T?>.filterNotNull(): Flow<T>

Returns a flow containing only values of the original flow that are not null.

first

suspend fun <T> Flow<T>.first(): T

The terminal operator that returns the first element emitted by the flow and then cancels flow’s collection. Throws NoSuchElementException if the flow was empty.

suspend fun <T> Flow<T>.first(
    predicate: suspend (T) -> Boolean
): T

The terminal operator that returns the first element emitted by the flow matching the given predicate and then cancels flow’s collection. Throws NoSuchElementException if the flow has not contained elements matching the predicate.

firstOrNull

suspend fun <T> Flow<T>.firstOrNull(): T?

The terminal operator that returns the first element emitted by the flow and then cancels flow’s collection. Returns null if the flow was empty.

suspend fun <T> Flow<T>.firstOrNull(
    predicate: suspend (T) -> Boolean
): T?

The terminal operator that returns the first element emitted by the flow matching the given predicate and then cancels flow’s collection. Returns null if the flow did not contain an element matching the predicate.

flatMapConcat

fun <T, R> Flow<T>.flatMapConcat(
    transform: suspend (value: T) -> Flow<R>
): Flow<R>

Transforms elements emitted by the original flow by applying transform, that returns another flow, and then concatenating and flattening these flows.

flatMapLatest

fun <T, R> Flow<T>.flatMapLatest(
    transform: suspend (value: T) -> Flow<R>
): Flow<R>

Returns a flow that switches to a new flow produced by transform function every time the original flow emits a value. When the original flow emits a new value, the previous flow produced by transform block is cancelled.

flatMapMerge

fun <T, R> Flow<T>.flatMapMerge(
    concurrency: Int = DEFAULT_CONCURRENCY,
    transform: suspend (value: T) -> Flow<R>
): Flow<R>

Transforms elements emitted by the original flow by applying transform, that returns another flow, and then merging and flattening these flows.

flowOn

fun <T> SharedFlow<T>.flowOn(
    context: CoroutineContext
): Flow<T>

Applying flowOn to SharedFlow has no effect. See the SharedFlow documentation on Operator Fusion.

flowWith

fun <T, R> Flow<T>.flowWith(
    flowContext: CoroutineContext,
    bufferSize: Int = BUFFERED,
    builder: Flow<T>.() -> Flow<R>
): Flow<R>

The operator that changes the context where all transformations applied to the given flow within a builder are executed. This operator is context preserving and does not affect the context of the preceding and subsequent operations.

fold

suspend fun <T, R> Flow<T>.fold(
    initial: R,
    operation: suspend (acc: R, value: T) -> R
): R

Accumulates value starting with initial value and applying operation current accumulator value and each element

launchIn

fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job

Terminal flow operator that launches the collection of the given flow in the scope. It is a shorthand for scope.launch { flow.collect() }.

map

fun <T, R> Flow<T>.map(
    transform: suspend (value: T) -> R
): Flow<R>

Returns a flow containing the results of applying the given transform function to each value of the original flow.

mapLatest

fun <T, R> Flow<T>.mapLatest(
    transform: suspend (value: T) -> R
): Flow<R>

Returns a flow that emits elements from the original flow transformed by transform function. When the original flow emits a new value, computation of the transform block for previous value is cancelled.

mapNotNull

fun <T, R : Any> Flow<T>.mapNotNull(
    transform: suspend (value: T) -> R?
): Flow<R>

Returns a flow that contains only non-null results of applying the given transform function to each value of the original flow.

onCompletion

fun <T> Flow<T>.onCompletion(
    action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
): Flow<T>

Returns a flow that invokes the given action after the flow is completed or cancelled, passing the cancellation exception or failure as cause parameter of action.

onEach

fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T>

Returns a flow that invokes the given action before each value of the upstream flow is emitted downstream.

onEmpty

fun <T> Flow<T>.onEmpty(
    action: suspend FlowCollector<T>.() -> Unit
): Flow<T>

Invokes the given action when this flow completes without emitting any elements. The receiver of the action is FlowCollector, so onEmpty can emit additional elements. For example:

onStart

fun <T> Flow<T>.onStart(
    action: suspend FlowCollector<T>.() -> Unit
): Flow<T>

Returns a flow that invokes the given action before this flow starts to be collected.

onSubscription

fun <T> SharedFlow<T>.onSubscription(
    action: suspend FlowCollector<T>.() -> Unit
): SharedFlow<T>

Returns a flow that invokes the given action after this shared flow starts to be collected (after the subscription is registered).

produceIn

fun <T> Flow<T>.produceIn(
    scope: CoroutineScope
): ReceiveChannel<T>

Creates a produce coroutine that collects the given flow.

publish

fun <T> Flow<T>.publish(): Flow<T>
fun <T> Flow<T>.publish(bufferSize: Int): Flow<T>

reduce

suspend fun <S, T : S> Flow<T>.reduce(
    operation: suspend (accumulator: S, value: T) -> S
): S

Accumulates value starting with the first element and applying operation to current accumulator value and each element. Throws NoSuchElementException if flow was empty.

replay

fun <T> Flow<T>.replay(): Flow<T>
fun <T> Flow<T>.replay(bufferSize: Int): Flow<T>

retry

fun <T> SharedFlow<T>.retry(
    retries: Long = Long.MAX_VALUE,
    predicate: suspend (cause: Throwable) -> Boolean = { true }
): Flow<T>
fun <T> Flow<T>.retry(
    retries: Int = Int.MAX_VALUE,
    predicate: (Throwable) -> Boolean = { true }
): Flow<T>

retryWhen

fun <T> SharedFlow<T>.retryWhen(
    predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean
): Flow<T>
fun <T> Flow<T>.retryWhen(
    predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean
): Flow<T>

Retries collection of the given flow when an exception occurs in the upstream flow and the predicate returns true. The predicate also receives an attempt number as parameter, starting from zero on the initial call. This operator is transparent to exceptions that occur in downstream flow and does not retry on exceptions that are thrown to cancel the flow.

runningReduce

fun <T> Flow<T>.runningReduce(
    operation: suspend (accumulator: T, value: T) -> T
): Flow<T>

Reduces the given flow with operation, emitting every intermediate result, including initial value. The first element is taken as initial value for operation accumulator. This operator has a sibling with initial value – scan.

sample

fun <T> Flow<T>.sample(periodMillis: Long): Flow<T>

Returns a flow that emits only the latest value emitted by the original flow during the given sampling period.

fun <T> Flow<T>.sample(period: <ERROR CLASS>): Flow<T>

Returns a flow that emits only the latest value emitted by the original flow during the given sampling period.

scan

fun <T, R> Flow<T>.scan(
    initial: R,
    operation: suspend (accumulator: R, value: T) -> R
): Flow<R>

Folds the given flow with operation, emitting every intermediate result, including initial value. Note that initial value should be immutable (or should not be mutated) as it is shared between different collectors. For example:

scanReduce

fun <T> Flow<T>.scanReduce(
    operation: suspend (accumulator: T, value: T) -> T
): Flow<T>

shareIn

fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow<T>

Converts a cold Flow into a hot SharedFlow that is started in the given coroutine scope, sharing emissions from a single running instance of the upstream flow with multiple downstream subscribers, and replaying a specified number of replay values to new subscribers. See the SharedFlow documentation for the general concepts of shared flows.

single

suspend fun <T> Flow<T>.single(): T

The terminal operator that awaits for one and only one value to be emitted. Throws NoSuchElementException for empty flow and IllegalStateException for flow that contains more than one element.

singleOrNull

suspend fun <T> Flow<T>.singleOrNull(): T?

The terminal operator that awaits for one and only one value to be emitted. Returns the single value or null, if the flow was empty or emitted more than one value.

stateIn

fun <T> Flow<T>.stateIn(
    scope: CoroutineScope,
    started: SharingStarted,
    initialValue: T
): StateFlow<T>

Converts a cold Flow into a hot StateFlow that is started in the given coroutine scope, sharing the most recently emitted value from a single running instance of the upstream flow with multiple downstream subscribers. See the StateFlow documentation for the general concepts of state flows.

suspend fun <T> Flow<T>.stateIn(
    scope: CoroutineScope
): StateFlow<T>

Starts the upstream flow in a given scope, suspends until the first value is emitted, and returns a hot StateFlow of future emissions, sharing the most recently emitted value from this running instance of the upstream flow with multiple downstream subscribers. See the StateFlow documentation for the general concepts of state flows.

switchMap

fun <T, R> Flow<T>.switchMap(
    transform: suspend (value: T) -> Flow<R>
): Flow<R>

take

fun <T> Flow<T>.take(count: Int): Flow<T>

Returns a flow that contains first count elements. When count elements are consumed, the original flow is cancelled. Throws IllegalArgumentException if count is not positive.

takeWhile

fun <T> Flow<T>.takeWhile(
    predicate: suspend (T) -> Boolean
): Flow<T>

Returns a flow that contains first elements satisfying the given predicate.

toCollection

suspend fun <T, C : MutableCollection<in T>> Flow<T>.toCollection(
    destination: C
): C

Collects given flow into a destination

toList

suspend fun <T> SharedFlow<T>.toList(): List<T>
suspend fun <T> Flow<T>.toList(
    destination: MutableList<T> = ArrayList()
): List<T>

Collects given flow into a destination

toSet

suspend fun <T> SharedFlow<T>.toSet(): Set<T>
suspend fun <T> Flow<T>.toSet(
    destination: MutableSet<T> = LinkedHashSet()
): Set<T>

Collects given flow into a destination

transform

fun <T, R> Flow<T>.transform(
    transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R>

Applies transform function to each value of the given flow.

transformLatest

fun <T, R> Flow<T>.transformLatest(
    transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R>

Returns a flow that produces element by transform function every time the original flow emits a value. When the original flow emits a new value, the previous transform block is cancelled, thus the name transformLatest.

transformWhile

fun <T, R> Flow<T>.transformWhile(
    transform: suspend FlowCollector<R>.(value: T) -> Boolean
): Flow<R>

Applies transform function to each value of the given flow while this function returns true.

withIndex

fun <T> Flow<T>.withIndex(): Flow<IndexedValue<T>>

Returns a flow that wraps each element into IndexedValue, containing value and its index (starting from zero).

zip

fun <T1, T2, R> Flow<T1>.zip(
    other: Flow<T2>,
    transform: suspend (T1, T2) -> R
): Flow<R>

Zips values from the current flow (this) with other flow using provided transform function applied to each pair of values. The resulting flow completes as soon as one of the flows completes and cancel is called on the remaining flow.

Inheritors

MutableSharedFlow

interface MutableSharedFlow<T> : 
    SharedFlow<T>,
    FlowCollector<T>

A mutable SharedFlow that provides functions to emit values to the flow. An instance of MutableSharedFlow with the given configuration parameters can be created using MutableSharedFlow(...) constructor function.

StateFlow

interface StateFlow<out T> : SharedFlow<T>

A SharedFlow that represents a read-only state with a single updatable data value that emits updates to the value to its collectors. A state flow is a hot flow because its active instance exists independently of the presence of collectors. Its current value can be retrieved via the value property.