Channel

interface Channel<E> : SendChannel<E>, ReceiveChannel<E> (source)

Channel is a non-blocking primitive for communication between a sender (via SendChannel) and a receiver (via ReceiveChannel). Conceptually, a channel is similar to Java’s BlockingQueue, but it has suspending operations instead of blocking ones and can be closed.

Creating channels

The Channel(capacity) factory function is used to create channels of different kinds depending on the value of the capacity integer:

Buffered channels can be configured with an additional onBufferOverflow parameter. It controls the behaviour of the channel’s send function on buffer overflow:

A non-default onBufferOverflow implicitly creates a channel with at least one buffered element and is ignored for a channel with unlimited buffer. It cannot be specified for capacity = CONFLATED, which is a shortcut by itself.

Prompt cancellation guarantee

All suspending functions with channels provide prompt cancellation guarantee. If the job was cancelled while send or receive function was suspended, it will not resume successfully, but throws a CancellationException. With a single-threaded dispatcher like Dispatchers.Main this gives a guarantee that if a piece code running in this thread cancels a Job, then a coroutine running this job cannot resume successfully and continue to run, ensuring a prompt response to its cancellation.

Prompt cancellation guarantee for channel operations was added since kotlinx.coroutines version 1.4.0 and had replaced a channel-specific atomic-cancellation that was not consistent with other suspending functions. The low-level mechanics of prompt cancellation are explained in suspendCancellableCoroutine function.

Undelivered elements

As a result of a prompt cancellation guarantee, when a closeable resource (like open file or a handle to another native resource) is transferred via channel from one coroutine to another it can fail to be delivered and will be lost if either send or receive operations are cancelled in transit.

A Channel() constructor function has an onUndeliveredElement optional parameter. When onUndeliveredElement parameter is set, the corresponding function is called once for each element that was sent to the channel with the call to the send function but failed to be delivered, which can happen in the following cases:

Note, that onUndeliveredElement function is called synchronously in an arbitrary context. It should be fast, non-blocking, and should not throw exceptions. Any exception thrown by onUndeliveredElement is wrapped into an internal runtime exception which is either rethrown from the caller method or handed off to the exception handler in the current context (see CoroutineExceptionHandler) when one is available.

A typical usage for onDeliveredElement is to close a resource that is being transferred via the channel. The following code pattern guarantees that opened resources are closed even if producer, consumer, and/or channel are cancelled. Resources are never lost.

// Create the channel with onUndeliveredElement block that closes a resource
val channel = Channel<Resource>(capacity) { resource -> resource.close() }

// Producer code
val resourceToSend = openResource()
channel.send(resourceToSend)

// Consumer code
val resourceReceived = channel.receive()
try {
    // work with received resource
} finally {
    resourceReceived.close()
}

Note, that if you do any kind of work in between openResource() and channel.send(...), then you should ensure that resource gets closed in case this additional code fails.

Inherited Properties

isClosedForReceive

abstract val isClosedForReceive: Boolean

Returns true if this channel was closed by invocation of close on the SendChannel side and all previously sent items were already received. This means that calling receive will result in a ClosedReceiveChannelException. If the channel was closed because of an exception, it is considered closed, too, but is called a failed channel. All suspending attempts to receive an element from a failed channel throw the original close cause exception.

isClosedForSend

abstract val isClosedForSend: Boolean

Returns true if this channel was closed by an invocation of close. This means that calling send or offer will result in an exception.

isEmpty

abstract val isEmpty: Boolean

Returns true if the channel is empty (contains no elements), which means that an attempt to receive will suspend. This function returns false if the channel is closed for receive.

onReceive

abstract val onReceive: SelectClause1<E>

Clause for the select expression of the receive suspending function that selects with the element received from the channel. The select invocation fails with an exception if the channel is closed for receive (see close for details).

onSend

abstract val onSend: SelectClause2<E, SendChannel<E>>

Clause for the select expression of the send suspending function that selects when the element that is specified as the parameter is sent to the channel. When the clause is selected, the reference to this channel is passed into the corresponding block.

Inherited Functions

cancel

abstract fun cancel(
    cause: CancellationException? = null
): Unit

Cancels reception of remaining elements from this channel with an optional cause. This function closes the channel and removes all buffered sent elements from it.

close

abstract fun close(cause: Throwable? = null): Boolean

Closes this channel. This is an idempotent operation — subsequent invocations of this function have no effect and return false. Conceptually, its sends a special “close token” over this channel.

