kotlinx.rpc 0.2.1 Help

Features

Send and receive Flows

You can send and receive Kotlin Flows in RPC methods. That includes Flow, SharedFlow, and StateFlow, but NOT their mutable versions. Flows can be nested and can be included in other classes, like this:

@Serializable data class StreamResult { @Contextual val innerFlow: StateFlow<Int> } interface MyService : RPC { suspend fun sendStream(stream: Flow<Flow<Int>>): Flow<StreamResult> }

Note that flows that are declared in classes (like in StreamResult) require Contextual annotation.

To use flows in your code - you need to use special streamScoped function that will provide your flows with their lifetime:

interface MyService { suspend fun sendFlow(flow: Flow<Int>) } val myService = rpcClient.withService<MyService>() streamScoped { val flow = flow { repeat(10) { i -> emit(i) } } myService.sendFlow(flow) }

In that case all your flows, including incoming and outgoing, will work until the streamScoped function completes. After that, all streams that are still live will be closed.

You can have multiple RPC call and flows inside the streamScoped function, even from different services.

On server side, you can use invokeOnStreamScopeCompletion handler inside your methods to execute code after streamScoped on client side has closed. It might be useful to clean resources, for example:

override suspend fun hotFlow(): StateFlow<Int> { val state = MutableStateFlow(-1) incomingHotFlowJob = launch { repeat(Int.MAX_VALUE) { value -> state.value = value hotFlowMirror.first { it == value } } } invokeOnStreamScopeCompletion { incomingHotFlowJob.cancel() } return state }

Note that this API is experimental and may be removed in future releases.

Service fields

Our protocol provides you with an ability to declare service fields:

interface MyService : RPC { val plainFlow: Flow<Int> val sharedFlow: SharedFlow<Int> val stateFlow: StateFlow<Int> } // ### Server code ### class MyServiceImpl(override val coroutineContext: CoroutineContext) : MyService { override val plainFlow: Flow<Int> = flow { emit(1) } override val sharedFlow: SharedFlow<Int> = MutableSharedFlow(replay = 1) override val stateFlow: StateFlow<Int> = MutableStateFlow(value = 1) }

Field declarations are only supported for these three types: Flow, SharedFlow and StateFlow.

You don't need to use streamScoped function to work with streams in fields.

To learn more about the limitations of such declarations, see Field declarations in services.

Field declarations in services

Fields are supported in the in-house RPC protocol, but the support comes with its limitations. There always will be a considerable time gap between the initial access to a field and the moment information about this field arrives from a server. This makes it hard to provide good uniform API for all possible field types, so now will limit supported types to Flow, SharedFlow and StateFlow (excluding mutable versions of them). To work with these fields, you may use additional provided APIs:

Firstly, we define two possible states of a flow field: uninitialized and initialized. Before the first information about this flow has arrived from a server, the flow is in uninitialized state. In this state, if you access any of its fields (replayCache for SharedFlow and StateFlow, and value for StateFlow) you will get a UninitializedRPCFieldException. If you call a suspend collect method on them, execution will suspend until the state is initialized and then the actual collect method will be executed. The same ability to suspend execution until the state is initialized can be achieved by using awaitFieldInitialization function:

interface MyService : RPC { val flow: StateFlow<Int> } // ### Somewhere in client code ### val myService: MyService = rpcClient.withService<MyService>() val value = myService.flow.value // throws UninitializedRPCFieldException val value = myService.awaitFieldInitialization { flow }.value // OK // or val value = myService.awaitFieldInitialization().flow.value // OK // or val firstFive = myService.flow.take(5).toList() // OK

Secondly, we provide you with an instrument to make initialization faster. By default, all fields are lazy. By adding @RPCEagerField annotation, you can change this behavior, so that fields will be initialized when the service in created (when withService method is called):

interface MyService : RPC { val lazyFlow: Flow<Int> // initialized on first access @RPCEagerField val eagerFlow: Flow<Int> // initialized on service creation }
Last modified: 13 June 2024