shareIn

fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow<T>
(source)

Converts a cold Flow into a hot SharedFlow that is started in the given coroutine scope, sharing emissions from a single running instance of the upstream flow with multiple downstream subscribers, and replaying a specified number of replay values to new subscribers. See the SharedFlow documentation for the general concepts of shared flows.

The starting of the sharing coroutine is controlled by the started parameter. The following options are supported.

The shareIn operator is useful in situations when there is a cold flow that is expensive to create and/or to maintain, but there are multiple subscribers that need to collect its values. For example, consider a flow of messages coming from a backend over the expensive network connection, taking a lot of time to establish. Conceptually, it might be implemented like this:

val backendMessages: Flow<Message> = flow {
    connectToBackend() // takes a lot of time
    try {
      while (true) {
          emit(receiveMessageFromBackend())
      }
    } finally {
        disconnectFromBackend()
    }
}

If this flow is directly used in the application, then every time it is collected a fresh connection is established, and it will take a while before messages start flowing. However, we can share a single connection and establish it eagerly like this:

val messages: SharedFlow<Message> = backendMessages.shareIn(scope, SharingStarted.Eagerly)

Now a single connection is shared between all collectors from messages, and there is a chance that the connection is already established by the time it is needed.

Upstream completion and error handling

Normal completion of the upstream flow has no effect on subscribers, and the sharing coroutine continues to run. If a a strategy like SharingStarted.WhileSubscribed is used, then the upstream can get restarted again. If a special action on upstream completion is needed, then an onCompletion operator can be used before the shareIn operator to emit a special value in this case, like this:

backendMessages
    .onCompletion { cause -> if (cause == null) emit(UpstreamHasCompletedMessage) }
    .shareIn(scope, SharingStarted.Eagerly)

Any exception in the upstream flow terminates the sharing coroutine without affecting any of the subscribers, and will be handled by the scope in which the sharing coroutine is launched. Custom exception handling can be configured by using the catch or retry operators before the shareIn operator. For example, to retry connection on any IOException with 1 second delay between attempts, use:

val messages = backendMessages
    .retry { e ->
        val shallRetry = e is IOException // other exception are bugs - handle them
        if (shallRetry) delay(1000)
        shallRetry
    }
    .shareIn(scope, SharingStarted.Eagerly)

Initial value

When a special initial value is needed to signal to subscribers that the upstream is still loading the data, use the onStart operator on the upstream flow. For example:

backendMessages
    .onStart { emit(UpstreamIsStartingMessage) }
    .shareIn(scope, SharingStarted.Eagerly, 1) // replay one most recent message

Buffering and conflation

The shareIn operator runs the upstream flow in a separate coroutine, and buffers emissions from upstream as explained in the buffer operator’s description, using a buffer of replay size or the default (whichever is larger). This default buffering can be overridden with an explicit buffer configuration by preceding the shareIn call with buffer or conflate, for example:

Operator fusion

Application of flowOn, buffer with RENDEZVOUS capacity, or cancellable operators to the resulting shared flow has no effect.

Exceptions

This function throws IllegalArgumentException on unsupported values of parameters or combinations thereof.

Parameters

scope - the coroutine scope in which sharing is started.

started - the strategy that controls when sharing is started and stopped.

replay - the number of values replayed to new subscribers (cannot be negative, defaults to zero).