1  //! Tokio context aware futures utilities.
2  //!
3  //! This module includes utilities around integrating tokio with other runtimes
4  //! by allowing the context to be attached to futures. This allows spawning
5  //! futures on other executors while still using tokio to drive them. This
6  //! can be useful if you need to use a tokio based library in an executor/runtime
7  //! that does not provide a tokio context.
8  
9  use pin_project_lite::pin_project;
10  use std::{
11      future::Future,
12      pin::Pin,
13      task::{Context, Poll},
14  };
15  use tokio::runtime::{Handle, Runtime};
16  
17  pin_project! {
18      /// `TokioContext` allows running futures that must be inside Tokio's
19      /// context on a non-Tokio runtime.
20      ///
21      /// It contains a [`Handle`] to the runtime. A handle to the runtime can be
22      /// obtain by calling the [`Runtime::handle()`] method.
23      ///
24      /// Note that the `TokioContext` wrapper only works if the `Runtime` it is
25      /// connected to has not yet been destroyed. You must keep the `Runtime`
26      /// alive until the future has finished executing.
27      ///
28      /// **Warning:** If `TokioContext` is used together with a [current thread]
29      /// runtime, that runtime must be inside a call to `block_on` for the
30      /// wrapped future to work. For this reason, it is recommended to use a
31      /// [multi thread] runtime, even if you configure it to only spawn one
32      /// worker thread.
33      ///
34      /// # Examples
35      ///
36      /// This example creates two runtimes, but only [enables time] on one of
37      /// them. It then uses the context of the runtime with the timer enabled to
38      /// execute a [`sleep`] future on the runtime with timing disabled.
39      /// ```
40      /// use tokio::time::{sleep, Duration};
41      /// use tokio_util::context::RuntimeExt;
42      ///
43      /// // This runtime has timers enabled.
44      /// let rt = tokio::runtime::Builder::new_multi_thread()
45      ///     .enable_all()
46      ///     .build()
47      ///     .unwrap();
48      ///
49      /// // This runtime has timers disabled.
50      /// let rt2 = tokio::runtime::Builder::new_multi_thread()
51      ///     .build()
52      ///     .unwrap();
53      ///
54      /// // Wrap the sleep future in the context of rt.
55      /// let fut = rt.wrap(async { sleep(Duration::from_millis(2)).await });
56      ///
57      /// // Execute the future on rt2.
58      /// rt2.block_on(fut);
59      /// ```
60      ///
61      /// [`Handle`]: struct@tokio::runtime::Handle
62      /// [`Runtime::handle()`]: fn@tokio::runtime::Runtime::handle
63      /// [`RuntimeExt`]: trait@crate::context::RuntimeExt
64      /// [`new_static`]: fn@Self::new_static
65      /// [`sleep`]: fn@tokio::time::sleep
66      /// [current thread]: fn@tokio::runtime::Builder::new_current_thread
67      /// [enables time]: fn@tokio::runtime::Builder::enable_time
68      /// [multi thread]: fn@tokio::runtime::Builder::new_multi_thread
69      pub struct TokioContext<F> {
70          #[pin]
71          inner: F,
72          handle: Handle,
73      }
74  }
75  
76  impl<F> TokioContext<F> {
77      /// Associate the provided future with the context of the runtime behind
78      /// the provided `Handle`.
79      ///
80      /// This constructor uses a `'static` lifetime to opt-out of checking that
81      /// the runtime still exists.
82      ///
83      /// # Examples
84      ///
85      /// This is the same as the example above, but uses the `new` constructor
86      /// rather than [`RuntimeExt::wrap`].
87      ///
88      /// [`RuntimeExt::wrap`]: fn@RuntimeExt::wrap
89      ///
90      /// ```
91      /// use tokio::time::{sleep, Duration};
92      /// use tokio_util::context::TokioContext;
93      ///
94      /// // This runtime has timers enabled.
95      /// let rt = tokio::runtime::Builder::new_multi_thread()
96      ///     .enable_all()
97      ///     .build()
98      ///     .unwrap();
99      ///
100      /// // This runtime has timers disabled.
101      /// let rt2 = tokio::runtime::Builder::new_multi_thread()
102      ///     .build()
103      ///     .unwrap();
104      ///
105      /// let fut = TokioContext::new(
106      ///     async { sleep(Duration::from_millis(2)).await },
107      ///     rt.handle().clone(),
108      /// );
109      ///
110      /// // Execute the future on rt2.
111      /// rt2.block_on(fut);
112      /// ```
new(future: F, handle: Handle) -> TokioContext<F>113      pub fn new(future: F, handle: Handle) -> TokioContext<F> {
114          TokioContext {
115              inner: future,
116              handle,
117          }
118      }
119  
120      /// Obtain a reference to the handle inside this `TokioContext`.
handle(&self) -> &Handle121      pub fn handle(&self) -> &Handle {
122          &self.handle
123      }
124  
125      /// Remove the association between the Tokio runtime and the wrapped future.
into_inner(self) -> F126      pub fn into_inner(self) -> F {
127          self.inner
128      }
129  }
130  
131  impl<F: Future> Future for TokioContext<F> {
132      type Output = F::Output;
133  
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>134      fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
135          let me = self.project();
136          let handle = me.handle;
137          let fut = me.inner;
138  
139          let _enter = handle.enter();
140          fut.poll(cx)
141      }
142  }
143  
144  /// Extension trait that simplifies bundling a `Handle` with a `Future`.
145  pub trait RuntimeExt {
146      /// Create a [`TokioContext`] that wraps the provided future and runs it in
147      /// this runtime's context.
148      ///
149      /// # Examples
150      ///
151      /// This example creates two runtimes, but only [enables time] on one of
152      /// them. It then uses the context of the runtime with the timer enabled to
153      /// execute a [`sleep`] future on the runtime with timing disabled.
154      ///
155      /// ```
156      /// use tokio::time::{sleep, Duration};
157      /// use tokio_util::context::RuntimeExt;
158      ///
159      /// // This runtime has timers enabled.
160      /// let rt = tokio::runtime::Builder::new_multi_thread()
161      ///     .enable_all()
162      ///     .build()
163      ///     .unwrap();
164      ///
165      /// // This runtime has timers disabled.
166      /// let rt2 = tokio::runtime::Builder::new_multi_thread()
167      ///     .build()
168      ///     .unwrap();
169      ///
170      /// // Wrap the sleep future in the context of rt.
171      /// let fut = rt.wrap(async { sleep(Duration::from_millis(2)).await });
172      ///
173      /// // Execute the future on rt2.
174      /// rt2.block_on(fut);
175      /// ```
176      ///
177      /// [`TokioContext`]: struct@crate::context::TokioContext
178      /// [`sleep`]: fn@tokio::time::sleep
179      /// [enables time]: fn@tokio::runtime::Builder::enable_time
wrap<F: Future>(&self, fut: F) -> TokioContext<F>180      fn wrap<F: Future>(&self, fut: F) -> TokioContext<F>;
181  }
182  
183  impl RuntimeExt for Runtime {
wrap<F: Future>(&self, fut: F) -> TokioContext<F>184      fn wrap<F: Future>(&self, fut: F) -> TokioContext<F> {
185          TokioContext {
186              inner: fut,
187              handle: self.handle().clone(),
188          }
189      }
190  }
191