<lambda>null1 package kotlinx.coroutines.flow
2
3 import kotlinx.coroutines.*
4 import kotlinx.coroutines.internal.IgnoreJreRequirement
5 import kotlin.time.*
6
7 /**
8 * A command emitted by [SharingStarted] implementations to control the sharing coroutine in
9 * the [shareIn] and [stateIn] operators.
10 */
11 public enum class SharingCommand {
12 /**
13 * Starts sharing, launching collection of the upstream flow.
14 *
15 * Emitting this command again does not do anything. Emit [STOP] and then [START] to restart an
16 * upstream flow.
17 */
18 START,
19
20 /**
21 * Stops sharing, cancelling collection of the upstream flow.
22 */
23 STOP,
24
25 /**
26 * Stops sharing, cancelling collection of the upstream flow, and resets the [SharedFlow.replayCache]
27 * to its initial state.
28 * The [shareIn] operator calls [MutableSharedFlow.resetReplayCache];
29 * the [stateIn] operator resets the value to its original `initialValue`.
30 */
31 STOP_AND_RESET_REPLAY_CACHE
32 }
33
34 /**
35 * A strategy for starting and stopping the sharing coroutine in [shareIn] and [stateIn] operators.
36 *
37 * This functional interface provides a set of built-in strategies: [Eagerly], [Lazily], [WhileSubscribed], and
38 * supports custom strategies by implementing this interface's [command] function.
39 *
40 * For example, it is possible to define a custom strategy that starts the upstream only when the number
41 * of subscribers exceeds the given `threshold` and make it an extension on [SharingStarted.Companion] so
42 * that it looks like a built-in strategy on the use-site:
43 *
44 * ```
45 * fun SharingStarted.Companion.WhileSubscribedAtLeast(threshold: Int) =
46 * SharingStarted { subscriptionCount: StateFlow<Int> ->
47 * subscriptionCount.map { if (it >= threshold) SharingCommand.START else SharingCommand.STOP }
48 * }
49 * ```
50 *
51 * ### Commands
52 *
53 * The `SharingStarted` strategy works by emitting [commands][SharingCommand] that control upstream flow from its
54 * [`command`][command] flow implementation function. Back-to-back emissions of the same command have no effect.
55 * Only emission of a different command has effect:
56 *
57 * - [START][SharingCommand.START] — the upstream flow is started.
58 * - [STOP][SharingCommand.STOP] — the upstream flow is stopped.
59 * - [STOP_AND_RESET_REPLAY_CACHE][SharingCommand.STOP_AND_RESET_REPLAY_CACHE] —
60 * the upstream flow is stopped and the [SharedFlow.replayCache] is reset to its initial state.
61 * The [shareIn] operator calls [MutableSharedFlow.resetReplayCache];
62 * the [stateIn] operator resets the value to its original `initialValue`.
63 *
64 * Initially, the upstream flow is stopped and is in the initial state, so the emission of additional
65 * [STOP][SharingCommand.STOP] and [STOP_AND_RESET_REPLAY_CACHE][SharingCommand.STOP_AND_RESET_REPLAY_CACHE] commands will
66 * have no effect.
67 *
68 * The completion of the `command` flow normally has no effect (the upstream flow keeps running if it was running).
69 * The failure of the `command` flow cancels the sharing coroutine and the upstream flow.
70 */
71 public fun interface SharingStarted {
72 public companion object {
73 /**
74 * Sharing is started immediately and never stops.
75 */
76 public val Eagerly: SharingStarted = StartedEagerly()
77
78 /**
79 * Sharing is started when the first subscriber appears and never stops.
80 */
81 public val Lazily: SharingStarted = StartedLazily()
82
83 /**
84 * Sharing is started when the first subscriber appears, immediately stops when the last
85 * subscriber disappears (by default), keeping the replay cache forever (by default).
86 *
87 * It has the following optional parameters:
88 *
89 * - [stopTimeoutMillis] — configures a delay (in milliseconds) between the disappearance of the last
90 * subscriber and the stopping of the sharing coroutine. It defaults to zero (stop immediately).
91 * - [replayExpirationMillis] — configures a delay (in milliseconds) between the stopping of
92 * the sharing coroutine and the resetting of the replay cache (which makes the cache empty for the [shareIn] operator
93 * and resets the cached value to the original `initialValue` for the [stateIn] operator).
94 * It defaults to `Long.MAX_VALUE` (keep replay cache forever, never reset buffer).
95 * Use zero value to expire the cache immediately.
96 *
97 * This function throws [IllegalArgumentException] when either [stopTimeoutMillis] or [replayExpirationMillis]
98 * are negative.
99 */
100 @Suppress("FunctionName")
WhileSubscribednull101 public fun WhileSubscribed(
102 stopTimeoutMillis: Long = 0,
103 replayExpirationMillis: Long = Long.MAX_VALUE
104 ): SharingStarted =
105 StartedWhileSubscribed(stopTimeoutMillis, replayExpirationMillis)
106 }
107
108 /**
109 * Transforms the [subscriptionCount][MutableSharedFlow.subscriptionCount] state of the shared flow into the
110 * flow of [commands][SharingCommand] that control the sharing coroutine. See the [SharingStarted] interface
111 * documentation for details.
112 */
113 public fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand>
114 }
115
116 /**
117 * Sharing is started when the first subscriber appears, immediately stops when the last
118 * subscriber disappears (by default), keeping the replay cache forever (by default).
119 *
120 * It has the following optional parameters:
121 *
122 * - [stopTimeout] — configures a delay between the disappearance of the last
123 * subscriber and the stopping of the sharing coroutine. It defaults to zero (stop immediately).
124 * - [replayExpiration] — configures a delay between the stopping of
125 * the sharing coroutine and the resetting of the replay cache (which makes the cache empty for the [shareIn] operator
126 * and resets the cached value to the original `initialValue` for the [stateIn] operator).
127 * It defaults to [Duration.INFINITE] (keep replay cache forever, never reset buffer).
128 * Use [Duration.ZERO] value to expire the cache immediately.
129 *
130 * This function throws [IllegalArgumentException] when either [stopTimeout] or [replayExpiration]
131 * are negative.
132 */
133 @Suppress("FunctionName")
134 public fun SharingStarted.Companion.WhileSubscribed(
135 stopTimeout: Duration = Duration.ZERO,
136 replayExpiration: Duration = Duration.INFINITE
137 ): SharingStarted =
138 StartedWhileSubscribed(stopTimeout.inWholeMilliseconds, replayExpiration.inWholeMilliseconds)
139
140 // -------------------------------- implementation --------------------------------
141
142 private class StartedEagerly : SharingStarted {
143 override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> =
144 flowOf(SharingCommand.START)
145 override fun toString(): String = "SharingStarted.Eagerly"
146 }
147
148 private class StartedLazily : SharingStarted {
<lambda>null149 override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> = flow {
150 var started = false
151 subscriptionCount.collect { count ->
152 if (count > 0 && !started) {
153 started = true
154 emit(SharingCommand.START)
155 }
156 }
157 }
158
toStringnull159 override fun toString(): String = "SharingStarted.Lazily"
160 }
161
162 private class StartedWhileSubscribed(
163 private val stopTimeout: Long,
164 private val replayExpiration: Long
165 ) : SharingStarted {
166 init {
167 require(stopTimeout >= 0) { "stopTimeout($stopTimeout ms) cannot be negative" }
168 require(replayExpiration >= 0) { "replayExpiration($replayExpiration ms) cannot be negative" }
169 }
170
171 override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> = subscriptionCount
172 .transformLatest { count ->
173 if (count > 0) {
174 emit(SharingCommand.START)
175 } else {
176 delay(stopTimeout)
177 if (replayExpiration > 0) {
178 emit(SharingCommand.STOP)
179 delay(replayExpiration)
180 }
181 emit(SharingCommand.STOP_AND_RESET_REPLAY_CACHE)
182 }
183 }
184 .dropWhile { it != SharingCommand.START } // don't emit any STOP/RESET_BUFFER to start with, only START
185 .distinctUntilChanged() // just in case somebody forgets it, don't leak our multiple sending of START
186
187 @OptIn(ExperimentalStdlibApi::class)
188 override fun toString(): String {
189 val params = buildList(2) {
190 if (stopTimeout > 0) add("stopTimeout=${stopTimeout}ms")
191 if (replayExpiration < Long.MAX_VALUE) add("replayExpiration=${replayExpiration}ms")
192 }
193 return "SharingStarted.WhileSubscribed(${params.joinToString()})"
194 }
195
196 // equals & hashcode to facilitate testing, not documented in public contract
197 override fun equals(other: Any?): Boolean =
198 other is StartedWhileSubscribed &&
199 stopTimeout == other.stopTimeout &&
200 replayExpiration == other.replayExpiration
201
202 @IgnoreJreRequirement // desugared hashcode implementation
203 override fun hashCode(): Int = stopTimeout.hashCode() * 31 + replayExpiration.hashCode()
204 }
205