1  #![doc(html_root_url = "https://docs.rs/want/0.3.1")]
2  #![deny(warnings)]
3  #![deny(missing_docs)]
4  #![deny(missing_debug_implementations)]
5  
6  //! A Futures channel-like utility to signal when a value is wanted.
7  //!
8  //! Futures are supposed to be lazy, and only starting work if `Future::poll`
9  //! is called. The same is true of `Stream`s, but when using a channel as
10  //! a `Stream`, it can be hard to know if the receiver is ready for the next
11  //! value.
12  //!
13  //! Put another way, given a `(tx, rx)` from `futures::sync::mpsc::channel()`,
14  //! how can the sender (`tx`) know when the receiver (`rx`) actually wants more
15  //! work to be produced? Just because there is room in the channel buffer
16  //! doesn't mean the work would be used by the receiver.
17  //!
18  //! This is where something like `want` comes in. Added to a channel, you can
19  //! make sure that the `tx` only creates the message and sends it when the `rx`
20  //! has `poll()` for it, and the buffer was empty.
21  //!
22  //! # Example
23  //!
24  //! ```nightly
25  //! # //#![feature(async_await)]
26  //! extern crate want;
27  //!
28  //! # fn spawn<T>(_t: T) {}
29  //! # fn we_still_want_message() -> bool { true }
30  //! # fn mpsc_channel() -> (Tx, Rx) { (Tx, Rx) }
31  //! # struct Tx;
32  //! # impl Tx { fn send<T>(&mut self, _: T) {} }
33  //! # struct Rx;
34  //! # impl Rx { async fn recv(&mut self) -> Option<Expensive> { Some(Expensive) } }
35  //!
36  //! // Some message that is expensive to produce.
37  //! struct Expensive;
38  //!
39  //! // Some futures-aware MPSC channel...
40  //! let (mut tx, mut rx) = mpsc_channel();
41  //!
42  //! // And our `want` channel!
43  //! let (mut gv, mut tk) = want::new();
44  //!
45  //!
46  //! // Our receiving task...
47  //! spawn(async move {
48  //!     // Maybe something comes up that prevents us from ever
49  //!     // using the expensive message.
50  //!     //
51  //!     // Without `want`, the "send" task may have started to
52  //!     // produce the expensive message even though we wouldn't
53  //!     // be able to use it.
54  //!     if !we_still_want_message() {
55  //!         return;
56  //!     }
57  //!
58  //!     // But we can use it! So tell the `want` channel.
59  //!     tk.want();
60  //!
61  //!     match rx.recv().await {
62  //!         Some(_msg) => println!("got a message"),
63  //!         None => println!("DONE"),
64  //!     }
65  //! });
66  //!
67  //! // Our sending task
68  //! spawn(async move {
69  //!     // It's expensive to create a new message, so we wait until the
70  //!     // receiving end truly *wants* the message.
71  //!     if let Err(_closed) = gv.want().await {
72  //!         // Looks like they will never want it...
73  //!         return;
74  //!     }
75  //!
76  //!     // They want it, let's go!
77  //!     tx.send(Expensive);
78  //! });
79  //!
80  //! # fn main() {}
81  //! ```
82  
83  use std::fmt;
84  use std::future::Future;
85  use std::mem;
86  use std::pin::Pin;
87  use std::sync::Arc;
88  use std::sync::atomic::AtomicUsize;
89  // SeqCst is the only ordering used to ensure accessing the state and
90  // TryLock are never re-ordered.
91  use std::sync::atomic::Ordering::SeqCst;
92  use std::task::{self, Poll, Waker};
93  
94  
95  use try_lock::TryLock;
96  
97  /// Create a new `want` channel.
new() -> (Giver, Taker)98  pub fn new() -> (Giver, Taker) {
99      let inner = Arc::new(Inner {
100          state: AtomicUsize::new(State::Idle.into()),
101          task: TryLock::new(None),
102      });
103      let inner2 = inner.clone();
104      (
105          Giver {
106              inner,
107          },
108          Taker {
109              inner: inner2,
110          },
111      )
112  }
113  
114  /// An entity that gives a value when wanted.
115  pub struct Giver {
116      inner: Arc<Inner>,
117  }
118  
119  /// An entity that wants a value.
120  pub struct Taker {
121      inner: Arc<Inner>,
122  }
123  
124  /// A cloneable `Giver`.
125  ///
126  /// It differs from `Giver` in that you cannot poll for `want`. It's only
127  /// usable as a cancellation watcher.
128  #[derive(Clone)]
129  pub struct SharedGiver {
130      inner: Arc<Inner>,
131  }
132  
133  /// The `Taker` has canceled its interest in a value.
134  pub struct Closed {
135      _inner: (),
136  }
137  
138  #[derive(Clone, Copy, Debug)]
139  enum State {
140      Idle,
141      Want,
142      Give,
143      Closed,
144  }
145  
146  impl From<State> for usize {
from(s: State) -> usize147      fn from(s: State) -> usize {
148          match s {
149              State::Idle => 0,
150              State::Want => 1,
151              State::Give => 2,
152              State::Closed => 3,
153          }
154      }
155  }
156  
157  impl From<usize> for State {
from(num: usize) -> State158      fn from(num: usize) -> State {
159          match num {
160              0 => State::Idle,
161              1 => State::Want,
162              2 => State::Give,
163              3 => State::Closed,
164              _ => unreachable!("unknown state: {}", num),
165          }
166      }
167  }
168  
169  struct Inner {
170      state: AtomicUsize,
171      task: TryLock<Option<Waker>>,
172  }
173  
174  // ===== impl Giver ======
175  
176  impl Giver {
177      /// Returns a `Future` that fulfills when the `Taker` has done some action.
want(&mut self) -> impl Future<Output = Result<(), Closed>> + '_178      pub fn want(&mut self) -> impl Future<Output = Result<(), Closed>> + '_ {
179          Want(self)
180      }
181  
182      /// Poll whether the `Taker` has registered interest in another value.
183      ///
184      /// - If the `Taker` has called `want()`, this returns `Async::Ready(())`.
185      /// - If the `Taker` has not called `want()` since last poll, this
186      ///   returns `Async::NotReady`, and parks the current task to be notified
187      ///   when the `Taker` does call `want()`.
188      /// - If the `Taker` has canceled (or dropped), this returns `Closed`.
189      ///
190      /// After knowing that the Taker is wanting, the state can be reset by
191      /// calling [`give`](Giver::give).
poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Closed>>192      pub fn poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Closed>> {
193          loop {
194              let state = self.inner.state.load(SeqCst).into();
195              match state {
196                  State::Want => {
197                      return Poll::Ready(Ok(()));
198                  },
199                  State::Closed => {
200                      return Poll::Ready(Err(Closed { _inner: () }));
201                  },
202                  State::Idle | State::Give => {
203                      // Taker doesn't want anything yet, so park.
204                      if let Some(mut locked) = self.inner.task.try_lock_explicit(SeqCst, SeqCst) {
205  
206                          // While we have the lock, try to set to GIVE.
207                          let old = self.inner.state.compare_exchange(
208                              state.into(),
209                              State::Give.into(),
210                              SeqCst,
211                              SeqCst,
212                          );
213                          // If it's still the first state (Idle or Give), park current task.
214                          if old == Ok(state.into()) {
215                              let park = locked.as_ref()
216                                  .map(|w| !w.will_wake(cx.waker()))
217                                  .unwrap_or(true);
218                              if park {
219                                  let old = mem::replace(&mut *locked, Some(cx.waker().clone()));
220                                  drop(locked);
221                                  if let Some(prev_task) = old {
222                                      // there was an old task parked here.
223                                      // it might be waiting to be notified,
224                                      // so poke it before dropping.
225                                      prev_task.wake();
226                                  };
227                              }
228                              return Poll::Pending;
229                          }
230                          // Otherwise, something happened! Go around the loop again.
231                      } else {
232                          // if we couldn't take the lock, then a Taker has it.
233                          // The *ONLY* reason is because it is in the process of notifying us
234                          // of its want.
235                          //
236                          // We need to loop again to see what state it was changed to.
237                      }
238                  },
239              }
240          }
241      }
242  
243      /// Mark the state as idle, if the Taker currently is wanting.
244      ///
245      /// Returns true if Taker was wanting, false otherwise.
246      #[inline]
give(&self) -> bool247      pub fn give(&self) -> bool {
248          // only set to IDLE if it is still Want
249          let old = self.inner.state.compare_exchange(
250              State::Want.into(),
251              State::Idle.into(),
252              SeqCst,
253              SeqCst);
254          old == Ok(State::Want.into())
255      }
256  
257      /// Check if the `Taker` has called `want()` without parking a task.
258      ///
259      /// This is safe to call outside of a futures task context, but other
260      /// means of being notified is left to the user.
261      #[inline]
is_wanting(&self) -> bool262      pub fn is_wanting(&self) -> bool {
263          self.inner.state.load(SeqCst) == State::Want.into()
264      }
265  
266  
267      /// Check if the `Taker` has canceled interest without parking a task.
268      #[inline]
is_canceled(&self) -> bool269      pub fn is_canceled(&self) -> bool {
270          self.inner.state.load(SeqCst) == State::Closed.into()
271      }
272  
273      /// Converts this into a `SharedGiver`.
274      #[inline]
shared(self) -> SharedGiver275      pub fn shared(self) -> SharedGiver {
276          SharedGiver {
277              inner: self.inner,
278          }
279      }
280  }
281  
282  impl fmt::Debug for Giver {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result283      fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
284          f.debug_struct("Giver")
285              .field("state", &self.inner.state())
286              .finish()
287      }
288  }
289  
290  // ===== impl SharedGiver ======
291  
292  impl SharedGiver {
293      /// Check if the `Taker` has called `want()` without parking a task.
294      ///
295      /// This is safe to call outside of a futures task context, but other
296      /// means of being notified is left to the user.
297      #[inline]
is_wanting(&self) -> bool298      pub fn is_wanting(&self) -> bool {
299          self.inner.state.load(SeqCst) == State::Want.into()
300      }
301  
302  
303      /// Check if the `Taker` has canceled interest without parking a task.
304      #[inline]
is_canceled(&self) -> bool305      pub fn is_canceled(&self) -> bool {
306          self.inner.state.load(SeqCst) == State::Closed.into()
307      }
308  }
309  
310  impl fmt::Debug for SharedGiver {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result311      fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
312          f.debug_struct("SharedGiver")
313              .field("state", &self.inner.state())
314              .finish()
315      }
316  }
317  
318  // ===== impl Taker ======
319  
320  impl Taker {
321      /// Signal to the `Giver` that the want is canceled.
322      ///
323      /// This is useful to tell that the channel is closed if you cannot
324      /// drop the value yet.
325      #[inline]
cancel(&mut self)326      pub fn cancel(&mut self) {
327          self.signal(State::Closed)
328      }
329  
330      /// Signal to the `Giver` that a value is wanted.
331      #[inline]
want(&mut self)332      pub fn want(&mut self) {
333          debug_assert!(
334              self.inner.state.load(SeqCst) != State::Closed.into(),
335              "want called after cancel"
336          );
337          self.signal(State::Want)
338      }
339  
340      #[inline]
signal(&mut self, state: State)341      fn signal(&mut self, state: State) {
342          let old_state = self.inner.state.swap(state.into(), SeqCst).into();
343          match old_state {
344              State::Idle | State::Want | State::Closed => (),
345              State::Give => {
346                  loop {
347                      if let Some(mut locked) = self.inner.task.try_lock_explicit(SeqCst, SeqCst) {
348                          if let Some(task) = locked.take() {
349                              drop(locked);
350                              task.wake();
351                          }
352                          return;
353                      } else {
354                          // if we couldn't take the lock, then a Giver has it.
355                          // The *ONLY* reason is because it is in the process of parking.
356                          //
357                          // We need to loop and take the lock so we can notify this task.
358                      }
359                  }
360              },
361          }
362      }
363  }
364  
365  impl Drop for Taker {
366      #[inline]
drop(&mut self)367      fn drop(&mut self) {
368          self.signal(State::Closed);
369      }
370  }
371  
372  impl fmt::Debug for Taker {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result373      fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
374          f.debug_struct("Taker")
375              .field("state", &self.inner.state())
376              .finish()
377      }
378  }
379  
380  // ===== impl Closed ======
381  
382  impl fmt::Debug for Closed {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result383      fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
384          f.debug_struct("Closed")
385              .finish()
386      }
387  }
388  
389  // ===== impl Inner ======
390  
391  impl Inner {
392      #[inline]
state(&self) -> State393      fn state(&self) -> State {
394          self.state.load(SeqCst).into()
395      }
396  }
397  
398  // ===== impl PollFn ======
399  
400  struct Want<'a>(&'a mut Giver);
401  
402  
403  impl Future for Want<'_> {
404      type Output = Result<(), Closed>;
405  
poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>406      fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
407          self.0.poll_want(cx)
408      }
409  }
410  
411  #[cfg(test)]
412  mod tests {
413      use std::thread;
414      use tokio_sync::oneshot;
415      use super::*;
416  
block_on<F: Future>(f: F) -> F::Output417      fn block_on<F: Future>(f: F) -> F::Output {
418          tokio_executor::enter()
419              .expect("block_on enter")
420              .block_on(f)
421      }
422  
423      #[test]
want_ready()424      fn want_ready() {
425          let (mut gv, mut tk) = new();
426  
427          tk.want();
428  
429          block_on(gv.want()).unwrap();
430      }
431  
432      #[test]
want_notify_0()433      fn want_notify_0() {
434          let (mut gv, mut tk) = new();
435          let (tx, rx) = oneshot::channel();
436  
437          thread::spawn(move || {
438              tk.want();
439              // use a oneshot to keep this thread alive
440              // until other thread was notified of want
441              block_on(rx).expect("rx");
442          });
443  
444          block_on(gv.want()).expect("want");
445  
446          assert!(gv.is_wanting(), "still wanting after poll_want success");
447          assert!(gv.give(), "give is true when wanting");
448  
449          assert!(!gv.is_wanting(), "no longer wanting after give");
450          assert!(!gv.is_canceled(), "give doesn't cancel");
451  
452          assert!(!gv.give(), "give is false if not wanting");
453  
454          tx.send(()).expect("tx");
455      }
456  
457      /*
458      /// This tests that if the Giver moves tasks after parking,
459      /// it will still wake up the correct task.
460      #[test]
461      fn want_notify_moving_tasks() {
462          use std::sync::Arc;
463          use futures::executor::{spawn, Notify, NotifyHandle};
464  
465          struct WantNotify;
466  
467          impl Notify for WantNotify {
468              fn notify(&self, _id: usize) {
469              }
470          }
471  
472          fn n() -> NotifyHandle {
473              Arc::new(WantNotify).into()
474          }
475  
476          let (mut gv, mut tk) = new();
477  
478          let mut s = spawn(poll_fn(move || {
479              gv.poll_want()
480          }));
481  
482          // Register with t1 as the task::current()
483          let t1 = n();
484          assert!(s.poll_future_notify(&t1, 1).unwrap().is_not_ready());
485  
486          thread::spawn(move || {
487              thread::sleep(::std::time::Duration::from_millis(100));
488              tk.want();
489          });
490  
491          // And now, move to a ThreadNotify task.
492          s.into_inner().wait().expect("poll_want");
493      }
494      */
495  
496      #[test]
cancel()497      fn cancel() {
498          // explicit
499          let (mut gv, mut tk) = new();
500  
501          assert!(!gv.is_canceled());
502  
503          tk.cancel();
504  
505          assert!(gv.is_canceled());
506          block_on(gv.want()).unwrap_err();
507  
508          // implicit
509          let (mut gv, tk) = new();
510  
511          assert!(!gv.is_canceled());
512  
513          drop(tk);
514  
515          assert!(gv.is_canceled());
516          block_on(gv.want()).unwrap_err();
517  
518          // notifies
519          let (mut gv, tk) = new();
520  
521          thread::spawn(move || {
522              let _tk = tk;
523              // and dropped
524          });
525  
526          block_on(gv.want()).unwrap_err();
527      }
528  
529      /*
530      #[test]
531      fn stress() {
532          let nthreads = 5;
533          let nwants = 100;
534  
535          for _ in 0..nthreads {
536              let (mut gv, mut tk) = new();
537              let (mut tx, mut rx) = mpsc::channel(0);
538  
539              // rx thread
540              thread::spawn(move || {
541                  let mut cnt = 0;
542                  poll_fn(move || {
543                      while cnt < nwants {
544                          let n = match rx.poll().expect("rx poll") {
545                              Async::Ready(n) => n.expect("rx opt"),
546                              Async::NotReady => {
547                                  tk.want();
548                                  return Ok(Async::NotReady);
549                              },
550                          };
551                          assert_eq!(cnt, n);
552                          cnt += 1;
553                      }
554                      Ok::<_, ()>(Async::Ready(()))
555                  }).wait().expect("rx wait");
556              });
557  
558              // tx thread
559              thread::spawn(move || {
560                  let mut cnt = 0;
561                  let nsent = poll_fn(move || {
562                      loop {
563                          while let Ok(()) = tx.try_send(cnt) {
564                              cnt += 1;
565                          }
566                          match gv.poll_want() {
567                              Ok(Async::Ready(_)) => (),
568                              Ok(Async::NotReady) => return Ok::<_, ()>(Async::NotReady),
569                              Err(_) => return Ok(Async::Ready(cnt)),
570                          }
571                      }
572                  }).wait().expect("tx wait");
573  
574                  assert_eq!(nsent, nwants);
575              }).join().expect("thread join");
576          }
577      }
578      */
579  }
580