xref: /aosp_15_r20/external/kotlinx.coroutines/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt (revision 7a7160fed73afa6648ef8aa100d4a336fe921d9a)

<lambda>null1 @file:JvmMultifileClass
2 @file:JvmName("FlowKt")
3 
4 package kotlinx.coroutines.flow
5 
6 import kotlinx.coroutines.*
7 import kotlinx.coroutines.internal.*
8 import kotlin.coroutines.*
9 import kotlin.jvm.*
10 import kotlinx.coroutines.flow.internal.unsafeFlow as flow
11 
12 /**
13  * Catches exceptions in the flow completion and calls a specified [action] with
14  * the caught exception. This operator is *transparent* to exceptions that occur
15  * in downstream flow and does not catch exceptions that are thrown to cancel the flow.
16  *
17  * For example:
18  *
19  * ```
20  * flow { emitData() }
21  *     .map { computeOne(it) }
22  *     .catch { ... } // catches exceptions in emitData and computeOne
23  *     .map { computeTwo(it) }
24  *     .collect { process(it) } // throws exceptions from process and computeTwo
25  * ```
26  *
27  * Conceptually, the action of `catch` operator is similar to wrapping the code of upstream flows with
28  * `try { ... } catch (e: Throwable) { action(e) }`.
29  *
30  * Any exception in the [action] code itself proceeds downstream where it can be
31  * caught by further `catch` operators if needed. If a particular exception does not need to be
32  * caught it can be rethrown from the action of `catch` operator. For example:
33  *
34  * ```
35  * flow.catch { e ->
36  *     if (e !is IOException) throw e // rethrow all but IOException
37  *     // e is IOException here
38  *     ...
39  * }
40  * ```
41  *
42  * The [action] code has [FlowCollector] as a receiver and can [emit][FlowCollector.emit] values downstream.
43  * For example, caught exception can be replaced with some wrapper value for errors:
44  *
45  * ```
46  * flow.catch { e -> emit(ErrorWrapperValue(e)) }
47  * ```
48  *
49  * The [action] can also use [emitAll] to fallback on some other flow in case of an error. However, to
50  * retry an original flow use [retryWhen] operator that can retry the flow multiple times without
51  * introducing ever-growing stack of suspending calls.
52  */
53 public fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(cause: Throwable) -> Unit): Flow<T> =
54     flow {
55         val exception = catchImpl(this)
56         if (exception != null) action(exception)
57     }
58 
59 /**
60  * Retries collection of the given flow up to [retries] times when an exception that matches the
61  * given [predicate] occurs in the upstream flow. This operator is *transparent* to exceptions that occur
62  * in downstream flow and does not retry on exceptions that are thrown to cancel the flow.
63  *
64  * See [catch] for details on how exceptions are caught in flows.
65  *
66  * The default value of [retries] parameter is [Long.MAX_VALUE]. This value effectively means to retry forever.
67  * This operator is a shorthand for the following code (see [retryWhen]). Note that `attempt` is checked first
68  * and [predicate] is not called when it reaches the given number of [retries]:
69  *
70  * ```
71  * retryWhen { cause, attempt -> attempt < retries && predicate(cause) }
72  * ```
73  *
74  * The [predicate] parameter is always true by default. The [predicate] is a suspending function,
75  * so it can be also used to introduce delay before retry, for example:
76  *
77  * ```
78  * flow.retry(3) { e ->
79  *     // retry on any IOException but also introduce delay if retrying
80  *     (e is IOException).also { if (it) delay(1000) }
81  * }
82  * ```
83  *
84  * @throws IllegalArgumentException when [retries] is not positive.
85  */
retrynull86 public fun <T> Flow<T>.retry(
87     retries: Long = Long.MAX_VALUE,
88     predicate: suspend (cause: Throwable) -> Boolean = { true }
89 ): Flow<T> {
<lambda>null90     require(retries > 0) { "Expected positive amount of retries, but had $retries" }
causenull91     return retryWhen { cause, attempt -> attempt < retries && predicate(cause) }
92 }
93 
94 /**
95  * Retries collection of the given flow when an exception occurs in the upstream flow and the
96  * [predicate] returns true. The predicate also receives an `attempt` number as parameter,
97  * starting from zero on the initial call. This operator is *transparent* to exceptions that occur
98  * in downstream flow and does not retry on exceptions that are thrown to cancel the flow.
99  *
100  * For example, the following call retries the flow forever if the error is caused by `IOException`, but
101  * stops after 3 retries on any other exception:
102  *
103  * ```
104  * flow.retryWhen { cause, attempt -> cause is IOException || attempt < 3 }
105  * ```
106  *
107  * To implement a simple retry logic with a limit on the number of retries use [retry] operator.
108  *
109  * Similarly to [catch] operator, the [predicate] code has [FlowCollector] as a receiver and can
110  * [emit][FlowCollector.emit] values downstream.
111  * The [predicate] is a suspending function, so it can be used to introduce delay before retry, for example:
112  *
113  * ```
114  * flow.retryWhen { cause, attempt ->
115  *     if (cause is IOException) {    // retry on IOException
116  *         emit(RetryWrapperValue(e))
117  *         delay(1000)                // delay for one second before retry
118  *         true
119  *     } else {                       // do not retry otherwise
120  *         false
121  *     }
122  * }
123  * ```
124  *
125  * See [catch] for more details.
126  */
retryWhennull127 public fun <T> Flow<T>.retryWhen(predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean): Flow<T> =
128     flow {
129         var attempt = 0L
130         var shallRetry: Boolean
131         do {
132             shallRetry = false
133             val cause = catchImpl(this)
134             if (cause != null) {
135                 if (predicate(cause, attempt)) {
136                     shallRetry = true
137                     attempt++
138                 } else {
139                     throw cause
140                 }
141             }
142         } while (shallRetry)
143     }
144 
145 // Return exception from upstream or null
146 @Suppress("NAME_SHADOWING")
catchImplnull147 internal suspend fun <T> Flow<T>.catchImpl(
148     collector: FlowCollector<T>
149 ): Throwable? {
150     var fromDownstream: Throwable? = null
151     try {
152         collect {
153             try {
154                 collector.emit(it)
155             } catch (e: Throwable) {
156                 fromDownstream = e
157                 throw e
158             }
159         }
160     } catch (e: Throwable) {
161         // Otherwise, smartcast is impossible
162         val fromDownstream = fromDownstream
163         /*
164          * First check ensures that we catch an original exception, not one rethrown by an operator.
165          * Seconds check ignores cancellation causes, they cannot be caught.
166          */
167         if (e.isSameExceptionAs(fromDownstream) || e.isCancellationCause(coroutineContext)) {
168             throw e // Rethrow exceptions from downstream and cancellation causes
169         } else {
170             /*
171              * The exception came from the upstream [semi-] independently.
172              * For pure failures, when the downstream functions normally, we handle the exception as intended.
173              * But if the downstream has failed prior to or concurrently
174              * with the upstream, we forcefully rethrow it, preserving the contextual information and ensuring  that it's not lost.
175              */
176             if (fromDownstream == null) {
177                 return e
178             }
179             /*
180              * We consider the upstream exception as the superseding one when both upstream and downstream
181              * fail, suppressing the downstream exception, and operating similarly to `finally` block with
182              * the useful addition of adding the original downstream exception to suppressed ones.
183              *
184              * That's important for the following scenarios:
185              * ```
186              * flow {
187              *     val resource = ...
188              *     try {
189              *         ... emit as well ...
190              *     } finally {
191              *          resource.close() // Throws in the shutdown sequence when 'collect' already has thrown an exception
192              *     }
193              * }.catch { } // or retry
194              * .collect { ... }
195              * ```
196              * when *the downstream* throws.
197              */
198             if (e is CancellationException) {
199                 fromDownstream.addSuppressed(e)
200                 throw fromDownstream
201             } else {
202                 e.addSuppressed(fromDownstream)
203                 throw e
204             }
205         }
206     }
207     return null
208 }
209 
isCancellationCausenull210 private fun Throwable.isCancellationCause(coroutineContext: CoroutineContext): Boolean {
211     val job = coroutineContext[Job]
212     if (job == null || !job.isCancelled) return false
213     return isSameExceptionAs(job.getCancellationException())
214 }
215 
isSameExceptionAsnull216 private fun Throwable.isSameExceptionAs(other: Throwable?): Boolean =
217     other != null && unwrap(other) == unwrap(this)
218 
219 
220