xref: /aosp_15_r20/external/kotlinx.coroutines/kotlinx-coroutines-core/jdk8/src/future/Future.kt (revision 7a7160fed73afa6648ef8aa100d4a336fe921d9a)

<lambda>null1 package kotlinx.coroutines.future
2 
3 import kotlinx.coroutines.*
4 import kotlinx.coroutines.CancellationException
5 import java.util.concurrent.*
6 import java.util.function.*
7 import kotlin.coroutines.*
8 
9 /**
10  * Starts a new coroutine and returns its result as an implementation of [CompletableFuture].
11  * The running coroutine is cancelled when the resulting future is cancelled or otherwise completed.
12  *
13  * The coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with the [context] argument.
14  * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
15  * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
16  * with corresponding [context] element.
17  *
18  * By default, the coroutine is immediately scheduled for execution.
19  * Other options can be specified via `start` parameter. See [CoroutineStart] for details.
20  * A value of [CoroutineStart.LAZY] is not supported
21  * (since `CompletableFuture` framework does not provide the corresponding capability) and
22  * produces [IllegalArgumentException].
23  *
24  * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
25  *
26  * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
27  * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
28  * @param block the coroutine code.
29  */
30 public fun <T> CoroutineScope.future(
31     context: CoroutineContext = EmptyCoroutineContext,
32     start: CoroutineStart = CoroutineStart.DEFAULT,
33     block: suspend CoroutineScope.() -> T
34 ) : CompletableFuture<T> {
35     require(!start.isLazy) { "$start start is not supported" }
36     val newContext = this.newCoroutineContext(context)
37     val future = CompletableFuture<T>()
38     val coroutine = CompletableFutureCoroutine(newContext, future)
39     future.handle(coroutine) // Cancel coroutine if future was completed externally
40     coroutine.start(start, coroutine, block)
41     return future
42 }
43 
44 private class CompletableFutureCoroutine<T>(
45     context: CoroutineContext,
46     private val future: CompletableFuture<T>
47 ) : AbstractCoroutine<T>(context, initParentJob = true, active = true), BiFunction<T?, Throwable?, Unit> {
applynull48     override fun apply(value: T?, exception: Throwable?) {
49         cancel()
50     }
51 
onCompletednull52     override fun onCompleted(value: T) {
53         future.complete(value)
54     }
55 
onCancellednull56     override fun onCancelled(cause: Throwable, handled: Boolean) {
57         /*
58          * Here we can potentially lose the cause if the failure is racing with future's
59          * external cancellation. We are consistent with other future implementations
60          * (LF, FT, CF) and give up on such exception.
61          */
62         future.completeExceptionally(cause)
63     }
64 }
65 
66 /**
67  * Converts this deferred value to the instance of [CompletableFuture].
68  * The deferred value is cancelled when the resulting future is cancelled or otherwise completed.
69  */
asCompletableFuturenull70 public fun <T> Deferred<T>.asCompletableFuture(): CompletableFuture<T> {
71     val future = CompletableFuture<T>()
72     setupCancellation(future)
73     invokeOnCompletion {
74         try {
75             future.complete(getCompleted())
76         } catch (t: Throwable) {
77             future.completeExceptionally(t)
78         }
79     }
80     return future
81 }
82 
83 /**
84  * Converts this job to the instance of [CompletableFuture].
85  * The job is cancelled when the resulting future is cancelled or otherwise completed.
86  */
asCompletableFuturenull87 public fun Job.asCompletableFuture(): CompletableFuture<Unit> {
88     val future = CompletableFuture<Unit>()
89     setupCancellation(future)
90     invokeOnCompletion { cause ->
91         if (cause === null) future.complete(Unit)
92         else future.completeExceptionally(cause)
93     }
94     return future
95 }
96 
setupCancellationnull97 private fun Job.setupCancellation(future: CompletableFuture<*>) {
98     future.handle { _, exception ->
99         cancel(exception?.let {
100             it as? CancellationException ?: CancellationException("CompletableFuture was completed exceptionally", it)
101         })
102     }
103 }
104 
105 /**
106  * Converts this [CompletionStage] to an instance of [Deferred].
107  *
108  * The [CompletableFuture] that corresponds to this [CompletionStage] (see [CompletionStage.toCompletableFuture])
109  * is cancelled when the resulting deferred is cancelled.
110  */
111 @Suppress("DeferredIsResult")
asDeferrednull112 public fun <T> CompletionStage<T>.asDeferred(): Deferred<T> {
113     val future = toCompletableFuture() // retrieve the future
114     // Fast path if already completed
115     if (future.isDone) {
116         return try {
117             @Suppress("UNCHECKED_CAST")
118             CompletableDeferred(future.get() as T)
119         } catch (e: Throwable) {
120             // unwrap original cause from ExecutionException
121             val original = (e as? ExecutionException)?.cause ?: e
122             CompletableDeferred<T>().also { it.completeExceptionally(original) }
123         }
124     }
125     val result = CompletableDeferred<T>()
126     handle { value, exception ->
127         try {
128             if (exception == null) {
129                 // the future has completed normally
130                 result.complete(value)
131             } else {
132                 // the future has completed with an exception, unwrap it consistently with fast path
133                 // Note: In the fast-path the implementation of CompletableFuture.get() does unwrapping
134                 result.completeExceptionally((exception as? CompletionException)?.cause ?: exception)
135             }
136         } catch (e: Throwable) {
137             // We come here iff the internals of Deferred threw an exception during its completion
138             handleCoroutineException(EmptyCoroutineContext, e)
139         }
140     }
141     result.cancelFutureOnCompletion(future)
142     return result
143 }
144 
145 /**
146  * Awaits for completion of [CompletionStage] without blocking a thread.
147  *
148  * This suspending function is cancellable.
149  * If the [Job] of the current coroutine is cancelled while this suspending function is waiting, this function
150  * stops waiting for the completion stage and immediately resumes with [CancellationException][kotlinx.coroutines.CancellationException].
151  *
152  * This method is intended to be used with one-shot futures, so on coroutine cancellation the [CompletableFuture] that
153  * corresponds to this [CompletionStage] (see [CompletionStage.toCompletableFuture])
154  * is cancelled. If cancelling the given stage is undesired, `stage.asDeferred().await()` should be used instead.
155  */
awaitnull156 public suspend fun <T> CompletionStage<T>.await(): T {
157     val future = toCompletableFuture() // retrieve the future
158     // fast path when CompletableFuture is already done (does not suspend)
159     if (future.isDone) {
160         try {
161             @Suppress("UNCHECKED_CAST", "BlockingMethodInNonBlockingContext")
162             return future.get() as T
163         } catch (e: ExecutionException) {
164             throw e.cause ?: e // unwrap original cause from ExecutionException
165         }
166     }
167     // slow path -- suspend
168     return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
169         val consumer = ContinuationHandler(cont)
170         handle(consumer)
171         cont.invokeOnCancellation {
172             future.cancel(false)
173             consumer.cont = null // shall clear reference to continuation to aid GC
174         }
175     }
176 }
177 
178 private class ContinuationHandler<T>(
179     @Volatile @JvmField var cont: Continuation<T>?
180 ) : BiFunction<T?, Throwable?, Unit> {
181     @Suppress("UNCHECKED_CAST")
applynull182     override fun apply(result: T?, exception: Throwable?) {
183         val cont = this.cont ?: return // atomically read current value unless null
184         if (exception == null) {
185             // the future has completed normally
186             cont.resume(result as T)
187         } else {
188             // the future has completed with an exception, unwrap it to provide consistent view of .await() result and to propagate only original exception
189             cont.resumeWithException((exception as? CompletionException)?.cause ?: exception)
190         }
191     }
192 }
193