Extensions for org.reactivestreams.Publisher

asFlow

fun <T : Any> Publisher<T>.asFlow(batchSize: Int): Flow<T>
fun <T : Any> Publisher<T>.asFlow(): Flow<T>

Transforms the given reactive Publisher into Flow. Use buffer operator on the resulting flow to specify the size of the backpressure. More precisely, it specifies the value of the subscription’s request. buffer default capacity is used by default.

asFlowDeprecated

fun <T : Any> Publisher<T>.asFlowDeprecated(): Flow<T>

awaitFirst

suspend fun <T> Publisher<T>.awaitFirst(): T

Awaits for the first value from the given publisher without blocking a thread and returns the resulting value or throws the corresponding exception if this publisher had produced error.

awaitFirstOrDefault

suspend fun <T> Publisher<T>.awaitFirstOrDefault(
    default: T
): T

Awaits for the first value from the given observable or the default value if none is emitted without blocking a thread and returns the resulting value or throws the corresponding exception if this observable had produced error.

awaitFirstOrElse

suspend fun <T> Publisher<T>.awaitFirstOrElse(
    defaultValue: () -> T
): T

Awaits for the first value from the given observable or call defaultValue to get a value if none is emitted without blocking a thread and returns the resulting value or throws the corresponding exception if this observable had produced error.

awaitFirstOrNull

suspend fun <T> Publisher<T>.awaitFirstOrNull(): T?

Awaits for the first value from the given observable or null value if none is emitted without blocking a thread and returns the resulting value or throws the corresponding exception if this observable had produced error.

awaitLast

suspend fun <T> Publisher<T>.awaitLast(): T

Awaits for the last value from the given publisher without blocking a thread and returns the resulting value or throws the corresponding exception if this publisher had produced error.

awaitSingle

suspend fun <T> Publisher<T>.awaitSingle(): T

Awaits for the single value from the given publisher without blocking a thread and returns the resulting value or throws the corresponding exception if this publisher had produced error.

collect

suspend fun <T> Publisher<T>.collect(
    action: (T) -> Unit
): Unit

Subscribes to this Publisher and performs the specified action for each received element. Cancels subscription if any exception happens during collect.

consumeEach

suspend fun <T> Publisher<T>.consumeEach(
    action: (T) -> Unit
): Unit

openSubscription

fun <T> Publisher<T>.openSubscription(
    request: Int = 1
): ReceiveChannel<T>

Subscribes to this Publisher and returns a channel to receive elements emitted by it. The resulting channel shall be cancelled to unsubscribe from this publisher.