1 use futures::channel::mpsc; 2 use futures::executor::{block_on, ThreadPool}; 3 use futures::future::{ready, FutureExt}; 4 use futures::lock::Mutex; 5 use futures::stream::StreamExt; 6 use futures::task::{Context, SpawnExt}; 7 use futures_test::future::FutureTestExt; 8 use futures_test::task::{new_count_waker, panic_context}; 9 use std::sync::Arc; 10 11 #[test] mutex_acquire_uncontested()12fn mutex_acquire_uncontested() { 13 let mutex = Mutex::new(()); 14 for _ in 0..10 { 15 assert!(mutex.lock().poll_unpin(&mut panic_context()).is_ready()); 16 } 17 } 18 19 #[test] mutex_wakes_waiters()20fn mutex_wakes_waiters() { 21 let mutex = Mutex::new(()); 22 let (waker, counter) = new_count_waker(); 23 let lock = mutex.lock().poll_unpin(&mut panic_context()); 24 assert!(lock.is_ready()); 25 26 let mut cx = Context::from_waker(&waker); 27 let mut waiter = mutex.lock(); 28 assert!(waiter.poll_unpin(&mut cx).is_pending()); 29 assert_eq!(counter, 0); 30 31 drop(lock); 32 33 assert_eq!(counter, 1); 34 assert!(waiter.poll_unpin(&mut panic_context()).is_ready()); 35 } 36 37 #[test] mutex_contested()38fn mutex_contested() { 39 { 40 let (tx, mut rx) = mpsc::unbounded(); 41 let pool = ThreadPool::builder().pool_size(16).create().unwrap(); 42 43 let tx = Arc::new(tx); 44 let mutex = Arc::new(Mutex::new(0)); 45 46 let num_tasks = 1000; 47 for _ in 0..num_tasks { 48 let tx = tx.clone(); 49 let mutex = mutex.clone(); 50 pool.spawn(async move { 51 let mut lock = mutex.lock().await; 52 ready(()).pending_once().await; 53 *lock += 1; 54 tx.unbounded_send(()).unwrap(); 55 drop(lock); 56 }) 57 .unwrap(); 58 } 59 60 block_on(async { 61 for _ in 0..num_tasks { 62 rx.next().await.unwrap(); 63 } 64 let lock = mutex.lock().await; 65 assert_eq!(num_tasks, *lock); 66 }); 67 } 68 std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 69 } 70