1 #![doc(html_root_url = "https://docs.rs/want/0.3.1")] 2 #![deny(warnings)] 3 #![deny(missing_docs)] 4 #![deny(missing_debug_implementations)] 5 6 //! A Futures channel-like utility to signal when a value is wanted. 7 //! 8 //! Futures are supposed to be lazy, and only starting work if `Future::poll` 9 //! is called. The same is true of `Stream`s, but when using a channel as 10 //! a `Stream`, it can be hard to know if the receiver is ready for the next 11 //! value. 12 //! 13 //! Put another way, given a `(tx, rx)` from `futures::sync::mpsc::channel()`, 14 //! how can the sender (`tx`) know when the receiver (`rx`) actually wants more 15 //! work to be produced? Just because there is room in the channel buffer 16 //! doesn't mean the work would be used by the receiver. 17 //! 18 //! This is where something like `want` comes in. Added to a channel, you can 19 //! make sure that the `tx` only creates the message and sends it when the `rx` 20 //! has `poll()` for it, and the buffer was empty. 21 //! 22 //! # Example 23 //! 24 //! ```nightly 25 //! # //#![feature(async_await)] 26 //! extern crate want; 27 //! 28 //! # fn spawn<T>(_t: T) {} 29 //! # fn we_still_want_message() -> bool { true } 30 //! # fn mpsc_channel() -> (Tx, Rx) { (Tx, Rx) } 31 //! # struct Tx; 32 //! # impl Tx { fn send<T>(&mut self, _: T) {} } 33 //! # struct Rx; 34 //! # impl Rx { async fn recv(&mut self) -> Option<Expensive> { Some(Expensive) } } 35 //! 36 //! // Some message that is expensive to produce. 37 //! struct Expensive; 38 //! 39 //! // Some futures-aware MPSC channel... 40 //! let (mut tx, mut rx) = mpsc_channel(); 41 //! 42 //! // And our `want` channel! 43 //! let (mut gv, mut tk) = want::new(); 44 //! 45 //! 46 //! // Our receiving task... 47 //! spawn(async move { 48 //! // Maybe something comes up that prevents us from ever 49 //! // using the expensive message. 50 //! // 51 //! // Without `want`, the "send" task may have started to 52 //! // produce the expensive message even though we wouldn't 53 //! // be able to use it. 54 //! if !we_still_want_message() { 55 //! return; 56 //! } 57 //! 58 //! // But we can use it! So tell the `want` channel. 59 //! tk.want(); 60 //! 61 //! match rx.recv().await { 62 //! Some(_msg) => println!("got a message"), 63 //! None => println!("DONE"), 64 //! } 65 //! }); 66 //! 67 //! // Our sending task 68 //! spawn(async move { 69 //! // It's expensive to create a new message, so we wait until the 70 //! // receiving end truly *wants* the message. 71 //! if let Err(_closed) = gv.want().await { 72 //! // Looks like they will never want it... 73 //! return; 74 //! } 75 //! 76 //! // They want it, let's go! 77 //! tx.send(Expensive); 78 //! }); 79 //! 80 //! # fn main() {} 81 //! ``` 82 83 use std::fmt; 84 use std::future::Future; 85 use std::mem; 86 use std::pin::Pin; 87 use std::sync::Arc; 88 use std::sync::atomic::AtomicUsize; 89 // SeqCst is the only ordering used to ensure accessing the state and 90 // TryLock are never re-ordered. 91 use std::sync::atomic::Ordering::SeqCst; 92 use std::task::{self, Poll, Waker}; 93 94 95 use try_lock::TryLock; 96 97 /// Create a new `want` channel. new() -> (Giver, Taker)98 pub fn new() -> (Giver, Taker) { 99 let inner = Arc::new(Inner { 100 state: AtomicUsize::new(State::Idle.into()), 101 task: TryLock::new(None), 102 }); 103 let inner2 = inner.clone(); 104 ( 105 Giver { 106 inner, 107 }, 108 Taker { 109 inner: inner2, 110 }, 111 ) 112 } 113 114 /// An entity that gives a value when wanted. 115 pub struct Giver { 116 inner: Arc<Inner>, 117 } 118 119 /// An entity that wants a value. 120 pub struct Taker { 121 inner: Arc<Inner>, 122 } 123 124 /// A cloneable `Giver`. 125 /// 126 /// It differs from `Giver` in that you cannot poll for `want`. It's only 127 /// usable as a cancellation watcher. 128 #[derive(Clone)] 129 pub struct SharedGiver { 130 inner: Arc<Inner>, 131 } 132 133 /// The `Taker` has canceled its interest in a value. 134 pub struct Closed { 135 _inner: (), 136 } 137 138 #[derive(Clone, Copy, Debug)] 139 enum State { 140 Idle, 141 Want, 142 Give, 143 Closed, 144 } 145 146 impl From<State> for usize { from(s: State) -> usize147 fn from(s: State) -> usize { 148 match s { 149 State::Idle => 0, 150 State::Want => 1, 151 State::Give => 2, 152 State::Closed => 3, 153 } 154 } 155 } 156 157 impl From<usize> for State { from(num: usize) -> State158 fn from(num: usize) -> State { 159 match num { 160 0 => State::Idle, 161 1 => State::Want, 162 2 => State::Give, 163 3 => State::Closed, 164 _ => unreachable!("unknown state: {}", num), 165 } 166 } 167 } 168 169 struct Inner { 170 state: AtomicUsize, 171 task: TryLock<Option<Waker>>, 172 } 173 174 // ===== impl Giver ====== 175 176 impl Giver { 177 /// Returns a `Future` that fulfills when the `Taker` has done some action. want(&mut self) -> impl Future<Output = Result<(), Closed>> + '_178 pub fn want(&mut self) -> impl Future<Output = Result<(), Closed>> + '_ { 179 Want(self) 180 } 181 182 /// Poll whether the `Taker` has registered interest in another value. 183 /// 184 /// - If the `Taker` has called `want()`, this returns `Async::Ready(())`. 185 /// - If the `Taker` has not called `want()` since last poll, this 186 /// returns `Async::NotReady`, and parks the current task to be notified 187 /// when the `Taker` does call `want()`. 188 /// - If the `Taker` has canceled (or dropped), this returns `Closed`. 189 /// 190 /// After knowing that the Taker is wanting, the state can be reset by 191 /// calling [`give`](Giver::give). poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Closed>>192 pub fn poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Closed>> { 193 loop { 194 let state = self.inner.state.load(SeqCst).into(); 195 match state { 196 State::Want => { 197 return Poll::Ready(Ok(())); 198 }, 199 State::Closed => { 200 return Poll::Ready(Err(Closed { _inner: () })); 201 }, 202 State::Idle | State::Give => { 203 // Taker doesn't want anything yet, so park. 204 if let Some(mut locked) = self.inner.task.try_lock_explicit(SeqCst, SeqCst) { 205 206 // While we have the lock, try to set to GIVE. 207 let old = self.inner.state.compare_exchange( 208 state.into(), 209 State::Give.into(), 210 SeqCst, 211 SeqCst, 212 ); 213 // If it's still the first state (Idle or Give), park current task. 214 if old == Ok(state.into()) { 215 let park = locked.as_ref() 216 .map(|w| !w.will_wake(cx.waker())) 217 .unwrap_or(true); 218 if park { 219 let old = mem::replace(&mut *locked, Some(cx.waker().clone())); 220 drop(locked); 221 if let Some(prev_task) = old { 222 // there was an old task parked here. 223 // it might be waiting to be notified, 224 // so poke it before dropping. 225 prev_task.wake(); 226 }; 227 } 228 return Poll::Pending; 229 } 230 // Otherwise, something happened! Go around the loop again. 231 } else { 232 // if we couldn't take the lock, then a Taker has it. 233 // The *ONLY* reason is because it is in the process of notifying us 234 // of its want. 235 // 236 // We need to loop again to see what state it was changed to. 237 } 238 }, 239 } 240 } 241 } 242 243 /// Mark the state as idle, if the Taker currently is wanting. 244 /// 245 /// Returns true if Taker was wanting, false otherwise. 246 #[inline] give(&self) -> bool247 pub fn give(&self) -> bool { 248 // only set to IDLE if it is still Want 249 let old = self.inner.state.compare_exchange( 250 State::Want.into(), 251 State::Idle.into(), 252 SeqCst, 253 SeqCst); 254 old == Ok(State::Want.into()) 255 } 256 257 /// Check if the `Taker` has called `want()` without parking a task. 258 /// 259 /// This is safe to call outside of a futures task context, but other 260 /// means of being notified is left to the user. 261 #[inline] is_wanting(&self) -> bool262 pub fn is_wanting(&self) -> bool { 263 self.inner.state.load(SeqCst) == State::Want.into() 264 } 265 266 267 /// Check if the `Taker` has canceled interest without parking a task. 268 #[inline] is_canceled(&self) -> bool269 pub fn is_canceled(&self) -> bool { 270 self.inner.state.load(SeqCst) == State::Closed.into() 271 } 272 273 /// Converts this into a `SharedGiver`. 274 #[inline] shared(self) -> SharedGiver275 pub fn shared(self) -> SharedGiver { 276 SharedGiver { 277 inner: self.inner, 278 } 279 } 280 } 281 282 impl fmt::Debug for Giver { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result283 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 284 f.debug_struct("Giver") 285 .field("state", &self.inner.state()) 286 .finish() 287 } 288 } 289 290 // ===== impl SharedGiver ====== 291 292 impl SharedGiver { 293 /// Check if the `Taker` has called `want()` without parking a task. 294 /// 295 /// This is safe to call outside of a futures task context, but other 296 /// means of being notified is left to the user. 297 #[inline] is_wanting(&self) -> bool298 pub fn is_wanting(&self) -> bool { 299 self.inner.state.load(SeqCst) == State::Want.into() 300 } 301 302 303 /// Check if the `Taker` has canceled interest without parking a task. 304 #[inline] is_canceled(&self) -> bool305 pub fn is_canceled(&self) -> bool { 306 self.inner.state.load(SeqCst) == State::Closed.into() 307 } 308 } 309 310 impl fmt::Debug for SharedGiver { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result311 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 312 f.debug_struct("SharedGiver") 313 .field("state", &self.inner.state()) 314 .finish() 315 } 316 } 317 318 // ===== impl Taker ====== 319 320 impl Taker { 321 /// Signal to the `Giver` that the want is canceled. 322 /// 323 /// This is useful to tell that the channel is closed if you cannot 324 /// drop the value yet. 325 #[inline] cancel(&mut self)326 pub fn cancel(&mut self) { 327 self.signal(State::Closed) 328 } 329 330 /// Signal to the `Giver` that a value is wanted. 331 #[inline] want(&mut self)332 pub fn want(&mut self) { 333 debug_assert!( 334 self.inner.state.load(SeqCst) != State::Closed.into(), 335 "want called after cancel" 336 ); 337 self.signal(State::Want) 338 } 339 340 #[inline] signal(&mut self, state: State)341 fn signal(&mut self, state: State) { 342 let old_state = self.inner.state.swap(state.into(), SeqCst).into(); 343 match old_state { 344 State::Idle | State::Want | State::Closed => (), 345 State::Give => { 346 loop { 347 if let Some(mut locked) = self.inner.task.try_lock_explicit(SeqCst, SeqCst) { 348 if let Some(task) = locked.take() { 349 drop(locked); 350 task.wake(); 351 } 352 return; 353 } else { 354 // if we couldn't take the lock, then a Giver has it. 355 // The *ONLY* reason is because it is in the process of parking. 356 // 357 // We need to loop and take the lock so we can notify this task. 358 } 359 } 360 }, 361 } 362 } 363 } 364 365 impl Drop for Taker { 366 #[inline] drop(&mut self)367 fn drop(&mut self) { 368 self.signal(State::Closed); 369 } 370 } 371 372 impl fmt::Debug for Taker { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result373 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 374 f.debug_struct("Taker") 375 .field("state", &self.inner.state()) 376 .finish() 377 } 378 } 379 380 // ===== impl Closed ====== 381 382 impl fmt::Debug for Closed { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result383 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 384 f.debug_struct("Closed") 385 .finish() 386 } 387 } 388 389 // ===== impl Inner ====== 390 391 impl Inner { 392 #[inline] state(&self) -> State393 fn state(&self) -> State { 394 self.state.load(SeqCst).into() 395 } 396 } 397 398 // ===== impl PollFn ====== 399 400 struct Want<'a>(&'a mut Giver); 401 402 403 impl Future for Want<'_> { 404 type Output = Result<(), Closed>; 405 poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>406 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { 407 self.0.poll_want(cx) 408 } 409 } 410 411 #[cfg(test)] 412 mod tests { 413 use std::thread; 414 use tokio_sync::oneshot; 415 use super::*; 416 block_on<F: Future>(f: F) -> F::Output417 fn block_on<F: Future>(f: F) -> F::Output { 418 tokio_executor::enter() 419 .expect("block_on enter") 420 .block_on(f) 421 } 422 423 #[test] want_ready()424 fn want_ready() { 425 let (mut gv, mut tk) = new(); 426 427 tk.want(); 428 429 block_on(gv.want()).unwrap(); 430 } 431 432 #[test] want_notify_0()433 fn want_notify_0() { 434 let (mut gv, mut tk) = new(); 435 let (tx, rx) = oneshot::channel(); 436 437 thread::spawn(move || { 438 tk.want(); 439 // use a oneshot to keep this thread alive 440 // until other thread was notified of want 441 block_on(rx).expect("rx"); 442 }); 443 444 block_on(gv.want()).expect("want"); 445 446 assert!(gv.is_wanting(), "still wanting after poll_want success"); 447 assert!(gv.give(), "give is true when wanting"); 448 449 assert!(!gv.is_wanting(), "no longer wanting after give"); 450 assert!(!gv.is_canceled(), "give doesn't cancel"); 451 452 assert!(!gv.give(), "give is false if not wanting"); 453 454 tx.send(()).expect("tx"); 455 } 456 457 /* 458 /// This tests that if the Giver moves tasks after parking, 459 /// it will still wake up the correct task. 460 #[test] 461 fn want_notify_moving_tasks() { 462 use std::sync::Arc; 463 use futures::executor::{spawn, Notify, NotifyHandle}; 464 465 struct WantNotify; 466 467 impl Notify for WantNotify { 468 fn notify(&self, _id: usize) { 469 } 470 } 471 472 fn n() -> NotifyHandle { 473 Arc::new(WantNotify).into() 474 } 475 476 let (mut gv, mut tk) = new(); 477 478 let mut s = spawn(poll_fn(move || { 479 gv.poll_want() 480 })); 481 482 // Register with t1 as the task::current() 483 let t1 = n(); 484 assert!(s.poll_future_notify(&t1, 1).unwrap().is_not_ready()); 485 486 thread::spawn(move || { 487 thread::sleep(::std::time::Duration::from_millis(100)); 488 tk.want(); 489 }); 490 491 // And now, move to a ThreadNotify task. 492 s.into_inner().wait().expect("poll_want"); 493 } 494 */ 495 496 #[test] cancel()497 fn cancel() { 498 // explicit 499 let (mut gv, mut tk) = new(); 500 501 assert!(!gv.is_canceled()); 502 503 tk.cancel(); 504 505 assert!(gv.is_canceled()); 506 block_on(gv.want()).unwrap_err(); 507 508 // implicit 509 let (mut gv, tk) = new(); 510 511 assert!(!gv.is_canceled()); 512 513 drop(tk); 514 515 assert!(gv.is_canceled()); 516 block_on(gv.want()).unwrap_err(); 517 518 // notifies 519 let (mut gv, tk) = new(); 520 521 thread::spawn(move || { 522 let _tk = tk; 523 // and dropped 524 }); 525 526 block_on(gv.want()).unwrap_err(); 527 } 528 529 /* 530 #[test] 531 fn stress() { 532 let nthreads = 5; 533 let nwants = 100; 534 535 for _ in 0..nthreads { 536 let (mut gv, mut tk) = new(); 537 let (mut tx, mut rx) = mpsc::channel(0); 538 539 // rx thread 540 thread::spawn(move || { 541 let mut cnt = 0; 542 poll_fn(move || { 543 while cnt < nwants { 544 let n = match rx.poll().expect("rx poll") { 545 Async::Ready(n) => n.expect("rx opt"), 546 Async::NotReady => { 547 tk.want(); 548 return Ok(Async::NotReady); 549 }, 550 }; 551 assert_eq!(cnt, n); 552 cnt += 1; 553 } 554 Ok::<_, ()>(Async::Ready(())) 555 }).wait().expect("rx wait"); 556 }); 557 558 // tx thread 559 thread::spawn(move || { 560 let mut cnt = 0; 561 let nsent = poll_fn(move || { 562 loop { 563 while let Ok(()) = tx.try_send(cnt) { 564 cnt += 1; 565 } 566 match gv.poll_want() { 567 Ok(Async::Ready(_)) => (), 568 Ok(Async::NotReady) => return Ok::<_, ()>(Async::NotReady), 569 Err(_) => return Ok(Async::Ready(cnt)), 570 } 571 } 572 }).wait().expect("tx wait"); 573 574 assert_eq!(nsent, nwants); 575 }).join().expect("thread join"); 576 } 577 } 578 */ 579 } 580