1 #![cfg(feature = "full")]
2 #![cfg(windows)]
3 
4 use std::io;
5 use std::time::Duration;
6 use tokio::io::{AsyncReadExt, AsyncWriteExt};
7 use tokio::net::windows::named_pipe::{ClientOptions, PipeMode, ServerOptions};
8 use tokio::time;
9 use windows_sys::Win32::Foundation::{ERROR_NO_DATA, ERROR_PIPE_BUSY};
10 
11 #[tokio::test]
test_named_pipe_client_drop() -> io::Result<()>12 async fn test_named_pipe_client_drop() -> io::Result<()> {
13     const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-client-drop";
14 
15     let mut server = ServerOptions::new().create(PIPE_NAME)?;
16 
17     let client = ClientOptions::new().open(PIPE_NAME)?;
18 
19     server.connect().await?;
20     drop(client);
21 
22     // instance will be broken because client is gone
23     match server.write_all(b"ping").await {
24         Err(e) if e.raw_os_error() == Some(ERROR_NO_DATA as i32) => (),
25         x => panic!("{:?}", x),
26     }
27 
28     Ok(())
29 }
30 
31 #[tokio::test]
test_named_pipe_single_client() -> io::Result<()>32 async fn test_named_pipe_single_client() -> io::Result<()> {
33     use tokio::io::{AsyncBufReadExt as _, BufReader};
34 
35     const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-single-client";
36 
37     let server = ServerOptions::new().create(PIPE_NAME)?;
38 
39     let server = tokio::spawn(async move {
40         // Note: we wait for a client to connect.
41         server.connect().await?;
42 
43         let mut server = BufReader::new(server);
44 
45         let mut buf = String::new();
46         server.read_line(&mut buf).await?;
47         server.write_all(b"pong\n").await?;
48         Ok::<_, io::Error>(buf)
49     });
50 
51     let client = tokio::spawn(async move {
52         let client = ClientOptions::new().open(PIPE_NAME)?;
53 
54         let mut client = BufReader::new(client);
55 
56         let mut buf = String::new();
57         client.write_all(b"ping\n").await?;
58         client.read_line(&mut buf).await?;
59         Ok::<_, io::Error>(buf)
60     });
61 
62     let (server, client) = tokio::try_join!(server, client)?;
63 
64     assert_eq!(server?, "ping\n");
65     assert_eq!(client?, "pong\n");
66 
67     Ok(())
68 }
69 
70 #[tokio::test]
test_named_pipe_multi_client() -> io::Result<()>71 async fn test_named_pipe_multi_client() -> io::Result<()> {
72     use tokio::io::{AsyncBufReadExt as _, BufReader};
73 
74     const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-multi-client";
75     const N: usize = 10;
76 
77     // The first server needs to be constructed early so that clients can
78     // be correctly connected. Otherwise calling .wait will cause the client to
79     // error.
80     let mut server = ServerOptions::new().create(PIPE_NAME)?;
81 
82     let server = tokio::spawn(async move {
83         for _ in 0..N {
84             // Wait for client to connect.
85             server.connect().await?;
86             let mut inner = BufReader::new(server);
87 
88             // Construct the next server to be connected before sending the one
89             // we already have of onto a task. This ensures that the server
90             // isn't closed (after it's done in the task) before a new one is
91             // available. Otherwise the client might error with
92             // `io::ErrorKind::NotFound`.
93             server = ServerOptions::new().create(PIPE_NAME)?;
94 
95             tokio::spawn(async move {
96                 let mut buf = String::new();
97                 inner.read_line(&mut buf).await?;
98                 inner.write_all(b"pong\n").await?;
99                 inner.flush().await?;
100                 Ok::<_, io::Error>(())
101             });
102         }
103 
104         Ok::<_, io::Error>(())
105     });
106 
107     let mut clients = Vec::new();
108 
109     for _ in 0..N {
110         clients.push(tokio::spawn(async move {
111             // This showcases a generic connect loop.
112             //
113             // We immediately try to create a client, if it's not found or the
114             // pipe is busy we use the specialized wait function on the client
115             // builder.
116             let client = loop {
117                 match ClientOptions::new().open(PIPE_NAME) {
118                     Ok(client) => break client,
119                     Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (),
120                     Err(e) if e.kind() == io::ErrorKind::NotFound => (),
121                     Err(e) => return Err(e),
122                 }
123 
124                 // Wait for a named pipe to become available.
125                 time::sleep(Duration::from_millis(10)).await;
126             };
127 
128             let mut client = BufReader::new(client);
129 
130             let mut buf = String::new();
131             client.write_all(b"ping\n").await?;
132             client.flush().await?;
133             client.read_line(&mut buf).await?;
134             Ok::<_, io::Error>(buf)
135         }));
136     }
137 
138     for client in clients {
139         let result = client.await?;
140         assert_eq!(result?, "pong\n");
141     }
142 
143     server.await??;
144     Ok(())
145 }
146 
147 #[tokio::test]
test_named_pipe_multi_client_ready() -> io::Result<()>148 async fn test_named_pipe_multi_client_ready() -> io::Result<()> {
149     use tokio::io::Interest;
150 
151     const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-multi-client-ready";
152     const N: usize = 10;
153 
154     // The first server needs to be constructed early so that clients can
155     // be correctly connected. Otherwise calling .wait will cause the client to
156     // error.
157     let mut server = ServerOptions::new().create(PIPE_NAME)?;
158 
159     let server = tokio::spawn(async move {
160         for _ in 0..N {
161             // Wait for client to connect.
162             server.connect().await?;
163 
164             let inner_server = server;
165 
166             // Construct the next server to be connected before sending the one
167             // we already have of onto a task. This ensures that the server
168             // isn't closed (after it's done in the task) before a new one is
169             // available. Otherwise the client might error with
170             // `io::ErrorKind::NotFound`.
171             server = ServerOptions::new().create(PIPE_NAME)?;
172 
173             tokio::spawn(async move {
174                 let server = inner_server;
175 
176                 {
177                     let mut read_buf = [0u8; 5];
178                     let mut read_buf_cursor = 0;
179 
180                     loop {
181                         server.readable().await?;
182 
183                         let buf = &mut read_buf[read_buf_cursor..];
184 
185                         match server.try_read(buf) {
186                             Ok(n) => {
187                                 read_buf_cursor += n;
188 
189                                 if read_buf_cursor == read_buf.len() {
190                                     break;
191                                 }
192                             }
193                             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
194                                 continue;
195                             }
196                             Err(e) => {
197                                 return Err(e);
198                             }
199                         }
200                     }
201                 };
202 
203                 {
204                     let write_buf = b"pong\n";
205                     let mut write_buf_cursor = 0;
206 
207                     loop {
208                         server.writable().await?;
209                         let buf = &write_buf[write_buf_cursor..];
210 
211                         match server.try_write(buf) {
212                             Ok(n) => {
213                                 write_buf_cursor += n;
214 
215                                 if write_buf_cursor == write_buf.len() {
216                                     break;
217                                 }
218                             }
219                             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
220                                 continue;
221                             }
222                             Err(e) => {
223                                 return Err(e);
224                             }
225                         }
226                     }
227                 }
228 
229                 Ok::<_, io::Error>(())
230             });
231         }
232 
233         Ok::<_, io::Error>(())
234     });
235 
236     let mut clients = Vec::new();
237 
238     for _ in 0..N {
239         clients.push(tokio::spawn(async move {
240             // This showcases a generic connect loop.
241             //
242             // We immediately try to create a client, if it's not found or the
243             // pipe is busy we use the specialized wait function on the client
244             // builder.
245             let client = loop {
246                 match ClientOptions::new().open(PIPE_NAME) {
247                     Ok(client) => break client,
248                     Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (),
249                     Err(e) if e.kind() == io::ErrorKind::NotFound => (),
250                     Err(e) => return Err(e),
251                 }
252 
253                 // Wait for a named pipe to become available.
254                 time::sleep(Duration::from_millis(10)).await;
255             };
256 
257             let mut read_buf = [0u8; 5];
258             let mut read_buf_cursor = 0;
259             let write_buf = b"ping\n";
260             let mut write_buf_cursor = 0;
261 
262             loop {
263                 let mut interest = Interest::READABLE;
264                 if write_buf_cursor < write_buf.len() {
265                     interest |= Interest::WRITABLE;
266                 }
267 
268                 let ready = client.ready(interest).await?;
269 
270                 if ready.is_readable() {
271                     let buf = &mut read_buf[read_buf_cursor..];
272 
273                     match client.try_read(buf) {
274                         Ok(n) => {
275                             read_buf_cursor += n;
276 
277                             if read_buf_cursor == read_buf.len() {
278                                 break;
279                             }
280                         }
281                         Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
282                             continue;
283                         }
284                         Err(e) => {
285                             return Err(e);
286                         }
287                     }
288                 }
289 
290                 if ready.is_writable() {
291                     let buf = &write_buf[write_buf_cursor..];
292 
293                     if buf.is_empty() {
294                         continue;
295                     }
296 
297                     match client.try_write(buf) {
298                         Ok(n) => {
299                             write_buf_cursor += n;
300                         }
301                         Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
302                             continue;
303                         }
304                         Err(e) => {
305                             return Err(e);
306                         }
307                     }
308                 }
309             }
310 
311             let buf = String::from_utf8_lossy(&read_buf).into_owned();
312 
313             Ok::<_, io::Error>(buf)
314         }));
315     }
316 
317     for client in clients {
318         let result = client.await?;
319         assert_eq!(result?, "pong\n");
320     }
321 
322     server.await??;
323     Ok(())
324 }
325 
326 // This tests that message mode works as expected.
327 #[tokio::test]
test_named_pipe_mode_message() -> io::Result<()>328 async fn test_named_pipe_mode_message() -> io::Result<()> {
329     // it's easy to accidentally get a seemingly working test here because byte pipes
330     // often return contents at write boundaries. to make sure we're doing the right thing we
331     // explicitly test that it doesn't work in byte mode.
332     _named_pipe_mode_message(PipeMode::Message).await?;
333     _named_pipe_mode_message(PipeMode::Byte).await
334 }
335 
_named_pipe_mode_message(mode: PipeMode) -> io::Result<()>336 async fn _named_pipe_mode_message(mode: PipeMode) -> io::Result<()> {
337     let pipe_name = format!(
338         r"\\.\pipe\test-named-pipe-mode-message-{}",
339         matches!(mode, PipeMode::Message)
340     );
341     let mut buf = [0u8; 32];
342 
343     let mut server = ServerOptions::new()
344         .first_pipe_instance(true)
345         .pipe_mode(mode)
346         .create(&pipe_name)?;
347 
348     let mut client = ClientOptions::new().pipe_mode(mode).open(&pipe_name)?;
349 
350     server.connect().await?;
351 
352     // this needs a few iterations, presumably Windows waits for a few calls before merging buffers
353     for _ in 0..10 {
354         client.write_all(b"hello").await?;
355         server.write_all(b"world").await?;
356     }
357     for _ in 0..10 {
358         let n = server.read(&mut buf).await?;
359         if buf[..n] != b"hello"[..] {
360             assert!(matches!(mode, PipeMode::Byte));
361             return Ok(());
362         }
363         let n = client.read(&mut buf).await?;
364         if buf[..n] != b"world"[..] {
365             assert!(matches!(mode, PipeMode::Byte));
366             return Ok(());
367         }
368     }
369     // byte mode should have errored before.
370     assert!(matches!(mode, PipeMode::Message));
371     Ok(())
372 }
373 
374 // This tests `NamedPipeServer::connect` with various access settings.
375 #[tokio::test]
test_named_pipe_access() -> io::Result<()>376 async fn test_named_pipe_access() -> io::Result<()> {
377     const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-access";
378 
379     for (inb, outb) in [(true, true), (true, false), (false, true)] {
380         let (tx, rx) = tokio::sync::oneshot::channel();
381         let server = tokio::spawn(async move {
382             let s = ServerOptions::new()
383                 .access_inbound(inb)
384                 .access_outbound(outb)
385                 .create(PIPE_NAME)?;
386             let mut connect_fut = tokio_test::task::spawn(s.connect());
387             assert!(connect_fut.poll().is_pending());
388             tx.send(()).unwrap();
389             connect_fut.await
390         });
391 
392         // Wait for the server to call connect.
393         rx.await.unwrap();
394         let _ = ClientOptions::new().read(outb).write(inb).open(PIPE_NAME)?;
395 
396         server.await??;
397     }
398     Ok(())
399 }
400