1 use futures::sink::SinkExt;
2 use std::future::poll_fn;
3 use tokio::sync::mpsc::channel;
4 use tokio_test::task::spawn;
5 use tokio_test::{
6     assert_ok, assert_pending, assert_ready, assert_ready_eq, assert_ready_err, assert_ready_ok,
7 };
8 use tokio_util::sync::PollSender;
9 
10 #[tokio::test]
simple()11 async fn simple() {
12     let (send, mut recv) = channel(3);
13     let mut send = PollSender::new(send);
14 
15     for i in 1..=3i32 {
16         let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
17         assert_ready_ok!(reserve.poll());
18         send.send_item(i).unwrap();
19     }
20 
21     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
22     assert_pending!(reserve.poll());
23 
24     assert_eq!(recv.recv().await.unwrap(), 1);
25     assert!(reserve.is_woken());
26     assert_ready_ok!(reserve.poll());
27 
28     drop(recv);
29 
30     send.send_item(42).unwrap();
31 }
32 
33 #[tokio::test]
simple_ref()34 async fn simple_ref() {
35     let v = [1, 2, 3i32];
36 
37     let (send, mut recv) = channel(3);
38     let mut send = PollSender::new(send);
39 
40     for vi in v.iter() {
41         let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
42         assert_ready_ok!(reserve.poll());
43         send.send_item(vi).unwrap();
44     }
45 
46     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
47     assert_pending!(reserve.poll());
48 
49     assert_eq!(*recv.recv().await.unwrap(), 1);
50     assert!(reserve.is_woken());
51     assert_ready_ok!(reserve.poll());
52     drop(recv);
53     send.send_item(&42).unwrap();
54 }
55 
56 #[tokio::test]
repeated_poll_reserve()57 async fn repeated_poll_reserve() {
58     let (send, mut recv) = channel::<i32>(1);
59     let mut send = PollSender::new(send);
60 
61     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
62     assert_ready_ok!(reserve.poll());
63     assert_ready_ok!(reserve.poll());
64     send.send_item(1).unwrap();
65 
66     assert_eq!(recv.recv().await.unwrap(), 1);
67 }
68 
69 #[tokio::test]
abort_send()70 async fn abort_send() {
71     let (send, mut recv) = channel(3);
72     let mut send = PollSender::new(send);
73     let send2 = send.get_ref().cloned().unwrap();
74 
75     for i in 1..=3i32 {
76         let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
77         assert_ready_ok!(reserve.poll());
78         send.send_item(i).unwrap();
79     }
80 
81     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
82     assert_pending!(reserve.poll());
83     assert_eq!(recv.recv().await.unwrap(), 1);
84     assert!(reserve.is_woken());
85     assert_ready_ok!(reserve.poll());
86 
87     let mut send2_send = spawn(send2.send(5));
88     assert_pending!(send2_send.poll());
89     assert!(send.abort_send());
90     assert!(send2_send.is_woken());
91     assert_ready_ok!(send2_send.poll());
92 
93     assert_eq!(recv.recv().await.unwrap(), 2);
94     assert_eq!(recv.recv().await.unwrap(), 3);
95     assert_eq!(recv.recv().await.unwrap(), 5);
96 }
97 
98 #[tokio::test]
close_sender_last()99 async fn close_sender_last() {
100     let (send, mut recv) = channel::<i32>(3);
101     let mut send = PollSender::new(send);
102 
103     let mut recv_task = spawn(recv.recv());
104     assert_pending!(recv_task.poll());
105 
106     send.close();
107 
108     assert!(recv_task.is_woken());
109     assert!(assert_ready!(recv_task.poll()).is_none());
110 }
111 
112 #[tokio::test]
close_sender_not_last()113 async fn close_sender_not_last() {
114     let (send, mut recv) = channel::<i32>(3);
115     let mut send = PollSender::new(send);
116     let send2 = send.get_ref().cloned().unwrap();
117 
118     let mut recv_task = spawn(recv.recv());
119     assert_pending!(recv_task.poll());
120 
121     send.close();
122 
123     assert!(!recv_task.is_woken());
124     assert_pending!(recv_task.poll());
125 
126     drop(send2);
127 
128     assert!(recv_task.is_woken());
129     assert!(assert_ready!(recv_task.poll()).is_none());
130 }
131 
132 #[tokio::test]
close_sender_before_reserve()133 async fn close_sender_before_reserve() {
134     let (send, mut recv) = channel::<i32>(3);
135     let mut send = PollSender::new(send);
136 
137     let mut recv_task = spawn(recv.recv());
138     assert_pending!(recv_task.poll());
139 
140     send.close();
141 
142     assert!(recv_task.is_woken());
143     assert!(assert_ready!(recv_task.poll()).is_none());
144 
145     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
146     assert_ready_err!(reserve.poll());
147 }
148 
149 #[tokio::test]
close_sender_after_pending_reserve()150 async fn close_sender_after_pending_reserve() {
151     let (send, mut recv) = channel::<i32>(1);
152     let mut send = PollSender::new(send);
153 
154     let mut recv_task = spawn(recv.recv());
155     assert_pending!(recv_task.poll());
156 
157     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
158     assert_ready_ok!(reserve.poll());
159     send.send_item(1).unwrap();
160 
161     assert!(recv_task.is_woken());
162 
163     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
164     assert_pending!(reserve.poll());
165     drop(reserve);
166 
167     send.close();
168 
169     assert!(send.is_closed());
170     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
171     assert_ready_err!(reserve.poll());
172 }
173 
174 #[tokio::test]
close_sender_after_successful_reserve()175 async fn close_sender_after_successful_reserve() {
176     let (send, mut recv) = channel::<i32>(3);
177     let mut send = PollSender::new(send);
178 
179     let mut recv_task = spawn(recv.recv());
180     assert_pending!(recv_task.poll());
181 
182     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
183     assert_ready_ok!(reserve.poll());
184     drop(reserve);
185 
186     send.close();
187     assert!(send.is_closed());
188     assert!(!recv_task.is_woken());
189     assert_pending!(recv_task.poll());
190 
191     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
192     assert_ready_ok!(reserve.poll());
193 }
194 
195 #[tokio::test]
abort_send_after_pending_reserve()196 async fn abort_send_after_pending_reserve() {
197     let (send, mut recv) = channel::<i32>(1);
198     let mut send = PollSender::new(send);
199 
200     let mut recv_task = spawn(recv.recv());
201     assert_pending!(recv_task.poll());
202 
203     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
204     assert_ready_ok!(reserve.poll());
205     send.send_item(1).unwrap();
206 
207     assert_eq!(send.get_ref().unwrap().capacity(), 0);
208     assert!(!send.abort_send());
209 
210     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
211     assert_pending!(reserve.poll());
212 
213     assert!(send.abort_send());
214     assert_eq!(send.get_ref().unwrap().capacity(), 0);
215 }
216 
217 #[tokio::test]
abort_send_after_successful_reserve()218 async fn abort_send_after_successful_reserve() {
219     let (send, mut recv) = channel::<i32>(1);
220     let mut send = PollSender::new(send);
221 
222     let mut recv_task = spawn(recv.recv());
223     assert_pending!(recv_task.poll());
224 
225     assert_eq!(send.get_ref().unwrap().capacity(), 1);
226     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
227     assert_ready_ok!(reserve.poll());
228     assert_eq!(send.get_ref().unwrap().capacity(), 0);
229 
230     assert!(send.abort_send());
231     assert_eq!(send.get_ref().unwrap().capacity(), 1);
232 }
233 
234 #[tokio::test]
closed_when_receiver_drops()235 async fn closed_when_receiver_drops() {
236     let (send, _) = channel::<i32>(1);
237     let mut send = PollSender::new(send);
238 
239     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
240     assert_ready_err!(reserve.poll());
241 }
242 
243 #[should_panic]
244 #[test]
start_send_panics_when_idle()245 fn start_send_panics_when_idle() {
246     let (send, _) = channel::<i32>(3);
247     let mut send = PollSender::new(send);
248 
249     send.send_item(1).unwrap();
250 }
251 
252 #[should_panic]
253 #[test]
start_send_panics_when_acquiring()254 fn start_send_panics_when_acquiring() {
255     let (send, _) = channel::<i32>(1);
256     let mut send = PollSender::new(send);
257 
258     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
259     assert_ready_ok!(reserve.poll());
260     send.send_item(1).unwrap();
261 
262     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
263     assert_pending!(reserve.poll());
264     send.send_item(2).unwrap();
265 }
266 
267 #[test]
sink_send_then_flush()268 fn sink_send_then_flush() {
269     let (send, mut recv) = channel(1);
270     let mut send = PollSender::new(send);
271 
272     let mut recv_task = spawn(recv.recv());
273     assert_pending!(recv_task.poll());
274 
275     let mut ready = spawn(poll_fn(|cx| send.poll_ready_unpin(cx)));
276     assert_ready_ok!(ready.poll());
277     assert_ok!(send.start_send_unpin(()));
278 
279     let mut ready = spawn(poll_fn(|cx| send.poll_ready_unpin(cx)));
280     assert_pending!(ready.poll());
281 
282     let mut flush = spawn(poll_fn(|cx| send.poll_flush_unpin(cx)));
283     assert_ready_ok!(flush.poll());
284 
285     // Flushing does not mean that the sender becomes ready.
286     let mut ready = spawn(poll_fn(|cx| send.poll_ready_unpin(cx)));
287     assert_pending!(ready.poll());
288 
289     assert_ready_eq!(recv_task.poll(), Some(()));
290     assert!(ready.is_woken());
291     assert_ready_ok!(ready.poll());
292 }
293 
294 #[test]
sink_send_then_close()295 fn sink_send_then_close() {
296     let (send, mut recv) = channel(1);
297     let mut send = PollSender::new(send);
298 
299     let mut recv_task = spawn(recv.recv());
300     assert_pending!(recv_task.poll());
301 
302     let mut ready = spawn(poll_fn(|cx| send.poll_ready_unpin(cx)));
303     assert_ready_ok!(ready.poll());
304     assert_ok!(send.start_send_unpin(1));
305 
306     let mut ready = spawn(poll_fn(|cx| send.poll_ready_unpin(cx)));
307     assert_pending!(ready.poll());
308 
309     assert!(recv_task.is_woken());
310     assert_ready_eq!(recv_task.poll(), Some(1));
311 
312     assert!(ready.is_woken());
313     assert_ready_ok!(ready.poll());
314 
315     drop(recv_task);
316     let mut recv_task = spawn(recv.recv());
317     assert_pending!(recv_task.poll());
318     assert_ok!(send.start_send_unpin(2));
319 
320     let mut close = spawn(poll_fn(|cx| send.poll_close_unpin(cx)));
321     assert_ready_ok!(close.poll());
322 
323     assert!(recv_task.is_woken());
324     assert_ready_eq!(recv_task.poll(), Some(2));
325 
326     drop(recv_task);
327     let mut recv_task = spawn(recv.recv());
328     assert_ready_eq!(recv_task.poll(), None);
329 }
330 
331 #[test]
sink_send_ref()332 fn sink_send_ref() {
333     let data = "data".to_owned();
334     let (send, mut recv) = channel(1);
335     let mut send = PollSender::new(send);
336 
337     let mut recv_task = spawn(recv.recv());
338     assert_pending!(recv_task.poll());
339 
340     let mut ready = spawn(poll_fn(|cx| send.poll_ready_unpin(cx)));
341     assert_ready_ok!(ready.poll());
342 
343     assert_ok!(send.start_send_unpin(data.as_str()));
344 
345     let mut flush = spawn(poll_fn(|cx| send.poll_flush_unpin(cx)));
346     assert_ready_ok!(flush.poll());
347 
348     assert_ready_eq!(recv_task.poll(), Some("data"));
349 }
350