1 //! Types related to the [`TaskTracker`] collection.
2 //!
3 //! See the documentation of [`TaskTracker`] for more information.
4
5 use pin_project_lite::pin_project;
6 use std::fmt;
7 use std::future::Future;
8 use std::pin::Pin;
9 use std::sync::atomic::{AtomicUsize, Ordering};
10 use std::sync::Arc;
11 use std::task::{Context, Poll};
12 use tokio::sync::{futures::Notified, Notify};
13
14 #[cfg(feature = "rt")]
15 use tokio::{
16 runtime::Handle,
17 task::{JoinHandle, LocalSet},
18 };
19
20 /// A task tracker used for waiting until tasks exit.
21 ///
22 /// This is usually used together with [`CancellationToken`] to implement [graceful shutdown]. The
23 /// `CancellationToken` is used to signal to tasks that they should shut down, and the
24 /// `TaskTracker` is used to wait for them to finish shutting down.
25 ///
26 /// The `TaskTracker` will also keep track of a `closed` boolean. This is used to handle the case
27 /// where the `TaskTracker` is empty, but we don't want to shut down yet. This means that the
28 /// [`wait`] method will wait until *both* of the following happen at the same time:
29 ///
30 /// * The `TaskTracker` must be closed using the [`close`] method.
31 /// * The `TaskTracker` must be empty, that is, all tasks that it is tracking must have exited.
32 ///
33 /// When a call to [`wait`] returns, it is guaranteed that all tracked tasks have exited and that
34 /// the destructor of the future has finished running. However, there might be a short amount of
35 /// time where [`JoinHandle::is_finished`] returns false.
36 ///
37 /// # Comparison to `JoinSet`
38 ///
39 /// The main Tokio crate has a similar collection known as [`JoinSet`]. The `JoinSet` type has a
40 /// lot more features than `TaskTracker`, so `TaskTracker` should only be used when one of its
41 /// unique features is required:
42 ///
43 /// 1. When tasks exit, a `TaskTracker` will allow the task to immediately free its memory.
44 /// 2. By not closing the `TaskTracker`, [`wait`] will be prevented from returning even if
45 /// the `TaskTracker` is empty.
46 /// 3. A `TaskTracker` does not require mutable access to insert tasks.
47 /// 4. A `TaskTracker` can be cloned to share it with many tasks.
48 ///
49 /// The first point is the most important one. A [`JoinSet`] keeps track of the return value of
50 /// every inserted task. This means that if the caller keeps inserting tasks and never calls
51 /// [`join_next`], then their return values will keep building up and consuming memory, _even if_
52 /// most of the tasks have already exited. This can cause the process to run out of memory. With a
53 /// `TaskTracker`, this does not happen. Once tasks exit, they are immediately removed from the
54 /// `TaskTracker`.
55 ///
56 /// # Examples
57 ///
58 /// For more examples, please see the topic page on [graceful shutdown].
59 ///
60 /// ## Spawn tasks and wait for them to exit
61 ///
62 /// This is a simple example. For this case, [`JoinSet`] should probably be used instead.
63 ///
64 /// ```
65 /// use tokio_util::task::TaskTracker;
66 ///
67 /// #[tokio::main]
68 /// async fn main() {
69 /// let tracker = TaskTracker::new();
70 ///
71 /// for i in 0..10 {
72 /// tracker.spawn(async move {
73 /// println!("Task {} is running!", i);
74 /// });
75 /// }
76 /// // Once we spawned everything, we close the tracker.
77 /// tracker.close();
78 ///
79 /// // Wait for everything to finish.
80 /// tracker.wait().await;
81 ///
82 /// println!("This is printed after all of the tasks.");
83 /// }
84 /// ```
85 ///
86 /// ## Wait for tasks to exit
87 ///
88 /// This example shows the intended use-case of `TaskTracker`. It is used together with
89 /// [`CancellationToken`] to implement graceful shutdown.
90 /// ```
91 /// use tokio_util::sync::CancellationToken;
92 /// use tokio_util::task::TaskTracker;
93 /// use tokio::time::{self, Duration};
94 ///
95 /// async fn background_task(num: u64) {
96 /// for i in 0..10 {
97 /// time::sleep(Duration::from_millis(100*num)).await;
98 /// println!("Background task {} in iteration {}.", num, i);
99 /// }
100 /// }
101 ///
102 /// #[tokio::main]
103 /// # async fn _hidden() {}
104 /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
105 /// async fn main() {
106 /// let tracker = TaskTracker::new();
107 /// let token = CancellationToken::new();
108 ///
109 /// for i in 0..10 {
110 /// let token = token.clone();
111 /// tracker.spawn(async move {
112 /// // Use a `tokio::select!` to kill the background task if the token is
113 /// // cancelled.
114 /// tokio::select! {
115 /// () = background_task(i) => {
116 /// println!("Task {} exiting normally.", i);
117 /// },
118 /// () = token.cancelled() => {
119 /// // Do some cleanup before we really exit.
120 /// time::sleep(Duration::from_millis(50)).await;
121 /// println!("Task {} finished cleanup.", i);
122 /// },
123 /// }
124 /// });
125 /// }
126 ///
127 /// // Spawn a background task that will send the shutdown signal.
128 /// {
129 /// let tracker = tracker.clone();
130 /// tokio::spawn(async move {
131 /// // Normally you would use something like ctrl-c instead of
132 /// // sleeping.
133 /// time::sleep(Duration::from_secs(2)).await;
134 /// tracker.close();
135 /// token.cancel();
136 /// });
137 /// }
138 ///
139 /// // Wait for all tasks to exit.
140 /// tracker.wait().await;
141 ///
142 /// println!("All tasks have exited now.");
143 /// }
144 /// ```
145 ///
146 /// [`CancellationToken`]: crate::sync::CancellationToken
147 /// [`JoinHandle::is_finished`]: tokio::task::JoinHandle::is_finished
148 /// [`JoinSet`]: tokio::task::JoinSet
149 /// [`close`]: Self::close
150 /// [`join_next`]: tokio::task::JoinSet::join_next
151 /// [`wait`]: Self::wait
152 /// [graceful shutdown]: https://tokio.rs/tokio/topics/shutdown
153 pub struct TaskTracker {
154 inner: Arc<TaskTrackerInner>,
155 }
156
157 /// Represents a task tracked by a [`TaskTracker`].
158 #[must_use]
159 #[derive(Debug)]
160 pub struct TaskTrackerToken {
161 task_tracker: TaskTracker,
162 }
163
164 struct TaskTrackerInner {
165 /// Keeps track of the state.
166 ///
167 /// The lowest bit is whether the task tracker is closed.
168 ///
169 /// The rest of the bits count the number of tracked tasks.
170 state: AtomicUsize,
171 /// Used to notify when the last task exits.
172 on_last_exit: Notify,
173 }
174
175 pin_project! {
176 /// A future that is tracked as a task by a [`TaskTracker`].
177 ///
178 /// The associated [`TaskTracker`] cannot complete until this future is dropped.
179 ///
180 /// This future is returned by [`TaskTracker::track_future`].
181 #[must_use = "futures do nothing unless polled"]
182 pub struct TrackedFuture<F> {
183 #[pin]
184 future: F,
185 token: TaskTrackerToken,
186 }
187 }
188
189 pin_project! {
190 /// A future that completes when the [`TaskTracker`] is empty and closed.
191 ///
192 /// This future is returned by [`TaskTracker::wait`].
193 #[must_use = "futures do nothing unless polled"]
194 pub struct TaskTrackerWaitFuture<'a> {
195 #[pin]
196 future: Notified<'a>,
197 inner: Option<&'a TaskTrackerInner>,
198 }
199 }
200
201 impl TaskTrackerInner {
202 #[inline]
new() -> Self203 fn new() -> Self {
204 Self {
205 state: AtomicUsize::new(0),
206 on_last_exit: Notify::new(),
207 }
208 }
209
210 #[inline]
is_closed_and_empty(&self) -> bool211 fn is_closed_and_empty(&self) -> bool {
212 // If empty and closed bit set, then we are done.
213 //
214 // The acquire load will synchronize with the release store of any previous call to
215 // `set_closed` and `drop_task`.
216 self.state.load(Ordering::Acquire) == 1
217 }
218
219 #[inline]
set_closed(&self) -> bool220 fn set_closed(&self) -> bool {
221 // The AcqRel ordering makes the closed bit behave like a `Mutex<bool>` for synchronization
222 // purposes. We do this because it makes the return value of `TaskTracker::{close,reopen}`
223 // more meaningful for the user. Without these orderings, this assert could fail:
224 // ```
225 // // thread 1
226 // some_other_atomic.store(true, Relaxed);
227 // tracker.close();
228 //
229 // // thread 2
230 // if tracker.reopen() {
231 // assert!(some_other_atomic.load(Relaxed));
232 // }
233 // ```
234 // However, with the AcqRel ordering, we establish a happens-before relationship from the
235 // call to `close` and the later call to `reopen` that returned true.
236 let state = self.state.fetch_or(1, Ordering::AcqRel);
237
238 // If there are no tasks, and if it was not already closed:
239 if state == 0 {
240 self.notify_now();
241 }
242
243 (state & 1) == 0
244 }
245
246 #[inline]
set_open(&self) -> bool247 fn set_open(&self) -> bool {
248 // See `set_closed` regarding the AcqRel ordering.
249 let state = self.state.fetch_and(!1, Ordering::AcqRel);
250 (state & 1) == 1
251 }
252
253 #[inline]
add_task(&self)254 fn add_task(&self) {
255 self.state.fetch_add(2, Ordering::Relaxed);
256 }
257
258 #[inline]
drop_task(&self)259 fn drop_task(&self) {
260 let state = self.state.fetch_sub(2, Ordering::Release);
261
262 // If this was the last task and we are closed:
263 if state == 3 {
264 self.notify_now();
265 }
266 }
267
268 #[cold]
notify_now(&self)269 fn notify_now(&self) {
270 // Insert an acquire fence. This matters for `drop_task` but doesn't matter for
271 // `set_closed` since it already uses AcqRel.
272 //
273 // This synchronizes with the release store of any other call to `drop_task`, and with the
274 // release store in the call to `set_closed`. That ensures that everything that happened
275 // before those other calls to `drop_task` or `set_closed` will be visible after this load,
276 // and those things will also be visible to anything woken by the call to `notify_waiters`.
277 self.state.load(Ordering::Acquire);
278
279 self.on_last_exit.notify_waiters();
280 }
281 }
282
283 impl TaskTracker {
284 /// Creates a new `TaskTracker`.
285 ///
286 /// The `TaskTracker` will start out as open.
287 #[must_use]
new() -> Self288 pub fn new() -> Self {
289 Self {
290 inner: Arc::new(TaskTrackerInner::new()),
291 }
292 }
293
294 /// Waits until this `TaskTracker` is both closed and empty.
295 ///
296 /// If the `TaskTracker` is already closed and empty when this method is called, then it
297 /// returns immediately.
298 ///
299 /// The `wait` future is resistant against [ABA problems][aba]. That is, if the `TaskTracker`
300 /// becomes both closed and empty for a short amount of time, then it is guarantee that all
301 /// `wait` futures that were created before the short time interval will trigger, even if they
302 /// are not polled during that short time interval.
303 ///
304 /// # Cancel safety
305 ///
306 /// This method is cancel safe.
307 ///
308 /// However, the resistance against [ABA problems][aba] is lost when using `wait` as the
309 /// condition in a `tokio::select!` loop.
310 ///
311 /// [aba]: https://en.wikipedia.org/wiki/ABA_problem
312 #[inline]
wait(&self) -> TaskTrackerWaitFuture<'_>313 pub fn wait(&self) -> TaskTrackerWaitFuture<'_> {
314 TaskTrackerWaitFuture {
315 future: self.inner.on_last_exit.notified(),
316 inner: if self.inner.is_closed_and_empty() {
317 None
318 } else {
319 Some(&self.inner)
320 },
321 }
322 }
323
324 /// Close this `TaskTracker`.
325 ///
326 /// This allows [`wait`] futures to complete. It does not prevent you from spawning new tasks.
327 ///
328 /// Returns `true` if this closed the `TaskTracker`, or `false` if it was already closed.
329 ///
330 /// [`wait`]: Self::wait
331 #[inline]
close(&self) -> bool332 pub fn close(&self) -> bool {
333 self.inner.set_closed()
334 }
335
336 /// Reopen this `TaskTracker`.
337 ///
338 /// This prevents [`wait`] futures from completing even if the `TaskTracker` is empty.
339 ///
340 /// Returns `true` if this reopened the `TaskTracker`, or `false` if it was already open.
341 ///
342 /// [`wait`]: Self::wait
343 #[inline]
reopen(&self) -> bool344 pub fn reopen(&self) -> bool {
345 self.inner.set_open()
346 }
347
348 /// Returns `true` if this `TaskTracker` is [closed](Self::close).
349 #[inline]
350 #[must_use]
is_closed(&self) -> bool351 pub fn is_closed(&self) -> bool {
352 (self.inner.state.load(Ordering::Acquire) & 1) != 0
353 }
354
355 /// Returns the number of tasks tracked by this `TaskTracker`.
356 #[inline]
357 #[must_use]
len(&self) -> usize358 pub fn len(&self) -> usize {
359 self.inner.state.load(Ordering::Acquire) >> 1
360 }
361
362 /// Returns `true` if there are no tasks in this `TaskTracker`.
363 #[inline]
364 #[must_use]
is_empty(&self) -> bool365 pub fn is_empty(&self) -> bool {
366 self.inner.state.load(Ordering::Acquire) <= 1
367 }
368
369 /// Spawn the provided future on the current Tokio runtime, and track it in this `TaskTracker`.
370 ///
371 /// This is equivalent to `tokio::spawn(tracker.track_future(task))`.
372 #[inline]
373 #[track_caller]
374 #[cfg(feature = "rt")]
375 #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
spawn<F>(&self, task: F) -> JoinHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,376 pub fn spawn<F>(&self, task: F) -> JoinHandle<F::Output>
377 where
378 F: Future + Send + 'static,
379 F::Output: Send + 'static,
380 {
381 tokio::task::spawn(self.track_future(task))
382 }
383
384 /// Spawn the provided future on the provided Tokio runtime, and track it in this `TaskTracker`.
385 ///
386 /// This is equivalent to `handle.spawn(tracker.track_future(task))`.
387 #[inline]
388 #[track_caller]
389 #[cfg(feature = "rt")]
390 #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
spawn_on<F>(&self, task: F, handle: &Handle) -> JoinHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,391 pub fn spawn_on<F>(&self, task: F, handle: &Handle) -> JoinHandle<F::Output>
392 where
393 F: Future + Send + 'static,
394 F::Output: Send + 'static,
395 {
396 handle.spawn(self.track_future(task))
397 }
398
399 /// Spawn the provided future on the current [`LocalSet`], and track it in this `TaskTracker`.
400 ///
401 /// This is equivalent to `tokio::task::spawn_local(tracker.track_future(task))`.
402 ///
403 /// [`LocalSet`]: tokio::task::LocalSet
404 #[inline]
405 #[track_caller]
406 #[cfg(feature = "rt")]
407 #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
spawn_local<F>(&self, task: F) -> JoinHandle<F::Output> where F: Future + 'static, F::Output: 'static,408 pub fn spawn_local<F>(&self, task: F) -> JoinHandle<F::Output>
409 where
410 F: Future + 'static,
411 F::Output: 'static,
412 {
413 tokio::task::spawn_local(self.track_future(task))
414 }
415
416 /// Spawn the provided future on the provided [`LocalSet`], and track it in this `TaskTracker`.
417 ///
418 /// This is equivalent to `local_set.spawn_local(tracker.track_future(task))`.
419 ///
420 /// [`LocalSet`]: tokio::task::LocalSet
421 #[inline]
422 #[track_caller]
423 #[cfg(feature = "rt")]
424 #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
spawn_local_on<F>(&self, task: F, local_set: &LocalSet) -> JoinHandle<F::Output> where F: Future + 'static, F::Output: 'static,425 pub fn spawn_local_on<F>(&self, task: F, local_set: &LocalSet) -> JoinHandle<F::Output>
426 where
427 F: Future + 'static,
428 F::Output: 'static,
429 {
430 local_set.spawn_local(self.track_future(task))
431 }
432
433 /// Spawn the provided blocking task on the current Tokio runtime, and track it in this `TaskTracker`.
434 ///
435 /// This is equivalent to `tokio::task::spawn_blocking(tracker.track_future(task))`.
436 #[inline]
437 #[track_caller]
438 #[cfg(feature = "rt")]
439 #[cfg(not(target_family = "wasm"))]
440 #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
spawn_blocking<F, T>(&self, task: F) -> JoinHandle<T> where F: FnOnce() -> T, F: Send + 'static, T: Send + 'static,441 pub fn spawn_blocking<F, T>(&self, task: F) -> JoinHandle<T>
442 where
443 F: FnOnce() -> T,
444 F: Send + 'static,
445 T: Send + 'static,
446 {
447 let token = self.token();
448 tokio::task::spawn_blocking(move || {
449 let res = task();
450 drop(token);
451 res
452 })
453 }
454
455 /// Spawn the provided blocking task on the provided Tokio runtime, and track it in this `TaskTracker`.
456 ///
457 /// This is equivalent to `handle.spawn_blocking(tracker.track_future(task))`.
458 #[inline]
459 #[track_caller]
460 #[cfg(feature = "rt")]
461 #[cfg(not(target_family = "wasm"))]
462 #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
spawn_blocking_on<F, T>(&self, task: F, handle: &Handle) -> JoinHandle<T> where F: FnOnce() -> T, F: Send + 'static, T: Send + 'static,463 pub fn spawn_blocking_on<F, T>(&self, task: F, handle: &Handle) -> JoinHandle<T>
464 where
465 F: FnOnce() -> T,
466 F: Send + 'static,
467 T: Send + 'static,
468 {
469 let token = self.token();
470 handle.spawn_blocking(move || {
471 let res = task();
472 drop(token);
473 res
474 })
475 }
476
477 /// Track the provided future.
478 ///
479 /// The returned [`TrackedFuture`] will count as a task tracked by this collection, and will
480 /// prevent calls to [`wait`] from returning until the task is dropped.
481 ///
482 /// The task is removed from the collection when it is dropped, not when [`poll`] returns
483 /// [`Poll::Ready`].
484 ///
485 /// # Examples
486 ///
487 /// Track a future spawned with [`tokio::spawn`].
488 ///
489 /// ```
490 /// # async fn my_async_fn() {}
491 /// use tokio_util::task::TaskTracker;
492 ///
493 /// # #[tokio::main(flavor = "current_thread")]
494 /// # async fn main() {
495 /// let tracker = TaskTracker::new();
496 ///
497 /// tokio::spawn(tracker.track_future(my_async_fn()));
498 /// # }
499 /// ```
500 ///
501 /// Track a future spawned on a [`JoinSet`].
502 /// ```
503 /// # async fn my_async_fn() {}
504 /// use tokio::task::JoinSet;
505 /// use tokio_util::task::TaskTracker;
506 ///
507 /// # #[tokio::main(flavor = "current_thread")]
508 /// # async fn main() {
509 /// let tracker = TaskTracker::new();
510 /// let mut join_set = JoinSet::new();
511 ///
512 /// join_set.spawn(tracker.track_future(my_async_fn()));
513 /// # }
514 /// ```
515 ///
516 /// [`JoinSet`]: tokio::task::JoinSet
517 /// [`Poll::Pending`]: std::task::Poll::Pending
518 /// [`poll`]: std::future::Future::poll
519 /// [`wait`]: Self::wait
520 #[inline]
track_future<F: Future>(&self, future: F) -> TrackedFuture<F>521 pub fn track_future<F: Future>(&self, future: F) -> TrackedFuture<F> {
522 TrackedFuture {
523 future,
524 token: self.token(),
525 }
526 }
527
528 /// Creates a [`TaskTrackerToken`] representing a task tracked by this `TaskTracker`.
529 ///
530 /// This token is a lower-level utility than the spawn methods. Each token is considered to
531 /// correspond to a task. As long as the token exists, the `TaskTracker` cannot complete.
532 /// Furthermore, the count returned by the [`len`] method will include the tokens in the count.
533 ///
534 /// Dropping the token indicates to the `TaskTracker` that the task has exited.
535 ///
536 /// [`len`]: TaskTracker::len
537 #[inline]
token(&self) -> TaskTrackerToken538 pub fn token(&self) -> TaskTrackerToken {
539 self.inner.add_task();
540 TaskTrackerToken {
541 task_tracker: self.clone(),
542 }
543 }
544
545 /// Returns `true` if both task trackers correspond to the same set of tasks.
546 ///
547 /// # Examples
548 ///
549 /// ```
550 /// use tokio_util::task::TaskTracker;
551 ///
552 /// let tracker_1 = TaskTracker::new();
553 /// let tracker_2 = TaskTracker::new();
554 /// let tracker_1_clone = tracker_1.clone();
555 ///
556 /// assert!(TaskTracker::ptr_eq(&tracker_1, &tracker_1_clone));
557 /// assert!(!TaskTracker::ptr_eq(&tracker_1, &tracker_2));
558 /// ```
559 #[inline]
560 #[must_use]
ptr_eq(left: &TaskTracker, right: &TaskTracker) -> bool561 pub fn ptr_eq(left: &TaskTracker, right: &TaskTracker) -> bool {
562 Arc::ptr_eq(&left.inner, &right.inner)
563 }
564 }
565
566 impl Default for TaskTracker {
567 /// Creates a new `TaskTracker`.
568 ///
569 /// The `TaskTracker` will start out as open.
570 #[inline]
default() -> TaskTracker571 fn default() -> TaskTracker {
572 TaskTracker::new()
573 }
574 }
575
576 impl Clone for TaskTracker {
577 /// Returns a new `TaskTracker` that tracks the same set of tasks.
578 ///
579 /// Since the new `TaskTracker` shares the same set of tasks, changes to one set are visible in
580 /// all other clones.
581 ///
582 /// # Examples
583 ///
584 /// ```
585 /// use tokio_util::task::TaskTracker;
586 ///
587 /// #[tokio::main]
588 /// # async fn _hidden() {}
589 /// # #[tokio::main(flavor = "current_thread")]
590 /// async fn main() {
591 /// let tracker = TaskTracker::new();
592 /// let cloned = tracker.clone();
593 ///
594 /// // Spawns on `tracker` are visible in `cloned`.
595 /// tracker.spawn(std::future::pending::<()>());
596 /// assert_eq!(cloned.len(), 1);
597 ///
598 /// // Spawns on `cloned` are visible in `tracker`.
599 /// cloned.spawn(std::future::pending::<()>());
600 /// assert_eq!(tracker.len(), 2);
601 ///
602 /// // Calling `close` is visible to `cloned`.
603 /// tracker.close();
604 /// assert!(cloned.is_closed());
605 ///
606 /// // Calling `reopen` is visible to `tracker`.
607 /// cloned.reopen();
608 /// assert!(!tracker.is_closed());
609 /// }
610 /// ```
611 #[inline]
clone(&self) -> TaskTracker612 fn clone(&self) -> TaskTracker {
613 Self {
614 inner: self.inner.clone(),
615 }
616 }
617 }
618
debug_inner(inner: &TaskTrackerInner, f: &mut fmt::Formatter<'_>) -> fmt::Result619 fn debug_inner(inner: &TaskTrackerInner, f: &mut fmt::Formatter<'_>) -> fmt::Result {
620 let state = inner.state.load(Ordering::Acquire);
621 let is_closed = (state & 1) != 0;
622 let len = state >> 1;
623
624 f.debug_struct("TaskTracker")
625 .field("len", &len)
626 .field("is_closed", &is_closed)
627 .field("inner", &(inner as *const TaskTrackerInner))
628 .finish()
629 }
630
631 impl fmt::Debug for TaskTracker {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result632 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
633 debug_inner(&self.inner, f)
634 }
635 }
636
637 impl TaskTrackerToken {
638 /// Returns the [`TaskTracker`] that this token is associated with.
639 #[inline]
640 #[must_use]
task_tracker(&self) -> &TaskTracker641 pub fn task_tracker(&self) -> &TaskTracker {
642 &self.task_tracker
643 }
644 }
645
646 impl Clone for TaskTrackerToken {
647 /// Returns a new `TaskTrackerToken` associated with the same [`TaskTracker`].
648 ///
649 /// This is equivalent to `token.task_tracker().token()`.
650 #[inline]
clone(&self) -> TaskTrackerToken651 fn clone(&self) -> TaskTrackerToken {
652 self.task_tracker.token()
653 }
654 }
655
656 impl Drop for TaskTrackerToken {
657 /// Dropping the token indicates to the [`TaskTracker`] that the task has exited.
658 #[inline]
drop(&mut self)659 fn drop(&mut self) {
660 self.task_tracker.inner.drop_task();
661 }
662 }
663
664 impl<F: Future> Future for TrackedFuture<F> {
665 type Output = F::Output;
666
667 #[inline]
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output>668 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
669 self.project().future.poll(cx)
670 }
671 }
672
673 impl<F: fmt::Debug> fmt::Debug for TrackedFuture<F> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result674 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
675 f.debug_struct("TrackedFuture")
676 .field("future", &self.future)
677 .field("task_tracker", self.token.task_tracker())
678 .finish()
679 }
680 }
681
682 impl<'a> Future for TaskTrackerWaitFuture<'a> {
683 type Output = ();
684
685 #[inline]
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>686 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
687 let me = self.project();
688
689 let inner = match me.inner.as_ref() {
690 None => return Poll::Ready(()),
691 Some(inner) => inner,
692 };
693
694 let ready = inner.is_closed_and_empty() || me.future.poll(cx).is_ready();
695 if ready {
696 *me.inner = None;
697 Poll::Ready(())
698 } else {
699 Poll::Pending
700 }
701 }
702 }
703
704 impl<'a> fmt::Debug for TaskTrackerWaitFuture<'a> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result705 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
706 struct Helper<'a>(&'a TaskTrackerInner);
707
708 impl fmt::Debug for Helper<'_> {
709 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
710 debug_inner(self.0, f)
711 }
712 }
713
714 f.debug_struct("TaskTrackerWaitFuture")
715 .field("future", &self.future)
716 .field("task_tracker", &self.inner.map(Helper))
717 .finish()
718 }
719 }
720