1 //! Tests for the array channel flavor.
2 
3 use std::any::Any;
4 use std::sync::atomic::AtomicUsize;
5 use std::sync::atomic::Ordering;
6 use std::thread;
7 use std::time::Duration;
8 
9 use crossbeam_channel::{bounded, select, Receiver};
10 use crossbeam_channel::{RecvError, RecvTimeoutError, TryRecvError};
11 use crossbeam_channel::{SendError, SendTimeoutError, TrySendError};
12 use crossbeam_utils::thread::scope;
13 use rand::{thread_rng, Rng};
14 
ms(ms: u64) -> Duration15 fn ms(ms: u64) -> Duration {
16     Duration::from_millis(ms)
17 }
18 
19 #[test]
smoke()20 fn smoke() {
21     let (s, r) = bounded(1);
22     s.send(7).unwrap();
23     assert_eq!(r.try_recv(), Ok(7));
24 
25     s.send(8).unwrap();
26     assert_eq!(r.recv(), Ok(8));
27 
28     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
29     assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout));
30 }
31 
32 #[test]
capacity()33 fn capacity() {
34     for i in 1..10 {
35         let (s, r) = bounded::<()>(i);
36         assert_eq!(s.capacity(), Some(i));
37         assert_eq!(r.capacity(), Some(i));
38     }
39 }
40 
41 #[test]
len_empty_full()42 fn len_empty_full() {
43     let (s, r) = bounded(2);
44 
45     assert_eq!(s.len(), 0);
46     assert!(s.is_empty());
47     assert!(!s.is_full());
48     assert_eq!(r.len(), 0);
49     assert!(r.is_empty());
50     assert!(!r.is_full());
51 
52     s.send(()).unwrap();
53 
54     assert_eq!(s.len(), 1);
55     assert!(!s.is_empty());
56     assert!(!s.is_full());
57     assert_eq!(r.len(), 1);
58     assert!(!r.is_empty());
59     assert!(!r.is_full());
60 
61     s.send(()).unwrap();
62 
63     assert_eq!(s.len(), 2);
64     assert!(!s.is_empty());
65     assert!(s.is_full());
66     assert_eq!(r.len(), 2);
67     assert!(!r.is_empty());
68     assert!(r.is_full());
69 
70     r.recv().unwrap();
71 
72     assert_eq!(s.len(), 1);
73     assert!(!s.is_empty());
74     assert!(!s.is_full());
75     assert_eq!(r.len(), 1);
76     assert!(!r.is_empty());
77     assert!(!r.is_full());
78 }
79 
80 #[test]
try_recv()81 fn try_recv() {
82     let (s, r) = bounded(100);
83 
84     scope(|scope| {
85         scope.spawn(move |_| {
86             assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
87             thread::sleep(ms(1500));
88             assert_eq!(r.try_recv(), Ok(7));
89             thread::sleep(ms(500));
90             assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
91         });
92         scope.spawn(move |_| {
93             thread::sleep(ms(1000));
94             s.send(7).unwrap();
95         });
96     })
97     .unwrap();
98 }
99 
100 #[test]
recv()101 fn recv() {
102     let (s, r) = bounded(100);
103 
104     scope(|scope| {
105         scope.spawn(move |_| {
106             assert_eq!(r.recv(), Ok(7));
107             thread::sleep(ms(1000));
108             assert_eq!(r.recv(), Ok(8));
109             thread::sleep(ms(1000));
110             assert_eq!(r.recv(), Ok(9));
111             assert_eq!(r.recv(), Err(RecvError));
112         });
113         scope.spawn(move |_| {
114             thread::sleep(ms(1500));
115             s.send(7).unwrap();
116             s.send(8).unwrap();
117             s.send(9).unwrap();
118         });
119     })
120     .unwrap();
121 }
122 
123 #[test]
recv_timeout()124 fn recv_timeout() {
125     let (s, r) = bounded::<i32>(100);
126 
127     scope(|scope| {
128         scope.spawn(move |_| {
129             assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout));
130             assert_eq!(r.recv_timeout(ms(1000)), Ok(7));
131             assert_eq!(
132                 r.recv_timeout(ms(1000)),
133                 Err(RecvTimeoutError::Disconnected)
134             );
135         });
136         scope.spawn(move |_| {
137             thread::sleep(ms(1500));
138             s.send(7).unwrap();
139         });
140     })
141     .unwrap();
142 }
143 
144 #[test]
try_send()145 fn try_send() {
146     let (s, r) = bounded(1);
147 
148     scope(|scope| {
149         scope.spawn(move |_| {
150             assert_eq!(s.try_send(1), Ok(()));
151             assert_eq!(s.try_send(2), Err(TrySendError::Full(2)));
152             thread::sleep(ms(1500));
153             assert_eq!(s.try_send(3), Ok(()));
154             thread::sleep(ms(500));
155             assert_eq!(s.try_send(4), Err(TrySendError::Disconnected(4)));
156         });
157         scope.spawn(move |_| {
158             thread::sleep(ms(1000));
159             assert_eq!(r.try_recv(), Ok(1));
160             assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
161             assert_eq!(r.recv(), Ok(3));
162         });
163     })
164     .unwrap();
165 }
166 
167 #[test]
send()168 fn send() {
169     let (s, r) = bounded(1);
170 
171     scope(|scope| {
172         scope.spawn(|_| {
173             s.send(7).unwrap();
174             thread::sleep(ms(1000));
175             s.send(8).unwrap();
176             thread::sleep(ms(1000));
177             s.send(9).unwrap();
178             thread::sleep(ms(1000));
179             s.send(10).unwrap();
180         });
181         scope.spawn(|_| {
182             thread::sleep(ms(1500));
183             assert_eq!(r.recv(), Ok(7));
184             assert_eq!(r.recv(), Ok(8));
185             assert_eq!(r.recv(), Ok(9));
186         });
187     })
188     .unwrap();
189 }
190 
191 #[test]
send_timeout()192 fn send_timeout() {
193     let (s, r) = bounded(2);
194 
195     scope(|scope| {
196         scope.spawn(move |_| {
197             assert_eq!(s.send_timeout(1, ms(1000)), Ok(()));
198             assert_eq!(s.send_timeout(2, ms(1000)), Ok(()));
199             assert_eq!(
200                 s.send_timeout(3, ms(500)),
201                 Err(SendTimeoutError::Timeout(3))
202             );
203             thread::sleep(ms(1000));
204             assert_eq!(s.send_timeout(4, ms(1000)), Ok(()));
205             thread::sleep(ms(1000));
206             assert_eq!(s.send(5), Err(SendError(5)));
207         });
208         scope.spawn(move |_| {
209             thread::sleep(ms(1000));
210             assert_eq!(r.recv(), Ok(1));
211             thread::sleep(ms(1000));
212             assert_eq!(r.recv(), Ok(2));
213             assert_eq!(r.recv(), Ok(4));
214         });
215     })
216     .unwrap();
217 }
218 
219 #[test]
send_after_disconnect()220 fn send_after_disconnect() {
221     let (s, r) = bounded(100);
222 
223     s.send(1).unwrap();
224     s.send(2).unwrap();
225     s.send(3).unwrap();
226 
227     drop(r);
228 
229     assert_eq!(s.send(4), Err(SendError(4)));
230     assert_eq!(s.try_send(5), Err(TrySendError::Disconnected(5)));
231     assert_eq!(
232         s.send_timeout(6, ms(500)),
233         Err(SendTimeoutError::Disconnected(6))
234     );
235 }
236 
237 #[test]
recv_after_disconnect()238 fn recv_after_disconnect() {
239     let (s, r) = bounded(100);
240 
241     s.send(1).unwrap();
242     s.send(2).unwrap();
243     s.send(3).unwrap();
244 
245     drop(s);
246 
247     assert_eq!(r.recv(), Ok(1));
248     assert_eq!(r.recv(), Ok(2));
249     assert_eq!(r.recv(), Ok(3));
250     assert_eq!(r.recv(), Err(RecvError));
251 }
252 
253 #[test]
len()254 fn len() {
255     #[cfg(miri)]
256     const COUNT: usize = 50;
257     #[cfg(not(miri))]
258     const COUNT: usize = 25_000;
259     #[cfg(miri)]
260     const CAP: usize = 50;
261     #[cfg(not(miri))]
262     const CAP: usize = 1000;
263 
264     let (s, r) = bounded(CAP);
265 
266     assert_eq!(s.len(), 0);
267     assert_eq!(r.len(), 0);
268 
269     for _ in 0..CAP / 10 {
270         for i in 0..50 {
271             s.send(i).unwrap();
272             assert_eq!(s.len(), i + 1);
273         }
274 
275         for i in 0..50 {
276             r.recv().unwrap();
277             assert_eq!(r.len(), 50 - i - 1);
278         }
279     }
280 
281     assert_eq!(s.len(), 0);
282     assert_eq!(r.len(), 0);
283 
284     for i in 0..CAP {
285         s.send(i).unwrap();
286         assert_eq!(s.len(), i + 1);
287     }
288 
289     for _ in 0..CAP {
290         r.recv().unwrap();
291     }
292 
293     assert_eq!(s.len(), 0);
294     assert_eq!(r.len(), 0);
295 
296     scope(|scope| {
297         scope.spawn(|_| {
298             for i in 0..COUNT {
299                 assert_eq!(r.recv(), Ok(i));
300                 let len = r.len();
301                 assert!(len <= CAP);
302             }
303         });
304 
305         scope.spawn(|_| {
306             for i in 0..COUNT {
307                 s.send(i).unwrap();
308                 let len = s.len();
309                 assert!(len <= CAP);
310             }
311         });
312     })
313     .unwrap();
314 
315     assert_eq!(s.len(), 0);
316     assert_eq!(r.len(), 0);
317 }
318 
319 #[test]
disconnect_wakes_sender()320 fn disconnect_wakes_sender() {
321     let (s, r) = bounded(1);
322 
323     scope(|scope| {
324         scope.spawn(move |_| {
325             assert_eq!(s.send(()), Ok(()));
326             assert_eq!(s.send(()), Err(SendError(())));
327         });
328         scope.spawn(move |_| {
329             thread::sleep(ms(1000));
330             drop(r);
331         });
332     })
333     .unwrap();
334 }
335 
336 #[test]
disconnect_wakes_receiver()337 fn disconnect_wakes_receiver() {
338     let (s, r) = bounded::<()>(1);
339 
340     scope(|scope| {
341         scope.spawn(move |_| {
342             assert_eq!(r.recv(), Err(RecvError));
343         });
344         scope.spawn(move |_| {
345             thread::sleep(ms(1000));
346             drop(s);
347         });
348     })
349     .unwrap();
350 }
351 
352 #[test]
spsc()353 fn spsc() {
354     #[cfg(miri)]
355     const COUNT: usize = 100;
356     #[cfg(not(miri))]
357     const COUNT: usize = 100_000;
358 
359     let (s, r) = bounded(3);
360 
361     scope(|scope| {
362         scope.spawn(move |_| {
363             for i in 0..COUNT {
364                 assert_eq!(r.recv(), Ok(i));
365             }
366             assert_eq!(r.recv(), Err(RecvError));
367         });
368         scope.spawn(move |_| {
369             for i in 0..COUNT {
370                 s.send(i).unwrap();
371             }
372         });
373     })
374     .unwrap();
375 }
376 
377 #[test]
mpmc()378 fn mpmc() {
379     #[cfg(miri)]
380     const COUNT: usize = 50;
381     #[cfg(not(miri))]
382     const COUNT: usize = 25_000;
383     const THREADS: usize = 4;
384 
385     let (s, r) = bounded::<usize>(3);
386     let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
387 
388     scope(|scope| {
389         for _ in 0..THREADS {
390             scope.spawn(|_| {
391                 for _ in 0..COUNT {
392                     let n = r.recv().unwrap();
393                     v[n].fetch_add(1, Ordering::SeqCst);
394                 }
395             });
396         }
397         for _ in 0..THREADS {
398             scope.spawn(|_| {
399                 for i in 0..COUNT {
400                     s.send(i).unwrap();
401                 }
402             });
403         }
404     })
405     .unwrap();
406 
407     for c in v {
408         assert_eq!(c.load(Ordering::SeqCst), THREADS);
409     }
410 }
411 
412 #[test]
stress_oneshot()413 fn stress_oneshot() {
414     #[cfg(miri)]
415     const COUNT: usize = 100;
416     #[cfg(not(miri))]
417     const COUNT: usize = 10_000;
418 
419     for _ in 0..COUNT {
420         let (s, r) = bounded(1);
421 
422         scope(|scope| {
423             scope.spawn(|_| r.recv().unwrap());
424             scope.spawn(|_| s.send(0).unwrap());
425         })
426         .unwrap();
427     }
428 }
429 
430 #[test]
stress_iter()431 fn stress_iter() {
432     #[cfg(miri)]
433     const COUNT: usize = 100;
434     #[cfg(not(miri))]
435     const COUNT: usize = 100_000;
436 
437     let (request_s, request_r) = bounded(1);
438     let (response_s, response_r) = bounded(1);
439 
440     scope(|scope| {
441         scope.spawn(move |_| {
442             let mut count = 0;
443             loop {
444                 for x in response_r.try_iter() {
445                     count += x;
446                     if count == COUNT {
447                         return;
448                     }
449                 }
450                 request_s.send(()).unwrap();
451             }
452         });
453 
454         for _ in request_r.iter() {
455             if response_s.send(1).is_err() {
456                 break;
457             }
458         }
459     })
460     .unwrap();
461 }
462 
463 #[test]
stress_timeout_two_threads()464 fn stress_timeout_two_threads() {
465     const COUNT: usize = 100;
466 
467     let (s, r) = bounded(2);
468 
469     scope(|scope| {
470         scope.spawn(|_| {
471             for i in 0..COUNT {
472                 if i % 2 == 0 {
473                     thread::sleep(ms(50));
474                 }
475                 loop {
476                     if let Ok(()) = s.send_timeout(i, ms(10)) {
477                         break;
478                     }
479                 }
480             }
481         });
482 
483         scope.spawn(|_| {
484             for i in 0..COUNT {
485                 if i % 2 == 0 {
486                     thread::sleep(ms(50));
487                 }
488                 loop {
489                     if let Ok(x) = r.recv_timeout(ms(10)) {
490                         assert_eq!(x, i);
491                         break;
492                     }
493                 }
494             }
495         });
496     })
497     .unwrap();
498 }
499 
500 #[test]
drops()501 fn drops() {
502     #[cfg(miri)]
503     const RUNS: usize = 10;
504     #[cfg(not(miri))]
505     const RUNS: usize = 100;
506     #[cfg(miri)]
507     const STEPS: usize = 100;
508     #[cfg(not(miri))]
509     const STEPS: usize = 10_000;
510 
511     static DROPS: AtomicUsize = AtomicUsize::new(0);
512 
513     #[derive(Debug, PartialEq)]
514     struct DropCounter;
515 
516     impl Drop for DropCounter {
517         fn drop(&mut self) {
518             DROPS.fetch_add(1, Ordering::SeqCst);
519         }
520     }
521 
522     let mut rng = thread_rng();
523 
524     for _ in 0..RUNS {
525         let steps = rng.gen_range(0..STEPS);
526         let additional = rng.gen_range(0..50);
527 
528         DROPS.store(0, Ordering::SeqCst);
529         let (s, r) = bounded::<DropCounter>(50);
530 
531         scope(|scope| {
532             scope.spawn(|_| {
533                 for _ in 0..steps {
534                     r.recv().unwrap();
535                 }
536             });
537 
538             scope.spawn(|_| {
539                 for _ in 0..steps {
540                     s.send(DropCounter).unwrap();
541                 }
542             });
543         })
544         .unwrap();
545 
546         for _ in 0..additional {
547             s.send(DropCounter).unwrap();
548         }
549 
550         assert_eq!(DROPS.load(Ordering::SeqCst), steps);
551         drop(s);
552         drop(r);
553         assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional);
554     }
555 }
556 
557 #[test]
linearizable()558 fn linearizable() {
559     #[cfg(miri)]
560     const COUNT: usize = 50;
561     #[cfg(not(miri))]
562     const COUNT: usize = 25_000;
563     const THREADS: usize = 4;
564 
565     let (s, r) = bounded(THREADS);
566 
567     scope(|scope| {
568         for _ in 0..THREADS {
569             scope.spawn(|_| {
570                 for _ in 0..COUNT {
571                     s.send(0).unwrap();
572                     r.try_recv().unwrap();
573                 }
574             });
575         }
576     })
577     .unwrap();
578 }
579 
580 #[test]
fairness()581 fn fairness() {
582     #[cfg(miri)]
583     const COUNT: usize = 100;
584     #[cfg(not(miri))]
585     const COUNT: usize = 10_000;
586 
587     let (s1, r1) = bounded::<()>(COUNT);
588     let (s2, r2) = bounded::<()>(COUNT);
589 
590     for _ in 0..COUNT {
591         s1.send(()).unwrap();
592         s2.send(()).unwrap();
593     }
594 
595     let mut hits = [0usize; 2];
596     for _ in 0..COUNT {
597         select! {
598             recv(r1) -> _  => hits[0] += 1,
599             recv(r2) -> _  => hits[1] += 1,
600         }
601     }
602     assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
603 }
604 
605 #[test]
fairness_duplicates()606 fn fairness_duplicates() {
607     #[cfg(miri)]
608     const COUNT: usize = 100;
609     #[cfg(not(miri))]
610     const COUNT: usize = 10_000;
611 
612     let (s, r) = bounded::<()>(COUNT);
613 
614     for _ in 0..COUNT {
615         s.send(()).unwrap();
616     }
617 
618     let mut hits = [0usize; 5];
619     for _ in 0..COUNT {
620         select! {
621             recv(r) -> _ => hits[0] += 1,
622             recv(r) -> _ => hits[1] += 1,
623             recv(r) -> _ => hits[2] += 1,
624             recv(r) -> _ => hits[3] += 1,
625             recv(r) -> _ => hits[4] += 1,
626         }
627     }
628     assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
629 }
630 
631 #[test]
recv_in_send()632 fn recv_in_send() {
633     let (s, _r) = bounded(1);
634     s.send(()).unwrap();
635 
636     #[allow(unreachable_code)]
637     {
638         select! {
639             send(s, panic!()) -> _ => panic!(),
640             default => {}
641         }
642     }
643 
644     let (s, r) = bounded(2);
645     s.send(()).unwrap();
646 
647     select! {
648         send(s, assert_eq!(r.recv(), Ok(()))) -> _ => {}
649     }
650 }
651 
652 #[test]
channel_through_channel()653 fn channel_through_channel() {
654     #[cfg(miri)]
655     const COUNT: usize = 100;
656     #[cfg(not(miri))]
657     const COUNT: usize = 1000;
658 
659     type T = Box<dyn Any + Send>;
660 
661     let (s, r) = bounded::<T>(1);
662 
663     scope(|scope| {
664         scope.spawn(move |_| {
665             let mut s = s;
666 
667             for _ in 0..COUNT {
668                 let (new_s, new_r) = bounded(1);
669                 let new_r: T = Box::new(Some(new_r));
670 
671                 s.send(new_r).unwrap();
672                 s = new_s;
673             }
674         });
675 
676         scope.spawn(move |_| {
677             let mut r = r;
678 
679             for _ in 0..COUNT {
680                 r = r
681                     .recv()
682                     .unwrap()
683                     .downcast_mut::<Option<Receiver<T>>>()
684                     .unwrap()
685                     .take()
686                     .unwrap()
687             }
688         });
689     })
690     .unwrap();
691 }
692 
693 #[test]
panic_on_drop()694 fn panic_on_drop() {
695     struct Msg1<'a>(&'a mut bool);
696     impl Drop for Msg1<'_> {
697         fn drop(&mut self) {
698             if *self.0 && !std::thread::panicking() {
699                 panic!("double drop");
700             } else {
701                 *self.0 = true;
702             }
703         }
704     }
705 
706     struct Msg2<'a>(&'a mut bool);
707     impl Drop for Msg2<'_> {
708         fn drop(&mut self) {
709             if *self.0 {
710                 panic!("double drop");
711             } else {
712                 *self.0 = true;
713                 panic!("first drop");
714             }
715         }
716     }
717 
718     // normal
719     let (s, r) = bounded(2);
720     let (mut a, mut b) = (false, false);
721     s.send(Msg1(&mut a)).unwrap();
722     s.send(Msg1(&mut b)).unwrap();
723     drop(s);
724     drop(r);
725     assert!(a);
726     assert!(b);
727 
728     // panic on drop
729     let (s, r) = bounded(2);
730     let (mut a, mut b) = (false, false);
731     s.send(Msg2(&mut a)).unwrap();
732     s.send(Msg2(&mut b)).unwrap();
733     drop(s);
734     let res = std::panic::catch_unwind(move || {
735         drop(r);
736     });
737     assert_eq!(
738         *res.unwrap_err().downcast_ref::<&str>().unwrap(),
739         "first drop"
740     );
741     assert!(a);
742     // Elements after the panicked element will leak.
743     assert!(!b);
744 }
745