xref: /aosp_15_r20/external/kotlinx.coroutines/kotlinx-coroutines-core/common/src/channels/Channel.kt (revision 7a7160fed73afa6648ef8aa100d4a336fe921d9a)
1 @file:Suppress("FunctionName")
2 
3 package kotlinx.coroutines.channels
4 
5 import kotlinx.coroutines.*
6 import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
7 import kotlinx.coroutines.channels.Channel.Factory.CHANNEL_DEFAULT_CAPACITY
8 import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
9 import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS
10 import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
11 import kotlinx.coroutines.internal.*
12 import kotlinx.coroutines.selects.*
13 import kotlin.contracts.*
14 import kotlin.internal.*
15 import kotlin.jvm.*
16 
17 /**
18  * Sender's interface to [Channel].
19  */
20 public interface SendChannel<in E> {
21     /**
22      * Returns `true` if this channel was closed by an invocation of [close] or its receiving side was [cancelled][ReceiveChannel.cancel].
23      * This means that calling [send] will result in an exception.
24      *
25      * Note that if this property returns `false`, it does not guarantee that consecutive call to [send] will succeed, as the
26      * channel can be concurrently closed right after the check. For such scenarios, it is recommended to use [trySend] instead.
27      *
28      * @see SendChannel.trySend
29      * @see SendChannel.close
30      * @see ReceiveChannel.cancel
31      */
32     @DelicateCoroutinesApi
33     public val isClosedForSend: Boolean
34 
35     /**
36      * Sends the specified [element] to this channel, suspending the caller while the buffer of this channel is full
37      * or if it does not exist, or throws an exception if the channel [is closed for `send`][isClosedForSend] (see [close] for details).
38      *
39      * [Closing][close] a channel _after_ this function has suspended does not cause this suspended [send] invocation
40      * to abort, because closing a channel is conceptually like sending a special "close token" over this channel.
41      * All elements sent over the channel are delivered in first-in first-out order. The sent element
42      * will be delivered to receivers before the close token.
43      *
44      * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this
45      * suspending function is waiting, this function immediately resumes with [CancellationException].
46      * There is a **prompt cancellation guarantee**: even if [send] managed to send the element, but was cancelled
47      * while suspended, [CancellationException] will be thrown. See [suspendCancellableCoroutine] for low-level details.
48      *
49      * Because of the prompt cancellation guarantee, an exception does not always mean a failure to deliver the element.
50      * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
51      *
52      * Note that this function does not check for cancellation when it is not suspended.
53      * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
54      *
55      * This function can be used in [select] invocations with the [onSend] clause.
56      * Use [trySend] to try sending to this channel without waiting.
57      */
sendnull58     public suspend fun send(element: E)
59 
60     /**
61      * Clause for the [select] expression of the [send] suspending function that selects when the element that is specified
62      * as the parameter is sent to the channel. When the clause is selected, the reference to this channel
63      * is passed into the corresponding block.
64      *
65      * The [select] invocation fails with an exception if the channel [is closed for `send`][isClosedForSend] (see [close] for details).
66      */
67     public val onSend: SelectClause2<E, SendChannel<E>>
68 
69     /**
70      * Immediately adds the specified [element] to this channel, if this doesn't violate its capacity restrictions,
71      * and returns the successful result. Otherwise, returns failed or closed result.
72      * This is synchronous variant of [send], which backs off in situations when `send` suspends or throws.
73      *
74      * When `trySend` call returns a non-successful result, it guarantees that the element was not delivered to the consumer, and
75      * it does not call `onUndeliveredElement` that was installed for this channel.
76      * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
77      */
78     public fun trySend(element: E): ChannelResult<Unit>
79 
80     /**
81      * Closes this channel.
82      * This is an idempotent operation &mdash; subsequent invocations of this function have no effect and return `false`.
83      * Conceptually, it sends a special "close token" over this channel.
84      *
85      * Immediately after invocation of this function,
86      * [isClosedForSend] starts returning `true`. However, [isClosedForReceive][ReceiveChannel.isClosedForReceive]
87      * on the side of [ReceiveChannel] starts returning `true` only after all previously sent elements
88      * are received.
89      *
90      * A channel that was closed without a [cause] throws a [ClosedSendChannelException] on attempts to [send]
91      * and [ClosedReceiveChannelException] on attempts to [receive][ReceiveChannel.receive].
92      * A channel that was closed with non-null [cause] is called a _failed_ channel. Attempts to send or
93      * receive on a failed channel throw the specified [cause] exception.
94      */
95     public fun close(cause: Throwable? = null): Boolean
96 
97     /**
98      * Registers a [handler] which is synchronously invoked once the channel is [closed][close]
99      * or the receiving side of this channel is [cancelled][ReceiveChannel.cancel].
100      * Only one handler can be attached to a channel during its lifetime.
101      * The `handler` is invoked when [isClosedForSend] starts to return `true`.
102      * If the channel is closed already, the handler is invoked immediately.
103      *
104      * The meaning of `cause` that is passed to the handler:
105      * - `null` if the channel was closed normally without the corresponding argument.
106      * - Instance of [CancellationException] if the channel was cancelled normally without the corresponding argument.
107      * - The cause of `close` or `cancel` otherwise.
108      *
109      * ### Execution context and exception safety
110      *
111      * The [handler] is executed as part of the closing or cancelling operation, and only after the channel reaches its final state.
112      * This means that if the handler throws an exception or hangs, the channel will still be successfully closed or cancelled.
113      * Unhandled exceptions from [handler] are propagated to the closing or cancelling operation's caller.
114      *
115      * Example of usage:
116      * ```
117      * val events = Channel<Event>(UNLIMITED)
118      * callbackBasedApi.registerCallback { event ->
119      *   events.trySend(event)
120      *       .onClosed { /* channel is already closed, but the callback hasn't stopped yet */ }
121      * }
122      *
123      * val uiUpdater = uiScope.launch(Dispatchers.Main) {
124      *    events.consume { /* handle events */ }
125      * }
126      * // Stop the callback after the channel is closed or cancelled
127      * events.invokeOnClose { callbackBasedApi.stop() }
128      * ```
129      *
130      * **Stability note.** This function constitutes a stable API surface, with the only exception being
131      * that an [IllegalStateException] is thrown when multiple handlers are registered.
132      * This restriction could be lifted in the future.
133      *
134      * @throws UnsupportedOperationException if the underlying channel does not support [invokeOnClose].
135      * Implementation note: currently, [invokeOnClose] is unsupported only by Rx-like integrations
136      *
137      * @throws IllegalStateException if another handler was already registered
138      */
139     public fun invokeOnClose(handler: (cause: Throwable?) -> Unit)
140 
141     /**
142      * **Deprecated** offer method.
143      *
144      * This method was deprecated in the favour of [trySend].
145      * It has proven itself as the most error-prone method in Channel API:
146      *
147      * - `Boolean` return type creates the false sense of security, implying that `false`
148      *    is returned instead of throwing an exception.
149      * - It was used mostly from non-suspending APIs where CancellationException triggered
150      *   internal failures in the application (the most common source of bugs).
151      * - Due to signature and explicit `if (ch.offer(...))` checks it was easy to
152      *   oversee such error during code review.
153      * - Its name was not aligned with the rest of the API and tried to mimic Java's queue instead.
154      *
155      * **NB** Automatic migration provides best-effort for the user experience, but requires removal
156      * or adjusting of the code that relied on the exception handling.
157      * The complete replacement has a more verbose form:
158      * ```
159      * channel.trySend(element)
160      *     .onClosed { throw it ?: ClosedSendChannelException("Channel was closed normally") }
161      *     .isSuccess
162      * ```
163      *
164      * See https://github.com/Kotlin/kotlinx.coroutines/issues/974 for more context.
165      *
166      * @suppress **Deprecated**.
167      */
168     @Deprecated(
169         level = DeprecationLevel.ERROR,
170         message = "Deprecated in the favour of 'trySend' method",
171         replaceWith = ReplaceWith("trySend(element).isSuccess")
172     ) // Warning since 1.5.0, error since 1.6.0, not hidden until 1.8+ because API is quite widespread
173     public fun offer(element: E): Boolean {
174         val result = trySend(element)
175         if (result.isSuccess) return true
176         throw recoverStackTrace(result.exceptionOrNull() ?: return false)
177     }
178 }
179 
180 /**
181  * Receiver's interface to [Channel].
182  */
183 public interface ReceiveChannel<out E> {
184     /**
185      * Returns `true` if this channel was closed by invocation of [close][SendChannel.close] on the [SendChannel]
186      * side and all previously sent items were already received, or if the receiving side was [cancelled][ReceiveChannel.cancel].
187      *
188      * This means that calling [receive] will result in a [ClosedReceiveChannelException] or a corresponding cancellation cause.
189      * If the channel was closed because of an exception, it is considered closed, too, but is called a _failed_ channel.
190      * All suspending attempts to receive an element from a failed channel throw the original [close][SendChannel.close] cause exception.
191      *
192      * Note that if this property returns `false`, it does not guarantee that consecutive call to [receive] will succeed, as the
193      * channel can be concurrently closed right after the check. For such scenarios, it is recommended to use [receiveCatching] instead.
194      *
195      * @see ReceiveChannel.receiveCatching
196      * @see ReceiveChannel.cancel
197      * @see SendChannel.close
198      */
199     @DelicateCoroutinesApi
200     public val isClosedForReceive: Boolean
201 
202     /**
203      * Returns `true` if the channel is empty (contains no elements), which means that an attempt to [receive] will suspend.
204      * This function returns `false` if the channel [is closed for `receive`][isClosedForReceive].
205      */
206     @ExperimentalCoroutinesApi
207     public val isEmpty: Boolean
208 
209     /**
210      * Retrieves and removes an element from this channel if it's not empty, or suspends the caller while the channel is empty,
211      * or throws a [ClosedReceiveChannelException] if the channel [is closed for `receive`][isClosedForReceive].
212      * If the channel was closed because of an exception, it is called a _failed_ channel and this function
213      * will throw the original [close][SendChannel.close] cause exception.
214      *
215      * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled while this
216      * function is suspended, this function immediately resumes with a [CancellationException].
217      * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
218      * suspended, it will not resume successfully. The `receive` call can retrieve the element from the channel,
219      * but then throw [CancellationException], thus failing to deliver the element.
220      * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
221      *
222      * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this
223      * suspending function is waiting, this function immediately resumes with [CancellationException].
224      * There is a **prompt cancellation guarantee**: even if [receive] managed to retrieve the element from the channel,
225      * but was cancelled while suspended, [CancellationException] will be thrown.
226      * See [suspendCancellableCoroutine] for low-level details.
227      *
228      * Because of the prompt cancellation guarantee, some values retrieved from the channel can become lost.
229      * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
230      *
231      * Note that this function does not check for cancellation when it is not suspended.
232      * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
233      *
234      * This function can be used in [select] invocations with the [onReceive] clause.
235      * Use [tryReceive] to try receiving from this channel without waiting.
236      */
receivenull237     public suspend fun receive(): E
238 
239     /**
240      * Clause for the [select] expression of the [receive] suspending function that selects with the element
241      * received from the channel.
242      * The [select] invocation fails with an exception if the channel
243      * [is closed for `receive`][isClosedForReceive] (see [close][SendChannel.close] for details).
244      */
245     public val onReceive: SelectClause1<E>
246 
247     /**
248      * Retrieves and removes an element from this channel if it's not empty, or suspends the caller while this channel is empty.
249      * This method returns [ChannelResult] with the value of an element successfully retrieved from the channel
250      * or the close cause if the channel was closed. Closed cause may be `null` if the channel was closed normally.
251      * The result cannot be [failed][ChannelResult.isFailure] without being [closed][ChannelResult.isClosed].
252      *
253      * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this
254      * suspending function is waiting, this function immediately resumes with [CancellationException].
255      * There is a **prompt cancellation guarantee**: even if [receiveCatching] managed to retrieve the element from the
256      * channel, but was cancelled while suspended, [CancellationException] will be thrown.
257      * See [suspendCancellableCoroutine] for low-level details.
258      *
259      * Because of the prompt cancellation guarantee, some values retrieved from the channel can become lost.
260      * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
261      *
262      * Note that this function does not check for cancellation when it is not suspended.
263      * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
264      *
265      * This function can be used in [select] invocations with the [onReceiveCatching] clause.
266      * Use [tryReceive] to try receiving from this channel without waiting.
267      */
268     public suspend fun receiveCatching(): ChannelResult<E>
269 
270     /**
271      * Clause for the [select] expression of the [onReceiveCatching] suspending function that selects with the [ChannelResult] with a value
272      * that is received from the channel or with a close cause if the channel
273      * [is closed for `receive`][isClosedForReceive].
274      */
275     public val onReceiveCatching: SelectClause1<ChannelResult<E>>
276 
277     /**
278      * Retrieves and removes an element from this channel if it's not empty, returning a [successful][ChannelResult.success]
279      * result, returns [failed][ChannelResult.failed] result if the channel is empty, and [closed][ChannelResult.closed]
280      * result if the channel is closed.
281      */
282     public fun tryReceive(): ChannelResult<E>
283 
284     /**
285      * Returns a new iterator to receive elements from this channel using a `for` loop.
286      * Iteration completes normally when the channel [is closed for `receive`][isClosedForReceive] without a cause and
287      * throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
288      */
289     public operator fun iterator(): ChannelIterator<E>
290 
291     /**
292      * Cancels reception of remaining elements from this channel with an optional [cause].
293      * This function closes the channel and removes all buffered sent elements from it.
294      *
295      * A cause can be used to specify an error message or to provide other details on
296      * the cancellation reason for debugging purposes.
297      * If the cause is not specified, then an instance of [CancellationException] with a
298      * default message is created to [close][SendChannel.close] the channel.
299      *
300      * Immediately after invocation of this function [isClosedForReceive] and
301      * [isClosedForSend][SendChannel.isClosedForSend]
302      * on the side of [SendChannel] start returning `true`. Any attempt to send to or receive from this channel
303      * will lead to a [CancellationException].
304      */
305     public fun cancel(cause: CancellationException? = null)
306 
307     /**
308      * @suppress This method implements old version of JVM ABI. Use [cancel].
309      */
310     @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
311     public fun cancel(): Unit = cancel(null)
312 
313     /**
314      * @suppress This method has bad semantics when cause is not a [CancellationException]. Use [cancel].
315      */
316     @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
317     public fun cancel(cause: Throwable? = null): Boolean
318 
319     /**
320      * **Deprecated** poll method.
321      *
322      * This method was deprecated in the favour of [tryReceive].
323      * It has proven itself as error-prone method in Channel API:
324      *
325      * - Nullable return type creates the false sense of security, implying that `null`
326      *    is returned instead of throwing an exception.
327      * - It was used mostly from non-suspending APIs where CancellationException triggered
328      *   internal failures in the application (the most common source of bugs).
329      * - Its name was not aligned with the rest of the API and tried to mimic Java's queue instead.
330      *
331      * See https://github.com/Kotlin/kotlinx.coroutines/issues/974 for more context.
332      *
333      * ### Replacement note
334      *
335      * The replacement `tryReceive().getOrNull()` is a default that ignores all close exceptions and
336      * proceeds with `null`, while `poll` throws an exception if the channel was closed with an exception.
337      * Replacement with the very same 'poll' semantics is `tryReceive().onClosed { if (it != null) throw it }.getOrNull()`
338      *
339      * @suppress **Deprecated**.
340      */
341     @Deprecated(
342         level = DeprecationLevel.ERROR,
343         message = "Deprecated in the favour of 'tryReceive'. " +
344             "Please note that the provided replacement does not rethrow channel's close cause as 'poll' did, " +
345             "for the precise replacement please refer to the 'poll' documentation",
346         replaceWith = ReplaceWith("tryReceive().getOrNull()")
347     ) // Warning since 1.5.0, error since 1.6.0, not hidden until 1.8+ because API is quite widespread
348     public fun poll(): E? {
349         val result = tryReceive()
350         if (result.isSuccess) return result.getOrThrow()
351         throw recoverStackTrace(result.exceptionOrNull() ?: return null)
352     }
353 
354     /**
355      * This function was deprecated since 1.3.0 and is no longer recommended to use
356      * or to implement in subclasses.
357      *
358      * It had the following pitfalls:
359      * - Didn't allow to distinguish 'null' as "closed channel" from "null as a value"
360      * - Was throwing if the channel has failed even though its signature may suggest it returns 'null'
361      * - It didn't really belong to core channel API and can be exposed as an extension instead.
362      *
363      * ### Replacement note
364      *
365      * The replacement `receiveCatching().getOrNull()` is a safe default that ignores all close exceptions and
366      * proceeds with `null`, while `receiveOrNull` throws an exception if the channel was closed with an exception.
367      * Replacement with the very same `receiveOrNull` semantics is `receiveCatching().onClosed { if (it != null) throw it }.getOrNull()`.
368      *
369      * @suppress **Deprecated**
370      */
371     @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
372     @LowPriorityInOverloadResolution
373     @Deprecated(
374         message = "Deprecated in favor of 'receiveCatching'. " +
375             "Please note that the provided replacement does not rethrow channel's close cause as 'receiveOrNull' did, " +
376             "for the detailed replacement please refer to the 'receiveOrNull' documentation",
377         level = DeprecationLevel.ERROR,
378         replaceWith = ReplaceWith("receiveCatching().getOrNull()")
379     ) // Warning since 1.3.0, error in 1.5.0, cannot be hidden due to deprecated extensions
receiveOrNullnull380     public suspend fun receiveOrNull(): E? = receiveCatching().getOrNull()
381 
382     /**
383      * This function was deprecated since 1.3.0 and is no longer recommended to use
384      * or to implement in subclasses.
385      * See [receiveOrNull] documentation.
386      *
387      * @suppress **Deprecated**: in favor of onReceiveCatching extension.
388      */
389     @Suppress("DEPRECATION_ERROR")
390     @Deprecated(
391         message = "Deprecated in favor of onReceiveCatching extension",
392         level = DeprecationLevel.ERROR,
393         replaceWith = ReplaceWith("onReceiveCatching")
394     ) // Warning since 1.3.0, error in 1.5.0, will be hidden or removed in 1.7.0
395     public val onReceiveOrNull: SelectClause1<E?> get() = (this as BufferedChannel<E>).onReceiveOrNull
396 }
397 
398 /**
399  * A discriminated union of channel operation result.
400  * It encapsulates the successful or failed result of a channel operation or a failed operation to a closed channel with
401  * an optional cause.
402  *
403  * The successful result represents a successful operation with a value of type [T], for example,
404  * the result of [Channel.receiveCatching] operation or a successfully sent element as a result of [Channel.trySend].
405  *
406  * The failed result represents a failed operation attempt to a channel, but it doesn't necessarily indicate that the channel is failed.
407  * E.g. when the channel is full, [Channel.trySend] returns failed result, but the channel itself is not in the failed state.
408  *
409  * The closed result represents an operation attempt to a closed channel and also implies that the operation has failed.
410  * It is guaranteed that if the result is _closed_, then the target channel is either [closed for send][Channel.isClosedForSend]
411  * or is [closed for receive][Channel.isClosedForReceive] depending on whether the failed operation was sending or receiving.
412  */
413 @JvmInline
414 public value class ChannelResult<out T>
415 @PublishedApi internal constructor(@PublishedApi internal val holder: Any?) {
416     /**
417      * Returns `true` if this instance represents a successful
418      * operation outcome.
419      *
420      * In this case [isFailure] and [isClosed] return `false`.
421      */
422     public val isSuccess: Boolean get() = holder !is Failed
423 
424     /**
425      * Returns `true` if this instance represents unsuccessful operation.
426      *
427      * In this case [isSuccess] returns false, but it does not imply
428      * that the channel is failed or closed.
429      *
430      * Example of a failed operation without an exception and channel being closed
431      * is [Channel.trySend] attempt to a channel that is full.
432      */
433     public val isFailure: Boolean get() = holder is Failed
434 
435     /**
436      * Returns `true` if this instance represents unsuccessful operation
437      * to a closed or cancelled channel.
438      *
439      * In this case [isSuccess] returns `false`, [isFailure] returns `true`, but it does not imply
440      * that [exceptionOrNull] returns non-null value.
441      *
442      * It can happen if the channel was [closed][Channel.close] normally without an exception.
443      */
444     public val isClosed: Boolean get() = holder is Closed
445 
446     /**
447      * Returns the encapsulated value if this instance represents success or `null` if it represents failed result.
448      */
449     @Suppress("UNCHECKED_CAST")
450     public fun getOrNull(): T? = if (holder !is Failed) holder as T else null
451 
452     /**
453      *  Returns the encapsulated value if this instance represents success or throws an exception if it is closed or failed.
454      */
455     public fun getOrThrow(): T {
456         @Suppress("UNCHECKED_CAST")
457         if (holder !is Failed) return holder as T
458         if (holder is Closed && holder.cause != null) throw holder.cause
459         error("Trying to call 'getOrThrow' on a failed channel result: $holder")
460     }
461 
462     /**
463      * Returns the encapsulated exception if this instance represents failure or `null` if it is success
464      * or unsuccessful operation to closed channel.
465      */
466     public fun exceptionOrNull(): Throwable? = (holder as? Closed)?.cause
467 
468     internal open class Failed {
469         override fun toString(): String = "Failed"
470     }
471 
472     internal class Closed(@JvmField val cause: Throwable?): Failed() {
473         override fun equals(other: Any?): Boolean = other is Closed && cause == other.cause
474         override fun hashCode(): Int = cause.hashCode()
475         override fun toString(): String = "Closed($cause)"
476     }
477 
478     @InternalCoroutinesApi
479     public companion object {
480         private val failed = Failed()
481 
482         @InternalCoroutinesApi
483         public fun <E> success(value: E): ChannelResult<E> =
484             ChannelResult(value)
485 
486         @InternalCoroutinesApi
487         public fun <E> failure(): ChannelResult<E> =
488             ChannelResult(failed)
489 
490         @InternalCoroutinesApi
491         public fun <E> closed(cause: Throwable?): ChannelResult<E> =
492             ChannelResult(Closed(cause))
493     }
494 
495     public override fun toString(): String =
496         when (holder) {
497             is Closed -> holder.toString()
498             else -> "Value($holder)"
499         }
500 }
501 
502 /**
503  * Returns the encapsulated value if this instance represents [success][ChannelResult.isSuccess] or the
504  * result of [onFailure] function for the encapsulated [Throwable] exception if it is failed or closed
505  * result.
506  */
507 @OptIn(ExperimentalContracts::class)
getOrElsenull508 public inline fun <T> ChannelResult<T>.getOrElse(onFailure: (exception: Throwable?) -> T): T {
509     contract {
510         callsInPlace(onFailure, InvocationKind.AT_MOST_ONCE)
511     }
512     @Suppress("UNCHECKED_CAST")
513     return if (holder is ChannelResult.Failed) onFailure(exceptionOrNull()) else holder as T
514 }
515 
516 /**
517  * Performs the given [action] on the encapsulated value if this instance represents [success][ChannelResult.isSuccess].
518  * Returns the original `ChannelResult` unchanged.
519  */
520 @OptIn(ExperimentalContracts::class)
onSuccessnull521 public inline fun <T> ChannelResult<T>.onSuccess(action: (value: T) -> Unit): ChannelResult<T> {
522     contract {
523         callsInPlace(action, InvocationKind.AT_MOST_ONCE)
524     }
525     @Suppress("UNCHECKED_CAST")
526     if (holder !is ChannelResult.Failed) action(holder as T)
527     return this
528 }
529 
530 /**
531  * Performs the given [action] on the encapsulated [Throwable] exception if this instance represents [failure][ChannelResult.isFailure].
532  * The result of [ChannelResult.exceptionOrNull] is passed to the [action] parameter.
533  *
534  * Returns the original `ChannelResult` unchanged.
535  */
536 @OptIn(ExperimentalContracts::class)
onFailurenull537 public inline fun <T> ChannelResult<T>.onFailure(action: (exception: Throwable?) -> Unit): ChannelResult<T> {
538     contract {
539         callsInPlace(action, InvocationKind.AT_MOST_ONCE)
540     }
541     if (holder is ChannelResult.Failed) action(exceptionOrNull())
542     return this
543 }
544 
545 /**
546  * Performs the given [action] on the encapsulated [Throwable] exception if this instance represents [failure][ChannelResult.isFailure]
547  * due to channel being [closed][Channel.close].
548  * The result of [ChannelResult.exceptionOrNull] is passed to the [action] parameter.
549  * It is guaranteed that if action is invoked, then the channel is either [closed for send][Channel.isClosedForSend]
550  * or is [closed for receive][Channel.isClosedForReceive] depending on the failed operation.
551  *
552  * Returns the original `ChannelResult` unchanged.
553  */
554 @OptIn(ExperimentalContracts::class)
onClosednull555 public inline fun <T> ChannelResult<T>.onClosed(action: (exception: Throwable?) -> Unit): ChannelResult<T> {
556     contract {
557         callsInPlace(action, InvocationKind.AT_MOST_ONCE)
558     }
559     if (holder is ChannelResult.Closed) action(exceptionOrNull())
560     return this
561 }
562 
563 /**
564  * Iterator for [ReceiveChannel]. Instances of this interface are *not thread-safe* and shall not be used
565  * from concurrent coroutines.
566  */
567 public interface ChannelIterator<out E> {
568     /**
569      * Returns `true` if the channel has more elements, suspending the caller while this channel is empty,
570      * or returns `false` if the channel [is closed for `receive`][ReceiveChannel.isClosedForReceive] without a cause.
571      * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
572      *
573      * This function retrieves and removes an element from this channel for the subsequent invocation
574      * of [next].
575      *
576      * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this
577      * suspending function is waiting, this function immediately resumes with [CancellationException].
578      * There is a **prompt cancellation guarantee**: even if [hasNext] retrieves the element from the channel during
579      * its operation, but was cancelled while suspended, [CancellationException] will be thrown.
580      * See [suspendCancellableCoroutine] for low-level details.
581      *
582      * Because of the prompt cancellation guarantee, some values retrieved from the channel can become lost.
583      * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
584      *
585      * Note that this function does not check for cancellation when it is not suspended.
586      * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
587      */
hasNextnull588     public suspend operator fun hasNext(): Boolean
589 
590     @Deprecated(message = "Since 1.3.0, binary compatibility with versions <= 1.2.x", level = DeprecationLevel.HIDDEN)
591     @Suppress("INAPPLICABLE_JVM_NAME")
592     @JvmName("next")
593     public suspend fun next0(): E {
594         /*
595          * Before 1.3.0 the "next()" could have been used without invoking "hasNext" first and there were code samples
596          * demonstrating this behavior, so we preserve this logic for full binary backwards compatibility with previously
597          * compiled code.
598          */
599         if (!hasNext()) throw ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)
600         return next()
601     }
602 
603     /**
604      * Retrieves the element removed from the channel by a preceding call to [hasNext], or
605      * throws an [IllegalStateException] if [hasNext] was not invoked.
606      * This method should only be used in pair with [hasNext]:
607      * ```
608      * while (iterator.hasNext()) {
609      *     val element = iterator.next()
610      *     // ... handle element ...
611      * }
612      * ```
613      *
614      * This method throws a [ClosedReceiveChannelException] if the channel [is closed for `receive`][ReceiveChannel.isClosedForReceive] without a cause.
615      * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
616      */
nextnull617     public operator fun next(): E
618 }
619 
620 /**
621  * Channel is a non-blocking primitive for communication between a sender (via [SendChannel]) and a receiver (via [ReceiveChannel]).
622  * Conceptually, a channel is similar to Java's [BlockingQueue][java.util.concurrent.BlockingQueue],
623  * but it has suspending operations instead of blocking ones and can be [closed][SendChannel.close].
624  *
625  * ### Creating channels
626  *
627  * The `Channel(capacity)` factory function is used to create channels of different kinds depending on
628  * the value of the `capacity` integer:
629  *
630  * - When `capacity` is 0 &mdash; it creates a _rendezvous_ channel.
631  *   This channel does not have any buffer at all. An element is transferred from the sender
632  *   to the receiver only when [send] and [receive] invocations meet in time (rendezvous), so [send] suspends
633  *   until another coroutine invokes [receive], and [receive] suspends until another coroutine invokes [send].
634  *
635  * - When `capacity` is [Channel.UNLIMITED] &mdash; it creates a channel with effectively unlimited buffer.
636  *   This channel has a linked-list buffer of unlimited capacity (limited only by available memory).
637  *   [Sending][send] to this channel never suspends, and [trySend] always succeeds.
638  *
639  * - When `capacity` is [Channel.CONFLATED] &mdash; it creates a _conflated_ channel
640  *   This channel buffers at most one element and conflates all subsequent `send` and `trySend` invocations,
641  *   so that the receiver always gets the last element sent.
642  *   Back-to-back sent elements are conflated &mdash; only the last sent element is received,
643  *   while previously sent elements **are lost**.
644  *   [Sending][send] to this channel never suspends, and [trySend] always succeeds.
645  *
646  * - When `capacity` is positive but less than [UNLIMITED] &mdash; it creates an array-based channel with the specified capacity.
647  *   This channel has an array buffer of a fixed `capacity`.
648  *   [Sending][send] suspends only when the buffer is full, and [receiving][receive] suspends only when the buffer is empty.
649  *
650  * Buffered channels can be configured with an additional [`onBufferOverflow`][BufferOverflow] parameter. It controls the behaviour
651  * of the channel's [send][Channel.send] function on buffer overflow:
652  *
653  * - [SUSPEND][BufferOverflow.SUSPEND] &mdash; the default, suspend `send` on buffer overflow until there is
654  *   free space in the buffer.
655  * - [DROP_OLDEST][BufferOverflow.DROP_OLDEST] &mdash; do not suspend the `send`, add the latest value to the buffer,
656  *   drop the oldest one from the buffer.
657  *   A channel with `capacity = 1` and `onBufferOverflow = DROP_OLDEST` is a _conflated_ channel.
658  * - [DROP_LATEST][BufferOverflow.DROP_LATEST] &mdash; do not suspend the `send`, drop the value that is being sent,
659  *   keep the buffer contents intact.
660  *
661  * A non-default `onBufferOverflow` implicitly creates a channel with at least one buffered element and
662  * is ignored for a channel with unlimited buffer. It cannot be specified for `capacity = CONFLATED`, which
663  * is a shortcut by itself.
664  *
665  * ### Prompt cancellation guarantee
666  *
667  * All suspending functions with channels provide **prompt cancellation guarantee**.
668  * If the job was cancelled while send or receive function was suspended, it will not resume successfully, even if it
669  * already changed the channel's state, but throws a [CancellationException].
670  * With a single-threaded [dispatcher][CoroutineDispatcher] like [Dispatchers.Main], this gives a
671  * guarantee that the coroutine promptly reacts to the cancellation of its [Job] and does not resume its execution.
672  *
673  * > **Prompt cancellation guarantee** for channel operations was added since `kotlinx.coroutines` version `1.4.0`
674  * > and had replaced a channel-specific atomic-cancellation that was not consistent with other suspending functions.
675  * > The low-level mechanics of prompt cancellation are explained in [suspendCancellableCoroutine] function.
676  *
677  * ### Undelivered elements
678  *
679  * As a result of the prompt cancellation guarantee, when a closeable resource
680  * (like open file or a handle to another native resource) is transferred via a channel from one coroutine to another,
681  * it can fail to be delivered and will be lost if the receiving operation is cancelled in transit.
682  *
683  * A `Channel()` constructor function has an `onUndeliveredElement` optional parameter.
684  * When `onUndeliveredElement` parameter is set, the corresponding function is called once for each element
685  * that was sent to the channel with the call to the [send][SendChannel.send] function but failed to be delivered,
686  * which can happen in the following cases:
687  *
688  * - When [send][SendChannel.send] operation throws an exception because it was cancelled before it had a chance to actually
689  *   send the element or because the channel was [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel].
690  * - When [receive][ReceiveChannel.receive], [receiveOrNull][ReceiveChannel.receiveOrNull], or [hasNext][ChannelIterator.hasNext]
691  *   operation throws an exception when it had retrieved the element from the
692  *   channel but was cancelled before the code following the receive call resumed.
693  * - The channel was [cancelled][ReceiveChannel.cancel], in which case `onUndeliveredElement` is called on every
694  *   remaining element in the channel's buffer.
695  *
696  * Note, that `onUndeliveredElement` function is called synchronously in an arbitrary context. It should be fast, non-blocking,
697  * and should not throw exceptions. Any exception thrown by `onUndeliveredElement` is wrapped into an internal runtime
698  * exception which is either rethrown from the caller method or handed off to the exception handler in the current context
699  * (see [CoroutineExceptionHandler]) when one is available.
700  *
701  * A typical usage for `onUndeliveredElement` is to close a resource that is being transferred via the channel. The
702  * following code pattern guarantees that opened resources are closed even if producer, consumer, and/or channel
703  * are cancelled. Resources are never lost.
704  *
705  * ```
706  * // Create the channel with onUndeliveredElement block that closes a resource
707  * val channel = Channel<Resource>(capacity) { resource -> resource.close() }
708  *
709  * // Producer code
710  * val resourceToSend = openResource()
711  * channel.send(resourceToSend)
712  *
713  * // Consumer code
714  * val resourceReceived = channel.receive()
715  * try {
716  *     // work with received resource
717  * } finally {
718  *     resourceReceived.close()
719  * }
720  * ```
721  *
722  * > Note, that if you do any kind of work in between `openResource()` and `channel.send(...)`, then you should
723  * > ensure that resource gets closed in case this additional code fails.
724  */
725 public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
726     /**
727      * Constants for the channel factory function `Channel()`.
728      */
729     public companion object Factory {
730         /**
731          * Requests a channel with an unlimited capacity buffer in the `Channel(...)` factory function.
732          */
733         public const val UNLIMITED: Int = Int.MAX_VALUE
734 
735         /**
736          * Requests a rendezvous channel in the `Channel(...)` factory function &mdash; a channel that does not have a buffer.
737          */
738         public const val RENDEZVOUS: Int = 0
739 
740         /**
741          * Requests a conflated channel in the `Channel(...)` factory function. This is a shortcut to creating
742          * a channel with [`onBufferOverflow = DROP_OLDEST`][BufferOverflow.DROP_OLDEST].
743          */
744         public const val CONFLATED: Int = -1
745 
746         /**
747          * Requests a buffered channel with the default buffer capacity in the `Channel(...)` factory function.
748          * The default capacity for a channel that [suspends][BufferOverflow.SUSPEND] on overflow
749          * is 64 and can be overridden by setting [DEFAULT_BUFFER_PROPERTY_NAME] on JVM.
750          * For non-suspending channels, a buffer of capacity 1 is used.
751          */
752         public const val BUFFERED: Int = -2
753 
754         // only for internal use, cannot be used with Channel(...)
755         internal const val OPTIONAL_CHANNEL = -3
756 
757         /**
758          * Name of the property that defines the default channel capacity when
759          * [BUFFERED] is used as parameter in `Channel(...)` factory function.
760          */
761         public const val DEFAULT_BUFFER_PROPERTY_NAME: String = "kotlinx.coroutines.channels.defaultBuffer"
762 
763         internal val CHANNEL_DEFAULT_CAPACITY = systemProp(DEFAULT_BUFFER_PROPERTY_NAME,
764             64, 1, UNLIMITED - 1
765         )
766     }
767 }
768 
769 /**
770  * Creates a channel with the specified buffer capacity (or without a buffer by default).
771  * See [Channel] interface documentation for details.
772  *
773  * @param capacity either a positive channel capacity or one of the constants defined in [Channel.Factory].
774  * @param onBufferOverflow configures an action on buffer overflow (optional, defaults to
775  *   a [suspending][BufferOverflow.SUSPEND] attempt to [send][Channel.send] a value,
776  *   supported only when `capacity >= 0` or `capacity == Channel.BUFFERED`,
777  *   implicitly creates a channel with at least one buffered element).
778  * @param onUndeliveredElement an optional function that is called when element was sent but was not delivered to the consumer.
779  *   See "Undelivered elements" section in [Channel] documentation.
780  * @throws IllegalArgumentException when [capacity] < -2
781  */
Channelnull782 public fun <E> Channel(
783     capacity: Int = RENDEZVOUS,
784     onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
785     onUndeliveredElement: ((E) -> Unit)? = null
786 ): Channel<E> =
787     when (capacity) {
788         RENDEZVOUS -> {
789             if (onBufferOverflow == BufferOverflow.SUSPEND)
790                 BufferedChannel(RENDEZVOUS, onUndeliveredElement) // an efficient implementation of rendezvous channel
791             else
792                 ConflatedBufferedChannel(1, onBufferOverflow, onUndeliveredElement) // support buffer overflow with buffered channel
793         }
794         CONFLATED -> {
795             require(onBufferOverflow == BufferOverflow.SUSPEND) {
796                 "CONFLATED capacity cannot be used with non-default onBufferOverflow"
797             }
798             ConflatedBufferedChannel(1, BufferOverflow.DROP_OLDEST, onUndeliveredElement)
799         }
800         UNLIMITED -> BufferedChannel(UNLIMITED, onUndeliveredElement) // ignores onBufferOverflow: it has buffer, but it never overflows
801         BUFFERED -> { // uses default capacity with SUSPEND
802             if (onBufferOverflow == BufferOverflow.SUSPEND) BufferedChannel(CHANNEL_DEFAULT_CAPACITY, onUndeliveredElement)
803             else ConflatedBufferedChannel(1, onBufferOverflow, onUndeliveredElement)
804         }
805         else -> {
806             if (onBufferOverflow === BufferOverflow.SUSPEND) BufferedChannel(capacity, onUndeliveredElement)
807             else ConflatedBufferedChannel(capacity, onBufferOverflow, onUndeliveredElement)
808         }
809     }
810 
811 @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.4.0, binary compatibility with earlier versions")
Channelnull812 public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> = Channel(capacity)
813 
814 /**
815  * Indicates an attempt to [send][SendChannel.send] to a [isClosedForSend][SendChannel.isClosedForSend] channel
816  * that was closed without a cause. A _failed_ channel rethrows the original [close][SendChannel.close] cause
817  * exception on send attempts.
818  *
819  * This exception is a subclass of [IllegalStateException], because, conceptually, it is the sender's responsibility
820  * to close the channel and not try to send anything thereafter. Attempts to
821  * send to a closed channel indicate a logical error in the sender's code.
822  */
823 public class ClosedSendChannelException(message: String?) : IllegalStateException(message)
824 
825 /**
826  * Indicates an attempt to [receive][ReceiveChannel.receive] from a [isClosedForReceive][ReceiveChannel.isClosedForReceive]
827  * channel that was closed without a cause. A _failed_ channel rethrows the original [close][SendChannel.close] cause
828  * exception on receive attempts.
829  *
830  * This exception is a subclass of [NoSuchElementException] to be consistent with plain collections.
831  */
832 public class ClosedReceiveChannelException(message: String?) : NoSuchElementException(message)
833