invokeOnClose

abstract fun invokeOnClose(
    handler: (cause: Throwable?) -> Unit
): Unit

Registers a handler which is synchronously invoked once the channel is closed or the receiving side of this channel is cancelled. Only one handler can be attached to a channel during its lifetime. The handler is invoked when isClosedForSend starts to return true. If the channel is closed already, the handler is invoked immediately.

iterator

abstract operator fun iterator(): ChannelIterator<E>

Returns a new iterator to receive elements from this channel using a for loop. Iteration completes normally when the channel is closed for receive without a cause and throws the original close cause exception if the channel has failed.

offer

abstract fun offer(element: E): Boolean

Immediately adds the specified element to this channel, if this doesn’t violate its capacity restrictions, and returns true. Otherwise, just returns false. This is a synchronous variant of send which backs off in situations when send suspends.

poll

abstract fun poll(): E?

Retrieves and removes an element from this channel if its not empty, or returns null if the channel is empty or is is closed for receive without a cause. It throws the original close cause exception if the channel has failed.

receive

abstract suspend fun receive(): E

Retrieves and removes an element from this channel if it’s not empty, or suspends the caller while the channel is empty, or throws a ClosedReceiveChannelException if the channel is closed for receive. If the channel was closed because of an exception, it is called a failed channel and this function will throw the original close cause exception.

send

abstract suspend fun send(element: E): Unit

Sends the specified element to this channel, suspending the caller while the buffer of this channel is full or if it does not exist, or throws an exception if the channel is closed for send (see close for details).

Companion Object Properties

BUFFERED

const val BUFFERED: Int

Requests a buffered channel with the default buffer capacity in the Channel(...) factory function. The default capacity for a channel that suspends on overflow is 64 and can be overridden by setting DEFAULT_BUFFER_PROPERTY_NAME on JVM. For non-suspending channels, a buffer of capacity 1 is used.

CONFLATED

const val CONFLATED: Int

Requests a conflated channel in the Channel(...) factory function. This is a shortcut to creating a channel with onBufferOverflow = DROP_OLDEST.

DEFAULT_BUFFER_PROPERTY_NAME

const val DEFAULT_BUFFER_PROPERTY_NAME: String

Name of the property that defines the default channel capacity when BUFFERED is used as parameter in Channel(...) factory function.

RENDEZVOUS

const val RENDEZVOUS: Int

Requests a rendezvous channel in the Channel(...) factory function — a channel that does not have a buffer.

UNLIMITED

const val UNLIMITED: Int

Requests a channel with an unlimited capacity buffer in the Channel(...) factory function.

Extension Functions

all

suspend fun <E> ReceiveChannel<E>.all(
    predicate: (E) -> Boolean
): Boolean

Returns true if all elements match the given predicate.

any

suspend fun <E> ReceiveChannel<E>.any(): Boolean

Returns true if channel has at least one element.

suspend fun <E> ReceiveChannel<E>.any(
    predicate: (E) -> Boolean
): Boolean

Returns true if at least one element matches the given predicate.

associate

suspend fun <E, K, V> ReceiveChannel<E>.associate(
    transform: (E) -> Pair<K, V>
): Map<K, V>

Returns a Map containing key-value pairs provided by transform function applied to elements of the given channel.

associateBy

suspend fun <E, K> ReceiveChannel<E>.associateBy(
    keySelector: (E) -> K
): Map<K, E>

Returns a Map containing the elements from the given channel indexed by the key returned from keySelector function applied to each element.

suspend fun <E, K, V> ReceiveChannel<E>.associateBy(
    keySelector: (E) -> K,
    valueTransform: (E) -> V
): Map<K, V>

Returns a Map containing the values provided by valueTransform and indexed by keySelector functions applied to elements of the given channel.

associateByTo

suspend fun <E, K, M : MutableMap<in K, in E>> ReceiveChannel<E>.associateByTo(
    destination: M,
    keySelector: (E) -> K
): M

Populates and returns the destination mutable map with key-value pairs, where key is provided by the keySelector function applied to each element of the given channel and value is the element itself.

suspend fun <E, K, V, M : MutableMap<in K, in V>> ReceiveChannel<E>.associateByTo(
    destination: M,
    keySelector: (E) -> K,
    valueTransform: (E) -> V
): M

Populates and returns the destination mutable map with key-value pairs, where key is provided by the keySelector function and and value is provided by the valueTransform function applied to elements of the given channel.

associateTo

