1 //! The task module. 2 //! 3 //! The task module contains the code that manages spawned tasks and provides a 4 //! safe API for the rest of the runtime to use. Each task in a runtime is 5 //! stored in an `OwnedTasks` or `LocalOwnedTasks` object. 6 //! 7 //! # Task reference types 8 //! 9 //! A task is usually referenced by multiple handles, and there are several 10 //! types of handles. 11 //! 12 //! * `OwnedTask` - tasks stored in an `OwnedTasks` or `LocalOwnedTasks` are of this 13 //! reference type. 14 //! 15 //! * `JoinHandle` - each task has a `JoinHandle` that allows access to the output 16 //! of the task. 17 //! 18 //! * `Waker` - every waker for a task has this reference type. There can be any 19 //! number of waker references. 20 //! 21 //! * `Notified` - tracks whether the task is notified. 22 //! 23 //! * `Unowned` - this task reference type is used for tasks not stored in any 24 //! runtime. Mainly used for blocking tasks, but also in tests. 25 //! 26 //! The task uses a reference count to keep track of how many active references 27 //! exist. The `Unowned` reference type takes up two ref-counts. All other 28 //! reference types take up a single ref-count. 29 //! 30 //! Besides the waker type, each task has at most one of each reference type. 31 //! 32 //! # State 33 //! 34 //! The task stores its state in an atomic `usize` with various bitfields for the 35 //! necessary information. The state has the following bitfields: 36 //! 37 //! * `RUNNING` - Tracks whether the task is currently being polled or cancelled. 38 //! This bit functions as a lock around the task. 39 //! 40 //! * `COMPLETE` - Is one once the future has fully completed and has been 41 //! dropped. Never unset once set. Never set together with RUNNING. 42 //! 43 //! * `NOTIFIED` - Tracks whether a Notified object currently exists. 44 //! 45 //! * `CANCELLED` - Is set to one for tasks that should be cancelled as soon as 46 //! possible. May take any value for completed tasks. 47 //! 48 //! * `JOIN_INTEREST` - Is set to one if there exists a `JoinHandle`. 49 //! 50 //! * `JOIN_WAKER` - Acts as an access control bit for the join handle waker. The 51 //! protocol for its usage is described below. 52 //! 53 //! The rest of the bits are used for the ref-count. 54 //! 55 //! # Fields in the task 56 //! 57 //! The task has various fields. This section describes how and when it is safe 58 //! to access a field. 59 //! 60 //! * The state field is accessed with atomic instructions. 61 //! 62 //! * The `OwnedTask` reference has exclusive access to the `owned` field. 63 //! 64 //! * The Notified reference has exclusive access to the `queue_next` field. 65 //! 66 //! * The `owner_id` field can be set as part of construction of the task, but 67 //! is otherwise immutable and anyone can access the field immutably without 68 //! synchronization. 69 //! 70 //! * If COMPLETE is one, then the `JoinHandle` has exclusive access to the 71 //! stage field. If COMPLETE is zero, then the RUNNING bitfield functions as 72 //! a lock for the stage field, and it can be accessed only by the thread 73 //! that set RUNNING to one. 74 //! 75 //! * The waker field may be concurrently accessed by different threads: in one 76 //! thread the runtime may complete a task and *read* the waker field to 77 //! invoke the waker, and in another thread the task's `JoinHandle` may be 78 //! polled, and if the task hasn't yet completed, the `JoinHandle` may *write* 79 //! a waker to the waker field. The `JOIN_WAKER` bit ensures safe access by 80 //! multiple threads to the waker field using the following rules: 81 //! 82 //! 1. `JOIN_WAKER` is initialized to zero. 83 //! 84 //! 2. If `JOIN_WAKER` is zero, then the `JoinHandle` has exclusive (mutable) 85 //! access to the waker field. 86 //! 87 //! 3. If `JOIN_WAKER` is one, then the `JoinHandle` has shared (read-only) 88 //! access to the waker field. 89 //! 90 //! 4. If `JOIN_WAKER` is one and COMPLETE is one, then the runtime has shared 91 //! (read-only) access to the waker field. 92 //! 93 //! 5. If the `JoinHandle` needs to write to the waker field, then the 94 //! `JoinHandle` needs to (i) successfully set `JOIN_WAKER` to zero if it is 95 //! not already zero to gain exclusive access to the waker field per rule 96 //! 2, (ii) write a waker, and (iii) successfully set `JOIN_WAKER` to one. 97 //! 98 //! 6. The `JoinHandle` can change `JOIN_WAKER` only if COMPLETE is zero (i.e. 99 //! the task hasn't yet completed). 100 //! 101 //! Rule 6 implies that the steps (i) or (iii) of rule 5 may fail due to a 102 //! race. If step (i) fails, then the attempt to write a waker is aborted. If 103 //! step (iii) fails because COMPLETE is set to one by another thread after 104 //! step (i), then the waker field is cleared. Once COMPLETE is one (i.e. 105 //! task has completed), the `JoinHandle` will not modify `JOIN_WAKER`. After the 106 //! runtime sets COMPLETE to one, it invokes the waker if there is one. 107 //! 108 //! All other fields are immutable and can be accessed immutably without 109 //! synchronization by anyone. 110 //! 111 //! # Safety 112 //! 113 //! This section goes through various situations and explains why the API is 114 //! safe in that situation. 115 //! 116 //! ## Polling or dropping the future 117 //! 118 //! Any mutable access to the future happens after obtaining a lock by modifying 119 //! the RUNNING field, so exclusive access is ensured. 120 //! 121 //! When the task completes, exclusive access to the output is transferred to 122 //! the `JoinHandle`. If the `JoinHandle` is already dropped when the transition to 123 //! complete happens, the thread performing that transition retains exclusive 124 //! access to the output and should immediately drop it. 125 //! 126 //! ## Non-Send futures 127 //! 128 //! If a future is not Send, then it is bound to a `LocalOwnedTasks`. The future 129 //! will only ever be polled or dropped given a `LocalNotified` or inside a call 130 //! to `LocalOwnedTasks::shutdown_all`. In either case, it is guaranteed that the 131 //! future is on the right thread. 132 //! 133 //! If the task is never removed from the `LocalOwnedTasks`, then it is leaked, so 134 //! there is no risk that the task is dropped on some other thread when the last 135 //! ref-count drops. 136 //! 137 //! ## Non-Send output 138 //! 139 //! When a task completes, the output is placed in the stage of the task. Then, 140 //! a transition that sets COMPLETE to true is performed, and the value of 141 //! `JOIN_INTEREST` when this transition happens is read. 142 //! 143 //! If `JOIN_INTEREST` is zero when the transition to COMPLETE happens, then the 144 //! output is immediately dropped. 145 //! 146 //! If `JOIN_INTEREST` is one when the transition to COMPLETE happens, then the 147 //! `JoinHandle` is responsible for cleaning up the output. If the output is not 148 //! Send, then this happens: 149 //! 150 //! 1. The output is created on the thread that the future was polled on. Since 151 //! only non-Send futures can have non-Send output, the future was polled on 152 //! the thread that the future was spawned from. 153 //! 2. Since `JoinHandle<Output>` is not Send if Output is not Send, the 154 //! `JoinHandle` is also on the thread that the future was spawned from. 155 //! 3. Thus, the `JoinHandle` will not move the output across threads when it 156 //! takes or drops the output. 157 //! 158 //! ## Recursive poll/shutdown 159 //! 160 //! Calling poll from inside a shutdown call or vice-versa is not prevented by 161 //! the API exposed by the task module, so this has to be safe. In either case, 162 //! the lock in the RUNNING bitfield makes the inner call return immediately. If 163 //! the inner call is a `shutdown` call, then the CANCELLED bit is set, and the 164 //! poll call will notice it when the poll finishes, and the task is cancelled 165 //! at that point. 166 167 // Some task infrastructure is here to support `JoinSet`, which is currently 168 // unstable. This should be removed once `JoinSet` is stabilized. 169 #![cfg_attr(not(tokio_unstable), allow(dead_code))] 170 171 mod core; 172 use self::core::Cell; 173 use self::core::Header; 174 175 mod error; 176 pub use self::error::JoinError; 177 178 mod harness; 179 use self::harness::Harness; 180 181 mod id; 182 #[cfg_attr(not(tokio_unstable), allow(unreachable_pub, unused_imports))] 183 pub use id::{id, try_id, Id}; 184 185 #[cfg(feature = "rt")] 186 mod abort; 187 mod join; 188 189 #[cfg(feature = "rt")] 190 pub use self::abort::AbortHandle; 191 192 pub use self::join::JoinHandle; 193 194 mod list; 195 pub(crate) use self::list::{LocalOwnedTasks, OwnedTasks}; 196 197 mod raw; 198 pub(crate) use self::raw::RawTask; 199 200 mod state; 201 use self::state::State; 202 203 mod waker; 204 205 cfg_taskdump! { 206 pub(crate) mod trace; 207 } 208 209 use crate::future::Future; 210 use crate::util::linked_list; 211 use crate::util::sharded_list; 212 213 use crate::runtime::TaskCallback; 214 use std::marker::PhantomData; 215 use std::ptr::NonNull; 216 use std::{fmt, mem}; 217 218 /// An owned handle to the task, tracked by ref count. 219 #[repr(transparent)] 220 pub(crate) struct Task<S: 'static> { 221 raw: RawTask, 222 _p: PhantomData<S>, 223 } 224 225 unsafe impl<S> Send for Task<S> {} 226 unsafe impl<S> Sync for Task<S> {} 227 228 /// A task was notified. 229 #[repr(transparent)] 230 pub(crate) struct Notified<S: 'static>(Task<S>); 231 232 // safety: This type cannot be used to touch the task without first verifying 233 // that the value is on a thread where it is safe to poll the task. 234 unsafe impl<S: Schedule> Send for Notified<S> {} 235 unsafe impl<S: Schedule> Sync for Notified<S> {} 236 237 /// A non-Send variant of Notified with the invariant that it is on a thread 238 /// where it is safe to poll it. 239 #[repr(transparent)] 240 pub(crate) struct LocalNotified<S: 'static> { 241 task: Task<S>, 242 _not_send: PhantomData<*const ()>, 243 } 244 245 /// A task that is not owned by any `OwnedTasks`. Used for blocking tasks. 246 /// This type holds two ref-counts. 247 pub(crate) struct UnownedTask<S: 'static> { 248 raw: RawTask, 249 _p: PhantomData<S>, 250 } 251 252 // safety: This type can only be created given a Send task. 253 unsafe impl<S> Send for UnownedTask<S> {} 254 unsafe impl<S> Sync for UnownedTask<S> {} 255 256 /// Task result sent back. 257 pub(crate) type Result<T> = std::result::Result<T, JoinError>; 258 259 /// Hooks for scheduling tasks which are needed in the task harness. 260 #[derive(Clone)] 261 pub(crate) struct TaskHarnessScheduleHooks { 262 pub(crate) task_terminate_callback: Option<TaskCallback>, 263 } 264 265 pub(crate) trait Schedule: Sync + Sized + 'static { 266 /// The task has completed work and is ready to be released. The scheduler 267 /// should release it immediately and return it. The task module will batch 268 /// the ref-dec with setting other options. 269 /// 270 /// If the scheduler has already released the task, then None is returned. release(&self, task: &Task<Self>) -> Option<Task<Self>>271 fn release(&self, task: &Task<Self>) -> Option<Task<Self>>; 272 273 /// Schedule the task schedule(&self, task: Notified<Self>)274 fn schedule(&self, task: Notified<Self>); 275 hooks(&self) -> TaskHarnessScheduleHooks276 fn hooks(&self) -> TaskHarnessScheduleHooks; 277 278 /// Schedule the task to run in the near future, yielding the thread to 279 /// other tasks. yield_now(&self, task: Notified<Self>)280 fn yield_now(&self, task: Notified<Self>) { 281 self.schedule(task); 282 } 283 284 /// Polling the task resulted in a panic. Should the runtime shutdown? unhandled_panic(&self)285 fn unhandled_panic(&self) { 286 // By default, do nothing. This maintains the 1.0 behavior. 287 } 288 } 289 290 cfg_rt! { 291 /// This is the constructor for a new task. Three references to the task are 292 /// created. The first task reference is usually put into an `OwnedTasks` 293 /// immediately. The Notified is sent to the scheduler as an ordinary 294 /// notification. 295 fn new_task<T, S>( 296 task: T, 297 scheduler: S, 298 id: Id, 299 ) -> (Task<S>, Notified<S>, JoinHandle<T::Output>) 300 where 301 S: Schedule, 302 T: Future + 'static, 303 T::Output: 'static, 304 { 305 let raw = RawTask::new::<T, S>(task, scheduler, id); 306 let task = Task { 307 raw, 308 _p: PhantomData, 309 }; 310 let notified = Notified(Task { 311 raw, 312 _p: PhantomData, 313 }); 314 let join = JoinHandle::new(raw); 315 316 (task, notified, join) 317 } 318 319 /// Creates a new task with an associated join handle. This method is used 320 /// only when the task is not going to be stored in an `OwnedTasks` list. 321 /// 322 /// Currently only blocking tasks use this method. 323 pub(crate) fn unowned<T, S>(task: T, scheduler: S, id: Id) -> (UnownedTask<S>, JoinHandle<T::Output>) 324 where 325 S: Schedule, 326 T: Send + Future + 'static, 327 T::Output: Send + 'static, 328 { 329 let (task, notified, join) = new_task(task, scheduler, id); 330 331 // This transfers the ref-count of task and notified into an UnownedTask. 332 // This is valid because an UnownedTask holds two ref-counts. 333 let unowned = UnownedTask { 334 raw: task.raw, 335 _p: PhantomData, 336 }; 337 std::mem::forget(task); 338 std::mem::forget(notified); 339 340 (unowned, join) 341 } 342 } 343 344 impl<S: 'static> Task<S> { new(raw: RawTask) -> Task<S>345 unsafe fn new(raw: RawTask) -> Task<S> { 346 Task { 347 raw, 348 _p: PhantomData, 349 } 350 } 351 from_raw(ptr: NonNull<Header>) -> Task<S>352 unsafe fn from_raw(ptr: NonNull<Header>) -> Task<S> { 353 Task::new(RawTask::from_raw(ptr)) 354 } 355 356 #[cfg(all( 357 tokio_unstable, 358 tokio_taskdump, 359 feature = "rt", 360 target_os = "linux", 361 any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64") 362 ))] as_raw(&self) -> RawTask363 pub(super) fn as_raw(&self) -> RawTask { 364 self.raw 365 } 366 header(&self) -> &Header367 fn header(&self) -> &Header { 368 self.raw.header() 369 } 370 header_ptr(&self) -> NonNull<Header>371 fn header_ptr(&self) -> NonNull<Header> { 372 self.raw.header_ptr() 373 } 374 375 cfg_taskdump! { 376 /// Notify the task for task dumping. 377 /// 378 /// Returns `None` if the task has already been notified. 379 pub(super) fn notify_for_tracing(&self) -> Option<Notified<S>> { 380 if self.as_raw().state().transition_to_notified_for_tracing() { 381 // SAFETY: `transition_to_notified_for_tracing` increments the 382 // refcount. 383 Some(unsafe { Notified(Task::new(self.raw)) }) 384 } else { 385 None 386 } 387 } 388 389 /// Returns a [task ID] that uniquely identifies this task relative to other 390 /// currently spawned tasks. 391 /// 392 /// [task ID]: crate::task::Id 393 #[cfg(tokio_unstable)] 394 pub(crate) fn id(&self) -> crate::task::Id { 395 // Safety: The header pointer is valid. 396 unsafe { Header::get_id(self.raw.header_ptr()) } 397 } 398 } 399 } 400 401 impl<S: 'static> Notified<S> { header(&self) -> &Header402 fn header(&self) -> &Header { 403 self.0.header() 404 } 405 } 406 407 impl<S: 'static> Notified<S> { from_raw(ptr: RawTask) -> Notified<S>408 pub(crate) unsafe fn from_raw(ptr: RawTask) -> Notified<S> { 409 Notified(Task::new(ptr)) 410 } 411 } 412 413 impl<S: 'static> Notified<S> { into_raw(self) -> RawTask414 pub(crate) fn into_raw(self) -> RawTask { 415 let raw = self.0.raw; 416 mem::forget(self); 417 raw 418 } 419 } 420 421 impl<S: Schedule> Task<S> { 422 /// Preemptively cancels the task as part of the shutdown process. shutdown(self)423 pub(crate) fn shutdown(self) { 424 let raw = self.raw; 425 mem::forget(self); 426 raw.shutdown(); 427 } 428 } 429 430 impl<S: Schedule> LocalNotified<S> { 431 /// Runs the task. run(self)432 pub(crate) fn run(self) { 433 let raw = self.task.raw; 434 mem::forget(self); 435 raw.poll(); 436 } 437 } 438 439 impl<S: Schedule> UnownedTask<S> { 440 // Used in test of the inject queue. 441 #[cfg(test)] 442 #[cfg_attr(target_family = "wasm", allow(dead_code))] into_notified(self) -> Notified<S>443 pub(super) fn into_notified(self) -> Notified<S> { 444 Notified(self.into_task()) 445 } 446 into_task(self) -> Task<S>447 fn into_task(self) -> Task<S> { 448 // Convert into a task. 449 let task = Task { 450 raw: self.raw, 451 _p: PhantomData, 452 }; 453 mem::forget(self); 454 455 // Drop a ref-count since an UnownedTask holds two. 456 task.header().state.ref_dec(); 457 458 task 459 } 460 run(self)461 pub(crate) fn run(self) { 462 let raw = self.raw; 463 mem::forget(self); 464 465 // Transfer one ref-count to a Task object. 466 let task = Task::<S> { 467 raw, 468 _p: PhantomData, 469 }; 470 471 // Use the other ref-count to poll the task. 472 raw.poll(); 473 // Decrement our extra ref-count 474 drop(task); 475 } 476 shutdown(self)477 pub(crate) fn shutdown(self) { 478 self.into_task().shutdown(); 479 } 480 } 481 482 impl<S: 'static> Drop for Task<S> { drop(&mut self)483 fn drop(&mut self) { 484 // Decrement the ref count 485 if self.header().state.ref_dec() { 486 // Deallocate if this is the final ref count 487 self.raw.dealloc(); 488 } 489 } 490 } 491 492 impl<S: 'static> Drop for UnownedTask<S> { drop(&mut self)493 fn drop(&mut self) { 494 // Decrement the ref count 495 if self.raw.header().state.ref_dec_twice() { 496 // Deallocate if this is the final ref count 497 self.raw.dealloc(); 498 } 499 } 500 } 501 502 impl<S> fmt::Debug for Task<S> { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result503 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 504 write!(fmt, "Task({:p})", self.header()) 505 } 506 } 507 508 impl<S> fmt::Debug for Notified<S> { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result509 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 510 write!(fmt, "task::Notified({:p})", self.0.header()) 511 } 512 } 513 514 /// # Safety 515 /// 516 /// Tasks are pinned. 517 unsafe impl<S> linked_list::Link for Task<S> { 518 type Handle = Task<S>; 519 type Target = Header; 520 as_raw(handle: &Task<S>) -> NonNull<Header>521 fn as_raw(handle: &Task<S>) -> NonNull<Header> { 522 handle.raw.header_ptr() 523 } 524 from_raw(ptr: NonNull<Header>) -> Task<S>525 unsafe fn from_raw(ptr: NonNull<Header>) -> Task<S> { 526 Task::from_raw(ptr) 527 } 528 pointers(target: NonNull<Header>) -> NonNull<linked_list::Pointers<Header>>529 unsafe fn pointers(target: NonNull<Header>) -> NonNull<linked_list::Pointers<Header>> { 530 self::core::Trailer::addr_of_owned(Header::get_trailer(target)) 531 } 532 } 533 534 /// # Safety 535 /// 536 /// The id of a task is never changed after creation of the task, so the return value of 537 /// `get_shard_id` will not change. (The cast may throw away the upper 32 bits of the task id, but 538 /// the shard id still won't change from call to call.) 539 unsafe impl<S> sharded_list::ShardedListItem for Task<S> { get_shard_id(target: NonNull<Self::Target>) -> usize540 unsafe fn get_shard_id(target: NonNull<Self::Target>) -> usize { 541 // SAFETY: The caller guarantees that `target` points at a valid task. 542 let task_id = unsafe { Header::get_id(target) }; 543 task_id.0.get() as usize 544 } 545 } 546