xref: /aosp_15_r20/external/kotlinx.coroutines/kotlinx-coroutines-core/common/src/flow/SharingStarted.kt (revision 7a7160fed73afa6648ef8aa100d4a336fe921d9a)

<lambda>null1 package kotlinx.coroutines.flow
2 
3 import kotlinx.coroutines.*
4 import kotlinx.coroutines.internal.IgnoreJreRequirement
5 import kotlin.time.*
6 
7 /**
8  * A command emitted by [SharingStarted] implementations to control the sharing coroutine in
9  * the [shareIn] and [stateIn] operators.
10  */
11 public enum class SharingCommand {
12     /**
13      * Starts sharing, launching collection of the upstream flow.
14      *
15      * Emitting this command again does not do anything. Emit [STOP] and then [START] to restart an
16      * upstream flow.
17      */
18     START,
19 
20     /**
21      * Stops sharing, cancelling collection of the upstream flow.
22      */
23     STOP,
24 
25     /**
26      * Stops sharing, cancelling collection of the upstream flow, and resets the [SharedFlow.replayCache]
27      * to its initial state.
28      * The [shareIn] operator calls [MutableSharedFlow.resetReplayCache];
29      * the [stateIn] operator resets the value to its original `initialValue`.
30      */
31     STOP_AND_RESET_REPLAY_CACHE
32 }
33 
34 /**
35  * A strategy for starting and stopping the sharing coroutine in [shareIn] and [stateIn] operators.
36  *
37  * This functional interface provides a set of built-in strategies: [Eagerly], [Lazily], [WhileSubscribed], and
38  * supports custom strategies by implementing this interface's [command] function.
39  *
40  * For example, it is possible to define a custom strategy that starts the upstream only when the number
41  * of subscribers exceeds the given `threshold` and make it an extension on [SharingStarted.Companion] so
42  * that it looks like a built-in strategy on the use-site:
43  *
44  * ```
45  * fun SharingStarted.Companion.WhileSubscribedAtLeast(threshold: Int) =
46  *     SharingStarted { subscriptionCount: StateFlow<Int> ->
47  *         subscriptionCount.map { if (it >= threshold) SharingCommand.START else SharingCommand.STOP }
48  *     }
49  * ```
50  *
51  * ### Commands
52  *
53  * The `SharingStarted` strategy works by emitting [commands][SharingCommand] that control upstream flow from its
54  * [`command`][command] flow implementation function. Back-to-back emissions of the same command have no effect.
55  * Only emission of a different command has effect:
56  *
57  * - [START][SharingCommand.START] &mdash; the upstream flow is started.
58  * - [STOP][SharingCommand.STOP] &mdash; the upstream flow is stopped.
59  * - [STOP_AND_RESET_REPLAY_CACHE][SharingCommand.STOP_AND_RESET_REPLAY_CACHE] &mdash;
60  *   the upstream flow is stopped and the [SharedFlow.replayCache] is reset to its initial state.
61  *   The [shareIn] operator calls [MutableSharedFlow.resetReplayCache];
62  *   the [stateIn] operator resets the value to its original `initialValue`.
63  *
64  * Initially, the upstream flow is stopped and is in the initial state, so the emission of additional
65  * [STOP][SharingCommand.STOP] and [STOP_AND_RESET_REPLAY_CACHE][SharingCommand.STOP_AND_RESET_REPLAY_CACHE] commands will
66  * have no effect.
67  *
68  * The completion of the `command` flow normally has no effect (the upstream flow keeps running if it was running).
69  * The failure of the `command` flow cancels the sharing coroutine and the upstream flow.
70  */
71 public fun interface SharingStarted {
72     public companion object {
73         /**
74          * Sharing is started immediately and never stops.
75          */
76         public val Eagerly: SharingStarted = StartedEagerly()
77 
78         /**
79          * Sharing is started when the first subscriber appears and never stops.
80          */
81         public val Lazily: SharingStarted = StartedLazily()
82 
83         /**
84          * Sharing is started when the first subscriber appears, immediately stops when the last
85          * subscriber disappears (by default), keeping the replay cache forever (by default).
86          *
87          * It has the following optional parameters:
88          *
89          * - [stopTimeoutMillis] &mdash; configures a delay (in milliseconds) between the disappearance of the last
90          *   subscriber and the stopping of the sharing coroutine. It defaults to zero (stop immediately).
91          * - [replayExpirationMillis] &mdash; configures a delay (in milliseconds) between the stopping of
92          *   the sharing coroutine and the resetting of the replay cache (which makes the cache empty for the [shareIn] operator
93          *   and resets the cached value to the original `initialValue` for the [stateIn] operator).
94          *   It defaults to `Long.MAX_VALUE` (keep replay cache forever, never reset buffer).
95          *   Use zero value to expire the cache immediately.
96          *
97          * This function throws [IllegalArgumentException] when either [stopTimeoutMillis] or [replayExpirationMillis]
98          * are negative.
99          */
100         @Suppress("FunctionName")
WhileSubscribednull101         public fun WhileSubscribed(
102             stopTimeoutMillis: Long = 0,
103             replayExpirationMillis: Long = Long.MAX_VALUE
104         ): SharingStarted =
105             StartedWhileSubscribed(stopTimeoutMillis, replayExpirationMillis)
106     }
107 
108     /**
109      * Transforms the [subscriptionCount][MutableSharedFlow.subscriptionCount] state of the shared flow into the
110      * flow of [commands][SharingCommand] that control the sharing coroutine. See the [SharingStarted] interface
111      * documentation for details.
112      */
113     public fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand>
114 }
115 
116 /**
117  * Sharing is started when the first subscriber appears, immediately stops when the last
118  * subscriber disappears (by default), keeping the replay cache forever (by default).
119  *
120  * It has the following optional parameters:
121  *
122  * - [stopTimeout] &mdash; configures a delay between the disappearance of the last
123  *   subscriber and the stopping of the sharing coroutine. It defaults to zero (stop immediately).
124  * - [replayExpiration] &mdash; configures a delay between the stopping of
125  *   the sharing coroutine and the resetting of the replay cache (which makes the cache empty for the [shareIn] operator
126  *   and resets the cached value to the original `initialValue` for the [stateIn] operator).
127  *   It defaults to [Duration.INFINITE] (keep replay cache forever, never reset buffer).
128  *   Use [Duration.ZERO] value to expire the cache immediately.
129  *
130  * This function throws [IllegalArgumentException] when either [stopTimeout] or [replayExpiration]
131  * are negative.
132  */
133 @Suppress("FunctionName")
134 public fun SharingStarted.Companion.WhileSubscribed(
135     stopTimeout: Duration = Duration.ZERO,
136     replayExpiration: Duration = Duration.INFINITE
137 ): SharingStarted =
138     StartedWhileSubscribed(stopTimeout.inWholeMilliseconds, replayExpiration.inWholeMilliseconds)
139 
140 // -------------------------------- implementation --------------------------------
141 
142 private class StartedEagerly : SharingStarted {
143     override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> =
144         flowOf(SharingCommand.START)
145     override fun toString(): String = "SharingStarted.Eagerly"
146 }
147 
148 private class StartedLazily : SharingStarted {
<lambda>null149     override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> = flow {
150         var started = false
151         subscriptionCount.collect { count ->
152             if (count > 0 && !started) {
153                 started = true
154                 emit(SharingCommand.START)
155             }
156         }
157     }
158 
toStringnull159     override fun toString(): String = "SharingStarted.Lazily"
160 }
161 
162 private class StartedWhileSubscribed(
163     private val stopTimeout: Long,
164     private val replayExpiration: Long
165 ) : SharingStarted {
166     init {
167         require(stopTimeout >= 0) { "stopTimeout($stopTimeout ms) cannot be negative" }
168         require(replayExpiration >= 0) { "replayExpiration($replayExpiration ms) cannot be negative" }
169     }
170 
171     override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> = subscriptionCount
172         .transformLatest { count ->
173             if (count > 0) {
174                 emit(SharingCommand.START)
175             } else {
176                 delay(stopTimeout)
177                 if (replayExpiration > 0) {
178                     emit(SharingCommand.STOP)
179                     delay(replayExpiration)
180                 }
181                 emit(SharingCommand.STOP_AND_RESET_REPLAY_CACHE)
182             }
183         }
184         .dropWhile { it != SharingCommand.START } // don't emit any STOP/RESET_BUFFER to start with, only START
185         .distinctUntilChanged() // just in case somebody forgets it, don't leak our multiple sending of START
186 
187     @OptIn(ExperimentalStdlibApi::class)
188     override fun toString(): String {
189         val params = buildList(2) {
190             if (stopTimeout > 0) add("stopTimeout=${stopTimeout}ms")
191             if (replayExpiration < Long.MAX_VALUE) add("replayExpiration=${replayExpiration}ms")
192         }
193         return "SharingStarted.WhileSubscribed(${params.joinToString()})"
194     }
195 
196     // equals & hashcode to facilitate testing, not documented in public contract
197     override fun equals(other: Any?): Boolean =
198         other is StartedWhileSubscribed &&
199             stopTimeout == other.stopTimeout &&
200             replayExpiration == other.replayExpiration
201 
202     @IgnoreJreRequirement // desugared hashcode implementation
203     override fun hashCode(): Int = stopTimeout.hashCode() * 31 + replayExpiration.hashCode()
204 }
205