<lambda>null1 @file:JvmMultifileClass
2 @file:JvmName("FlowKt")
3
4 package kotlinx.coroutines.flow
5
6 import kotlinx.coroutines.*
7 import kotlinx.coroutines.internal.*
8 import kotlin.coroutines.*
9 import kotlin.jvm.*
10 import kotlinx.coroutines.flow.internal.unsafeFlow as flow
11
12 /**
13 * Catches exceptions in the flow completion and calls a specified [action] with
14 * the caught exception. This operator is *transparent* to exceptions that occur
15 * in downstream flow and does not catch exceptions that are thrown to cancel the flow.
16 *
17 * For example:
18 *
19 * ```
20 * flow { emitData() }
21 * .map { computeOne(it) }
22 * .catch { ... } // catches exceptions in emitData and computeOne
23 * .map { computeTwo(it) }
24 * .collect { process(it) } // throws exceptions from process and computeTwo
25 * ```
26 *
27 * Conceptually, the action of `catch` operator is similar to wrapping the code of upstream flows with
28 * `try { ... } catch (e: Throwable) { action(e) }`.
29 *
30 * Any exception in the [action] code itself proceeds downstream where it can be
31 * caught by further `catch` operators if needed. If a particular exception does not need to be
32 * caught it can be rethrown from the action of `catch` operator. For example:
33 *
34 * ```
35 * flow.catch { e ->
36 * if (e !is IOException) throw e // rethrow all but IOException
37 * // e is IOException here
38 * ...
39 * }
40 * ```
41 *
42 * The [action] code has [FlowCollector] as a receiver and can [emit][FlowCollector.emit] values downstream.
43 * For example, caught exception can be replaced with some wrapper value for errors:
44 *
45 * ```
46 * flow.catch { e -> emit(ErrorWrapperValue(e)) }
47 * ```
48 *
49 * The [action] can also use [emitAll] to fallback on some other flow in case of an error. However, to
50 * retry an original flow use [retryWhen] operator that can retry the flow multiple times without
51 * introducing ever-growing stack of suspending calls.
52 */
53 public fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(cause: Throwable) -> Unit): Flow<T> =
54 flow {
55 val exception = catchImpl(this)
56 if (exception != null) action(exception)
57 }
58
59 /**
60 * Retries collection of the given flow up to [retries] times when an exception that matches the
61 * given [predicate] occurs in the upstream flow. This operator is *transparent* to exceptions that occur
62 * in downstream flow and does not retry on exceptions that are thrown to cancel the flow.
63 *
64 * See [catch] for details on how exceptions are caught in flows.
65 *
66 * The default value of [retries] parameter is [Long.MAX_VALUE]. This value effectively means to retry forever.
67 * This operator is a shorthand for the following code (see [retryWhen]). Note that `attempt` is checked first
68 * and [predicate] is not called when it reaches the given number of [retries]:
69 *
70 * ```
71 * retryWhen { cause, attempt -> attempt < retries && predicate(cause) }
72 * ```
73 *
74 * The [predicate] parameter is always true by default. The [predicate] is a suspending function,
75 * so it can be also used to introduce delay before retry, for example:
76 *
77 * ```
78 * flow.retry(3) { e ->
79 * // retry on any IOException but also introduce delay if retrying
80 * (e is IOException).also { if (it) delay(1000) }
81 * }
82 * ```
83 *
84 * @throws IllegalArgumentException when [retries] is not positive.
85 */
retrynull86 public fun <T> Flow<T>.retry(
87 retries: Long = Long.MAX_VALUE,
88 predicate: suspend (cause: Throwable) -> Boolean = { true }
89 ): Flow<T> {
<lambda>null90 require(retries > 0) { "Expected positive amount of retries, but had $retries" }
causenull91 return retryWhen { cause, attempt -> attempt < retries && predicate(cause) }
92 }
93
94 /**
95 * Retries collection of the given flow when an exception occurs in the upstream flow and the
96 * [predicate] returns true. The predicate also receives an `attempt` number as parameter,
97 * starting from zero on the initial call. This operator is *transparent* to exceptions that occur
98 * in downstream flow and does not retry on exceptions that are thrown to cancel the flow.
99 *
100 * For example, the following call retries the flow forever if the error is caused by `IOException`, but
101 * stops after 3 retries on any other exception:
102 *
103 * ```
104 * flow.retryWhen { cause, attempt -> cause is IOException || attempt < 3 }
105 * ```
106 *
107 * To implement a simple retry logic with a limit on the number of retries use [retry] operator.
108 *
109 * Similarly to [catch] operator, the [predicate] code has [FlowCollector] as a receiver and can
110 * [emit][FlowCollector.emit] values downstream.
111 * The [predicate] is a suspending function, so it can be used to introduce delay before retry, for example:
112 *
113 * ```
114 * flow.retryWhen { cause, attempt ->
115 * if (cause is IOException) { // retry on IOException
116 * emit(RetryWrapperValue(e))
117 * delay(1000) // delay for one second before retry
118 * true
119 * } else { // do not retry otherwise
120 * false
121 * }
122 * }
123 * ```
124 *
125 * See [catch] for more details.
126 */
retryWhennull127 public fun <T> Flow<T>.retryWhen(predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean): Flow<T> =
128 flow {
129 var attempt = 0L
130 var shallRetry: Boolean
131 do {
132 shallRetry = false
133 val cause = catchImpl(this)
134 if (cause != null) {
135 if (predicate(cause, attempt)) {
136 shallRetry = true
137 attempt++
138 } else {
139 throw cause
140 }
141 }
142 } while (shallRetry)
143 }
144
145 // Return exception from upstream or null
146 @Suppress("NAME_SHADOWING")
catchImplnull147 internal suspend fun <T> Flow<T>.catchImpl(
148 collector: FlowCollector<T>
149 ): Throwable? {
150 var fromDownstream: Throwable? = null
151 try {
152 collect {
153 try {
154 collector.emit(it)
155 } catch (e: Throwable) {
156 fromDownstream = e
157 throw e
158 }
159 }
160 } catch (e: Throwable) {
161 // Otherwise, smartcast is impossible
162 val fromDownstream = fromDownstream
163 /*
164 * First check ensures that we catch an original exception, not one rethrown by an operator.
165 * Seconds check ignores cancellation causes, they cannot be caught.
166 */
167 if (e.isSameExceptionAs(fromDownstream) || e.isCancellationCause(coroutineContext)) {
168 throw e // Rethrow exceptions from downstream and cancellation causes
169 } else {
170 /*
171 * The exception came from the upstream [semi-] independently.
172 * For pure failures, when the downstream functions normally, we handle the exception as intended.
173 * But if the downstream has failed prior to or concurrently
174 * with the upstream, we forcefully rethrow it, preserving the contextual information and ensuring that it's not lost.
175 */
176 if (fromDownstream == null) {
177 return e
178 }
179 /*
180 * We consider the upstream exception as the superseding one when both upstream and downstream
181 * fail, suppressing the downstream exception, and operating similarly to `finally` block with
182 * the useful addition of adding the original downstream exception to suppressed ones.
183 *
184 * That's important for the following scenarios:
185 * ```
186 * flow {
187 * val resource = ...
188 * try {
189 * ... emit as well ...
190 * } finally {
191 * resource.close() // Throws in the shutdown sequence when 'collect' already has thrown an exception
192 * }
193 * }.catch { } // or retry
194 * .collect { ... }
195 * ```
196 * when *the downstream* throws.
197 */
198 if (e is CancellationException) {
199 fromDownstream.addSuppressed(e)
200 throw fromDownstream
201 } else {
202 e.addSuppressed(fromDownstream)
203 throw e
204 }
205 }
206 }
207 return null
208 }
209
isCancellationCausenull210 private fun Throwable.isCancellationCause(coroutineContext: CoroutineContext): Boolean {
211 val job = coroutineContext[Job]
212 if (job == null || !job.isCancelled) return false
213 return isSameExceptionAs(job.getCancellationException())
214 }
215
isSameExceptionAsnull216 private fun Throwable.isSameExceptionAs(other: Throwable?): Boolean =
217 other != null && unwrap(other) == unwrap(this)
218
219
220