suspend fun <E, K, V, M : MutableMap<in K, in V>> ReceiveChannel<E>.associateTo(
    destination: M,
    transform: (E) -> Pair<K, V>
): M

Populates and returns the destination mutable map with key-value pairs provided by transform function applied to each element of the given channel.

broadcast

fun <E> ReceiveChannel<E>.broadcast(
    capacity: Int = 1,
    start: CoroutineStart = CoroutineStart.LAZY
): BroadcastChannel<E>

Broadcasts all elements of the channel. This function consumes all elements of the original ReceiveChannel.

consume

fun <E, R> ReceiveChannel<E>.consume(
    block: ReceiveChannel<E>.() -> R
): R

Makes sure that the given block consumes all elements from the given channel by always invoking cancel after the execution of the block.

consumeAsFlow

fun <T> ReceiveChannel<T>.consumeAsFlow(): Flow<T>

Represents the given receive channel as a hot flow and consumes the channel on the first collection from this flow. The resulting flow can be collected just once and throws IllegalStateException when trying to collect it more than once.

consumeEach

suspend fun <E> ReceiveChannel<E>.consumeEach(
    action: (E) -> Unit
): Unit

Performs the given action for each received element and cancels the channel after the execution of the block. If you need to iterate over the channel without consuming it, a regular for loop should be used instead.

consumeEachIndexed

suspend fun <E> ReceiveChannel<E>.consumeEachIndexed(
    action: (IndexedValue<E>) -> Unit
): Unit

Performs the given action for each received element.

consumes

Returns a CompletionHandler that invokes cancel on the ReceiveChannel with the corresponding cause. See also ReceiveChannel.consume.

count

suspend fun <E> ReceiveChannel<E>.count(): Int

Returns the number of elements in this channel.

suspend fun <E> ReceiveChannel<E>.count(
    predicate: (E) -> Boolean
): Int

Returns the number of elements matching the given predicate.

distinct

fun <E> ReceiveChannel<E>.distinct(): ReceiveChannel<E>

Returns a channel containing only distinct elements from the given channel.

distinctBy

fun <E, K> ReceiveChannel<E>.distinctBy(
    context: CoroutineContext = Dispatchers.Unconfined,
    selector: suspend (E) -> K
): ReceiveChannel<E>

Returns a channel containing only elements from the given channel having distinct keys returned by the given selector function.

drop

fun <E> ReceiveChannel<E>.drop(
    n: Int,
    context: CoroutineContext = Dispatchers.Unconfined
): ReceiveChannel<E>

Returns a channel containing all elements except first n elements.

dropWhile

fun <E> ReceiveChannel<E>.dropWhile(
    context: CoroutineContext = Dispatchers.Unconfined,
    predicate: suspend (E) -> Boolean
): ReceiveChannel<E>

Returns a channel containing all elements except first elements that satisfy the given predicate.

elementAt

suspend fun <E> ReceiveChannel<E>.elementAt(index: Int): E

Returns an element at the given index or throws an IndexOutOfBoundsException if the index is out of bounds of this channel.

elementAtOrElse

suspend fun <E> ReceiveChannel<E>.elementAtOrElse(
    index: Int,
    defaultValue: (Int) -> E
): E

Returns an element at the given index or the result of calling the defaultValue function if the index is out of bounds of this channel.

elementAtOrNull

suspend fun <E> ReceiveChannel<E>.elementAtOrNull(
    index: Int
): E?

Returns an element at the given index or null if the index is out of bounds of this channel.

filter

fun <E> ReceiveChannel<E>.filter(
    context: CoroutineContext = Dispatchers.Unconfined,
    predicate: suspend (E) -> Boolean
): ReceiveChannel<E>

Returns a channel containing only elements matching the given predicate.

filterIndexed

fun <E> ReceiveChannel<E>.filterIndexed(
    context: CoroutineContext = Dispatchers.Unconfined,
    predicate: suspend (index: Int, E) -> Boolean
): ReceiveChannel<E>

Returns a channel containing only elements matching the given predicate.

filterIndexedTo

suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterIndexedTo(
    destination: C,
    predicate: (index: Int, E) -> Boolean
): C

Appends all elements matching the given predicate to the given destination.

filterNot

fun <E> ReceiveChannel<E>.filterNot(
    context: CoroutineContext = Dispatchers.Unconfined,
    predicate: suspend (E) -> Boolean
): ReceiveChannel<E>

Returns a channel containing all elements not matching the given predicate.

filterNotNull

