1 //! Types for working with [`File`].
2 //!
3 //! [`File`]: File
4 
5 use crate::fs::{asyncify, OpenOptions};
6 use crate::io::blocking::{Buf, DEFAULT_MAX_BUF_SIZE};
7 use crate::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
8 use crate::sync::Mutex;
9 
10 use std::fmt;
11 use std::fs::{Metadata, Permissions};
12 use std::future::Future;
13 use std::io::{self, Seek, SeekFrom};
14 use std::path::Path;
15 use std::pin::Pin;
16 use std::sync::Arc;
17 use std::task::{ready, Context, Poll};
18 
19 #[cfg(test)]
20 use super::mocks::JoinHandle;
21 #[cfg(test)]
22 use super::mocks::MockFile as StdFile;
23 #[cfg(test)]
24 use super::mocks::{spawn_blocking, spawn_mandatory_blocking};
25 #[cfg(not(test))]
26 use crate::blocking::JoinHandle;
27 #[cfg(not(test))]
28 use crate::blocking::{spawn_blocking, spawn_mandatory_blocking};
29 #[cfg(not(test))]
30 use std::fs::File as StdFile;
31 
32 /// A reference to an open file on the filesystem.
33 ///
34 /// This is a specialized version of [`std::fs::File`] for usage from the
35 /// Tokio runtime.
36 ///
37 /// An instance of a `File` can be read and/or written depending on what options
38 /// it was opened with. Files also implement [`AsyncSeek`] to alter the logical
39 /// cursor that the file contains internally.
40 ///
41 /// A file will not be closed immediately when it goes out of scope if there
42 /// are any IO operations that have not yet completed. To ensure that a file is
43 /// closed immediately when it is dropped, you should call [`flush`] before
44 /// dropping it. Note that this does not ensure that the file has been fully
45 /// written to disk; the operating system might keep the changes around in an
46 /// in-memory buffer. See the [`sync_all`] method for telling the OS to write
47 /// the data to disk.
48 ///
49 /// Reading and writing to a `File` is usually done using the convenience
50 /// methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] traits.
51 ///
52 /// [`AsyncSeek`]: trait@crate::io::AsyncSeek
53 /// [`flush`]: fn@crate::io::AsyncWriteExt::flush
54 /// [`sync_all`]: fn@crate::fs::File::sync_all
55 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
56 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
57 ///
58 /// # Examples
59 ///
60 /// Create a new file and asynchronously write bytes to it:
61 ///
62 /// ```no_run
63 /// use tokio::fs::File;
64 /// use tokio::io::AsyncWriteExt; // for write_all()
65 ///
66 /// # async fn dox() -> std::io::Result<()> {
67 /// let mut file = File::create("foo.txt").await?;
68 /// file.write_all(b"hello, world!").await?;
69 /// # Ok(())
70 /// # }
71 /// ```
72 ///
73 /// Read the contents of a file into a buffer:
74 ///
75 /// ```no_run
76 /// use tokio::fs::File;
77 /// use tokio::io::AsyncReadExt; // for read_to_end()
78 ///
79 /// # async fn dox() -> std::io::Result<()> {
80 /// let mut file = File::open("foo.txt").await?;
81 ///
82 /// let mut contents = vec![];
83 /// file.read_to_end(&mut contents).await?;
84 ///
85 /// println!("len = {}", contents.len());
86 /// # Ok(())
87 /// # }
88 /// ```
89 pub struct File {
90     std: Arc<StdFile>,
91     inner: Mutex<Inner>,
92     max_buf_size: usize,
93 }
94 
95 struct Inner {
96     state: State,
97 
98     /// Errors from writes/flushes are returned in write/flush calls. If a write
99     /// error is observed while performing a read, it is saved until the next
100     /// write / flush call.
101     last_write_err: Option<io::ErrorKind>,
102 
103     pos: u64,
104 }
105 
106 #[derive(Debug)]
107 enum State {
108     Idle(Option<Buf>),
109     Busy(JoinHandle<(Operation, Buf)>),
110 }
111 
112 #[derive(Debug)]
113 enum Operation {
114     Read(io::Result<usize>),
115     Write(io::Result<()>),
116     Seek(io::Result<u64>),
117 }
118 
119 impl File {
120     /// Attempts to open a file in read-only mode.
121     ///
122     /// See [`OpenOptions`] for more details.
123     ///
124     /// # Errors
125     ///
126     /// This function will return an error if called from outside of the Tokio
127     /// runtime or if path does not already exist. Other errors may also be
128     /// returned according to `OpenOptions::open`.
129     ///
130     /// # Examples
131     ///
132     /// ```no_run
133     /// use tokio::fs::File;
134     /// use tokio::io::AsyncReadExt;
135     ///
136     /// # async fn dox() -> std::io::Result<()> {
137     /// let mut file = File::open("foo.txt").await?;
138     ///
139     /// let mut contents = vec![];
140     /// file.read_to_end(&mut contents).await?;
141     ///
142     /// println!("len = {}", contents.len());
143     /// # Ok(())
144     /// # }
145     /// ```
146     ///
147     /// The [`read_to_end`] method is defined on the [`AsyncReadExt`] trait.
148     ///
149     /// [`read_to_end`]: fn@crate::io::AsyncReadExt::read_to_end
150     /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
open(path: impl AsRef<Path>) -> io::Result<File>151     pub async fn open(path: impl AsRef<Path>) -> io::Result<File> {
152         let path = path.as_ref().to_owned();
153         let std = asyncify(|| StdFile::open(path)).await?;
154 
155         Ok(File::from_std(std))
156     }
157 
158     /// Opens a file in write-only mode.
159     ///
160     /// This function will create a file if it does not exist, and will truncate
161     /// it if it does.
162     ///
163     /// See [`OpenOptions`] for more details.
164     ///
165     /// # Errors
166     ///
167     /// Results in an error if called from outside of the Tokio runtime or if
168     /// the underlying [`create`] call results in an error.
169     ///
170     /// [`create`]: std::fs::File::create
171     ///
172     /// # Examples
173     ///
174     /// ```no_run
175     /// use tokio::fs::File;
176     /// use tokio::io::AsyncWriteExt;
177     ///
178     /// # async fn dox() -> std::io::Result<()> {
179     /// let mut file = File::create("foo.txt").await?;
180     /// file.write_all(b"hello, world!").await?;
181     /// # Ok(())
182     /// # }
183     /// ```
184     ///
185     /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
186     ///
187     /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
188     /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
create(path: impl AsRef<Path>) -> io::Result<File>189     pub async fn create(path: impl AsRef<Path>) -> io::Result<File> {
190         let path = path.as_ref().to_owned();
191         let std_file = asyncify(move || StdFile::create(path)).await?;
192         Ok(File::from_std(std_file))
193     }
194 
195     /// Opens a file in read-write mode.
196     ///
197     /// This function will create a file if it does not exist, or return an error
198     /// if it does. This way, if the call succeeds, the file returned is guaranteed
199     /// to be new.
200     ///
201     /// This option is useful because it is atomic. Otherwise between checking
202     /// whether a file exists and creating a new one, the file may have been
203     /// created by another process (a TOCTOU race condition / attack).
204     ///
205     /// This can also be written using `File::options().read(true).write(true).create_new(true).open(...)`.
206     ///
207     /// See [`OpenOptions`] for more details.
208     ///
209     /// # Examples
210     ///
211     /// ```no_run
212     /// use tokio::fs::File;
213     /// use tokio::io::AsyncWriteExt;
214     ///
215     /// # async fn dox() -> std::io::Result<()> {
216     /// let mut file = File::create_new("foo.txt").await?;
217     /// file.write_all(b"hello, world!").await?;
218     /// # Ok(())
219     /// # }
220     /// ```
221     ///
222     /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
223     ///
224     /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
225     /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
create_new<P: AsRef<Path>>(path: P) -> std::io::Result<File>226     pub async fn create_new<P: AsRef<Path>>(path: P) -> std::io::Result<File> {
227         Self::options()
228             .read(true)
229             .write(true)
230             .create_new(true)
231             .open(path)
232             .await
233     }
234 
235     /// Returns a new [`OpenOptions`] object.
236     ///
237     /// This function returns a new `OpenOptions` object that you can use to
238     /// open or create a file with specific options if `open()` or `create()`
239     /// are not appropriate.
240     ///
241     /// It is equivalent to `OpenOptions::new()`, but allows you to write more
242     /// readable code. Instead of
243     /// `OpenOptions::new().append(true).open("example.log")`,
244     /// you can write `File::options().append(true).open("example.log")`. This
245     /// also avoids the need to import `OpenOptions`.
246     ///
247     /// See the [`OpenOptions::new`] function for more details.
248     ///
249     /// # Examples
250     ///
251     /// ```no_run
252     /// use tokio::fs::File;
253     /// use tokio::io::AsyncWriteExt;
254     ///
255     /// # async fn dox() -> std::io::Result<()> {
256     /// let mut f = File::options().append(true).open("example.log").await?;
257     /// f.write_all(b"new line\n").await?;
258     /// # Ok(())
259     /// # }
260     /// ```
261     #[must_use]
options() -> OpenOptions262     pub fn options() -> OpenOptions {
263         OpenOptions::new()
264     }
265 
266     /// Converts a [`std::fs::File`] to a [`tokio::fs::File`](File).
267     ///
268     /// # Examples
269     ///
270     /// ```no_run
271     /// // This line could block. It is not recommended to do this on the Tokio
272     /// // runtime.
273     /// let std_file = std::fs::File::open("foo.txt").unwrap();
274     /// let file = tokio::fs::File::from_std(std_file);
275     /// ```
from_std(std: StdFile) -> File276     pub fn from_std(std: StdFile) -> File {
277         File {
278             std: Arc::new(std),
279             inner: Mutex::new(Inner {
280                 state: State::Idle(Some(Buf::with_capacity(0))),
281                 last_write_err: None,
282                 pos: 0,
283             }),
284             max_buf_size: DEFAULT_MAX_BUF_SIZE,
285         }
286     }
287 
288     /// Attempts to sync all OS-internal metadata to disk.
289     ///
290     /// This function will attempt to ensure that all in-core data reaches the
291     /// filesystem before returning.
292     ///
293     /// # Examples
294     ///
295     /// ```no_run
296     /// use tokio::fs::File;
297     /// use tokio::io::AsyncWriteExt;
298     ///
299     /// # async fn dox() -> std::io::Result<()> {
300     /// let mut file = File::create("foo.txt").await?;
301     /// file.write_all(b"hello, world!").await?;
302     /// file.sync_all().await?;
303     /// # Ok(())
304     /// # }
305     /// ```
306     ///
307     /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
308     ///
309     /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
310     /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
sync_all(&self) -> io::Result<()>311     pub async fn sync_all(&self) -> io::Result<()> {
312         let mut inner = self.inner.lock().await;
313         inner.complete_inflight().await;
314 
315         let std = self.std.clone();
316         asyncify(move || std.sync_all()).await
317     }
318 
319     /// This function is similar to `sync_all`, except that it may not
320     /// synchronize file metadata to the filesystem.
321     ///
322     /// This is intended for use cases that must synchronize content, but don't
323     /// need the metadata on disk. The goal of this method is to reduce disk
324     /// operations.
325     ///
326     /// Note that some platforms may simply implement this in terms of `sync_all`.
327     ///
328     /// # Examples
329     ///
330     /// ```no_run
331     /// use tokio::fs::File;
332     /// use tokio::io::AsyncWriteExt;
333     ///
334     /// # async fn dox() -> std::io::Result<()> {
335     /// let mut file = File::create("foo.txt").await?;
336     /// file.write_all(b"hello, world!").await?;
337     /// file.sync_data().await?;
338     /// # Ok(())
339     /// # }
340     /// ```
341     ///
342     /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
343     ///
344     /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
345     /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
sync_data(&self) -> io::Result<()>346     pub async fn sync_data(&self) -> io::Result<()> {
347         let mut inner = self.inner.lock().await;
348         inner.complete_inflight().await;
349 
350         let std = self.std.clone();
351         asyncify(move || std.sync_data()).await
352     }
353 
354     /// Truncates or extends the underlying file, updating the size of this file to become size.
355     ///
356     /// If the size is less than the current file's size, then the file will be
357     /// shrunk. If it is greater than the current file's size, then the file
358     /// will be extended to size and have all of the intermediate data filled in
359     /// with 0s.
360     ///
361     /// # Errors
362     ///
363     /// This function will return an error if the file is not opened for
364     /// writing.
365     ///
366     /// # Examples
367     ///
368     /// ```no_run
369     /// use tokio::fs::File;
370     /// use tokio::io::AsyncWriteExt;
371     ///
372     /// # async fn dox() -> std::io::Result<()> {
373     /// let mut file = File::create("foo.txt").await?;
374     /// file.write_all(b"hello, world!").await?;
375     /// file.set_len(10).await?;
376     /// # Ok(())
377     /// # }
378     /// ```
379     ///
380     /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
381     ///
382     /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
383     /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
set_len(&self, size: u64) -> io::Result<()>384     pub async fn set_len(&self, size: u64) -> io::Result<()> {
385         let mut inner = self.inner.lock().await;
386         inner.complete_inflight().await;
387 
388         let mut buf = match inner.state {
389             State::Idle(ref mut buf_cell) => buf_cell.take().unwrap(),
390             _ => unreachable!(),
391         };
392 
393         let seek = if !buf.is_empty() {
394             Some(SeekFrom::Current(buf.discard_read()))
395         } else {
396             None
397         };
398 
399         let std = self.std.clone();
400 
401         inner.state = State::Busy(spawn_blocking(move || {
402             let res = if let Some(seek) = seek {
403                 (&*std).seek(seek).and_then(|_| std.set_len(size))
404             } else {
405                 std.set_len(size)
406             }
407             .map(|()| 0); // the value is discarded later
408 
409             // Return the result as a seek
410             (Operation::Seek(res), buf)
411         }));
412 
413         let (op, buf) = match inner.state {
414             State::Idle(_) => unreachable!(),
415             State::Busy(ref mut rx) => rx.await?,
416         };
417 
418         inner.state = State::Idle(Some(buf));
419 
420         match op {
421             Operation::Seek(res) => res.map(|pos| {
422                 inner.pos = pos;
423             }),
424             _ => unreachable!(),
425         }
426     }
427 
428     /// Queries metadata about the underlying file.
429     ///
430     /// # Examples
431     ///
432     /// ```no_run
433     /// use tokio::fs::File;
434     ///
435     /// # async fn dox() -> std::io::Result<()> {
436     /// let file = File::open("foo.txt").await?;
437     /// let metadata = file.metadata().await?;
438     ///
439     /// println!("{:?}", metadata);
440     /// # Ok(())
441     /// # }
442     /// ```
metadata(&self) -> io::Result<Metadata>443     pub async fn metadata(&self) -> io::Result<Metadata> {
444         let std = self.std.clone();
445         asyncify(move || std.metadata()).await
446     }
447 
448     /// Creates a new `File` instance that shares the same underlying file handle
449     /// as the existing `File` instance. Reads, writes, and seeks will affect both
450     /// File instances simultaneously.
451     ///
452     /// # Examples
453     ///
454     /// ```no_run
455     /// use tokio::fs::File;
456     ///
457     /// # async fn dox() -> std::io::Result<()> {
458     /// let file = File::open("foo.txt").await?;
459     /// let file_clone = file.try_clone().await?;
460     /// # Ok(())
461     /// # }
462     /// ```
try_clone(&self) -> io::Result<File>463     pub async fn try_clone(&self) -> io::Result<File> {
464         self.inner.lock().await.complete_inflight().await;
465         let std = self.std.clone();
466         let std_file = asyncify(move || std.try_clone()).await?;
467         Ok(File::from_std(std_file))
468     }
469 
470     /// Destructures `File` into a [`std::fs::File`]. This function is
471     /// async to allow any in-flight operations to complete.
472     ///
473     /// Use `File::try_into_std` to attempt conversion immediately.
474     ///
475     /// # Examples
476     ///
477     /// ```no_run
478     /// use tokio::fs::File;
479     ///
480     /// # async fn dox() -> std::io::Result<()> {
481     /// let tokio_file = File::open("foo.txt").await?;
482     /// let std_file = tokio_file.into_std().await;
483     /// # Ok(())
484     /// # }
485     /// ```
into_std(mut self) -> StdFile486     pub async fn into_std(mut self) -> StdFile {
487         self.inner.get_mut().complete_inflight().await;
488         Arc::try_unwrap(self.std).expect("Arc::try_unwrap failed")
489     }
490 
491     /// Tries to immediately destructure `File` into a [`std::fs::File`].
492     ///
493     /// # Errors
494     ///
495     /// This function will return an error containing the file if some
496     /// operation is in-flight.
497     ///
498     /// # Examples
499     ///
500     /// ```no_run
501     /// use tokio::fs::File;
502     ///
503     /// # async fn dox() -> std::io::Result<()> {
504     /// let tokio_file = File::open("foo.txt").await?;
505     /// let std_file = tokio_file.try_into_std().unwrap();
506     /// # Ok(())
507     /// # }
508     /// ```
try_into_std(mut self) -> Result<StdFile, Self>509     pub fn try_into_std(mut self) -> Result<StdFile, Self> {
510         match Arc::try_unwrap(self.std) {
511             Ok(file) => Ok(file),
512             Err(std_file_arc) => {
513                 self.std = std_file_arc;
514                 Err(self)
515             }
516         }
517     }
518 
519     /// Changes the permissions on the underlying file.
520     ///
521     /// # Platform-specific behavior
522     ///
523     /// This function currently corresponds to the `fchmod` function on Unix and
524     /// the `SetFileInformationByHandle` function on Windows. Note that, this
525     /// [may change in the future][changes].
526     ///
527     /// [changes]: https://doc.rust-lang.org/std/io/index.html#platform-specific-behavior
528     ///
529     /// # Errors
530     ///
531     /// This function will return an error if the user lacks permission change
532     /// attributes on the underlying file. It may also return an error in other
533     /// os-specific unspecified cases.
534     ///
535     /// # Examples
536     ///
537     /// ```no_run
538     /// use tokio::fs::File;
539     ///
540     /// # async fn dox() -> std::io::Result<()> {
541     /// let file = File::open("foo.txt").await?;
542     /// let mut perms = file.metadata().await?.permissions();
543     /// perms.set_readonly(true);
544     /// file.set_permissions(perms).await?;
545     /// # Ok(())
546     /// # }
547     /// ```
set_permissions(&self, perm: Permissions) -> io::Result<()>548     pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> {
549         let std = self.std.clone();
550         asyncify(move || std.set_permissions(perm)).await
551     }
552 
553     /// Set the maximum buffer size for the underlying [`AsyncRead`] / [`AsyncWrite`] operation.
554     ///
555     /// Although Tokio uses a sensible default value for this buffer size, this function would be
556     /// useful for changing that default depending on the situation.
557     ///
558     /// # Examples
559     ///
560     /// ```no_run
561     /// use tokio::fs::File;
562     /// use tokio::io::AsyncWriteExt;
563     ///
564     /// # async fn dox() -> std::io::Result<()> {
565     /// let mut file = File::open("foo.txt").await?;
566     ///
567     /// // Set maximum buffer size to 8 MiB
568     /// file.set_max_buf_size(8 * 1024 * 1024);
569     ///
570     /// let mut buf = vec![1; 1024 * 1024 * 1024];
571     ///
572     /// // Write the 1 GiB buffer in chunks up to 8 MiB each.
573     /// file.write_all(&mut buf).await?;
574     /// # Ok(())
575     /// # }
576     /// ```
set_max_buf_size(&mut self, max_buf_size: usize)577     pub fn set_max_buf_size(&mut self, max_buf_size: usize) {
578         self.max_buf_size = max_buf_size;
579     }
580 }
581 
582 impl AsyncRead for File {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, dst: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>583     fn poll_read(
584         self: Pin<&mut Self>,
585         cx: &mut Context<'_>,
586         dst: &mut ReadBuf<'_>,
587     ) -> Poll<io::Result<()>> {
588         ready!(crate::trace::trace_leaf(cx));
589         let me = self.get_mut();
590         let inner = me.inner.get_mut();
591 
592         loop {
593             match inner.state {
594                 State::Idle(ref mut buf_cell) => {
595                     let mut buf = buf_cell.take().unwrap();
596 
597                     if !buf.is_empty() {
598                         buf.copy_to(dst);
599                         *buf_cell = Some(buf);
600                         return Poll::Ready(Ok(()));
601                     }
602 
603                     buf.ensure_capacity_for(dst, me.max_buf_size);
604                     let std = me.std.clone();
605 
606                     inner.state = State::Busy(spawn_blocking(move || {
607                         let res = buf.read_from(&mut &*std);
608                         (Operation::Read(res), buf)
609                     }));
610                 }
611                 State::Busy(ref mut rx) => {
612                     let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?;
613 
614                     match op {
615                         Operation::Read(Ok(_)) => {
616                             buf.copy_to(dst);
617                             inner.state = State::Idle(Some(buf));
618                             return Poll::Ready(Ok(()));
619                         }
620                         Operation::Read(Err(e)) => {
621                             assert!(buf.is_empty());
622 
623                             inner.state = State::Idle(Some(buf));
624                             return Poll::Ready(Err(e));
625                         }
626                         Operation::Write(Ok(())) => {
627                             assert!(buf.is_empty());
628                             inner.state = State::Idle(Some(buf));
629                             continue;
630                         }
631                         Operation::Write(Err(e)) => {
632                             assert!(inner.last_write_err.is_none());
633                             inner.last_write_err = Some(e.kind());
634                             inner.state = State::Idle(Some(buf));
635                         }
636                         Operation::Seek(result) => {
637                             assert!(buf.is_empty());
638                             inner.state = State::Idle(Some(buf));
639                             if let Ok(pos) = result {
640                                 inner.pos = pos;
641                             }
642                             continue;
643                         }
644                     }
645                 }
646             }
647         }
648     }
649 }
650 
651 impl AsyncSeek for File {
start_seek(self: Pin<&mut Self>, mut pos: SeekFrom) -> io::Result<()>652     fn start_seek(self: Pin<&mut Self>, mut pos: SeekFrom) -> io::Result<()> {
653         let me = self.get_mut();
654         let inner = me.inner.get_mut();
655 
656         match inner.state {
657             State::Busy(_) => Err(io::Error::new(
658                 io::ErrorKind::Other,
659                 "other file operation is pending, call poll_complete before start_seek",
660             )),
661             State::Idle(ref mut buf_cell) => {
662                 let mut buf = buf_cell.take().unwrap();
663 
664                 // Factor in any unread data from the buf
665                 if !buf.is_empty() {
666                     let n = buf.discard_read();
667 
668                     if let SeekFrom::Current(ref mut offset) = pos {
669                         *offset += n;
670                     }
671                 }
672 
673                 let std = me.std.clone();
674 
675                 inner.state = State::Busy(spawn_blocking(move || {
676                     let res = (&*std).seek(pos);
677                     (Operation::Seek(res), buf)
678                 }));
679                 Ok(())
680             }
681         }
682     }
683 
poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>>684     fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
685         ready!(crate::trace::trace_leaf(cx));
686         let inner = self.inner.get_mut();
687 
688         loop {
689             match inner.state {
690                 State::Idle(_) => return Poll::Ready(Ok(inner.pos)),
691                 State::Busy(ref mut rx) => {
692                     let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
693                     inner.state = State::Idle(Some(buf));
694 
695                     match op {
696                         Operation::Read(_) => {}
697                         Operation::Write(Err(e)) => {
698                             assert!(inner.last_write_err.is_none());
699                             inner.last_write_err = Some(e.kind());
700                         }
701                         Operation::Write(_) => {}
702                         Operation::Seek(res) => {
703                             if let Ok(pos) = res {
704                                 inner.pos = pos;
705                             }
706                             return Poll::Ready(res);
707                         }
708                     }
709                 }
710             }
711         }
712     }
713 }
714 
715 impl AsyncWrite for File {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, src: &[u8], ) -> Poll<io::Result<usize>>716     fn poll_write(
717         self: Pin<&mut Self>,
718         cx: &mut Context<'_>,
719         src: &[u8],
720     ) -> Poll<io::Result<usize>> {
721         ready!(crate::trace::trace_leaf(cx));
722         let me = self.get_mut();
723         let inner = me.inner.get_mut();
724 
725         if let Some(e) = inner.last_write_err.take() {
726             return Poll::Ready(Err(e.into()));
727         }
728 
729         loop {
730             match inner.state {
731                 State::Idle(ref mut buf_cell) => {
732                     let mut buf = buf_cell.take().unwrap();
733 
734                     let seek = if !buf.is_empty() {
735                         Some(SeekFrom::Current(buf.discard_read()))
736                     } else {
737                         None
738                     };
739 
740                     let n = buf.copy_from(src, me.max_buf_size);
741                     let std = me.std.clone();
742 
743                     let blocking_task_join_handle = spawn_mandatory_blocking(move || {
744                         let res = if let Some(seek) = seek {
745                             (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
746                         } else {
747                             buf.write_to(&mut &*std)
748                         };
749 
750                         (Operation::Write(res), buf)
751                     })
752                     .ok_or_else(|| {
753                         io::Error::new(io::ErrorKind::Other, "background task failed")
754                     })?;
755 
756                     inner.state = State::Busy(blocking_task_join_handle);
757 
758                     return Poll::Ready(Ok(n));
759                 }
760                 State::Busy(ref mut rx) => {
761                     let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
762                     inner.state = State::Idle(Some(buf));
763 
764                     match op {
765                         Operation::Read(_) => {
766                             // We don't care about the result here. The fact
767                             // that the cursor has advanced will be reflected in
768                             // the next iteration of the loop
769                             continue;
770                         }
771                         Operation::Write(res) => {
772                             // If the previous write was successful, continue.
773                             // Otherwise, error.
774                             res?;
775                             continue;
776                         }
777                         Operation::Seek(_) => {
778                             // Ignore the seek
779                             continue;
780                         }
781                     }
782                 }
783             }
784         }
785     }
786 
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<Result<usize, io::Error>>787     fn poll_write_vectored(
788         self: Pin<&mut Self>,
789         cx: &mut Context<'_>,
790         bufs: &[io::IoSlice<'_>],
791     ) -> Poll<Result<usize, io::Error>> {
792         ready!(crate::trace::trace_leaf(cx));
793         let me = self.get_mut();
794         let inner = me.inner.get_mut();
795 
796         if let Some(e) = inner.last_write_err.take() {
797             return Poll::Ready(Err(e.into()));
798         }
799 
800         loop {
801             match inner.state {
802                 State::Idle(ref mut buf_cell) => {
803                     let mut buf = buf_cell.take().unwrap();
804 
805                     let seek = if !buf.is_empty() {
806                         Some(SeekFrom::Current(buf.discard_read()))
807                     } else {
808                         None
809                     };
810 
811                     let n = buf.copy_from_bufs(bufs, me.max_buf_size);
812                     let std = me.std.clone();
813 
814                     let blocking_task_join_handle = spawn_mandatory_blocking(move || {
815                         let res = if let Some(seek) = seek {
816                             (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
817                         } else {
818                             buf.write_to(&mut &*std)
819                         };
820 
821                         (Operation::Write(res), buf)
822                     })
823                     .ok_or_else(|| {
824                         io::Error::new(io::ErrorKind::Other, "background task failed")
825                     })?;
826 
827                     inner.state = State::Busy(blocking_task_join_handle);
828 
829                     return Poll::Ready(Ok(n));
830                 }
831                 State::Busy(ref mut rx) => {
832                     let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
833                     inner.state = State::Idle(Some(buf));
834 
835                     match op {
836                         Operation::Read(_) => {
837                             // We don't care about the result here. The fact
838                             // that the cursor has advanced will be reflected in
839                             // the next iteration of the loop
840                             continue;
841                         }
842                         Operation::Write(res) => {
843                             // If the previous write was successful, continue.
844                             // Otherwise, error.
845                             res?;
846                             continue;
847                         }
848                         Operation::Seek(_) => {
849                             // Ignore the seek
850                             continue;
851                         }
852                     }
853                 }
854             }
855         }
856     }
857 
is_write_vectored(&self) -> bool858     fn is_write_vectored(&self) -> bool {
859         true
860     }
861 
poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>862     fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
863         ready!(crate::trace::trace_leaf(cx));
864         let inner = self.inner.get_mut();
865         inner.poll_flush(cx)
866     }
867 
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>868     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
869         ready!(crate::trace::trace_leaf(cx));
870         self.poll_flush(cx)
871     }
872 }
873 
874 impl From<StdFile> for File {
from(std: StdFile) -> Self875     fn from(std: StdFile) -> Self {
876         Self::from_std(std)
877     }
878 }
879 
880 impl fmt::Debug for File {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result881     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
882         fmt.debug_struct("tokio::fs::File")
883             .field("std", &self.std)
884             .finish()
885     }
886 }
887 
888 #[cfg(unix)]
889 impl std::os::unix::io::AsRawFd for File {
as_raw_fd(&self) -> std::os::unix::io::RawFd890     fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
891         self.std.as_raw_fd()
892     }
893 }
894 
895 #[cfg(unix)]
896 impl std::os::unix::io::AsFd for File {
as_fd(&self) -> std::os::unix::io::BorrowedFd<'_>897     fn as_fd(&self) -> std::os::unix::io::BorrowedFd<'_> {
898         unsafe {
899             std::os::unix::io::BorrowedFd::borrow_raw(std::os::unix::io::AsRawFd::as_raw_fd(self))
900         }
901     }
902 }
903 
904 #[cfg(unix)]
905 impl std::os::unix::io::FromRawFd for File {
from_raw_fd(fd: std::os::unix::io::RawFd) -> Self906     unsafe fn from_raw_fd(fd: std::os::unix::io::RawFd) -> Self {
907         StdFile::from_raw_fd(fd).into()
908     }
909 }
910 
911 cfg_windows! {
912     use crate::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle, AsHandle, BorrowedHandle};
913 
914     impl AsRawHandle for File {
915         fn as_raw_handle(&self) -> RawHandle {
916             self.std.as_raw_handle()
917         }
918     }
919 
920     impl AsHandle for File {
921         fn as_handle(&self) -> BorrowedHandle<'_> {
922             unsafe {
923                 BorrowedHandle::borrow_raw(
924                     AsRawHandle::as_raw_handle(self),
925                 )
926             }
927         }
928     }
929 
930     impl FromRawHandle for File {
931         unsafe fn from_raw_handle(handle: RawHandle) -> Self {
932             StdFile::from_raw_handle(handle).into()
933         }
934     }
935 }
936 
937 impl Inner {
complete_inflight(&mut self)938     async fn complete_inflight(&mut self) {
939         use std::future::poll_fn;
940 
941         poll_fn(|cx| self.poll_complete_inflight(cx)).await;
942     }
943 
poll_complete_inflight(&mut self, cx: &mut Context<'_>) -> Poll<()>944     fn poll_complete_inflight(&mut self, cx: &mut Context<'_>) -> Poll<()> {
945         ready!(crate::trace::trace_leaf(cx));
946         match self.poll_flush(cx) {
947             Poll::Ready(Err(e)) => {
948                 self.last_write_err = Some(e.kind());
949                 Poll::Ready(())
950             }
951             Poll::Ready(Ok(())) => Poll::Ready(()),
952             Poll::Pending => Poll::Pending,
953         }
954     }
955 
poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>956     fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
957         if let Some(e) = self.last_write_err.take() {
958             return Poll::Ready(Err(e.into()));
959         }
960 
961         let (op, buf) = match self.state {
962             State::Idle(_) => return Poll::Ready(Ok(())),
963             State::Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx))?,
964         };
965 
966         // The buffer is not used here
967         self.state = State::Idle(Some(buf));
968 
969         match op {
970             Operation::Read(_) => Poll::Ready(Ok(())),
971             Operation::Write(res) => Poll::Ready(res),
972             Operation::Seek(_) => Poll::Ready(Ok(())),
973         }
974     }
975 }
976 
977 #[cfg(test)]
978 mod tests;
979