<lambda>null1 package kotlinx.coroutines.future
2
3 import kotlinx.coroutines.*
4 import kotlinx.coroutines.CancellationException
5 import java.util.concurrent.*
6 import java.util.function.*
7 import kotlin.coroutines.*
8
9 /**
10 * Starts a new coroutine and returns its result as an implementation of [CompletableFuture].
11 * The running coroutine is cancelled when the resulting future is cancelled or otherwise completed.
12 *
13 * The coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with the [context] argument.
14 * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
15 * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
16 * with corresponding [context] element.
17 *
18 * By default, the coroutine is immediately scheduled for execution.
19 * Other options can be specified via `start` parameter. See [CoroutineStart] for details.
20 * A value of [CoroutineStart.LAZY] is not supported
21 * (since `CompletableFuture` framework does not provide the corresponding capability) and
22 * produces [IllegalArgumentException].
23 *
24 * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
25 *
26 * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
27 * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
28 * @param block the coroutine code.
29 */
30 public fun <T> CoroutineScope.future(
31 context: CoroutineContext = EmptyCoroutineContext,
32 start: CoroutineStart = CoroutineStart.DEFAULT,
33 block: suspend CoroutineScope.() -> T
34 ) : CompletableFuture<T> {
35 require(!start.isLazy) { "$start start is not supported" }
36 val newContext = this.newCoroutineContext(context)
37 val future = CompletableFuture<T>()
38 val coroutine = CompletableFutureCoroutine(newContext, future)
39 future.handle(coroutine) // Cancel coroutine if future was completed externally
40 coroutine.start(start, coroutine, block)
41 return future
42 }
43
44 private class CompletableFutureCoroutine<T>(
45 context: CoroutineContext,
46 private val future: CompletableFuture<T>
47 ) : AbstractCoroutine<T>(context, initParentJob = true, active = true), BiFunction<T?, Throwable?, Unit> {
applynull48 override fun apply(value: T?, exception: Throwable?) {
49 cancel()
50 }
51
onCompletednull52 override fun onCompleted(value: T) {
53 future.complete(value)
54 }
55
onCancellednull56 override fun onCancelled(cause: Throwable, handled: Boolean) {
57 /*
58 * Here we can potentially lose the cause if the failure is racing with future's
59 * external cancellation. We are consistent with other future implementations
60 * (LF, FT, CF) and give up on such exception.
61 */
62 future.completeExceptionally(cause)
63 }
64 }
65
66 /**
67 * Converts this deferred value to the instance of [CompletableFuture].
68 * The deferred value is cancelled when the resulting future is cancelled or otherwise completed.
69 */
asCompletableFuturenull70 public fun <T> Deferred<T>.asCompletableFuture(): CompletableFuture<T> {
71 val future = CompletableFuture<T>()
72 setupCancellation(future)
73 invokeOnCompletion {
74 try {
75 future.complete(getCompleted())
76 } catch (t: Throwable) {
77 future.completeExceptionally(t)
78 }
79 }
80 return future
81 }
82
83 /**
84 * Converts this job to the instance of [CompletableFuture].
85 * The job is cancelled when the resulting future is cancelled or otherwise completed.
86 */
asCompletableFuturenull87 public fun Job.asCompletableFuture(): CompletableFuture<Unit> {
88 val future = CompletableFuture<Unit>()
89 setupCancellation(future)
90 invokeOnCompletion { cause ->
91 if (cause === null) future.complete(Unit)
92 else future.completeExceptionally(cause)
93 }
94 return future
95 }
96
setupCancellationnull97 private fun Job.setupCancellation(future: CompletableFuture<*>) {
98 future.handle { _, exception ->
99 cancel(exception?.let {
100 it as? CancellationException ?: CancellationException("CompletableFuture was completed exceptionally", it)
101 })
102 }
103 }
104
105 /**
106 * Converts this [CompletionStage] to an instance of [Deferred].
107 *
108 * The [CompletableFuture] that corresponds to this [CompletionStage] (see [CompletionStage.toCompletableFuture])
109 * is cancelled when the resulting deferred is cancelled.
110 */
111 @Suppress("DeferredIsResult")
asDeferrednull112 public fun <T> CompletionStage<T>.asDeferred(): Deferred<T> {
113 val future = toCompletableFuture() // retrieve the future
114 // Fast path if already completed
115 if (future.isDone) {
116 return try {
117 @Suppress("UNCHECKED_CAST")
118 CompletableDeferred(future.get() as T)
119 } catch (e: Throwable) {
120 // unwrap original cause from ExecutionException
121 val original = (e as? ExecutionException)?.cause ?: e
122 CompletableDeferred<T>().also { it.completeExceptionally(original) }
123 }
124 }
125 val result = CompletableDeferred<T>()
126 handle { value, exception ->
127 try {
128 if (exception == null) {
129 // the future has completed normally
130 result.complete(value)
131 } else {
132 // the future has completed with an exception, unwrap it consistently with fast path
133 // Note: In the fast-path the implementation of CompletableFuture.get() does unwrapping
134 result.completeExceptionally((exception as? CompletionException)?.cause ?: exception)
135 }
136 } catch (e: Throwable) {
137 // We come here iff the internals of Deferred threw an exception during its completion
138 handleCoroutineException(EmptyCoroutineContext, e)
139 }
140 }
141 result.cancelFutureOnCompletion(future)
142 return result
143 }
144
145 /**
146 * Awaits for completion of [CompletionStage] without blocking a thread.
147 *
148 * This suspending function is cancellable.
149 * If the [Job] of the current coroutine is cancelled while this suspending function is waiting, this function
150 * stops waiting for the completion stage and immediately resumes with [CancellationException][kotlinx.coroutines.CancellationException].
151 *
152 * This method is intended to be used with one-shot futures, so on coroutine cancellation the [CompletableFuture] that
153 * corresponds to this [CompletionStage] (see [CompletionStage.toCompletableFuture])
154 * is cancelled. If cancelling the given stage is undesired, `stage.asDeferred().await()` should be used instead.
155 */
awaitnull156 public suspend fun <T> CompletionStage<T>.await(): T {
157 val future = toCompletableFuture() // retrieve the future
158 // fast path when CompletableFuture is already done (does not suspend)
159 if (future.isDone) {
160 try {
161 @Suppress("UNCHECKED_CAST", "BlockingMethodInNonBlockingContext")
162 return future.get() as T
163 } catch (e: ExecutionException) {
164 throw e.cause ?: e // unwrap original cause from ExecutionException
165 }
166 }
167 // slow path -- suspend
168 return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
169 val consumer = ContinuationHandler(cont)
170 handle(consumer)
171 cont.invokeOnCancellation {
172 future.cancel(false)
173 consumer.cont = null // shall clear reference to continuation to aid GC
174 }
175 }
176 }
177
178 private class ContinuationHandler<T>(
179 @Volatile @JvmField var cont: Continuation<T>?
180 ) : BiFunction<T?, Throwable?, Unit> {
181 @Suppress("UNCHECKED_CAST")
applynull182 override fun apply(result: T?, exception: Throwable?) {
183 val cont = this.cont ?: return // atomically read current value unless null
184 if (exception == null) {
185 // the future has completed normally
186 cont.resume(result as T)
187 } else {
188 // the future has completed with an exception, unwrap it to provide consistent view of .await() result and to propagate only original exception
189 cont.resumeWithException((exception as? CompletionException)?.cause ?: exception)
190 }
191 }
192 }
193