1 use futures::channel::oneshot::{self, Canceled};
2 use futures::executor::block_on;
3 use futures::future;
4 use std::sync::mpsc::{channel, TryRecvError};
5 
6 // mod support;
7 // use support::*;
8 
unselect<T, E, A, B>(r: Result<Either<(T, B), (T, A)>, Either<(E, B), (E, A)>>) -> Result<T, E>9 fn unselect<T, E, A, B>(r: Result<Either<(T, B), (T, A)>, Either<(E, B), (E, A)>>) -> Result<T, E> {
10     match r {
11         Ok(Either::Left((t, _))) | Ok(Either::Right((t, _))) => Ok(t),
12         Err(Either::Left((e, _))) | Err(Either::Right((e, _))) => Err(e),
13     }
14 }
15 
16 #[test]
result_smoke()17 fn result_smoke() {
18     fn is_future_v<A, B, C>(_: C)
19     where
20         A: Send + 'static,
21         B: Send + 'static,
22         C: Future<Item = A, Error = B>,
23     {
24     }
25 
26     is_future_v::<i32, u32, _>(f_ok(1).map(|a| a + 1));
27     is_future_v::<i32, u32, _>(f_ok(1).map_err(|a| a + 1));
28     is_future_v::<i32, u32, _>(f_ok(1).and_then(Ok));
29     is_future_v::<i32, u32, _>(f_ok(1).or_else(Err));
30     is_future_v::<(i32, i32), u32, _>(f_ok(1).join(Err(3)));
31     is_future_v::<i32, u32, _>(f_ok(1).map(f_ok).flatten());
32 
33     assert_done(|| f_ok(1), r_ok(1));
34     assert_done(|| f_err(1), r_err(1));
35     assert_done(|| result(Ok(1)), r_ok(1));
36     assert_done(|| result(Err(1)), r_err(1));
37     assert_done(|| ok(1), r_ok(1));
38     assert_done(|| err(1), r_err(1));
39     assert_done(|| f_ok(1).map(|a| a + 2), r_ok(3));
40     assert_done(|| f_err(1).map(|a| a + 2), r_err(1));
41     assert_done(|| f_ok(1).map_err(|a| a + 2), r_ok(1));
42     assert_done(|| f_err(1).map_err(|a| a + 2), r_err(3));
43     assert_done(|| f_ok(1).and_then(|a| Ok(a + 2)), r_ok(3));
44     assert_done(|| f_err(1).and_then(|a| Ok(a + 2)), r_err(1));
45     assert_done(|| f_ok(1).and_then(|a| Err(a as u32 + 3)), r_err(4));
46     assert_done(|| f_err(1).and_then(|a| Err(a as u32 + 4)), r_err(1));
47     assert_done(|| f_ok(1).or_else(|a| Ok(a as i32 + 2)), r_ok(1));
48     assert_done(|| f_err(1).or_else(|a| Ok(a as i32 + 2)), r_ok(3));
49     assert_done(|| f_ok(1).or_else(|a| Err(a + 3)), r_ok(1));
50     assert_done(|| f_err(1).or_else(|a| Err(a + 4)), r_err(5));
51     assert_done(|| f_ok(1).select(f_err(2)).then(unselect), r_ok(1));
52     assert_done(|| f_ok(1).select(Ok(2)).then(unselect), r_ok(1));
53     assert_done(|| f_err(1).select(f_ok(1)).then(unselect), r_err(1));
54     assert_done(|| f_ok(1).select(empty()).then(unselect), Ok(1));
55     assert_done(|| empty().select(f_ok(1)).then(unselect), Ok(1));
56     assert_done(|| f_ok(1).join(f_err(1)), Err(1));
57     assert_done(|| f_ok(1).join(Ok(2)), Ok((1, 2)));
58     assert_done(|| f_err(1).join(f_ok(1)), Err(1));
59     assert_done(|| f_ok(1).then(|_| Ok(2)), r_ok(2));
60     assert_done(|| f_ok(1).then(|_| Err(2)), r_err(2));
61     assert_done(|| f_err(1).then(|_| Ok(2)), r_ok(2));
62     assert_done(|| f_err(1).then(|_| Err(2)), r_err(2));
63 }
64 
65 #[test]
test_empty()66 fn test_empty() {
67     fn empty() -> Empty<i32, u32> {
68         future::empty()
69     }
70 
71     assert_empty(|| empty());
72     assert_empty(|| empty().select(empty()));
73     assert_empty(|| empty().join(empty()));
74     assert_empty(|| empty().join(f_ok(1)));
75     assert_empty(|| f_ok(1).join(empty()));
76     assert_empty(|| empty().or_else(move |_| empty()));
77     assert_empty(|| empty().and_then(move |_| empty()));
78     assert_empty(|| f_err(1).or_else(move |_| empty()));
79     assert_empty(|| f_ok(1).and_then(move |_| empty()));
80     assert_empty(|| empty().map(|a| a + 1));
81     assert_empty(|| empty().map_err(|a| a + 1));
82     assert_empty(|| empty().then(|a| a));
83 }
84 
85 #[test]
test_ok()86 fn test_ok() {
87     assert_done(|| ok(1), r_ok(1));
88     assert_done(|| err(1), r_err(1));
89 }
90 
91 #[test]
flatten()92 fn flatten() {
93     fn ok<T: Send + 'static>(a: T) -> FutureResult<T, u32> {
94         future::ok(a)
95     }
96     fn err<E: Send + 'static>(b: E) -> FutureResult<i32, E> {
97         future::err(b)
98     }
99 
100     assert_done(|| ok(ok(1)).flatten(), r_ok(1));
101     assert_done(|| ok(err(1)).flatten(), r_err(1));
102     assert_done(|| err(1u32).map(ok).flatten(), r_err(1));
103     assert_done(|| future::ok(future::ok(1)).flatten(), r_ok(1));
104     assert_empty(|| ok(empty::<i32, u32>()).flatten());
105     assert_empty(|| empty::<i32, u32>().map(ok).flatten());
106 }
107 
108 #[test]
smoke_oneshot()109 fn smoke_oneshot() {
110     assert_done(
111         || {
112             let (c, p) = oneshot::channel();
113             c.send(1).unwrap();
114             p
115         },
116         Ok(1),
117     );
118     assert_done(
119         || {
120             let (c, p) = oneshot::channel::<i32>();
121             drop(c);
122             p
123         },
124         Err(Canceled),
125     );
126     let mut completes = Vec::new();
127     assert_empty(|| {
128         let (a, b) = oneshot::channel::<i32>();
129         completes.push(a);
130         b
131     });
132 
133     let (c, mut p) = oneshot::channel::<i32>();
134     drop(c);
135     let res = panic_waker_lw(|lw| p.poll(lw));
136     assert!(res.is_err());
137     let (c, p) = oneshot::channel::<i32>();
138     drop(c);
139     let (tx, rx) = channel();
140     p.then(move |_| tx.send(())).forget();
141     rx.recv().unwrap();
142 }
143 
144 #[test]
select_cancels()145 fn select_cancels() {
146     let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
147     let ((btx, brx), (dtx, drx)) = (channel(), channel());
148     let b = b.map(move |b| {
149         btx.send(b).unwrap();
150         b
151     });
152     let d = d.map(move |d| {
153         dtx.send(d).unwrap();
154         d
155     });
156 
157     let mut f = b.select(d).then(unselect);
158     // assert!(f.poll(&mut Task::new()).is_pending());
159     assert!(brx.try_recv().is_err());
160     assert!(drx.try_recv().is_err());
161     a.send(1).unwrap();
162     noop_waker_lw(|lw| {
163         let res = f.poll(lw);
164         assert!(res.ok().unwrap().is_ready());
165         assert_eq!(brx.recv().unwrap(), 1);
166         drop(c);
167         assert!(drx.recv().is_err());
168 
169         let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
170         let ((btx, _brx), (dtx, drx)) = (channel(), channel());
171         let b = b.map(move |b| {
172             btx.send(b).unwrap();
173             b
174         });
175         let d = d.map(move |d| {
176             dtx.send(d).unwrap();
177             d
178         });
179 
180         let mut f = b.select(d).then(unselect);
181         assert!(f.poll(lw).ok().unwrap().is_pending());
182         assert!(f.poll(lw).ok().unwrap().is_pending());
183         a.send(1).unwrap();
184         assert!(f.poll(lw).ok().unwrap().is_ready());
185         drop((c, f));
186         assert!(drx.recv().is_err());
187     })
188 }
189 
190 #[test]
join_cancels()191 fn join_cancels() {
192     let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
193     let ((btx, _brx), (dtx, drx)) = (channel(), channel());
194     let b = b.map(move |b| {
195         btx.send(b).unwrap();
196         b
197     });
198     let d = d.map(move |d| {
199         dtx.send(d).unwrap();
200         d
201     });
202 
203     let mut f = b.join(d);
204     drop(a);
205     let res = panic_waker_lw(|lw| f.poll(lw));
206     assert!(res.is_err());
207     drop(c);
208     assert!(drx.recv().is_err());
209 
210     let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
211     let ((btx, _brx), (dtx, drx)) = (channel(), channel());
212     let b = b.map(move |b| {
213         btx.send(b).unwrap();
214         b
215     });
216     let d = d.map(move |d| {
217         dtx.send(d).unwrap();
218         d
219     });
220 
221     let (tx, rx) = channel();
222     let f = b.join(d);
223     f.then(move |_| {
224         tx.send(()).unwrap();
225         let res: Result<(), ()> = Ok(());
226         res
227     })
228     .forget();
229     assert!(rx.try_recv().is_err());
230     drop(a);
231     rx.recv().unwrap();
232     drop(c);
233     assert!(drx.recv().is_err());
234 }
235 
236 #[test]
join_incomplete()237 fn join_incomplete() {
238     let (a, b) = oneshot::channel::<i32>();
239     let (tx, rx) = channel();
240     noop_waker_lw(|lw| {
241         let mut f = ok(1).join(b).map(move |r| tx.send(r).unwrap());
242         assert!(f.poll(lw).ok().unwrap().is_pending());
243         assert!(rx.try_recv().is_err());
244         a.send(2).unwrap();
245         assert!(f.poll(lw).ok().unwrap().is_ready());
246         assert_eq!(rx.recv().unwrap(), (1, 2));
247 
248         let (a, b) = oneshot::channel::<i32>();
249         let (tx, rx) = channel();
250         let mut f = b.join(Ok(2)).map(move |r| tx.send(r).unwrap());
251         assert!(f.poll(lw).ok().unwrap().is_pending());
252         assert!(rx.try_recv().is_err());
253         a.send(1).unwrap();
254         assert!(f.poll(lw).ok().unwrap().is_ready());
255         assert_eq!(rx.recv().unwrap(), (1, 2));
256 
257         let (a, b) = oneshot::channel::<i32>();
258         let (tx, rx) = channel();
259         let mut f = ok(1).join(b).map_err(move |_r| tx.send(2).unwrap());
260         assert!(f.poll(lw).ok().unwrap().is_pending());
261         assert!(rx.try_recv().is_err());
262         drop(a);
263         assert!(f.poll(lw).is_err());
264         assert_eq!(rx.recv().unwrap(), 2);
265 
266         let (a, b) = oneshot::channel::<i32>();
267         let (tx, rx) = channel();
268         let mut f = b.join(Ok(2)).map_err(move |_r| tx.send(1).unwrap());
269         assert!(f.poll(lw).ok().unwrap().is_pending());
270         assert!(rx.try_recv().is_err());
271         drop(a);
272         assert!(f.poll(lw).is_err());
273         assert_eq!(rx.recv().unwrap(), 1);
274     })
275 }
276 
277 #[test]
select2()278 fn select2() {
279     assert_done(|| f_ok(2).select(empty()).then(unselect), Ok(2));
280     assert_done(|| empty().select(f_ok(2)).then(unselect), Ok(2));
281     assert_done(|| f_err(2).select(empty()).then(unselect), Err(2));
282     assert_done(|| empty().select(f_err(2)).then(unselect), Err(2));
283 
284     assert_done(
285         || {
286             f_ok(1).select(f_ok(2)).map_err(|_| 0).and_then(|either_tup| {
287                 let (a, b) = either_tup.into_inner();
288                 b.map(move |b| a + b)
289             })
290         },
291         Ok(3),
292     );
293 
294     // Finish one half of a select and then fail the second, ensuring that we
295     // get the notification of the second one.
296     {
297         let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
298         let f = b.select(d);
299         let (tx, rx) = channel();
300         f.map(move |r| tx.send(r).unwrap()).forget();
301         a.send(1).unwrap();
302         let (val, next) = rx.recv().unwrap().into_inner();
303         assert_eq!(val, 1);
304         let (tx, rx) = channel();
305         next.map_err(move |_r| tx.send(2).unwrap()).forget();
306         assert_eq!(rx.try_recv().err().unwrap(), TryRecvError::Empty);
307         drop(c);
308         assert_eq!(rx.recv().unwrap(), 2);
309     }
310 
311     // Fail the second half and ensure that we see the first one finish
312     {
313         let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
314         let f = b.select(d);
315         let (tx, rx) = channel();
316         f.map_err(move |r| tx.send((1, r.into_inner().1)).unwrap()).forget();
317         drop(c);
318         let (val, next) = rx.recv().unwrap();
319         assert_eq!(val, 1);
320         let (tx, rx) = channel();
321         next.map(move |r| tx.send(r).unwrap()).forget();
322         assert_eq!(rx.try_recv().err().unwrap(), TryRecvError::Empty);
323         a.send(2).unwrap();
324         assert_eq!(rx.recv().unwrap(), 2);
325     }
326 
327     // Cancelling the first half should cancel the second
328     {
329         let ((_a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
330         let ((btx, brx), (dtx, drx)) = (channel(), channel());
331         let b = b.map(move |v| {
332             btx.send(v).unwrap();
333             v
334         });
335         let d = d.map(move |v| {
336             dtx.send(v).unwrap();
337             v
338         });
339         let f = b.select(d);
340         drop(f);
341         assert!(drx.recv().is_err());
342         assert!(brx.recv().is_err());
343     }
344 
345     // Cancel after a schedule
346     {
347         let ((_a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
348         let ((btx, brx), (dtx, drx)) = (channel(), channel());
349         let b = b.map(move |v| {
350             btx.send(v).unwrap();
351             v
352         });
353         let d = d.map(move |v| {
354             dtx.send(v).unwrap();
355             v
356         });
357         let mut f = b.select(d);
358         let _res = noop_waker_lw(|lw| f.poll(lw));
359         drop(f);
360         assert!(drx.recv().is_err());
361         assert!(brx.recv().is_err());
362     }
363 
364     // Cancel propagates
365     {
366         let ((a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
367         let ((btx, brx), (dtx, drx)) = (channel(), channel());
368         let b = b.map(move |v| {
369             btx.send(v).unwrap();
370             v
371         });
372         let d = d.map(move |v| {
373             dtx.send(v).unwrap();
374             v
375         });
376         let (tx, rx) = channel();
377         b.select(d).map(move |_| tx.send(()).unwrap()).forget();
378         drop(a);
379         assert!(drx.recv().is_err());
380         assert!(brx.recv().is_err());
381         assert!(rx.recv().is_err());
382     }
383 
384     // Cancel on early drop
385     {
386         let (tx, rx) = channel();
387         let f = f_ok(1).select(empty::<_, ()>().map(move |()| {
388             tx.send(()).unwrap();
389             1
390         }));
391         drop(f);
392         assert!(rx.recv().is_err());
393     }
394 }
395 
396 #[test]
option()397 fn option() {
398     assert_eq!(Ok(Some(())), block_on(Some(ok::<(), ()>(())).into_future()));
399     assert_eq!(Ok::<_, ()>(None::<()>), block_on(None::<FutureResult<(), ()>>.into_future()));
400 }
401