fun <E : Any> ReceiveChannel<E?>.filterNotNull(): ReceiveChannel<E>

Returns a channel containing all elements that are not null.

filterNotNullTo

suspend fun <E : Any, C : MutableCollection<in E>> ReceiveChannel<E?>.filterNotNullTo(
    destination: C
): C

Appends all elements that are not null to the given destination.

filterNotTo

suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterNotTo(
    destination: C,
    predicate: (E) -> Boolean
): C

Appends all elements not matching the given predicate to the given destination.

filterTo

suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterTo(
    destination: C,
    predicate: (E) -> Boolean
): C

Appends all elements matching the given predicate to the given destination.

find

suspend fun <E> ReceiveChannel<E>.find(
    predicate: (E) -> Boolean
): E?

Returns the first element matching the given predicate, or null if no such element was found.

findLast

suspend fun <E> ReceiveChannel<E>.findLast(
    predicate: (E) -> Boolean
): E?

Returns the last element matching the given predicate, or null if no such element was found.

first

suspend fun <E> ReceiveChannel<E>.first(): E

Returns first element.

suspend fun <E> ReceiveChannel<E>.first(
    predicate: (E) -> Boolean
): E

Returns the first element matching the given predicate.

firstOrNull

suspend fun <E> ReceiveChannel<E>.firstOrNull(): E?

Returns the first element, or null if the channel is empty.

suspend fun <E> ReceiveChannel<E>.firstOrNull(
    predicate: (E) -> Boolean
): E?

Returns the first element matching the given predicate, or null if element was not found.

flatMap

fun <E, R> ReceiveChannel<E>.flatMap(
    context: CoroutineContext = Dispatchers.Unconfined,
    transform: suspend (E) -> ReceiveChannel<R>
): ReceiveChannel<R>

Returns a single channel of all elements from results of transform function being invoked on each element of original channel.

fold

suspend fun <E, R> ReceiveChannel<E>.fold(
    initial: R,
    operation: (acc: R, E) -> R
): R

Accumulates value starting with initial value and applying operation from left to right to current accumulator value and each element.

foldIndexed

suspend fun <E, R> ReceiveChannel<E>.foldIndexed(
    initial: R,
    operation: (index: Int, acc: R, E) -> R
): R

Accumulates value starting with initial value and applying operation from left to right to current accumulator value and each element with its index in the original channel.

groupBy

suspend fun <E, K> ReceiveChannel<E>.groupBy(
    keySelector: (E) -> K
): Map<K, List<E>>

Groups elements of the original channel by the key returned by the given keySelector function applied to each element and returns a map where each group key is associated with a list of corresponding elements.

suspend fun <E, K, V> ReceiveChannel<E>.groupBy(
    keySelector: (E) -> K,
    valueTransform: (E) -> V
): Map<K, List<V>>

Groups values returned by the valueTransform function applied to each element of the original channel by the key returned by the given keySelector function applied to the element and returns a map where each group key is associated with a list of corresponding values.

groupByTo

suspend fun <E, K, M : MutableMap<in K, MutableList<E>>> ReceiveChannel<E>.groupByTo(
    destination: M,
    keySelector: (E) -> K
): M

Groups elements of the original channel by the key returned by the given keySelector function applied to each element and puts to the destination map each group key associated with a list of corresponding elements.

suspend fun <E, K, V, M : MutableMap<in K, MutableList<V>>> ReceiveChannel<E>.groupByTo(
    destination: M,
    keySelector: (E) -> K,
    valueTransform: (E) -> V
): M

Groups values returned by the valueTransform function applied to each element of the original channel by the key returned by the given keySelector function applied to the element and puts to the destination map each group key associated with a list of corresponding values.

indexOf

suspend fun <E> ReceiveChannel<E>.indexOf(element: E): Int

Returns first index of element, or -1 if the channel does not contain element.

indexOfFirst

suspend fun <E> ReceiveChannel<E>.indexOfFirst(
    predicate: (E) -> Boolean
): Int

Returns index of the first element matching the given predicate, or -1 if the channel does not contain such element.

indexOfLast

suspend fun <E> ReceiveChannel<E>.indexOfLast(
    predicate: (E) -> Boolean
): Int

Returns index of the last element matching the given predicate, or -1 if the channel does not contain such element.

last

suspend fun <E> ReceiveChannel<E>.last(): E

Returns the last element.

suspend fun <E> ReceiveChannel<E>.last(
    predicate: (E) -> Boolean
): E

