1 //! Types for working with [`File`]. 2 //! 3 //! [`File`]: File 4 5 use crate::fs::{asyncify, OpenOptions}; 6 use crate::io::blocking::{Buf, DEFAULT_MAX_BUF_SIZE}; 7 use crate::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}; 8 use crate::sync::Mutex; 9 10 use std::fmt; 11 use std::fs::{Metadata, Permissions}; 12 use std::future::Future; 13 use std::io::{self, Seek, SeekFrom}; 14 use std::path::Path; 15 use std::pin::Pin; 16 use std::sync::Arc; 17 use std::task::{ready, Context, Poll}; 18 19 #[cfg(test)] 20 use super::mocks::JoinHandle; 21 #[cfg(test)] 22 use super::mocks::MockFile as StdFile; 23 #[cfg(test)] 24 use super::mocks::{spawn_blocking, spawn_mandatory_blocking}; 25 #[cfg(not(test))] 26 use crate::blocking::JoinHandle; 27 #[cfg(not(test))] 28 use crate::blocking::{spawn_blocking, spawn_mandatory_blocking}; 29 #[cfg(not(test))] 30 use std::fs::File as StdFile; 31 32 /// A reference to an open file on the filesystem. 33 /// 34 /// This is a specialized version of [`std::fs::File`] for usage from the 35 /// Tokio runtime. 36 /// 37 /// An instance of a `File` can be read and/or written depending on what options 38 /// it was opened with. Files also implement [`AsyncSeek`] to alter the logical 39 /// cursor that the file contains internally. 40 /// 41 /// A file will not be closed immediately when it goes out of scope if there 42 /// are any IO operations that have not yet completed. To ensure that a file is 43 /// closed immediately when it is dropped, you should call [`flush`] before 44 /// dropping it. Note that this does not ensure that the file has been fully 45 /// written to disk; the operating system might keep the changes around in an 46 /// in-memory buffer. See the [`sync_all`] method for telling the OS to write 47 /// the data to disk. 48 /// 49 /// Reading and writing to a `File` is usually done using the convenience 50 /// methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] traits. 51 /// 52 /// [`AsyncSeek`]: trait@crate::io::AsyncSeek 53 /// [`flush`]: fn@crate::io::AsyncWriteExt::flush 54 /// [`sync_all`]: fn@crate::fs::File::sync_all 55 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt 56 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt 57 /// 58 /// # Examples 59 /// 60 /// Create a new file and asynchronously write bytes to it: 61 /// 62 /// ```no_run 63 /// use tokio::fs::File; 64 /// use tokio::io::AsyncWriteExt; // for write_all() 65 /// 66 /// # async fn dox() -> std::io::Result<()> { 67 /// let mut file = File::create("foo.txt").await?; 68 /// file.write_all(b"hello, world!").await?; 69 /// # Ok(()) 70 /// # } 71 /// ``` 72 /// 73 /// Read the contents of a file into a buffer: 74 /// 75 /// ```no_run 76 /// use tokio::fs::File; 77 /// use tokio::io::AsyncReadExt; // for read_to_end() 78 /// 79 /// # async fn dox() -> std::io::Result<()> { 80 /// let mut file = File::open("foo.txt").await?; 81 /// 82 /// let mut contents = vec![]; 83 /// file.read_to_end(&mut contents).await?; 84 /// 85 /// println!("len = {}", contents.len()); 86 /// # Ok(()) 87 /// # } 88 /// ``` 89 pub struct File { 90 std: Arc<StdFile>, 91 inner: Mutex<Inner>, 92 max_buf_size: usize, 93 } 94 95 struct Inner { 96 state: State, 97 98 /// Errors from writes/flushes are returned in write/flush calls. If a write 99 /// error is observed while performing a read, it is saved until the next 100 /// write / flush call. 101 last_write_err: Option<io::ErrorKind>, 102 103 pos: u64, 104 } 105 106 #[derive(Debug)] 107 enum State { 108 Idle(Option<Buf>), 109 Busy(JoinHandle<(Operation, Buf)>), 110 } 111 112 #[derive(Debug)] 113 enum Operation { 114 Read(io::Result<usize>), 115 Write(io::Result<()>), 116 Seek(io::Result<u64>), 117 } 118 119 impl File { 120 /// Attempts to open a file in read-only mode. 121 /// 122 /// See [`OpenOptions`] for more details. 123 /// 124 /// # Errors 125 /// 126 /// This function will return an error if called from outside of the Tokio 127 /// runtime or if path does not already exist. Other errors may also be 128 /// returned according to `OpenOptions::open`. 129 /// 130 /// # Examples 131 /// 132 /// ```no_run 133 /// use tokio::fs::File; 134 /// use tokio::io::AsyncReadExt; 135 /// 136 /// # async fn dox() -> std::io::Result<()> { 137 /// let mut file = File::open("foo.txt").await?; 138 /// 139 /// let mut contents = vec![]; 140 /// file.read_to_end(&mut contents).await?; 141 /// 142 /// println!("len = {}", contents.len()); 143 /// # Ok(()) 144 /// # } 145 /// ``` 146 /// 147 /// The [`read_to_end`] method is defined on the [`AsyncReadExt`] trait. 148 /// 149 /// [`read_to_end`]: fn@crate::io::AsyncReadExt::read_to_end 150 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt open(path: impl AsRef<Path>) -> io::Result<File>151 pub async fn open(path: impl AsRef<Path>) -> io::Result<File> { 152 let path = path.as_ref().to_owned(); 153 let std = asyncify(|| StdFile::open(path)).await?; 154 155 Ok(File::from_std(std)) 156 } 157 158 /// Opens a file in write-only mode. 159 /// 160 /// This function will create a file if it does not exist, and will truncate 161 /// it if it does. 162 /// 163 /// See [`OpenOptions`] for more details. 164 /// 165 /// # Errors 166 /// 167 /// Results in an error if called from outside of the Tokio runtime or if 168 /// the underlying [`create`] call results in an error. 169 /// 170 /// [`create`]: std::fs::File::create 171 /// 172 /// # Examples 173 /// 174 /// ```no_run 175 /// use tokio::fs::File; 176 /// use tokio::io::AsyncWriteExt; 177 /// 178 /// # async fn dox() -> std::io::Result<()> { 179 /// let mut file = File::create("foo.txt").await?; 180 /// file.write_all(b"hello, world!").await?; 181 /// # Ok(()) 182 /// # } 183 /// ``` 184 /// 185 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. 186 /// 187 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all 188 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt create(path: impl AsRef<Path>) -> io::Result<File>189 pub async fn create(path: impl AsRef<Path>) -> io::Result<File> { 190 let path = path.as_ref().to_owned(); 191 let std_file = asyncify(move || StdFile::create(path)).await?; 192 Ok(File::from_std(std_file)) 193 } 194 195 /// Opens a file in read-write mode. 196 /// 197 /// This function will create a file if it does not exist, or return an error 198 /// if it does. This way, if the call succeeds, the file returned is guaranteed 199 /// to be new. 200 /// 201 /// This option is useful because it is atomic. Otherwise between checking 202 /// whether a file exists and creating a new one, the file may have been 203 /// created by another process (a TOCTOU race condition / attack). 204 /// 205 /// This can also be written using `File::options().read(true).write(true).create_new(true).open(...)`. 206 /// 207 /// See [`OpenOptions`] for more details. 208 /// 209 /// # Examples 210 /// 211 /// ```no_run 212 /// use tokio::fs::File; 213 /// use tokio::io::AsyncWriteExt; 214 /// 215 /// # async fn dox() -> std::io::Result<()> { 216 /// let mut file = File::create_new("foo.txt").await?; 217 /// file.write_all(b"hello, world!").await?; 218 /// # Ok(()) 219 /// # } 220 /// ``` 221 /// 222 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. 223 /// 224 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all 225 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt create_new<P: AsRef<Path>>(path: P) -> std::io::Result<File>226 pub async fn create_new<P: AsRef<Path>>(path: P) -> std::io::Result<File> { 227 Self::options() 228 .read(true) 229 .write(true) 230 .create_new(true) 231 .open(path) 232 .await 233 } 234 235 /// Returns a new [`OpenOptions`] object. 236 /// 237 /// This function returns a new `OpenOptions` object that you can use to 238 /// open or create a file with specific options if `open()` or `create()` 239 /// are not appropriate. 240 /// 241 /// It is equivalent to `OpenOptions::new()`, but allows you to write more 242 /// readable code. Instead of 243 /// `OpenOptions::new().append(true).open("example.log")`, 244 /// you can write `File::options().append(true).open("example.log")`. This 245 /// also avoids the need to import `OpenOptions`. 246 /// 247 /// See the [`OpenOptions::new`] function for more details. 248 /// 249 /// # Examples 250 /// 251 /// ```no_run 252 /// use tokio::fs::File; 253 /// use tokio::io::AsyncWriteExt; 254 /// 255 /// # async fn dox() -> std::io::Result<()> { 256 /// let mut f = File::options().append(true).open("example.log").await?; 257 /// f.write_all(b"new line\n").await?; 258 /// # Ok(()) 259 /// # } 260 /// ``` 261 #[must_use] options() -> OpenOptions262 pub fn options() -> OpenOptions { 263 OpenOptions::new() 264 } 265 266 /// Converts a [`std::fs::File`] to a [`tokio::fs::File`](File). 267 /// 268 /// # Examples 269 /// 270 /// ```no_run 271 /// // This line could block. It is not recommended to do this on the Tokio 272 /// // runtime. 273 /// let std_file = std::fs::File::open("foo.txt").unwrap(); 274 /// let file = tokio::fs::File::from_std(std_file); 275 /// ``` from_std(std: StdFile) -> File276 pub fn from_std(std: StdFile) -> File { 277 File { 278 std: Arc::new(std), 279 inner: Mutex::new(Inner { 280 state: State::Idle(Some(Buf::with_capacity(0))), 281 last_write_err: None, 282 pos: 0, 283 }), 284 max_buf_size: DEFAULT_MAX_BUF_SIZE, 285 } 286 } 287 288 /// Attempts to sync all OS-internal metadata to disk. 289 /// 290 /// This function will attempt to ensure that all in-core data reaches the 291 /// filesystem before returning. 292 /// 293 /// # Examples 294 /// 295 /// ```no_run 296 /// use tokio::fs::File; 297 /// use tokio::io::AsyncWriteExt; 298 /// 299 /// # async fn dox() -> std::io::Result<()> { 300 /// let mut file = File::create("foo.txt").await?; 301 /// file.write_all(b"hello, world!").await?; 302 /// file.sync_all().await?; 303 /// # Ok(()) 304 /// # } 305 /// ``` 306 /// 307 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. 308 /// 309 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all 310 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt sync_all(&self) -> io::Result<()>311 pub async fn sync_all(&self) -> io::Result<()> { 312 let mut inner = self.inner.lock().await; 313 inner.complete_inflight().await; 314 315 let std = self.std.clone(); 316 asyncify(move || std.sync_all()).await 317 } 318 319 /// This function is similar to `sync_all`, except that it may not 320 /// synchronize file metadata to the filesystem. 321 /// 322 /// This is intended for use cases that must synchronize content, but don't 323 /// need the metadata on disk. The goal of this method is to reduce disk 324 /// operations. 325 /// 326 /// Note that some platforms may simply implement this in terms of `sync_all`. 327 /// 328 /// # Examples 329 /// 330 /// ```no_run 331 /// use tokio::fs::File; 332 /// use tokio::io::AsyncWriteExt; 333 /// 334 /// # async fn dox() -> std::io::Result<()> { 335 /// let mut file = File::create("foo.txt").await?; 336 /// file.write_all(b"hello, world!").await?; 337 /// file.sync_data().await?; 338 /// # Ok(()) 339 /// # } 340 /// ``` 341 /// 342 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. 343 /// 344 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all 345 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt sync_data(&self) -> io::Result<()>346 pub async fn sync_data(&self) -> io::Result<()> { 347 let mut inner = self.inner.lock().await; 348 inner.complete_inflight().await; 349 350 let std = self.std.clone(); 351 asyncify(move || std.sync_data()).await 352 } 353 354 /// Truncates or extends the underlying file, updating the size of this file to become size. 355 /// 356 /// If the size is less than the current file's size, then the file will be 357 /// shrunk. If it is greater than the current file's size, then the file 358 /// will be extended to size and have all of the intermediate data filled in 359 /// with 0s. 360 /// 361 /// # Errors 362 /// 363 /// This function will return an error if the file is not opened for 364 /// writing. 365 /// 366 /// # Examples 367 /// 368 /// ```no_run 369 /// use tokio::fs::File; 370 /// use tokio::io::AsyncWriteExt; 371 /// 372 /// # async fn dox() -> std::io::Result<()> { 373 /// let mut file = File::create("foo.txt").await?; 374 /// file.write_all(b"hello, world!").await?; 375 /// file.set_len(10).await?; 376 /// # Ok(()) 377 /// # } 378 /// ``` 379 /// 380 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. 381 /// 382 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all 383 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt set_len(&self, size: u64) -> io::Result<()>384 pub async fn set_len(&self, size: u64) -> io::Result<()> { 385 let mut inner = self.inner.lock().await; 386 inner.complete_inflight().await; 387 388 let mut buf = match inner.state { 389 State::Idle(ref mut buf_cell) => buf_cell.take().unwrap(), 390 _ => unreachable!(), 391 }; 392 393 let seek = if !buf.is_empty() { 394 Some(SeekFrom::Current(buf.discard_read())) 395 } else { 396 None 397 }; 398 399 let std = self.std.clone(); 400 401 inner.state = State::Busy(spawn_blocking(move || { 402 let res = if let Some(seek) = seek { 403 (&*std).seek(seek).and_then(|_| std.set_len(size)) 404 } else { 405 std.set_len(size) 406 } 407 .map(|()| 0); // the value is discarded later 408 409 // Return the result as a seek 410 (Operation::Seek(res), buf) 411 })); 412 413 let (op, buf) = match inner.state { 414 State::Idle(_) => unreachable!(), 415 State::Busy(ref mut rx) => rx.await?, 416 }; 417 418 inner.state = State::Idle(Some(buf)); 419 420 match op { 421 Operation::Seek(res) => res.map(|pos| { 422 inner.pos = pos; 423 }), 424 _ => unreachable!(), 425 } 426 } 427 428 /// Queries metadata about the underlying file. 429 /// 430 /// # Examples 431 /// 432 /// ```no_run 433 /// use tokio::fs::File; 434 /// 435 /// # async fn dox() -> std::io::Result<()> { 436 /// let file = File::open("foo.txt").await?; 437 /// let metadata = file.metadata().await?; 438 /// 439 /// println!("{:?}", metadata); 440 /// # Ok(()) 441 /// # } 442 /// ``` metadata(&self) -> io::Result<Metadata>443 pub async fn metadata(&self) -> io::Result<Metadata> { 444 let std = self.std.clone(); 445 asyncify(move || std.metadata()).await 446 } 447 448 /// Creates a new `File` instance that shares the same underlying file handle 449 /// as the existing `File` instance. Reads, writes, and seeks will affect both 450 /// File instances simultaneously. 451 /// 452 /// # Examples 453 /// 454 /// ```no_run 455 /// use tokio::fs::File; 456 /// 457 /// # async fn dox() -> std::io::Result<()> { 458 /// let file = File::open("foo.txt").await?; 459 /// let file_clone = file.try_clone().await?; 460 /// # Ok(()) 461 /// # } 462 /// ``` try_clone(&self) -> io::Result<File>463 pub async fn try_clone(&self) -> io::Result<File> { 464 self.inner.lock().await.complete_inflight().await; 465 let std = self.std.clone(); 466 let std_file = asyncify(move || std.try_clone()).await?; 467 Ok(File::from_std(std_file)) 468 } 469 470 /// Destructures `File` into a [`std::fs::File`]. This function is 471 /// async to allow any in-flight operations to complete. 472 /// 473 /// Use `File::try_into_std` to attempt conversion immediately. 474 /// 475 /// # Examples 476 /// 477 /// ```no_run 478 /// use tokio::fs::File; 479 /// 480 /// # async fn dox() -> std::io::Result<()> { 481 /// let tokio_file = File::open("foo.txt").await?; 482 /// let std_file = tokio_file.into_std().await; 483 /// # Ok(()) 484 /// # } 485 /// ``` into_std(mut self) -> StdFile486 pub async fn into_std(mut self) -> StdFile { 487 self.inner.get_mut().complete_inflight().await; 488 Arc::try_unwrap(self.std).expect("Arc::try_unwrap failed") 489 } 490 491 /// Tries to immediately destructure `File` into a [`std::fs::File`]. 492 /// 493 /// # Errors 494 /// 495 /// This function will return an error containing the file if some 496 /// operation is in-flight. 497 /// 498 /// # Examples 499 /// 500 /// ```no_run 501 /// use tokio::fs::File; 502 /// 503 /// # async fn dox() -> std::io::Result<()> { 504 /// let tokio_file = File::open("foo.txt").await?; 505 /// let std_file = tokio_file.try_into_std().unwrap(); 506 /// # Ok(()) 507 /// # } 508 /// ``` try_into_std(mut self) -> Result<StdFile, Self>509 pub fn try_into_std(mut self) -> Result<StdFile, Self> { 510 match Arc::try_unwrap(self.std) { 511 Ok(file) => Ok(file), 512 Err(std_file_arc) => { 513 self.std = std_file_arc; 514 Err(self) 515 } 516 } 517 } 518 519 /// Changes the permissions on the underlying file. 520 /// 521 /// # Platform-specific behavior 522 /// 523 /// This function currently corresponds to the `fchmod` function on Unix and 524 /// the `SetFileInformationByHandle` function on Windows. Note that, this 525 /// [may change in the future][changes]. 526 /// 527 /// [changes]: https://doc.rust-lang.org/std/io/index.html#platform-specific-behavior 528 /// 529 /// # Errors 530 /// 531 /// This function will return an error if the user lacks permission change 532 /// attributes on the underlying file. It may also return an error in other 533 /// os-specific unspecified cases. 534 /// 535 /// # Examples 536 /// 537 /// ```no_run 538 /// use tokio::fs::File; 539 /// 540 /// # async fn dox() -> std::io::Result<()> { 541 /// let file = File::open("foo.txt").await?; 542 /// let mut perms = file.metadata().await?.permissions(); 543 /// perms.set_readonly(true); 544 /// file.set_permissions(perms).await?; 545 /// # Ok(()) 546 /// # } 547 /// ``` set_permissions(&self, perm: Permissions) -> io::Result<()>548 pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> { 549 let std = self.std.clone(); 550 asyncify(move || std.set_permissions(perm)).await 551 } 552 553 /// Set the maximum buffer size for the underlying [`AsyncRead`] / [`AsyncWrite`] operation. 554 /// 555 /// Although Tokio uses a sensible default value for this buffer size, this function would be 556 /// useful for changing that default depending on the situation. 557 /// 558 /// # Examples 559 /// 560 /// ```no_run 561 /// use tokio::fs::File; 562 /// use tokio::io::AsyncWriteExt; 563 /// 564 /// # async fn dox() -> std::io::Result<()> { 565 /// let mut file = File::open("foo.txt").await?; 566 /// 567 /// // Set maximum buffer size to 8 MiB 568 /// file.set_max_buf_size(8 * 1024 * 1024); 569 /// 570 /// let mut buf = vec![1; 1024 * 1024 * 1024]; 571 /// 572 /// // Write the 1 GiB buffer in chunks up to 8 MiB each. 573 /// file.write_all(&mut buf).await?; 574 /// # Ok(()) 575 /// # } 576 /// ``` set_max_buf_size(&mut self, max_buf_size: usize)577 pub fn set_max_buf_size(&mut self, max_buf_size: usize) { 578 self.max_buf_size = max_buf_size; 579 } 580 } 581 582 impl AsyncRead for File { poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, dst: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>583 fn poll_read( 584 self: Pin<&mut Self>, 585 cx: &mut Context<'_>, 586 dst: &mut ReadBuf<'_>, 587 ) -> Poll<io::Result<()>> { 588 ready!(crate::trace::trace_leaf(cx)); 589 let me = self.get_mut(); 590 let inner = me.inner.get_mut(); 591 592 loop { 593 match inner.state { 594 State::Idle(ref mut buf_cell) => { 595 let mut buf = buf_cell.take().unwrap(); 596 597 if !buf.is_empty() { 598 buf.copy_to(dst); 599 *buf_cell = Some(buf); 600 return Poll::Ready(Ok(())); 601 } 602 603 buf.ensure_capacity_for(dst, me.max_buf_size); 604 let std = me.std.clone(); 605 606 inner.state = State::Busy(spawn_blocking(move || { 607 let res = buf.read_from(&mut &*std); 608 (Operation::Read(res), buf) 609 })); 610 } 611 State::Busy(ref mut rx) => { 612 let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?; 613 614 match op { 615 Operation::Read(Ok(_)) => { 616 buf.copy_to(dst); 617 inner.state = State::Idle(Some(buf)); 618 return Poll::Ready(Ok(())); 619 } 620 Operation::Read(Err(e)) => { 621 assert!(buf.is_empty()); 622 623 inner.state = State::Idle(Some(buf)); 624 return Poll::Ready(Err(e)); 625 } 626 Operation::Write(Ok(())) => { 627 assert!(buf.is_empty()); 628 inner.state = State::Idle(Some(buf)); 629 continue; 630 } 631 Operation::Write(Err(e)) => { 632 assert!(inner.last_write_err.is_none()); 633 inner.last_write_err = Some(e.kind()); 634 inner.state = State::Idle(Some(buf)); 635 } 636 Operation::Seek(result) => { 637 assert!(buf.is_empty()); 638 inner.state = State::Idle(Some(buf)); 639 if let Ok(pos) = result { 640 inner.pos = pos; 641 } 642 continue; 643 } 644 } 645 } 646 } 647 } 648 } 649 } 650 651 impl AsyncSeek for File { start_seek(self: Pin<&mut Self>, mut pos: SeekFrom) -> io::Result<()>652 fn start_seek(self: Pin<&mut Self>, mut pos: SeekFrom) -> io::Result<()> { 653 let me = self.get_mut(); 654 let inner = me.inner.get_mut(); 655 656 match inner.state { 657 State::Busy(_) => Err(io::Error::new( 658 io::ErrorKind::Other, 659 "other file operation is pending, call poll_complete before start_seek", 660 )), 661 State::Idle(ref mut buf_cell) => { 662 let mut buf = buf_cell.take().unwrap(); 663 664 // Factor in any unread data from the buf 665 if !buf.is_empty() { 666 let n = buf.discard_read(); 667 668 if let SeekFrom::Current(ref mut offset) = pos { 669 *offset += n; 670 } 671 } 672 673 let std = me.std.clone(); 674 675 inner.state = State::Busy(spawn_blocking(move || { 676 let res = (&*std).seek(pos); 677 (Operation::Seek(res), buf) 678 })); 679 Ok(()) 680 } 681 } 682 } 683 poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>>684 fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> { 685 ready!(crate::trace::trace_leaf(cx)); 686 let inner = self.inner.get_mut(); 687 688 loop { 689 match inner.state { 690 State::Idle(_) => return Poll::Ready(Ok(inner.pos)), 691 State::Busy(ref mut rx) => { 692 let (op, buf) = ready!(Pin::new(rx).poll(cx))?; 693 inner.state = State::Idle(Some(buf)); 694 695 match op { 696 Operation::Read(_) => {} 697 Operation::Write(Err(e)) => { 698 assert!(inner.last_write_err.is_none()); 699 inner.last_write_err = Some(e.kind()); 700 } 701 Operation::Write(_) => {} 702 Operation::Seek(res) => { 703 if let Ok(pos) = res { 704 inner.pos = pos; 705 } 706 return Poll::Ready(res); 707 } 708 } 709 } 710 } 711 } 712 } 713 } 714 715 impl AsyncWrite for File { poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, src: &[u8], ) -> Poll<io::Result<usize>>716 fn poll_write( 717 self: Pin<&mut Self>, 718 cx: &mut Context<'_>, 719 src: &[u8], 720 ) -> Poll<io::Result<usize>> { 721 ready!(crate::trace::trace_leaf(cx)); 722 let me = self.get_mut(); 723 let inner = me.inner.get_mut(); 724 725 if let Some(e) = inner.last_write_err.take() { 726 return Poll::Ready(Err(e.into())); 727 } 728 729 loop { 730 match inner.state { 731 State::Idle(ref mut buf_cell) => { 732 let mut buf = buf_cell.take().unwrap(); 733 734 let seek = if !buf.is_empty() { 735 Some(SeekFrom::Current(buf.discard_read())) 736 } else { 737 None 738 }; 739 740 let n = buf.copy_from(src, me.max_buf_size); 741 let std = me.std.clone(); 742 743 let blocking_task_join_handle = spawn_mandatory_blocking(move || { 744 let res = if let Some(seek) = seek { 745 (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std)) 746 } else { 747 buf.write_to(&mut &*std) 748 }; 749 750 (Operation::Write(res), buf) 751 }) 752 .ok_or_else(|| { 753 io::Error::new(io::ErrorKind::Other, "background task failed") 754 })?; 755 756 inner.state = State::Busy(blocking_task_join_handle); 757 758 return Poll::Ready(Ok(n)); 759 } 760 State::Busy(ref mut rx) => { 761 let (op, buf) = ready!(Pin::new(rx).poll(cx))?; 762 inner.state = State::Idle(Some(buf)); 763 764 match op { 765 Operation::Read(_) => { 766 // We don't care about the result here. The fact 767 // that the cursor has advanced will be reflected in 768 // the next iteration of the loop 769 continue; 770 } 771 Operation::Write(res) => { 772 // If the previous write was successful, continue. 773 // Otherwise, error. 774 res?; 775 continue; 776 } 777 Operation::Seek(_) => { 778 // Ignore the seek 779 continue; 780 } 781 } 782 } 783 } 784 } 785 } 786 poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<Result<usize, io::Error>>787 fn poll_write_vectored( 788 self: Pin<&mut Self>, 789 cx: &mut Context<'_>, 790 bufs: &[io::IoSlice<'_>], 791 ) -> Poll<Result<usize, io::Error>> { 792 ready!(crate::trace::trace_leaf(cx)); 793 let me = self.get_mut(); 794 let inner = me.inner.get_mut(); 795 796 if let Some(e) = inner.last_write_err.take() { 797 return Poll::Ready(Err(e.into())); 798 } 799 800 loop { 801 match inner.state { 802 State::Idle(ref mut buf_cell) => { 803 let mut buf = buf_cell.take().unwrap(); 804 805 let seek = if !buf.is_empty() { 806 Some(SeekFrom::Current(buf.discard_read())) 807 } else { 808 None 809 }; 810 811 let n = buf.copy_from_bufs(bufs, me.max_buf_size); 812 let std = me.std.clone(); 813 814 let blocking_task_join_handle = spawn_mandatory_blocking(move || { 815 let res = if let Some(seek) = seek { 816 (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std)) 817 } else { 818 buf.write_to(&mut &*std) 819 }; 820 821 (Operation::Write(res), buf) 822 }) 823 .ok_or_else(|| { 824 io::Error::new(io::ErrorKind::Other, "background task failed") 825 })?; 826 827 inner.state = State::Busy(blocking_task_join_handle); 828 829 return Poll::Ready(Ok(n)); 830 } 831 State::Busy(ref mut rx) => { 832 let (op, buf) = ready!(Pin::new(rx).poll(cx))?; 833 inner.state = State::Idle(Some(buf)); 834 835 match op { 836 Operation::Read(_) => { 837 // We don't care about the result here. The fact 838 // that the cursor has advanced will be reflected in 839 // the next iteration of the loop 840 continue; 841 } 842 Operation::Write(res) => { 843 // If the previous write was successful, continue. 844 // Otherwise, error. 845 res?; 846 continue; 847 } 848 Operation::Seek(_) => { 849 // Ignore the seek 850 continue; 851 } 852 } 853 } 854 } 855 } 856 } 857 is_write_vectored(&self) -> bool858 fn is_write_vectored(&self) -> bool { 859 true 860 } 861 poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>862 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { 863 ready!(crate::trace::trace_leaf(cx)); 864 let inner = self.inner.get_mut(); 865 inner.poll_flush(cx) 866 } 867 poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>868 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { 869 ready!(crate::trace::trace_leaf(cx)); 870 self.poll_flush(cx) 871 } 872 } 873 874 impl From<StdFile> for File { from(std: StdFile) -> Self875 fn from(std: StdFile) -> Self { 876 Self::from_std(std) 877 } 878 } 879 880 impl fmt::Debug for File { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result881 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 882 fmt.debug_struct("tokio::fs::File") 883 .field("std", &self.std) 884 .finish() 885 } 886 } 887 888 #[cfg(unix)] 889 impl std::os::unix::io::AsRawFd for File { as_raw_fd(&self) -> std::os::unix::io::RawFd890 fn as_raw_fd(&self) -> std::os::unix::io::RawFd { 891 self.std.as_raw_fd() 892 } 893 } 894 895 #[cfg(unix)] 896 impl std::os::unix::io::AsFd for File { as_fd(&self) -> std::os::unix::io::BorrowedFd<'_>897 fn as_fd(&self) -> std::os::unix::io::BorrowedFd<'_> { 898 unsafe { 899 std::os::unix::io::BorrowedFd::borrow_raw(std::os::unix::io::AsRawFd::as_raw_fd(self)) 900 } 901 } 902 } 903 904 #[cfg(unix)] 905 impl std::os::unix::io::FromRawFd for File { from_raw_fd(fd: std::os::unix::io::RawFd) -> Self906 unsafe fn from_raw_fd(fd: std::os::unix::io::RawFd) -> Self { 907 StdFile::from_raw_fd(fd).into() 908 } 909 } 910 911 cfg_windows! { 912 use crate::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle, AsHandle, BorrowedHandle}; 913 914 impl AsRawHandle for File { 915 fn as_raw_handle(&self) -> RawHandle { 916 self.std.as_raw_handle() 917 } 918 } 919 920 impl AsHandle for File { 921 fn as_handle(&self) -> BorrowedHandle<'_> { 922 unsafe { 923 BorrowedHandle::borrow_raw( 924 AsRawHandle::as_raw_handle(self), 925 ) 926 } 927 } 928 } 929 930 impl FromRawHandle for File { 931 unsafe fn from_raw_handle(handle: RawHandle) -> Self { 932 StdFile::from_raw_handle(handle).into() 933 } 934 } 935 } 936 937 impl Inner { complete_inflight(&mut self)938 async fn complete_inflight(&mut self) { 939 use std::future::poll_fn; 940 941 poll_fn(|cx| self.poll_complete_inflight(cx)).await; 942 } 943 poll_complete_inflight(&mut self, cx: &mut Context<'_>) -> Poll<()>944 fn poll_complete_inflight(&mut self, cx: &mut Context<'_>) -> Poll<()> { 945 ready!(crate::trace::trace_leaf(cx)); 946 match self.poll_flush(cx) { 947 Poll::Ready(Err(e)) => { 948 self.last_write_err = Some(e.kind()); 949 Poll::Ready(()) 950 } 951 Poll::Ready(Ok(())) => Poll::Ready(()), 952 Poll::Pending => Poll::Pending, 953 } 954 } 955 poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>956 fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { 957 if let Some(e) = self.last_write_err.take() { 958 return Poll::Ready(Err(e.into())); 959 } 960 961 let (op, buf) = match self.state { 962 State::Idle(_) => return Poll::Ready(Ok(())), 963 State::Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx))?, 964 }; 965 966 // The buffer is not used here 967 self.state = State::Idle(Some(buf)); 968 969 match op { 970 Operation::Read(_) => Poll::Ready(Ok(())), 971 Operation::Write(res) => Poll::Ready(res), 972 Operation::Seek(_) => Poll::Ready(Ok(())), 973 } 974 } 975 } 976 977 #[cfg(test)] 978 mod tests; 979