1 use futures::sink::SinkExt;
2 use std::future::poll_fn;
3 use tokio::sync::mpsc::channel;
4 use tokio_test::task::spawn;
5 use tokio_test::{
6 assert_ok, assert_pending, assert_ready, assert_ready_eq, assert_ready_err, assert_ready_ok,
7 };
8 use tokio_util::sync::PollSender;
9
10 #[tokio::test]
simple()11 async fn simple() {
12 let (send, mut recv) = channel(3);
13 let mut send = PollSender::new(send);
14
15 for i in 1..=3i32 {
16 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
17 assert_ready_ok!(reserve.poll());
18 send.send_item(i).unwrap();
19 }
20
21 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
22 assert_pending!(reserve.poll());
23
24 assert_eq!(recv.recv().await.unwrap(), 1);
25 assert!(reserve.is_woken());
26 assert_ready_ok!(reserve.poll());
27
28 drop(recv);
29
30 send.send_item(42).unwrap();
31 }
32
33 #[tokio::test]
simple_ref()34 async fn simple_ref() {
35 let v = [1, 2, 3i32];
36
37 let (send, mut recv) = channel(3);
38 let mut send = PollSender::new(send);
39
40 for vi in v.iter() {
41 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
42 assert_ready_ok!(reserve.poll());
43 send.send_item(vi).unwrap();
44 }
45
46 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
47 assert_pending!(reserve.poll());
48
49 assert_eq!(*recv.recv().await.unwrap(), 1);
50 assert!(reserve.is_woken());
51 assert_ready_ok!(reserve.poll());
52 drop(recv);
53 send.send_item(&42).unwrap();
54 }
55
56 #[tokio::test]
repeated_poll_reserve()57 async fn repeated_poll_reserve() {
58 let (send, mut recv) = channel::<i32>(1);
59 let mut send = PollSender::new(send);
60
61 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
62 assert_ready_ok!(reserve.poll());
63 assert_ready_ok!(reserve.poll());
64 send.send_item(1).unwrap();
65
66 assert_eq!(recv.recv().await.unwrap(), 1);
67 }
68
69 #[tokio::test]
abort_send()70 async fn abort_send() {
71 let (send, mut recv) = channel(3);
72 let mut send = PollSender::new(send);
73 let send2 = send.get_ref().cloned().unwrap();
74
75 for i in 1..=3i32 {
76 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
77 assert_ready_ok!(reserve.poll());
78 send.send_item(i).unwrap();
79 }
80
81 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
82 assert_pending!(reserve.poll());
83 assert_eq!(recv.recv().await.unwrap(), 1);
84 assert!(reserve.is_woken());
85 assert_ready_ok!(reserve.poll());
86
87 let mut send2_send = spawn(send2.send(5));
88 assert_pending!(send2_send.poll());
89 assert!(send.abort_send());
90 assert!(send2_send.is_woken());
91 assert_ready_ok!(send2_send.poll());
92
93 assert_eq!(recv.recv().await.unwrap(), 2);
94 assert_eq!(recv.recv().await.unwrap(), 3);
95 assert_eq!(recv.recv().await.unwrap(), 5);
96 }
97
98 #[tokio::test]
close_sender_last()99 async fn close_sender_last() {
100 let (send, mut recv) = channel::<i32>(3);
101 let mut send = PollSender::new(send);
102
103 let mut recv_task = spawn(recv.recv());
104 assert_pending!(recv_task.poll());
105
106 send.close();
107
108 assert!(recv_task.is_woken());
109 assert!(assert_ready!(recv_task.poll()).is_none());
110 }
111
112 #[tokio::test]
close_sender_not_last()113 async fn close_sender_not_last() {
114 let (send, mut recv) = channel::<i32>(3);
115 let mut send = PollSender::new(send);
116 let send2 = send.get_ref().cloned().unwrap();
117
118 let mut recv_task = spawn(recv.recv());
119 assert_pending!(recv_task.poll());
120
121 send.close();
122
123 assert!(!recv_task.is_woken());
124 assert_pending!(recv_task.poll());
125
126 drop(send2);
127
128 assert!(recv_task.is_woken());
129 assert!(assert_ready!(recv_task.poll()).is_none());
130 }
131
132 #[tokio::test]
close_sender_before_reserve()133 async fn close_sender_before_reserve() {
134 let (send, mut recv) = channel::<i32>(3);
135 let mut send = PollSender::new(send);
136
137 let mut recv_task = spawn(recv.recv());
138 assert_pending!(recv_task.poll());
139
140 send.close();
141
142 assert!(recv_task.is_woken());
143 assert!(assert_ready!(recv_task.poll()).is_none());
144
145 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
146 assert_ready_err!(reserve.poll());
147 }
148
149 #[tokio::test]
close_sender_after_pending_reserve()150 async fn close_sender_after_pending_reserve() {
151 let (send, mut recv) = channel::<i32>(1);
152 let mut send = PollSender::new(send);
153
154 let mut recv_task = spawn(recv.recv());
155 assert_pending!(recv_task.poll());
156
157 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
158 assert_ready_ok!(reserve.poll());
159 send.send_item(1).unwrap();
160
161 assert!(recv_task.is_woken());
162
163 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
164 assert_pending!(reserve.poll());
165 drop(reserve);
166
167 send.close();
168
169 assert!(send.is_closed());
170 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
171 assert_ready_err!(reserve.poll());
172 }
173
174 #[tokio::test]
close_sender_after_successful_reserve()175 async fn close_sender_after_successful_reserve() {
176 let (send, mut recv) = channel::<i32>(3);
177 let mut send = PollSender::new(send);
178
179 let mut recv_task = spawn(recv.recv());
180 assert_pending!(recv_task.poll());
181
182 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
183 assert_ready_ok!(reserve.poll());
184 drop(reserve);
185
186 send.close();
187 assert!(send.is_closed());
188 assert!(!recv_task.is_woken());
189 assert_pending!(recv_task.poll());
190
191 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
192 assert_ready_ok!(reserve.poll());
193 }
194
195 #[tokio::test]
abort_send_after_pending_reserve()196 async fn abort_send_after_pending_reserve() {
197 let (send, mut recv) = channel::<i32>(1);
198 let mut send = PollSender::new(send);
199
200 let mut recv_task = spawn(recv.recv());
201 assert_pending!(recv_task.poll());
202
203 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
204 assert_ready_ok!(reserve.poll());
205 send.send_item(1).unwrap();
206
207 assert_eq!(send.get_ref().unwrap().capacity(), 0);
208 assert!(!send.abort_send());
209
210 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
211 assert_pending!(reserve.poll());
212
213 assert!(send.abort_send());
214 assert_eq!(send.get_ref().unwrap().capacity(), 0);
215 }
216
217 #[tokio::test]
abort_send_after_successful_reserve()218 async fn abort_send_after_successful_reserve() {
219 let (send, mut recv) = channel::<i32>(1);
220 let mut send = PollSender::new(send);
221
222 let mut recv_task = spawn(recv.recv());
223 assert_pending!(recv_task.poll());
224
225 assert_eq!(send.get_ref().unwrap().capacity(), 1);
226 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
227 assert_ready_ok!(reserve.poll());
228 assert_eq!(send.get_ref().unwrap().capacity(), 0);
229
230 assert!(send.abort_send());
231 assert_eq!(send.get_ref().unwrap().capacity(), 1);
232 }
233
234 #[tokio::test]
closed_when_receiver_drops()235 async fn closed_when_receiver_drops() {
236 let (send, _) = channel::<i32>(1);
237 let mut send = PollSender::new(send);
238
239 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
240 assert_ready_err!(reserve.poll());
241 }
242
243 #[should_panic]
244 #[test]
start_send_panics_when_idle()245 fn start_send_panics_when_idle() {
246 let (send, _) = channel::<i32>(3);
247 let mut send = PollSender::new(send);
248
249 send.send_item(1).unwrap();
250 }
251
252 #[should_panic]
253 #[test]
start_send_panics_when_acquiring()254 fn start_send_panics_when_acquiring() {
255 let (send, _) = channel::<i32>(1);
256 let mut send = PollSender::new(send);
257
258 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
259 assert_ready_ok!(reserve.poll());
260 send.send_item(1).unwrap();
261
262 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
263 assert_pending!(reserve.poll());
264 send.send_item(2).unwrap();
265 }
266
267 #[test]
sink_send_then_flush()268 fn sink_send_then_flush() {
269 let (send, mut recv) = channel(1);
270 let mut send = PollSender::new(send);
271
272 let mut recv_task = spawn(recv.recv());
273 assert_pending!(recv_task.poll());
274
275 let mut ready = spawn(poll_fn(|cx| send.poll_ready_unpin(cx)));
276 assert_ready_ok!(ready.poll());
277 assert_ok!(send.start_send_unpin(()));
278
279 let mut ready = spawn(poll_fn(|cx| send.poll_ready_unpin(cx)));
280 assert_pending!(ready.poll());
281
282 let mut flush = spawn(poll_fn(|cx| send.poll_flush_unpin(cx)));
283 assert_ready_ok!(flush.poll());
284
285 // Flushing does not mean that the sender becomes ready.
286 let mut ready = spawn(poll_fn(|cx| send.poll_ready_unpin(cx)));
287 assert_pending!(ready.poll());
288
289 assert_ready_eq!(recv_task.poll(), Some(()));
290 assert!(ready.is_woken());
291 assert_ready_ok!(ready.poll());
292 }
293
294 #[test]
sink_send_then_close()295 fn sink_send_then_close() {
296 let (send, mut recv) = channel(1);
297 let mut send = PollSender::new(send);
298
299 let mut recv_task = spawn(recv.recv());
300 assert_pending!(recv_task.poll());
301
302 let mut ready = spawn(poll_fn(|cx| send.poll_ready_unpin(cx)));
303 assert_ready_ok!(ready.poll());
304 assert_ok!(send.start_send_unpin(1));
305
306 let mut ready = spawn(poll_fn(|cx| send.poll_ready_unpin(cx)));
307 assert_pending!(ready.poll());
308
309 assert!(recv_task.is_woken());
310 assert_ready_eq!(recv_task.poll(), Some(1));
311
312 assert!(ready.is_woken());
313 assert_ready_ok!(ready.poll());
314
315 drop(recv_task);
316 let mut recv_task = spawn(recv.recv());
317 assert_pending!(recv_task.poll());
318 assert_ok!(send.start_send_unpin(2));
319
320 let mut close = spawn(poll_fn(|cx| send.poll_close_unpin(cx)));
321 assert_ready_ok!(close.poll());
322
323 assert!(recv_task.is_woken());
324 assert_ready_eq!(recv_task.poll(), Some(2));
325
326 drop(recv_task);
327 let mut recv_task = spawn(recv.recv());
328 assert_ready_eq!(recv_task.poll(), None);
329 }
330
331 #[test]
sink_send_ref()332 fn sink_send_ref() {
333 let data = "data".to_owned();
334 let (send, mut recv) = channel(1);
335 let mut send = PollSender::new(send);
336
337 let mut recv_task = spawn(recv.recv());
338 assert_pending!(recv_task.poll());
339
340 let mut ready = spawn(poll_fn(|cx| send.poll_ready_unpin(cx)));
341 assert_ready_ok!(ready.poll());
342
343 assert_ok!(send.start_send_unpin(data.as_str()));
344
345 let mut flush = spawn(poll_fn(|cx| send.poll_flush_unpin(cx)));
346 assert_ready_ok!(flush.poll());
347
348 assert_ready_eq!(recv_task.poll(), Some("data"));
349 }
350