Returns the last element matching the given predicate.

lastIndexOf

suspend fun <E> ReceiveChannel<E>.lastIndexOf(
    element: E
): Int

Returns last index of element, or -1 if the channel does not contain element.

lastOrNull

suspend fun <E> ReceiveChannel<E>.lastOrNull(): E?

Returns the last element, or null if the channel is empty.

suspend fun <E> ReceiveChannel<E>.lastOrNull(
    predicate: (E) -> Boolean
): E?

Returns the last element matching the given predicate, or null if no such element was found.

map

fun <E, R> ReceiveChannel<E>.map(
    context: CoroutineContext = Dispatchers.Unconfined,
    transform: suspend (E) -> R
): ReceiveChannel<R>

Returns a channel containing the results of applying the given transform function to each element in the original channel.

mapIndexed

fun <E, R> ReceiveChannel<E>.mapIndexed(
    context: CoroutineContext = Dispatchers.Unconfined,
    transform: suspend (index: Int, E) -> R
): ReceiveChannel<R>

Returns a channel containing the results of applying the given transform function to each element and its index in the original channel.

mapIndexedNotNull

fun <E, R : Any> ReceiveChannel<E>.mapIndexedNotNull(
    context: CoroutineContext = Dispatchers.Unconfined,
    transform: suspend (index: Int, E) -> R?
): ReceiveChannel<R>

Returns a channel containing only the non-null results of applying the given transform function to each element and its index in the original channel.

mapIndexedNotNullTo

suspend fun <E, R : Any, C : MutableCollection<in R>> ReceiveChannel<E>.mapIndexedNotNullTo(
    destination: C,
    transform: (index: Int, E) -> R?
): C

Applies the given transform function to each element and its index in the original channel and appends only the non-null results to the given destination.

mapIndexedTo

suspend fun <E, R, C : MutableCollection<in R>> ReceiveChannel<E>.mapIndexedTo(
    destination: C,
    transform: (index: Int, E) -> R
): C

Applies the given transform function to each element and its index in the original channel and appends the results to the given destination.

mapNotNull

fun <E, R : Any> ReceiveChannel<E>.mapNotNull(
    context: CoroutineContext = Dispatchers.Unconfined,
    transform: suspend (E) -> R?
): ReceiveChannel<R>

Returns a channel containing only the non-null results of applying the given transform function to each element in the original channel.

mapNotNullTo

suspend fun <E, R : Any, C : MutableCollection<in R>> ReceiveChannel<E>.mapNotNullTo(
    destination: C,
    transform: (E) -> R?
): C

Applies the given transform function to each element in the original channel and appends only the non-null results to the given destination.

mapTo

suspend fun <E, R, C : MutableCollection<in R>> ReceiveChannel<E>.mapTo(
    destination: C,
    transform: (E) -> R
): C

Applies the given transform function to each element of the original channel and appends the results to the given destination.

maxBy

suspend fun <E, R : Comparable<R>> ReceiveChannel<E>.maxBy(
    selector: (E) -> R
): E?

Returns the first element yielding the largest value of the given function or null if there are no elements.

maxWith

suspend fun <E> ReceiveChannel<E>.maxWith(
    comparator: <ERROR CLASS><in E>
): E?

Returns the first element having the largest value according to the provided comparator or null if there are no elements.

minBy

suspend fun <E, R : Comparable<R>> ReceiveChannel<E>.minBy(
    selector: (E) -> R
): E?

Returns the first element yielding the smallest value of the given function or null if there are no elements.

minWith

suspend fun <E> ReceiveChannel<E>.minWith(
    comparator: <ERROR CLASS><in E>
): E?

Returns the first element having the smallest value according to the provided comparator or null if there are no elements.

none

suspend fun <E> ReceiveChannel<E>.none(): Boolean

Returns true if the channel has no elements.

suspend fun <E> ReceiveChannel<E>.none(
    predicate: (E) -> Boolean
): Boolean

Returns true if no elements match the given predicate.

onReceiveOrNull

fun <E : Any> ReceiveChannel<E>.onReceiveOrNull(): SelectClause1<E?>

Clause for select expression of receiveOrNull suspending function that selects with the element that is received from the channel or selects with null if the channel isClosedForReceive without cause. The select invocation fails with the original close cause exception if the channel has failed.

partition

suspend fun <E> ReceiveChannel<E>.partition(
    predicate: (E) -> Boolean
): Pair<List<E>, List<E>>

