1  #![cfg(feature = "full")]
2  #![cfg(unix)]
3  
4  use tokio::io::{AsyncReadExt, AsyncWriteExt, Interest};
5  use tokio::net::unix::pipe;
6  use tokio_test::task;
7  use tokio_test::{assert_err, assert_ok, assert_pending, assert_ready_ok};
8  
9  use std::fs::File;
10  use std::io;
11  use std::os::unix::fs::OpenOptionsExt;
12  use std::os::unix::io::AsRawFd;
13  use std::path::{Path, PathBuf};
14  
15  /// Helper struct which will clean up temporary files once dropped.
16  struct TempFifo {
17      path: PathBuf,
18      _dir: tempfile::TempDir,
19  }
20  
21  impl TempFifo {
new(name: &str) -> io::Result<TempFifo>22      fn new(name: &str) -> io::Result<TempFifo> {
23          let dir = tempfile::Builder::new()
24              .prefix("tokio-fifo-tests")
25              .tempdir()?;
26          let path = dir.path().join(name);
27          nix::unistd::mkfifo(&path, nix::sys::stat::Mode::S_IRWXU)?;
28  
29          Ok(TempFifo { path, _dir: dir })
30      }
31  }
32  
33  impl AsRef<Path> for TempFifo {
as_ref(&self) -> &Path34      fn as_ref(&self) -> &Path {
35          self.path.as_ref()
36      }
37  }
38  
39  #[tokio::test]
40  #[cfg_attr(miri, ignore)] // No `mkfifo` in miri.
fifo_simple_send() -> io::Result<()>41  async fn fifo_simple_send() -> io::Result<()> {
42      const DATA: &[u8] = b"this is some data to write to the fifo";
43  
44      let fifo = TempFifo::new("simple_send")?;
45  
46      // Create a reading task which should wait for data from the pipe.
47      let mut reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
48      let mut read_fut = task::spawn(async move {
49          let mut buf = vec![0; DATA.len()];
50          reader.read_exact(&mut buf).await?;
51          Ok::<_, io::Error>(buf)
52      });
53      assert_pending!(read_fut.poll());
54  
55      let mut writer = pipe::OpenOptions::new().open_sender(&fifo)?;
56      writer.write_all(DATA).await?;
57  
58      // Let the IO driver poll events for the reader.
59      while !read_fut.is_woken() {
60          tokio::task::yield_now().await;
61      }
62  
63      // Reading task should be ready now.
64      let read_data = assert_ready_ok!(read_fut.poll());
65      assert_eq!(&read_data, DATA);
66  
67      Ok(())
68  }
69  
70  #[tokio::test]
71  #[cfg(target_os = "linux")]
72  #[cfg_attr(miri, ignore)] // No `mkfifo` in miri.
fifo_simple_send_sender_first() -> io::Result<()>73  async fn fifo_simple_send_sender_first() -> io::Result<()> {
74      const DATA: &[u8] = b"this is some data to write to the fifo";
75  
76      // Create a new fifo file with *no reading ends open*.
77      let fifo = TempFifo::new("simple_send_sender_first")?;
78  
79      // Simple `open_sender` should fail with ENXIO (no such device or address).
80      let err = assert_err!(pipe::OpenOptions::new().open_sender(&fifo));
81      assert_eq!(err.raw_os_error(), Some(libc::ENXIO));
82  
83      // `open_sender` in read-write mode should succeed and the pipe should be ready to write.
84      let mut writer = pipe::OpenOptions::new()
85          .read_write(true)
86          .open_sender(&fifo)?;
87      writer.write_all(DATA).await?;
88  
89      // Read the written data and validate.
90      let mut reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
91      let mut read_data = vec![0; DATA.len()];
92      reader.read_exact(&mut read_data).await?;
93      assert_eq!(&read_data, DATA);
94  
95      Ok(())
96  }
97  
98  // Opens a FIFO file, write and *close the writer*.
write_and_close(path: impl AsRef<Path>, msg: &[u8]) -> io::Result<()>99  async fn write_and_close(path: impl AsRef<Path>, msg: &[u8]) -> io::Result<()> {
100      let mut writer = pipe::OpenOptions::new().open_sender(path)?;
101      writer.write_all(msg).await?;
102      drop(writer); // Explicit drop.
103      Ok(())
104  }
105  
106  /// Checks EOF behavior with single reader and writers sequentially opening
107  /// and closing a FIFO.
108  #[tokio::test]
109  #[cfg_attr(miri, ignore)] // No `mkfifo` in miri.
fifo_multiple_writes() -> io::Result<()>110  async fn fifo_multiple_writes() -> io::Result<()> {
111      const DATA: &[u8] = b"this is some data to write to the fifo";
112  
113      let fifo = TempFifo::new("fifo_multiple_writes")?;
114  
115      let mut reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
116  
117      write_and_close(&fifo, DATA).await?;
118      let ev = reader.ready(Interest::READABLE).await?;
119      assert!(ev.is_readable());
120      let mut read_data = vec![0; DATA.len()];
121      assert_ok!(reader.read_exact(&mut read_data).await);
122  
123      // Check that reader hits EOF.
124      let err = assert_err!(reader.read_exact(&mut read_data).await);
125      assert_eq!(err.kind(), io::ErrorKind::UnexpectedEof);
126  
127      // Write more data and read again.
128      write_and_close(&fifo, DATA).await?;
129      assert_ok!(reader.read_exact(&mut read_data).await);
130  
131      Ok(())
132  }
133  
134  /// Checks behavior of a resilient reader (Receiver in O_RDWR access mode)
135  /// with writers sequentially opening and closing a FIFO.
136  #[tokio::test]
137  #[cfg(target_os = "linux")]
138  #[cfg_attr(miri, ignore)] // No `socket` in miri.
fifo_resilient_reader() -> io::Result<()>139  async fn fifo_resilient_reader() -> io::Result<()> {
140      const DATA: &[u8] = b"this is some data to write to the fifo";
141  
142      let fifo = TempFifo::new("fifo_resilient_reader")?;
143  
144      // Open reader in read-write access mode.
145      let mut reader = pipe::OpenOptions::new()
146          .read_write(true)
147          .open_receiver(&fifo)?;
148  
149      write_and_close(&fifo, DATA).await?;
150      let ev = reader.ready(Interest::READABLE).await?;
151      let mut read_data = vec![0; DATA.len()];
152      reader.read_exact(&mut read_data).await?;
153  
154      // Check that reader didn't hit EOF.
155      assert!(!ev.is_read_closed());
156  
157      // Resilient reader can asynchronously wait for the next writer.
158      let mut second_read_fut = task::spawn(reader.read_exact(&mut read_data));
159      assert_pending!(second_read_fut.poll());
160  
161      // Write more data and read again.
162      write_and_close(&fifo, DATA).await?;
163      assert_ok!(second_read_fut.await);
164  
165      Ok(())
166  }
167  
168  #[tokio::test]
169  #[cfg_attr(miri, ignore)] // No `O_NONBLOCK` for open64 in miri.
open_detects_not_a_fifo() -> io::Result<()>170  async fn open_detects_not_a_fifo() -> io::Result<()> {
171      let dir = tempfile::Builder::new()
172          .prefix("tokio-fifo-tests")
173          .tempdir()
174          .unwrap();
175      let path = dir.path().join("not_a_fifo");
176  
177      // Create an ordinary file.
178      File::create(&path)?;
179  
180      // Check if Sender detects invalid file type.
181      let err = assert_err!(pipe::OpenOptions::new().open_sender(&path));
182      assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
183  
184      // Check if Receiver detects invalid file type.
185      let err = assert_err!(pipe::OpenOptions::new().open_sender(&path));
186      assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
187  
188      Ok(())
189  }
190  
191  #[tokio::test]
192  #[cfg_attr(miri, ignore)] // No `mkfifo` in miri.
from_file() -> io::Result<()>193  async fn from_file() -> io::Result<()> {
194      const DATA: &[u8] = b"this is some data to write to the fifo";
195  
196      let fifo = TempFifo::new("from_file")?;
197  
198      // Construct a Receiver from a File.
199      let file = std::fs::OpenOptions::new()
200          .read(true)
201          .custom_flags(libc::O_NONBLOCK)
202          .open(&fifo)?;
203      let mut reader = pipe::Receiver::from_file(file)?;
204  
205      // Construct a Sender from a File.
206      let file = std::fs::OpenOptions::new()
207          .write(true)
208          .custom_flags(libc::O_NONBLOCK)
209          .open(&fifo)?;
210      let mut writer = pipe::Sender::from_file(file)?;
211  
212      // Write and read some data to test async.
213      let mut read_fut = task::spawn(async move {
214          let mut buf = vec![0; DATA.len()];
215          reader.read_exact(&mut buf).await?;
216          Ok::<_, io::Error>(buf)
217      });
218      assert_pending!(read_fut.poll());
219  
220      writer.write_all(DATA).await?;
221  
222      let read_data = assert_ok!(read_fut.await);
223      assert_eq!(&read_data, DATA);
224  
225      Ok(())
226  }
227  
228  #[tokio::test]
229  #[cfg_attr(miri, ignore)] // No `fstat` in miri.
from_file_detects_not_a_fifo() -> io::Result<()>230  async fn from_file_detects_not_a_fifo() -> io::Result<()> {
231      let dir = tempfile::Builder::new()
232          .prefix("tokio-fifo-tests")
233          .tempdir()
234          .unwrap();
235      let path = dir.path().join("not_a_fifo");
236  
237      // Create an ordinary file.
238      File::create(&path)?;
239  
240      // Check if Sender detects invalid file type.
241      let file = std::fs::OpenOptions::new().write(true).open(&path)?;
242      let err = assert_err!(pipe::Sender::from_file(file));
243      assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
244  
245      // Check if Receiver detects invalid file type.
246      let file = std::fs::OpenOptions::new().read(true).open(&path)?;
247      let err = assert_err!(pipe::Receiver::from_file(file));
248      assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
249  
250      Ok(())
251  }
252  
253  #[tokio::test]
254  #[cfg_attr(miri, ignore)] // No `mkfifo` in miri.
from_file_detects_wrong_access_mode() -> io::Result<()>255  async fn from_file_detects_wrong_access_mode() -> io::Result<()> {
256      let fifo = TempFifo::new("wrong_access_mode")?;
257  
258      // Open a read end to open the fifo for writing.
259      let _reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
260  
261      // Check if Receiver detects write-only access mode.
262      let wronly = std::fs::OpenOptions::new()
263          .write(true)
264          .custom_flags(libc::O_NONBLOCK)
265          .open(&fifo)?;
266      let err = assert_err!(pipe::Receiver::from_file(wronly));
267      assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
268  
269      // Check if Sender detects read-only access mode.
270      let rdonly = std::fs::OpenOptions::new()
271          .read(true)
272          .custom_flags(libc::O_NONBLOCK)
273          .open(&fifo)?;
274      let err = assert_err!(pipe::Sender::from_file(rdonly));
275      assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
276  
277      Ok(())
278  }
279  
is_nonblocking<T: AsRawFd>(fd: &T) -> io::Result<bool>280  fn is_nonblocking<T: AsRawFd>(fd: &T) -> io::Result<bool> {
281      let flags = nix::fcntl::fcntl(fd.as_raw_fd(), nix::fcntl::F_GETFL)?;
282      Ok((flags & libc::O_NONBLOCK) != 0)
283  }
284  
285  #[tokio::test]
286  #[cfg_attr(miri, ignore)] // No `mkfifo` in miri.
from_file_sets_nonblock() -> io::Result<()>287  async fn from_file_sets_nonblock() -> io::Result<()> {
288      let fifo = TempFifo::new("sets_nonblock")?;
289  
290      // Open read and write ends to let blocking files open.
291      let _reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
292      let _writer = pipe::OpenOptions::new().open_sender(&fifo)?;
293  
294      // Check if Receiver sets the pipe in non-blocking mode.
295      let rdonly = std::fs::OpenOptions::new().read(true).open(&fifo)?;
296      assert!(!is_nonblocking(&rdonly)?);
297      let reader = pipe::Receiver::from_file(rdonly)?;
298      assert!(is_nonblocking(&reader)?);
299  
300      // Check if Sender sets the pipe in non-blocking mode.
301      let wronly = std::fs::OpenOptions::new().write(true).open(&fifo)?;
302      assert!(!is_nonblocking(&wronly)?);
303      let writer = pipe::Sender::from_file(wronly)?;
304      assert!(is_nonblocking(&writer)?);
305  
306      Ok(())
307  }
308  
writable_by_poll(writer: &pipe::Sender) -> bool309  fn writable_by_poll(writer: &pipe::Sender) -> bool {
310      task::spawn(writer.writable()).poll().is_ready()
311  }
312  
313  #[tokio::test]
314  #[cfg_attr(miri, ignore)] // No `mkfifo` in miri.
try_read_write() -> io::Result<()>315  async fn try_read_write() -> io::Result<()> {
316      const DATA: &[u8] = b"this is some data to write to the fifo";
317  
318      // Create a pipe pair over a fifo file.
319      let fifo = TempFifo::new("try_read_write")?;
320      let reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
321      let writer = pipe::OpenOptions::new().open_sender(&fifo)?;
322  
323      // Fill the pipe buffer with `try_write`.
324      let mut write_data = Vec::new();
325      while writable_by_poll(&writer) {
326          match writer.try_write(DATA) {
327              Ok(n) => write_data.extend(&DATA[..n]),
328              Err(e) => {
329                  assert_eq!(e.kind(), io::ErrorKind::WouldBlock);
330                  break;
331              }
332          }
333      }
334  
335      // Drain the pipe buffer with `try_read`.
336      let mut read_data = vec![0; write_data.len()];
337      let mut i = 0;
338      while i < write_data.len() {
339          reader.readable().await?;
340          match reader.try_read(&mut read_data[i..]) {
341              Ok(n) => i += n,
342              Err(e) => {
343                  assert_eq!(e.kind(), io::ErrorKind::WouldBlock);
344                  continue;
345              }
346          }
347      }
348  
349      assert_eq!(read_data, write_data);
350  
351      Ok(())
352  }
353  
354  #[tokio::test]
355  #[cfg_attr(miri, ignore)] // No `mkfifo` in miri.
try_read_write_vectored() -> io::Result<()>356  async fn try_read_write_vectored() -> io::Result<()> {
357      const DATA: &[u8] = b"this is some data to write to the fifo";
358  
359      // Create a pipe pair over a fifo file.
360      let fifo = TempFifo::new("try_read_write_vectored")?;
361      let reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
362      let writer = pipe::OpenOptions::new().open_sender(&fifo)?;
363  
364      let write_bufs: Vec<_> = DATA.chunks(3).map(io::IoSlice::new).collect();
365  
366      // Fill the pipe buffer with `try_write_vectored`.
367      let mut write_data = Vec::new();
368      while writable_by_poll(&writer) {
369          match writer.try_write_vectored(&write_bufs) {
370              Ok(n) => write_data.extend(&DATA[..n]),
371              Err(e) => {
372                  assert_eq!(e.kind(), io::ErrorKind::WouldBlock);
373                  break;
374              }
375          }
376      }
377  
378      // Drain the pipe buffer with `try_read_vectored`.
379      let mut read_data = vec![0; write_data.len()];
380      let mut i = 0;
381      while i < write_data.len() {
382          reader.readable().await?;
383  
384          let mut read_bufs: Vec<_> = read_data[i..]
385              .chunks_mut(0x10000)
386              .map(io::IoSliceMut::new)
387              .collect();
388          match reader.try_read_vectored(&mut read_bufs) {
389              Ok(n) => i += n,
390              Err(e) => {
391                  assert_eq!(e.kind(), io::ErrorKind::WouldBlock);
392                  continue;
393              }
394          }
395      }
396  
397      assert_eq!(read_data, write_data);
398  
399      Ok(())
400  }
401  
402  #[tokio::test]
403  #[cfg_attr(miri, ignore)] // No `mkfifo` in miri.
try_read_buf() -> std::io::Result<()>404  async fn try_read_buf() -> std::io::Result<()> {
405      const DATA: &[u8] = b"this is some data to write to the fifo";
406  
407      // Create a pipe pair over a fifo file.
408      let fifo = TempFifo::new("try_read_write_vectored")?;
409      let reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
410      let writer = pipe::OpenOptions::new().open_sender(&fifo)?;
411  
412      // Fill the pipe buffer with `try_write`.
413      let mut write_data = Vec::new();
414      while writable_by_poll(&writer) {
415          match writer.try_write(DATA) {
416              Ok(n) => write_data.extend(&DATA[..n]),
417              Err(e) => {
418                  assert_eq!(e.kind(), io::ErrorKind::WouldBlock);
419                  break;
420              }
421          }
422      }
423  
424      // Drain the pipe buffer with `try_read_buf`.
425      let mut read_data = vec![0; write_data.len()];
426      let mut i = 0;
427      while i < write_data.len() {
428          reader.readable().await?;
429          match reader.try_read_buf(&mut read_data) {
430              Ok(n) => i += n,
431              Err(e) => {
432                  assert_eq!(e.kind(), io::ErrorKind::WouldBlock);
433                  continue;
434              }
435          }
436      }
437  
438      assert_eq!(read_data, write_data);
439  
440      Ok(())
441  }
442  
443  #[tokio::test]
anon_pipe_simple_send() -> io::Result<()>444  async fn anon_pipe_simple_send() -> io::Result<()> {
445      const DATA: &[u8] = b"this is some data to write to the pipe";
446  
447      let (mut writer, mut reader) = pipe::pipe()?;
448  
449      // Create a reading task which should wait for data from the pipe.
450      let mut read_fut = task::spawn(async move {
451          let mut buf = vec![0; DATA.len()];
452          reader.read_exact(&mut buf).await?;
453          Ok::<_, io::Error>(buf)
454      });
455      assert_pending!(read_fut.poll());
456  
457      writer.write_all(DATA).await?;
458  
459      // Let the IO driver poll events for the reader.
460      while !read_fut.is_woken() {
461          tokio::task::yield_now().await;
462      }
463  
464      // Reading task should be ready now.
465      let read_data = assert_ready_ok!(read_fut.poll());
466      assert_eq!(&read_data, DATA);
467  
468      Ok(())
469  }
470  
471  #[tokio::test]
472  #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
anon_pipe_spawn_echo() -> std::io::Result<()>473  async fn anon_pipe_spawn_echo() -> std::io::Result<()> {
474      use tokio::process::Command;
475  
476      const DATA: &str = "this is some data to write to the pipe";
477  
478      let (tx, mut rx) = pipe::pipe()?;
479  
480      let status = Command::new("echo")
481          .arg("-n")
482          .arg(DATA)
483          .stdout(tx.into_blocking_fd()?)
484          .status();
485  
486      let mut buf = vec![0; DATA.len()];
487      rx.read_exact(&mut buf).await?;
488      assert_eq!(String::from_utf8(buf).unwrap(), DATA);
489  
490      let exit_code = status.await?;
491      assert!(exit_code.success());
492  
493      // Check if the pipe is closed.
494      buf = Vec::new();
495      let total = assert_ok!(rx.try_read(&mut buf));
496      assert_eq!(total, 0);
497  
498      Ok(())
499  }
500  
501  #[tokio::test]
502  #[cfg(target_os = "linux")]
503  #[cfg_attr(miri, ignore)] // No `fstat` in miri.
anon_pipe_from_owned_fd() -> std::io::Result<()>504  async fn anon_pipe_from_owned_fd() -> std::io::Result<()> {
505      use nix::fcntl::OFlag;
506  
507      const DATA: &[u8] = b"this is some data to write to the pipe";
508  
509      let (rx_fd, tx_fd) = nix::unistd::pipe2(OFlag::O_CLOEXEC | OFlag::O_NONBLOCK)?;
510  
511      let mut rx = pipe::Receiver::from_owned_fd(rx_fd)?;
512      let mut tx = pipe::Sender::from_owned_fd(tx_fd)?;
513  
514      let mut buf = vec![0; DATA.len()];
515      tx.write_all(DATA).await?;
516      rx.read_exact(&mut buf).await?;
517      assert_eq!(buf, DATA);
518  
519      Ok(())
520  }
521  
522  #[tokio::test]
523  #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
anon_pipe_into_nonblocking_fd() -> std::io::Result<()>524  async fn anon_pipe_into_nonblocking_fd() -> std::io::Result<()> {
525      let (tx, rx) = pipe::pipe()?;
526  
527      let tx_fd = tx.into_nonblocking_fd()?;
528      let rx_fd = rx.into_nonblocking_fd()?;
529  
530      assert!(is_nonblocking(&tx_fd)?);
531      assert!(is_nonblocking(&rx_fd)?);
532  
533      Ok(())
534  }
535  
536  #[tokio::test]
537  #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
anon_pipe_into_blocking_fd() -> std::io::Result<()>538  async fn anon_pipe_into_blocking_fd() -> std::io::Result<()> {
539      let (tx, rx) = pipe::pipe()?;
540  
541      let tx_fd = tx.into_blocking_fd()?;
542      let rx_fd = rx.into_blocking_fd()?;
543  
544      assert!(!is_nonblocking(&tx_fd)?);
545      assert!(!is_nonblocking(&rx_fd)?);
546  
547      Ok(())
548  }
549