1 #![cfg(feature = "full")] 2 #![cfg(unix)] 3 4 use tokio::io::{AsyncReadExt, AsyncWriteExt, Interest}; 5 use tokio::net::unix::pipe; 6 use tokio_test::task; 7 use tokio_test::{assert_err, assert_ok, assert_pending, assert_ready_ok}; 8 9 use std::fs::File; 10 use std::io; 11 use std::os::unix::fs::OpenOptionsExt; 12 use std::os::unix::io::AsRawFd; 13 use std::path::{Path, PathBuf}; 14 15 /// Helper struct which will clean up temporary files once dropped. 16 struct TempFifo { 17 path: PathBuf, 18 _dir: tempfile::TempDir, 19 } 20 21 impl TempFifo { new(name: &str) -> io::Result<TempFifo>22 fn new(name: &str) -> io::Result<TempFifo> { 23 let dir = tempfile::Builder::new() 24 .prefix("tokio-fifo-tests") 25 .tempdir()?; 26 let path = dir.path().join(name); 27 nix::unistd::mkfifo(&path, nix::sys::stat::Mode::S_IRWXU)?; 28 29 Ok(TempFifo { path, _dir: dir }) 30 } 31 } 32 33 impl AsRef<Path> for TempFifo { as_ref(&self) -> &Path34 fn as_ref(&self) -> &Path { 35 self.path.as_ref() 36 } 37 } 38 39 #[tokio::test] 40 #[cfg_attr(miri, ignore)] // No `mkfifo` in miri. fifo_simple_send() -> io::Result<()>41 async fn fifo_simple_send() -> io::Result<()> { 42 const DATA: &[u8] = b"this is some data to write to the fifo"; 43 44 let fifo = TempFifo::new("simple_send")?; 45 46 // Create a reading task which should wait for data from the pipe. 47 let mut reader = pipe::OpenOptions::new().open_receiver(&fifo)?; 48 let mut read_fut = task::spawn(async move { 49 let mut buf = vec![0; DATA.len()]; 50 reader.read_exact(&mut buf).await?; 51 Ok::<_, io::Error>(buf) 52 }); 53 assert_pending!(read_fut.poll()); 54 55 let mut writer = pipe::OpenOptions::new().open_sender(&fifo)?; 56 writer.write_all(DATA).await?; 57 58 // Let the IO driver poll events for the reader. 59 while !read_fut.is_woken() { 60 tokio::task::yield_now().await; 61 } 62 63 // Reading task should be ready now. 64 let read_data = assert_ready_ok!(read_fut.poll()); 65 assert_eq!(&read_data, DATA); 66 67 Ok(()) 68 } 69 70 #[tokio::test] 71 #[cfg(target_os = "linux")] 72 #[cfg_attr(miri, ignore)] // No `mkfifo` in miri. fifo_simple_send_sender_first() -> io::Result<()>73 async fn fifo_simple_send_sender_first() -> io::Result<()> { 74 const DATA: &[u8] = b"this is some data to write to the fifo"; 75 76 // Create a new fifo file with *no reading ends open*. 77 let fifo = TempFifo::new("simple_send_sender_first")?; 78 79 // Simple `open_sender` should fail with ENXIO (no such device or address). 80 let err = assert_err!(pipe::OpenOptions::new().open_sender(&fifo)); 81 assert_eq!(err.raw_os_error(), Some(libc::ENXIO)); 82 83 // `open_sender` in read-write mode should succeed and the pipe should be ready to write. 84 let mut writer = pipe::OpenOptions::new() 85 .read_write(true) 86 .open_sender(&fifo)?; 87 writer.write_all(DATA).await?; 88 89 // Read the written data and validate. 90 let mut reader = pipe::OpenOptions::new().open_receiver(&fifo)?; 91 let mut read_data = vec![0; DATA.len()]; 92 reader.read_exact(&mut read_data).await?; 93 assert_eq!(&read_data, DATA); 94 95 Ok(()) 96 } 97 98 // Opens a FIFO file, write and *close the writer*. write_and_close(path: impl AsRef<Path>, msg: &[u8]) -> io::Result<()>99 async fn write_and_close(path: impl AsRef<Path>, msg: &[u8]) -> io::Result<()> { 100 let mut writer = pipe::OpenOptions::new().open_sender(path)?; 101 writer.write_all(msg).await?; 102 drop(writer); // Explicit drop. 103 Ok(()) 104 } 105 106 /// Checks EOF behavior with single reader and writers sequentially opening 107 /// and closing a FIFO. 108 #[tokio::test] 109 #[cfg_attr(miri, ignore)] // No `mkfifo` in miri. fifo_multiple_writes() -> io::Result<()>110 async fn fifo_multiple_writes() -> io::Result<()> { 111 const DATA: &[u8] = b"this is some data to write to the fifo"; 112 113 let fifo = TempFifo::new("fifo_multiple_writes")?; 114 115 let mut reader = pipe::OpenOptions::new().open_receiver(&fifo)?; 116 117 write_and_close(&fifo, DATA).await?; 118 let ev = reader.ready(Interest::READABLE).await?; 119 assert!(ev.is_readable()); 120 let mut read_data = vec![0; DATA.len()]; 121 assert_ok!(reader.read_exact(&mut read_data).await); 122 123 // Check that reader hits EOF. 124 let err = assert_err!(reader.read_exact(&mut read_data).await); 125 assert_eq!(err.kind(), io::ErrorKind::UnexpectedEof); 126 127 // Write more data and read again. 128 write_and_close(&fifo, DATA).await?; 129 assert_ok!(reader.read_exact(&mut read_data).await); 130 131 Ok(()) 132 } 133 134 /// Checks behavior of a resilient reader (Receiver in O_RDWR access mode) 135 /// with writers sequentially opening and closing a FIFO. 136 #[tokio::test] 137 #[cfg(target_os = "linux")] 138 #[cfg_attr(miri, ignore)] // No `socket` in miri. fifo_resilient_reader() -> io::Result<()>139 async fn fifo_resilient_reader() -> io::Result<()> { 140 const DATA: &[u8] = b"this is some data to write to the fifo"; 141 142 let fifo = TempFifo::new("fifo_resilient_reader")?; 143 144 // Open reader in read-write access mode. 145 let mut reader = pipe::OpenOptions::new() 146 .read_write(true) 147 .open_receiver(&fifo)?; 148 149 write_and_close(&fifo, DATA).await?; 150 let ev = reader.ready(Interest::READABLE).await?; 151 let mut read_data = vec![0; DATA.len()]; 152 reader.read_exact(&mut read_data).await?; 153 154 // Check that reader didn't hit EOF. 155 assert!(!ev.is_read_closed()); 156 157 // Resilient reader can asynchronously wait for the next writer. 158 let mut second_read_fut = task::spawn(reader.read_exact(&mut read_data)); 159 assert_pending!(second_read_fut.poll()); 160 161 // Write more data and read again. 162 write_and_close(&fifo, DATA).await?; 163 assert_ok!(second_read_fut.await); 164 165 Ok(()) 166 } 167 168 #[tokio::test] 169 #[cfg_attr(miri, ignore)] // No `O_NONBLOCK` for open64 in miri. open_detects_not_a_fifo() -> io::Result<()>170 async fn open_detects_not_a_fifo() -> io::Result<()> { 171 let dir = tempfile::Builder::new() 172 .prefix("tokio-fifo-tests") 173 .tempdir() 174 .unwrap(); 175 let path = dir.path().join("not_a_fifo"); 176 177 // Create an ordinary file. 178 File::create(&path)?; 179 180 // Check if Sender detects invalid file type. 181 let err = assert_err!(pipe::OpenOptions::new().open_sender(&path)); 182 assert_eq!(err.kind(), io::ErrorKind::InvalidInput); 183 184 // Check if Receiver detects invalid file type. 185 let err = assert_err!(pipe::OpenOptions::new().open_sender(&path)); 186 assert_eq!(err.kind(), io::ErrorKind::InvalidInput); 187 188 Ok(()) 189 } 190 191 #[tokio::test] 192 #[cfg_attr(miri, ignore)] // No `mkfifo` in miri. from_file() -> io::Result<()>193 async fn from_file() -> io::Result<()> { 194 const DATA: &[u8] = b"this is some data to write to the fifo"; 195 196 let fifo = TempFifo::new("from_file")?; 197 198 // Construct a Receiver from a File. 199 let file = std::fs::OpenOptions::new() 200 .read(true) 201 .custom_flags(libc::O_NONBLOCK) 202 .open(&fifo)?; 203 let mut reader = pipe::Receiver::from_file(file)?; 204 205 // Construct a Sender from a File. 206 let file = std::fs::OpenOptions::new() 207 .write(true) 208 .custom_flags(libc::O_NONBLOCK) 209 .open(&fifo)?; 210 let mut writer = pipe::Sender::from_file(file)?; 211 212 // Write and read some data to test async. 213 let mut read_fut = task::spawn(async move { 214 let mut buf = vec![0; DATA.len()]; 215 reader.read_exact(&mut buf).await?; 216 Ok::<_, io::Error>(buf) 217 }); 218 assert_pending!(read_fut.poll()); 219 220 writer.write_all(DATA).await?; 221 222 let read_data = assert_ok!(read_fut.await); 223 assert_eq!(&read_data, DATA); 224 225 Ok(()) 226 } 227 228 #[tokio::test] 229 #[cfg_attr(miri, ignore)] // No `fstat` in miri. from_file_detects_not_a_fifo() -> io::Result<()>230 async fn from_file_detects_not_a_fifo() -> io::Result<()> { 231 let dir = tempfile::Builder::new() 232 .prefix("tokio-fifo-tests") 233 .tempdir() 234 .unwrap(); 235 let path = dir.path().join("not_a_fifo"); 236 237 // Create an ordinary file. 238 File::create(&path)?; 239 240 // Check if Sender detects invalid file type. 241 let file = std::fs::OpenOptions::new().write(true).open(&path)?; 242 let err = assert_err!(pipe::Sender::from_file(file)); 243 assert_eq!(err.kind(), io::ErrorKind::InvalidInput); 244 245 // Check if Receiver detects invalid file type. 246 let file = std::fs::OpenOptions::new().read(true).open(&path)?; 247 let err = assert_err!(pipe::Receiver::from_file(file)); 248 assert_eq!(err.kind(), io::ErrorKind::InvalidInput); 249 250 Ok(()) 251 } 252 253 #[tokio::test] 254 #[cfg_attr(miri, ignore)] // No `mkfifo` in miri. from_file_detects_wrong_access_mode() -> io::Result<()>255 async fn from_file_detects_wrong_access_mode() -> io::Result<()> { 256 let fifo = TempFifo::new("wrong_access_mode")?; 257 258 // Open a read end to open the fifo for writing. 259 let _reader = pipe::OpenOptions::new().open_receiver(&fifo)?; 260 261 // Check if Receiver detects write-only access mode. 262 let wronly = std::fs::OpenOptions::new() 263 .write(true) 264 .custom_flags(libc::O_NONBLOCK) 265 .open(&fifo)?; 266 let err = assert_err!(pipe::Receiver::from_file(wronly)); 267 assert_eq!(err.kind(), io::ErrorKind::InvalidInput); 268 269 // Check if Sender detects read-only access mode. 270 let rdonly = std::fs::OpenOptions::new() 271 .read(true) 272 .custom_flags(libc::O_NONBLOCK) 273 .open(&fifo)?; 274 let err = assert_err!(pipe::Sender::from_file(rdonly)); 275 assert_eq!(err.kind(), io::ErrorKind::InvalidInput); 276 277 Ok(()) 278 } 279 is_nonblocking<T: AsRawFd>(fd: &T) -> io::Result<bool>280 fn is_nonblocking<T: AsRawFd>(fd: &T) -> io::Result<bool> { 281 let flags = nix::fcntl::fcntl(fd.as_raw_fd(), nix::fcntl::F_GETFL)?; 282 Ok((flags & libc::O_NONBLOCK) != 0) 283 } 284 285 #[tokio::test] 286 #[cfg_attr(miri, ignore)] // No `mkfifo` in miri. from_file_sets_nonblock() -> io::Result<()>287 async fn from_file_sets_nonblock() -> io::Result<()> { 288 let fifo = TempFifo::new("sets_nonblock")?; 289 290 // Open read and write ends to let blocking files open. 291 let _reader = pipe::OpenOptions::new().open_receiver(&fifo)?; 292 let _writer = pipe::OpenOptions::new().open_sender(&fifo)?; 293 294 // Check if Receiver sets the pipe in non-blocking mode. 295 let rdonly = std::fs::OpenOptions::new().read(true).open(&fifo)?; 296 assert!(!is_nonblocking(&rdonly)?); 297 let reader = pipe::Receiver::from_file(rdonly)?; 298 assert!(is_nonblocking(&reader)?); 299 300 // Check if Sender sets the pipe in non-blocking mode. 301 let wronly = std::fs::OpenOptions::new().write(true).open(&fifo)?; 302 assert!(!is_nonblocking(&wronly)?); 303 let writer = pipe::Sender::from_file(wronly)?; 304 assert!(is_nonblocking(&writer)?); 305 306 Ok(()) 307 } 308 writable_by_poll(writer: &pipe::Sender) -> bool309 fn writable_by_poll(writer: &pipe::Sender) -> bool { 310 task::spawn(writer.writable()).poll().is_ready() 311 } 312 313 #[tokio::test] 314 #[cfg_attr(miri, ignore)] // No `mkfifo` in miri. try_read_write() -> io::Result<()>315 async fn try_read_write() -> io::Result<()> { 316 const DATA: &[u8] = b"this is some data to write to the fifo"; 317 318 // Create a pipe pair over a fifo file. 319 let fifo = TempFifo::new("try_read_write")?; 320 let reader = pipe::OpenOptions::new().open_receiver(&fifo)?; 321 let writer = pipe::OpenOptions::new().open_sender(&fifo)?; 322 323 // Fill the pipe buffer with `try_write`. 324 let mut write_data = Vec::new(); 325 while writable_by_poll(&writer) { 326 match writer.try_write(DATA) { 327 Ok(n) => write_data.extend(&DATA[..n]), 328 Err(e) => { 329 assert_eq!(e.kind(), io::ErrorKind::WouldBlock); 330 break; 331 } 332 } 333 } 334 335 // Drain the pipe buffer with `try_read`. 336 let mut read_data = vec![0; write_data.len()]; 337 let mut i = 0; 338 while i < write_data.len() { 339 reader.readable().await?; 340 match reader.try_read(&mut read_data[i..]) { 341 Ok(n) => i += n, 342 Err(e) => { 343 assert_eq!(e.kind(), io::ErrorKind::WouldBlock); 344 continue; 345 } 346 } 347 } 348 349 assert_eq!(read_data, write_data); 350 351 Ok(()) 352 } 353 354 #[tokio::test] 355 #[cfg_attr(miri, ignore)] // No `mkfifo` in miri. try_read_write_vectored() -> io::Result<()>356 async fn try_read_write_vectored() -> io::Result<()> { 357 const DATA: &[u8] = b"this is some data to write to the fifo"; 358 359 // Create a pipe pair over a fifo file. 360 let fifo = TempFifo::new("try_read_write_vectored")?; 361 let reader = pipe::OpenOptions::new().open_receiver(&fifo)?; 362 let writer = pipe::OpenOptions::new().open_sender(&fifo)?; 363 364 let write_bufs: Vec<_> = DATA.chunks(3).map(io::IoSlice::new).collect(); 365 366 // Fill the pipe buffer with `try_write_vectored`. 367 let mut write_data = Vec::new(); 368 while writable_by_poll(&writer) { 369 match writer.try_write_vectored(&write_bufs) { 370 Ok(n) => write_data.extend(&DATA[..n]), 371 Err(e) => { 372 assert_eq!(e.kind(), io::ErrorKind::WouldBlock); 373 break; 374 } 375 } 376 } 377 378 // Drain the pipe buffer with `try_read_vectored`. 379 let mut read_data = vec![0; write_data.len()]; 380 let mut i = 0; 381 while i < write_data.len() { 382 reader.readable().await?; 383 384 let mut read_bufs: Vec<_> = read_data[i..] 385 .chunks_mut(0x10000) 386 .map(io::IoSliceMut::new) 387 .collect(); 388 match reader.try_read_vectored(&mut read_bufs) { 389 Ok(n) => i += n, 390 Err(e) => { 391 assert_eq!(e.kind(), io::ErrorKind::WouldBlock); 392 continue; 393 } 394 } 395 } 396 397 assert_eq!(read_data, write_data); 398 399 Ok(()) 400 } 401 402 #[tokio::test] 403 #[cfg_attr(miri, ignore)] // No `mkfifo` in miri. try_read_buf() -> std::io::Result<()>404 async fn try_read_buf() -> std::io::Result<()> { 405 const DATA: &[u8] = b"this is some data to write to the fifo"; 406 407 // Create a pipe pair over a fifo file. 408 let fifo = TempFifo::new("try_read_write_vectored")?; 409 let reader = pipe::OpenOptions::new().open_receiver(&fifo)?; 410 let writer = pipe::OpenOptions::new().open_sender(&fifo)?; 411 412 // Fill the pipe buffer with `try_write`. 413 let mut write_data = Vec::new(); 414 while writable_by_poll(&writer) { 415 match writer.try_write(DATA) { 416 Ok(n) => write_data.extend(&DATA[..n]), 417 Err(e) => { 418 assert_eq!(e.kind(), io::ErrorKind::WouldBlock); 419 break; 420 } 421 } 422 } 423 424 // Drain the pipe buffer with `try_read_buf`. 425 let mut read_data = vec![0; write_data.len()]; 426 let mut i = 0; 427 while i < write_data.len() { 428 reader.readable().await?; 429 match reader.try_read_buf(&mut read_data) { 430 Ok(n) => i += n, 431 Err(e) => { 432 assert_eq!(e.kind(), io::ErrorKind::WouldBlock); 433 continue; 434 } 435 } 436 } 437 438 assert_eq!(read_data, write_data); 439 440 Ok(()) 441 } 442 443 #[tokio::test] anon_pipe_simple_send() -> io::Result<()>444 async fn anon_pipe_simple_send() -> io::Result<()> { 445 const DATA: &[u8] = b"this is some data to write to the pipe"; 446 447 let (mut writer, mut reader) = pipe::pipe()?; 448 449 // Create a reading task which should wait for data from the pipe. 450 let mut read_fut = task::spawn(async move { 451 let mut buf = vec![0; DATA.len()]; 452 reader.read_exact(&mut buf).await?; 453 Ok::<_, io::Error>(buf) 454 }); 455 assert_pending!(read_fut.poll()); 456 457 writer.write_all(DATA).await?; 458 459 // Let the IO driver poll events for the reader. 460 while !read_fut.is_woken() { 461 tokio::task::yield_now().await; 462 } 463 464 // Reading task should be ready now. 465 let read_data = assert_ready_ok!(read_fut.poll()); 466 assert_eq!(&read_data, DATA); 467 468 Ok(()) 469 } 470 471 #[tokio::test] 472 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri. anon_pipe_spawn_echo() -> std::io::Result<()>473 async fn anon_pipe_spawn_echo() -> std::io::Result<()> { 474 use tokio::process::Command; 475 476 const DATA: &str = "this is some data to write to the pipe"; 477 478 let (tx, mut rx) = pipe::pipe()?; 479 480 let status = Command::new("echo") 481 .arg("-n") 482 .arg(DATA) 483 .stdout(tx.into_blocking_fd()?) 484 .status(); 485 486 let mut buf = vec![0; DATA.len()]; 487 rx.read_exact(&mut buf).await?; 488 assert_eq!(String::from_utf8(buf).unwrap(), DATA); 489 490 let exit_code = status.await?; 491 assert!(exit_code.success()); 492 493 // Check if the pipe is closed. 494 buf = Vec::new(); 495 let total = assert_ok!(rx.try_read(&mut buf)); 496 assert_eq!(total, 0); 497 498 Ok(()) 499 } 500 501 #[tokio::test] 502 #[cfg(target_os = "linux")] 503 #[cfg_attr(miri, ignore)] // No `fstat` in miri. anon_pipe_from_owned_fd() -> std::io::Result<()>504 async fn anon_pipe_from_owned_fd() -> std::io::Result<()> { 505 use nix::fcntl::OFlag; 506 507 const DATA: &[u8] = b"this is some data to write to the pipe"; 508 509 let (rx_fd, tx_fd) = nix::unistd::pipe2(OFlag::O_CLOEXEC | OFlag::O_NONBLOCK)?; 510 511 let mut rx = pipe::Receiver::from_owned_fd(rx_fd)?; 512 let mut tx = pipe::Sender::from_owned_fd(tx_fd)?; 513 514 let mut buf = vec![0; DATA.len()]; 515 tx.write_all(DATA).await?; 516 rx.read_exact(&mut buf).await?; 517 assert_eq!(buf, DATA); 518 519 Ok(()) 520 } 521 522 #[tokio::test] 523 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri. anon_pipe_into_nonblocking_fd() -> std::io::Result<()>524 async fn anon_pipe_into_nonblocking_fd() -> std::io::Result<()> { 525 let (tx, rx) = pipe::pipe()?; 526 527 let tx_fd = tx.into_nonblocking_fd()?; 528 let rx_fd = rx.into_nonblocking_fd()?; 529 530 assert!(is_nonblocking(&tx_fd)?); 531 assert!(is_nonblocking(&rx_fd)?); 532 533 Ok(()) 534 } 535 536 #[tokio::test] 537 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri. anon_pipe_into_blocking_fd() -> std::io::Result<()>538 async fn anon_pipe_into_blocking_fd() -> std::io::Result<()> { 539 let (tx, rx) = pipe::pipe()?; 540 541 let tx_fd = tx.into_blocking_fd()?; 542 let rx_fd = rx.into_blocking_fd()?; 543 544 assert!(!is_nonblocking(&tx_fd)?); 545 assert!(!is_nonblocking(&rx_fd)?); 546 547 Ok(()) 548 } 549