Splits the original channel into pair of lists, where first list contains elements for which predicate yielded true, while second list contains elements for which predicate yielded false.

receiveAsFlow

fun <T> ReceiveChannel<T>.receiveAsFlow(): Flow<T>

Represents the given receive channel as a hot flow and receives from the channel in fan-out fashion every time this flow is collected. One element will be emitted to one collector only.

receiveOrNull

suspend fun <E : Any> ReceiveChannel<E>.receiveOrNull(): E?

Retrieves and removes the element from this channel suspending the caller while this channel isEmpty or returns null if the channel is closed.

reduce

suspend fun <S, E : S> ReceiveChannel<E>.reduce(
    operation: (acc: S, E) -> S
): S

Accumulates value starting with the first element and applying operation from left to right to current accumulator value and each element.

reduceIndexed

suspend fun <S, E : S> ReceiveChannel<E>.reduceIndexed(
    operation: (index: Int, acc: S, E) -> S
): S

Accumulates value starting with the first element and applying operation from left to right to current accumulator value and each element with its index in the original channel.

requireNoNulls

fun <E : Any> ReceiveChannel<E?>.requireNoNulls(): ReceiveChannel<E>

Returns an original collection containing all the non-null elements, throwing an IllegalArgumentException if there are any null elements.

single

suspend fun <E> ReceiveChannel<E>.single(): E

Returns the single element, or throws an exception if the channel is empty or has more than one element.

suspend fun <E> ReceiveChannel<E>.single(
    predicate: (E) -> Boolean
): E

Returns the single element matching the given predicate, or throws exception if there is no or more than one matching element.

singleOrNull

suspend fun <E> ReceiveChannel<E>.singleOrNull(): E?

Returns single element, or null if the channel is empty or has more than one element.

suspend fun <E> ReceiveChannel<E>.singleOrNull(
    predicate: (E) -> Boolean
): E?

Returns the single element matching the given predicate, or null if element was not found or more than one element was found.

sumBy

suspend fun <E> ReceiveChannel<E>.sumBy(
    selector: (E) -> Int
): Int

Returns the sum of all values produced by selector function applied to each element in the channel.

sumByDouble

suspend fun <E> ReceiveChannel<E>.sumByDouble(
    selector: (E) -> Double
): Double

Returns the sum of all values produced by selector function applied to each element in the channel.

take

fun <E> ReceiveChannel<E>.take(
    n: Int,
    context: CoroutineContext = Dispatchers.Unconfined
): ReceiveChannel<E>

Returns a channel containing first n elements.

takeWhile

fun <E> ReceiveChannel<E>.takeWhile(
    context: CoroutineContext = Dispatchers.Unconfined,
    predicate: suspend (E) -> Boolean
): ReceiveChannel<E>

Returns a channel containing first elements satisfying the given predicate.

toChannel

suspend fun <E, C : SendChannel<E>> ReceiveChannel<E>.toChannel(
    destination: C
): C

Send each element of the original channel and appends the results to the given destination.

toCollection

suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.toCollection(
    destination: C
): C

Appends all elements to the given destination collection.

toList

suspend fun <E> ReceiveChannel<E>.toList(): List<E>

Returns a List containing all elements.

toMutableList

suspend fun <E> ReceiveChannel<E>.toMutableList(): MutableList<E>

Returns a MutableList filled with all elements of this channel.

toMutableSet

suspend fun <E> ReceiveChannel<E>.toMutableSet(): MutableSet<E>

Returns a mutable set containing all distinct elements from the given channel.

toSet

suspend fun <E> ReceiveChannel<E>.toSet(): Set<E>

Returns a Set of all elements.

withIndex

fun <E> ReceiveChannel<E>.withIndex(
    context: CoroutineContext = Dispatchers.Unconfined
): ReceiveChannel<IndexedValue<E>>

Returns a channel of IndexedValue for each element of the original channel.

zip

infix fun <E, R> ReceiveChannel<E>.zip(
    other: ReceiveChannel<R>
): ReceiveChannel<Pair<E, R>>

Returns a channel of pairs built from elements of both channels with same indexes. Resulting channel has length of shortest input channel.

fun <E, R, V> ReceiveChannel<E>.zip(
    other: ReceiveChannel<R>,
    context: CoroutineContext = Dispatchers.Unconfined,
    transform: (a: E, b: R) -> V
): ReceiveChannel<V>

Returns a channel of values built from elements of both collections with same indexes using provided transform. Resulting channel has length of shortest input channels.