1 //! Types related to the [`TaskTracker`] collection.
2 //!
3 //! See the documentation of [`TaskTracker`] for more information.
4 
5 use pin_project_lite::pin_project;
6 use std::fmt;
7 use std::future::Future;
8 use std::pin::Pin;
9 use std::sync::atomic::{AtomicUsize, Ordering};
10 use std::sync::Arc;
11 use std::task::{Context, Poll};
12 use tokio::sync::{futures::Notified, Notify};
13 
14 #[cfg(feature = "rt")]
15 use tokio::{
16     runtime::Handle,
17     task::{JoinHandle, LocalSet},
18 };
19 
20 /// A task tracker used for waiting until tasks exit.
21 ///
22 /// This is usually used together with [`CancellationToken`] to implement [graceful shutdown]. The
23 /// `CancellationToken` is used to signal to tasks that they should shut down, and the
24 /// `TaskTracker` is used to wait for them to finish shutting down.
25 ///
26 /// The `TaskTracker` will also keep track of a `closed` boolean. This is used to handle the case
27 /// where the `TaskTracker` is empty, but we don't want to shut down yet. This means that the
28 /// [`wait`] method will wait until *both* of the following happen at the same time:
29 ///
30 ///  * The `TaskTracker` must be closed using the [`close`] method.
31 ///  * The `TaskTracker` must be empty, that is, all tasks that it is tracking must have exited.
32 ///
33 /// When a call to [`wait`] returns, it is guaranteed that all tracked tasks have exited and that
34 /// the destructor of the future has finished running. However, there might be a short amount of
35 /// time where [`JoinHandle::is_finished`] returns false.
36 ///
37 /// # Comparison to `JoinSet`
38 ///
39 /// The main Tokio crate has a similar collection known as [`JoinSet`]. The `JoinSet` type has a
40 /// lot more features than `TaskTracker`, so `TaskTracker` should only be used when one of its
41 /// unique features is required:
42 ///
43 ///  1. When tasks exit, a `TaskTracker` will allow the task to immediately free its memory.
44 ///  2. By not closing the `TaskTracker`, [`wait`] will be prevented from returning even if
45 ///     the `TaskTracker` is empty.
46 ///  3. A `TaskTracker` does not require mutable access to insert tasks.
47 ///  4. A `TaskTracker` can be cloned to share it with many tasks.
48 ///
49 /// The first point is the most important one. A [`JoinSet`] keeps track of the return value of
50 /// every inserted task. This means that if the caller keeps inserting tasks and never calls
51 /// [`join_next`], then their return values will keep building up and consuming memory, _even if_
52 /// most of the tasks have already exited. This can cause the process to run out of memory. With a
53 /// `TaskTracker`, this does not happen. Once tasks exit, they are immediately removed from the
54 /// `TaskTracker`.
55 ///
56 /// # Examples
57 ///
58 /// For more examples, please see the topic page on [graceful shutdown].
59 ///
60 /// ## Spawn tasks and wait for them to exit
61 ///
62 /// This is a simple example. For this case, [`JoinSet`] should probably be used instead.
63 ///
64 /// ```
65 /// use tokio_util::task::TaskTracker;
66 ///
67 /// #[tokio::main]
68 /// async fn main() {
69 ///     let tracker = TaskTracker::new();
70 ///
71 ///     for i in 0..10 {
72 ///         tracker.spawn(async move {
73 ///             println!("Task {} is running!", i);
74 ///         });
75 ///     }
76 ///     // Once we spawned everything, we close the tracker.
77 ///     tracker.close();
78 ///
79 ///     // Wait for everything to finish.
80 ///     tracker.wait().await;
81 ///
82 ///     println!("This is printed after all of the tasks.");
83 /// }
84 /// ```
85 ///
86 /// ## Wait for tasks to exit
87 ///
88 /// This example shows the intended use-case of `TaskTracker`. It is used together with
89 /// [`CancellationToken`] to implement graceful shutdown.
90 /// ```
91 /// use tokio_util::sync::CancellationToken;
92 /// use tokio_util::task::TaskTracker;
93 /// use tokio::time::{self, Duration};
94 ///
95 /// async fn background_task(num: u64) {
96 ///     for i in 0..10 {
97 ///         time::sleep(Duration::from_millis(100*num)).await;
98 ///         println!("Background task {} in iteration {}.", num, i);
99 ///     }
100 /// }
101 ///
102 /// #[tokio::main]
103 /// # async fn _hidden() {}
104 /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
105 /// async fn main() {
106 ///     let tracker = TaskTracker::new();
107 ///     let token = CancellationToken::new();
108 ///
109 ///     for i in 0..10 {
110 ///         let token = token.clone();
111 ///         tracker.spawn(async move {
112 ///             // Use a `tokio::select!` to kill the background task if the token is
113 ///             // cancelled.
114 ///             tokio::select! {
115 ///                 () = background_task(i) => {
116 ///                     println!("Task {} exiting normally.", i);
117 ///                 },
118 ///                 () = token.cancelled() => {
119 ///                     // Do some cleanup before we really exit.
120 ///                     time::sleep(Duration::from_millis(50)).await;
121 ///                     println!("Task {} finished cleanup.", i);
122 ///                 },
123 ///             }
124 ///         });
125 ///     }
126 ///
127 ///     // Spawn a background task that will send the shutdown signal.
128 ///     {
129 ///         let tracker = tracker.clone();
130 ///         tokio::spawn(async move {
131 ///             // Normally you would use something like ctrl-c instead of
132 ///             // sleeping.
133 ///             time::sleep(Duration::from_secs(2)).await;
134 ///             tracker.close();
135 ///             token.cancel();
136 ///         });
137 ///     }
138 ///
139 ///     // Wait for all tasks to exit.
140 ///     tracker.wait().await;
141 ///
142 ///     println!("All tasks have exited now.");
143 /// }
144 /// ```
145 ///
146 /// [`CancellationToken`]: crate::sync::CancellationToken
147 /// [`JoinHandle::is_finished`]: tokio::task::JoinHandle::is_finished
148 /// [`JoinSet`]: tokio::task::JoinSet
149 /// [`close`]: Self::close
150 /// [`join_next`]: tokio::task::JoinSet::join_next
151 /// [`wait`]: Self::wait
152 /// [graceful shutdown]: https://tokio.rs/tokio/topics/shutdown
153 pub struct TaskTracker {
154     inner: Arc<TaskTrackerInner>,
155 }
156 
157 /// Represents a task tracked by a [`TaskTracker`].
158 #[must_use]
159 #[derive(Debug)]
160 pub struct TaskTrackerToken {
161     task_tracker: TaskTracker,
162 }
163 
164 struct TaskTrackerInner {
165     /// Keeps track of the state.
166     ///
167     /// The lowest bit is whether the task tracker is closed.
168     ///
169     /// The rest of the bits count the number of tracked tasks.
170     state: AtomicUsize,
171     /// Used to notify when the last task exits.
172     on_last_exit: Notify,
173 }
174 
175 pin_project! {
176     /// A future that is tracked as a task by a [`TaskTracker`].
177     ///
178     /// The associated [`TaskTracker`] cannot complete until this future is dropped.
179     ///
180     /// This future is returned by [`TaskTracker::track_future`].
181     #[must_use = "futures do nothing unless polled"]
182     pub struct TrackedFuture<F> {
183         #[pin]
184         future: F,
185         token: TaskTrackerToken,
186     }
187 }
188 
189 pin_project! {
190     /// A future that completes when the [`TaskTracker`] is empty and closed.
191     ///
192     /// This future is returned by [`TaskTracker::wait`].
193     #[must_use = "futures do nothing unless polled"]
194     pub struct TaskTrackerWaitFuture<'a> {
195         #[pin]
196         future: Notified<'a>,
197         inner: Option<&'a TaskTrackerInner>,
198     }
199 }
200 
201 impl TaskTrackerInner {
202     #[inline]
new() -> Self203     fn new() -> Self {
204         Self {
205             state: AtomicUsize::new(0),
206             on_last_exit: Notify::new(),
207         }
208     }
209 
210     #[inline]
is_closed_and_empty(&self) -> bool211     fn is_closed_and_empty(&self) -> bool {
212         // If empty and closed bit set, then we are done.
213         //
214         // The acquire load will synchronize with the release store of any previous call to
215         // `set_closed` and `drop_task`.
216         self.state.load(Ordering::Acquire) == 1
217     }
218 
219     #[inline]
set_closed(&self) -> bool220     fn set_closed(&self) -> bool {
221         // The AcqRel ordering makes the closed bit behave like a `Mutex<bool>` for synchronization
222         // purposes. We do this because it makes the return value of `TaskTracker::{close,reopen}`
223         // more meaningful for the user. Without these orderings, this assert could fail:
224         // ```
225         // // thread 1
226         // some_other_atomic.store(true, Relaxed);
227         // tracker.close();
228         //
229         // // thread 2
230         // if tracker.reopen() {
231         //     assert!(some_other_atomic.load(Relaxed));
232         // }
233         // ```
234         // However, with the AcqRel ordering, we establish a happens-before relationship from the
235         // call to `close` and the later call to `reopen` that returned true.
236         let state = self.state.fetch_or(1, Ordering::AcqRel);
237 
238         // If there are no tasks, and if it was not already closed:
239         if state == 0 {
240             self.notify_now();
241         }
242 
243         (state & 1) == 0
244     }
245 
246     #[inline]
set_open(&self) -> bool247     fn set_open(&self) -> bool {
248         // See `set_closed` regarding the AcqRel ordering.
249         let state = self.state.fetch_and(!1, Ordering::AcqRel);
250         (state & 1) == 1
251     }
252 
253     #[inline]
add_task(&self)254     fn add_task(&self) {
255         self.state.fetch_add(2, Ordering::Relaxed);
256     }
257 
258     #[inline]
drop_task(&self)259     fn drop_task(&self) {
260         let state = self.state.fetch_sub(2, Ordering::Release);
261 
262         // If this was the last task and we are closed:
263         if state == 3 {
264             self.notify_now();
265         }
266     }
267 
268     #[cold]
notify_now(&self)269     fn notify_now(&self) {
270         // Insert an acquire fence. This matters for `drop_task` but doesn't matter for
271         // `set_closed` since it already uses AcqRel.
272         //
273         // This synchronizes with the release store of any other call to `drop_task`, and with the
274         // release store in the call to `set_closed`. That ensures that everything that happened
275         // before those other calls to `drop_task` or `set_closed` will be visible after this load,
276         // and those things will also be visible to anything woken by the call to `notify_waiters`.
277         self.state.load(Ordering::Acquire);
278 
279         self.on_last_exit.notify_waiters();
280     }
281 }
282 
283 impl TaskTracker {
284     /// Creates a new `TaskTracker`.
285     ///
286     /// The `TaskTracker` will start out as open.
287     #[must_use]
new() -> Self288     pub fn new() -> Self {
289         Self {
290             inner: Arc::new(TaskTrackerInner::new()),
291         }
292     }
293 
294     /// Waits until this `TaskTracker` is both closed and empty.
295     ///
296     /// If the `TaskTracker` is already closed and empty when this method is called, then it
297     /// returns immediately.
298     ///
299     /// The `wait` future is resistant against [ABA problems][aba]. That is, if the `TaskTracker`
300     /// becomes both closed and empty for a short amount of time, then it is guarantee that all
301     /// `wait` futures that were created before the short time interval will trigger, even if they
302     /// are not polled during that short time interval.
303     ///
304     /// # Cancel safety
305     ///
306     /// This method is cancel safe.
307     ///
308     /// However, the resistance against [ABA problems][aba] is lost when using `wait` as the
309     /// condition in a `tokio::select!` loop.
310     ///
311     /// [aba]: https://en.wikipedia.org/wiki/ABA_problem
312     #[inline]
wait(&self) -> TaskTrackerWaitFuture<'_>313     pub fn wait(&self) -> TaskTrackerWaitFuture<'_> {
314         TaskTrackerWaitFuture {
315             future: self.inner.on_last_exit.notified(),
316             inner: if self.inner.is_closed_and_empty() {
317                 None
318             } else {
319                 Some(&self.inner)
320             },
321         }
322     }
323 
324     /// Close this `TaskTracker`.
325     ///
326     /// This allows [`wait`] futures to complete. It does not prevent you from spawning new tasks.
327     ///
328     /// Returns `true` if this closed the `TaskTracker`, or `false` if it was already closed.
329     ///
330     /// [`wait`]: Self::wait
331     #[inline]
close(&self) -> bool332     pub fn close(&self) -> bool {
333         self.inner.set_closed()
334     }
335 
336     /// Reopen this `TaskTracker`.
337     ///
338     /// This prevents [`wait`] futures from completing even if the `TaskTracker` is empty.
339     ///
340     /// Returns `true` if this reopened the `TaskTracker`, or `false` if it was already open.
341     ///
342     /// [`wait`]: Self::wait
343     #[inline]
reopen(&self) -> bool344     pub fn reopen(&self) -> bool {
345         self.inner.set_open()
346     }
347 
348     /// Returns `true` if this `TaskTracker` is [closed](Self::close).
349     #[inline]
350     #[must_use]
is_closed(&self) -> bool351     pub fn is_closed(&self) -> bool {
352         (self.inner.state.load(Ordering::Acquire) & 1) != 0
353     }
354 
355     /// Returns the number of tasks tracked by this `TaskTracker`.
356     #[inline]
357     #[must_use]
len(&self) -> usize358     pub fn len(&self) -> usize {
359         self.inner.state.load(Ordering::Acquire) >> 1
360     }
361 
362     /// Returns `true` if there are no tasks in this `TaskTracker`.
363     #[inline]
364     #[must_use]
is_empty(&self) -> bool365     pub fn is_empty(&self) -> bool {
366         self.inner.state.load(Ordering::Acquire) <= 1
367     }
368 
369     /// Spawn the provided future on the current Tokio runtime, and track it in this `TaskTracker`.
370     ///
371     /// This is equivalent to `tokio::spawn(tracker.track_future(task))`.
372     #[inline]
373     #[track_caller]
374     #[cfg(feature = "rt")]
375     #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
spawn<F>(&self, task: F) -> JoinHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,376     pub fn spawn<F>(&self, task: F) -> JoinHandle<F::Output>
377     where
378         F: Future + Send + 'static,
379         F::Output: Send + 'static,
380     {
381         tokio::task::spawn(self.track_future(task))
382     }
383 
384     /// Spawn the provided future on the provided Tokio runtime, and track it in this `TaskTracker`.
385     ///
386     /// This is equivalent to `handle.spawn(tracker.track_future(task))`.
387     #[inline]
388     #[track_caller]
389     #[cfg(feature = "rt")]
390     #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
spawn_on<F>(&self, task: F, handle: &Handle) -> JoinHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,391     pub fn spawn_on<F>(&self, task: F, handle: &Handle) -> JoinHandle<F::Output>
392     where
393         F: Future + Send + 'static,
394         F::Output: Send + 'static,
395     {
396         handle.spawn(self.track_future(task))
397     }
398 
399     /// Spawn the provided future on the current [`LocalSet`], and track it in this `TaskTracker`.
400     ///
401     /// This is equivalent to `tokio::task::spawn_local(tracker.track_future(task))`.
402     ///
403     /// [`LocalSet`]: tokio::task::LocalSet
404     #[inline]
405     #[track_caller]
406     #[cfg(feature = "rt")]
407     #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
spawn_local<F>(&self, task: F) -> JoinHandle<F::Output> where F: Future + 'static, F::Output: 'static,408     pub fn spawn_local<F>(&self, task: F) -> JoinHandle<F::Output>
409     where
410         F: Future + 'static,
411         F::Output: 'static,
412     {
413         tokio::task::spawn_local(self.track_future(task))
414     }
415 
416     /// Spawn the provided future on the provided [`LocalSet`], and track it in this `TaskTracker`.
417     ///
418     /// This is equivalent to `local_set.spawn_local(tracker.track_future(task))`.
419     ///
420     /// [`LocalSet`]: tokio::task::LocalSet
421     #[inline]
422     #[track_caller]
423     #[cfg(feature = "rt")]
424     #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
spawn_local_on<F>(&self, task: F, local_set: &LocalSet) -> JoinHandle<F::Output> where F: Future + 'static, F::Output: 'static,425     pub fn spawn_local_on<F>(&self, task: F, local_set: &LocalSet) -> JoinHandle<F::Output>
426     where
427         F: Future + 'static,
428         F::Output: 'static,
429     {
430         local_set.spawn_local(self.track_future(task))
431     }
432 
433     /// Spawn the provided blocking task on the current Tokio runtime, and track it in this `TaskTracker`.
434     ///
435     /// This is equivalent to `tokio::task::spawn_blocking(tracker.track_future(task))`.
436     #[inline]
437     #[track_caller]
438     #[cfg(feature = "rt")]
439     #[cfg(not(target_family = "wasm"))]
440     #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
spawn_blocking<F, T>(&self, task: F) -> JoinHandle<T> where F: FnOnce() -> T, F: Send + 'static, T: Send + 'static,441     pub fn spawn_blocking<F, T>(&self, task: F) -> JoinHandle<T>
442     where
443         F: FnOnce() -> T,
444         F: Send + 'static,
445         T: Send + 'static,
446     {
447         let token = self.token();
448         tokio::task::spawn_blocking(move || {
449             let res = task();
450             drop(token);
451             res
452         })
453     }
454 
455     /// Spawn the provided blocking task on the provided Tokio runtime, and track it in this `TaskTracker`.
456     ///
457     /// This is equivalent to `handle.spawn_blocking(tracker.track_future(task))`.
458     #[inline]
459     #[track_caller]
460     #[cfg(feature = "rt")]
461     #[cfg(not(target_family = "wasm"))]
462     #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
spawn_blocking_on<F, T>(&self, task: F, handle: &Handle) -> JoinHandle<T> where F: FnOnce() -> T, F: Send + 'static, T: Send + 'static,463     pub fn spawn_blocking_on<F, T>(&self, task: F, handle: &Handle) -> JoinHandle<T>
464     where
465         F: FnOnce() -> T,
466         F: Send + 'static,
467         T: Send + 'static,
468     {
469         let token = self.token();
470         handle.spawn_blocking(move || {
471             let res = task();
472             drop(token);
473             res
474         })
475     }
476 
477     /// Track the provided future.
478     ///
479     /// The returned [`TrackedFuture`] will count as a task tracked by this collection, and will
480     /// prevent calls to [`wait`] from returning until the task is dropped.
481     ///
482     /// The task is removed from the collection when it is dropped, not when [`poll`] returns
483     /// [`Poll::Ready`].
484     ///
485     /// # Examples
486     ///
487     /// Track a future spawned with [`tokio::spawn`].
488     ///
489     /// ```
490     /// # async fn my_async_fn() {}
491     /// use tokio_util::task::TaskTracker;
492     ///
493     /// # #[tokio::main(flavor = "current_thread")]
494     /// # async fn main() {
495     /// let tracker = TaskTracker::new();
496     ///
497     /// tokio::spawn(tracker.track_future(my_async_fn()));
498     /// # }
499     /// ```
500     ///
501     /// Track a future spawned on a [`JoinSet`].
502     /// ```
503     /// # async fn my_async_fn() {}
504     /// use tokio::task::JoinSet;
505     /// use tokio_util::task::TaskTracker;
506     ///
507     /// # #[tokio::main(flavor = "current_thread")]
508     /// # async fn main() {
509     /// let tracker = TaskTracker::new();
510     /// let mut join_set = JoinSet::new();
511     ///
512     /// join_set.spawn(tracker.track_future(my_async_fn()));
513     /// # }
514     /// ```
515     ///
516     /// [`JoinSet`]: tokio::task::JoinSet
517     /// [`Poll::Pending`]: std::task::Poll::Pending
518     /// [`poll`]: std::future::Future::poll
519     /// [`wait`]: Self::wait
520     #[inline]
track_future<F: Future>(&self, future: F) -> TrackedFuture<F>521     pub fn track_future<F: Future>(&self, future: F) -> TrackedFuture<F> {
522         TrackedFuture {
523             future,
524             token: self.token(),
525         }
526     }
527 
528     /// Creates a [`TaskTrackerToken`] representing a task tracked by this `TaskTracker`.
529     ///
530     /// This token is a lower-level utility than the spawn methods. Each token is considered to
531     /// correspond to a task. As long as the token exists, the `TaskTracker` cannot complete.
532     /// Furthermore, the count returned by the [`len`] method will include the tokens in the count.
533     ///
534     /// Dropping the token indicates to the `TaskTracker` that the task has exited.
535     ///
536     /// [`len`]: TaskTracker::len
537     #[inline]
token(&self) -> TaskTrackerToken538     pub fn token(&self) -> TaskTrackerToken {
539         self.inner.add_task();
540         TaskTrackerToken {
541             task_tracker: self.clone(),
542         }
543     }
544 
545     /// Returns `true` if both task trackers correspond to the same set of tasks.
546     ///
547     /// # Examples
548     ///
549     /// ```
550     /// use tokio_util::task::TaskTracker;
551     ///
552     /// let tracker_1 = TaskTracker::new();
553     /// let tracker_2 = TaskTracker::new();
554     /// let tracker_1_clone = tracker_1.clone();
555     ///
556     /// assert!(TaskTracker::ptr_eq(&tracker_1, &tracker_1_clone));
557     /// assert!(!TaskTracker::ptr_eq(&tracker_1, &tracker_2));
558     /// ```
559     #[inline]
560     #[must_use]
ptr_eq(left: &TaskTracker, right: &TaskTracker) -> bool561     pub fn ptr_eq(left: &TaskTracker, right: &TaskTracker) -> bool {
562         Arc::ptr_eq(&left.inner, &right.inner)
563     }
564 }
565 
566 impl Default for TaskTracker {
567     /// Creates a new `TaskTracker`.
568     ///
569     /// The `TaskTracker` will start out as open.
570     #[inline]
default() -> TaskTracker571     fn default() -> TaskTracker {
572         TaskTracker::new()
573     }
574 }
575 
576 impl Clone for TaskTracker {
577     /// Returns a new `TaskTracker` that tracks the same set of tasks.
578     ///
579     /// Since the new `TaskTracker` shares the same set of tasks, changes to one set are visible in
580     /// all other clones.
581     ///
582     /// # Examples
583     ///
584     /// ```
585     /// use tokio_util::task::TaskTracker;
586     ///
587     /// #[tokio::main]
588     /// # async fn _hidden() {}
589     /// # #[tokio::main(flavor = "current_thread")]
590     /// async fn main() {
591     ///     let tracker = TaskTracker::new();
592     ///     let cloned = tracker.clone();
593     ///
594     ///     // Spawns on `tracker` are visible in `cloned`.
595     ///     tracker.spawn(std::future::pending::<()>());
596     ///     assert_eq!(cloned.len(), 1);
597     ///
598     ///     // Spawns on `cloned` are visible in `tracker`.
599     ///     cloned.spawn(std::future::pending::<()>());
600     ///     assert_eq!(tracker.len(), 2);
601     ///
602     ///     // Calling `close` is visible to `cloned`.
603     ///     tracker.close();
604     ///     assert!(cloned.is_closed());
605     ///
606     ///     // Calling `reopen` is visible to `tracker`.
607     ///     cloned.reopen();
608     ///     assert!(!tracker.is_closed());
609     /// }
610     /// ```
611     #[inline]
clone(&self) -> TaskTracker612     fn clone(&self) -> TaskTracker {
613         Self {
614             inner: self.inner.clone(),
615         }
616     }
617 }
618 
debug_inner(inner: &TaskTrackerInner, f: &mut fmt::Formatter<'_>) -> fmt::Result619 fn debug_inner(inner: &TaskTrackerInner, f: &mut fmt::Formatter<'_>) -> fmt::Result {
620     let state = inner.state.load(Ordering::Acquire);
621     let is_closed = (state & 1) != 0;
622     let len = state >> 1;
623 
624     f.debug_struct("TaskTracker")
625         .field("len", &len)
626         .field("is_closed", &is_closed)
627         .field("inner", &(inner as *const TaskTrackerInner))
628         .finish()
629 }
630 
631 impl fmt::Debug for TaskTracker {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result632     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
633         debug_inner(&self.inner, f)
634     }
635 }
636 
637 impl TaskTrackerToken {
638     /// Returns the [`TaskTracker`] that this token is associated with.
639     #[inline]
640     #[must_use]
task_tracker(&self) -> &TaskTracker641     pub fn task_tracker(&self) -> &TaskTracker {
642         &self.task_tracker
643     }
644 }
645 
646 impl Clone for TaskTrackerToken {
647     /// Returns a new `TaskTrackerToken` associated with the same [`TaskTracker`].
648     ///
649     /// This is equivalent to `token.task_tracker().token()`.
650     #[inline]
clone(&self) -> TaskTrackerToken651     fn clone(&self) -> TaskTrackerToken {
652         self.task_tracker.token()
653     }
654 }
655 
656 impl Drop for TaskTrackerToken {
657     /// Dropping the token indicates to the [`TaskTracker`] that the task has exited.
658     #[inline]
drop(&mut self)659     fn drop(&mut self) {
660         self.task_tracker.inner.drop_task();
661     }
662 }
663 
664 impl<F: Future> Future for TrackedFuture<F> {
665     type Output = F::Output;
666 
667     #[inline]
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output>668     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
669         self.project().future.poll(cx)
670     }
671 }
672 
673 impl<F: fmt::Debug> fmt::Debug for TrackedFuture<F> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result674     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
675         f.debug_struct("TrackedFuture")
676             .field("future", &self.future)
677             .field("task_tracker", self.token.task_tracker())
678             .finish()
679     }
680 }
681 
682 impl<'a> Future for TaskTrackerWaitFuture<'a> {
683     type Output = ();
684 
685     #[inline]
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>686     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
687         let me = self.project();
688 
689         let inner = match me.inner.as_ref() {
690             None => return Poll::Ready(()),
691             Some(inner) => inner,
692         };
693 
694         let ready = inner.is_closed_and_empty() || me.future.poll(cx).is_ready();
695         if ready {
696             *me.inner = None;
697             Poll::Ready(())
698         } else {
699             Poll::Pending
700         }
701     }
702 }
703 
704 impl<'a> fmt::Debug for TaskTrackerWaitFuture<'a> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result705     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
706         struct Helper<'a>(&'a TaskTrackerInner);
707 
708         impl fmt::Debug for Helper<'_> {
709             fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
710                 debug_inner(self.0, f)
711             }
712         }
713 
714         f.debug_struct("TaskTrackerWaitFuture")
715             .field("future", &self.future)
716             .field("task_tracker", &self.inner.map(Helper))
717             .finish()
718     }
719 }
720