1 //! Asynchronous I/O.
2 //!
3 //! This module is the asynchronous version of `std::io`. It defines four
4 //! traits, [`AsyncRead`], [`AsyncWrite`], [`AsyncSeek`], and [`AsyncBufRead`],
5 //! which mirror the `Read`, `Write`, `Seek`, and `BufRead` traits of the
6 //! standard library. However, these traits integrate with the asynchronous
7 //! task system, so that if an I/O object isn't ready for reading (or writing),
8 //! the thread is not blocked, and instead the current task is queued to be
9 //! woken when I/O is ready.
10 //!
11 //! In addition, the [`AsyncReadExt`], [`AsyncWriteExt`], [`AsyncSeekExt`], and
12 //! [`AsyncBufReadExt`] extension traits offer a variety of useful combinators
13 //! for operating with asynchronous I/O objects, including ways to work with
14 //! them using futures, streams and sinks.
15 //!
16 //! This module is only available when the `std` feature of this
17 //! library is activated, and it is activated by default.
18 
19 #[cfg(feature = "io-compat")]
20 #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
21 use crate::compat::Compat;
22 use crate::future::assert_future;
23 use crate::stream::assert_stream;
24 use std::{pin::Pin, ptr, string::String, vec::Vec};
25 
26 // Re-export some types from `std::io` so that users don't have to deal
27 // with conflicts when `use`ing `futures::io` and `std::io`.
28 #[doc(no_inline)]
29 pub use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom};
30 
31 pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
32 
33 // used by `BufReader` and `BufWriter`
34 // https://github.com/rust-lang/rust/blob/master/src/libstd/sys_common/io.rs#L1
35 const DEFAULT_BUF_SIZE: usize = 8 * 1024;
36 
37 /// Initializes a buffer if necessary.
38 ///
39 /// A buffer is currently always initialized.
40 #[inline]
initialize<R: AsyncRead>(_reader: &R, buf: &mut [u8])41 unsafe fn initialize<R: AsyncRead>(_reader: &R, buf: &mut [u8]) {
42     unsafe { ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len()) }
43 }
44 
45 mod allow_std;
46 pub use self::allow_std::AllowStdIo;
47 
48 mod buf_reader;
49 pub use self::buf_reader::{BufReader, SeeKRelative};
50 
51 mod buf_writer;
52 pub use self::buf_writer::BufWriter;
53 
54 mod line_writer;
55 pub use self::line_writer::LineWriter;
56 
57 mod chain;
58 pub use self::chain::Chain;
59 
60 mod close;
61 pub use self::close::Close;
62 
63 mod copy;
64 pub use self::copy::{copy, Copy};
65 
66 mod copy_buf;
67 pub use self::copy_buf::{copy_buf, CopyBuf};
68 
69 mod copy_buf_abortable;
70 pub use self::copy_buf_abortable::{copy_buf_abortable, CopyBufAbortable};
71 
72 mod cursor;
73 pub use self::cursor::Cursor;
74 
75 mod empty;
76 pub use self::empty::{empty, Empty};
77 
78 mod fill_buf;
79 pub use self::fill_buf::FillBuf;
80 
81 mod flush;
82 pub use self::flush::Flush;
83 
84 #[cfg(feature = "sink")]
85 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
86 mod into_sink;
87 #[cfg(feature = "sink")]
88 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
89 pub use self::into_sink::IntoSink;
90 
91 mod lines;
92 pub use self::lines::Lines;
93 
94 mod read;
95 pub use self::read::Read;
96 
97 mod read_vectored;
98 pub use self::read_vectored::ReadVectored;
99 
100 mod read_exact;
101 pub use self::read_exact::ReadExact;
102 
103 mod read_line;
104 pub use self::read_line::ReadLine;
105 
106 mod read_to_end;
107 pub use self::read_to_end::ReadToEnd;
108 
109 mod read_to_string;
110 pub use self::read_to_string::ReadToString;
111 
112 mod read_until;
113 pub use self::read_until::ReadUntil;
114 
115 mod repeat;
116 pub use self::repeat::{repeat, Repeat};
117 
118 mod seek;
119 pub use self::seek::Seek;
120 
121 mod sink;
122 pub use self::sink::{sink, Sink};
123 
124 mod split;
125 pub use self::split::{ReadHalf, ReuniteError, WriteHalf};
126 
127 mod take;
128 pub use self::take::Take;
129 
130 mod window;
131 pub use self::window::Window;
132 
133 mod write;
134 pub use self::write::Write;
135 
136 mod write_vectored;
137 pub use self::write_vectored::WriteVectored;
138 
139 mod write_all;
140 pub use self::write_all::WriteAll;
141 
142 #[cfg(feature = "write-all-vectored")]
143 mod write_all_vectored;
144 #[cfg(feature = "write-all-vectored")]
145 pub use self::write_all_vectored::WriteAllVectored;
146 
147 /// An extension trait which adds utility methods to `AsyncRead` types.
148 pub trait AsyncReadExt: AsyncRead {
149     /// Creates an adaptor which will chain this stream with another.
150     ///
151     /// The returned `AsyncRead` instance will first read all bytes from this object
152     /// until EOF is encountered. Afterwards the output is equivalent to the
153     /// output of `next`.
154     ///
155     /// # Examples
156     ///
157     /// ```
158     /// # futures::executor::block_on(async {
159     /// use futures::io::{AsyncReadExt, Cursor};
160     ///
161     /// let reader1 = Cursor::new([1, 2, 3, 4]);
162     /// let reader2 = Cursor::new([5, 6, 7, 8]);
163     ///
164     /// let mut reader = reader1.chain(reader2);
165     /// let mut buffer = Vec::new();
166     ///
167     /// // read the value into a Vec.
168     /// reader.read_to_end(&mut buffer).await?;
169     /// assert_eq!(buffer, [1, 2, 3, 4, 5, 6, 7, 8]);
170     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
171     /// ```
chain<R>(self, next: R) -> Chain<Self, R> where Self: Sized, R: AsyncRead,172     fn chain<R>(self, next: R) -> Chain<Self, R>
173     where
174         Self: Sized,
175         R: AsyncRead,
176     {
177         assert_read(Chain::new(self, next))
178     }
179 
180     /// Tries to read some bytes directly into the given `buf` in asynchronous
181     /// manner, returning a future type.
182     ///
183     /// The returned future will resolve to the number of bytes read once the read
184     /// operation is completed.
185     ///
186     /// # Examples
187     ///
188     /// ```
189     /// # futures::executor::block_on(async {
190     /// use futures::io::{AsyncReadExt, Cursor};
191     ///
192     /// let mut reader = Cursor::new([1, 2, 3, 4]);
193     /// let mut output = [0u8; 5];
194     ///
195     /// let bytes = reader.read(&mut output[..]).await?;
196     ///
197     /// // This is only guaranteed to be 4 because `&[u8]` is a synchronous
198     /// // reader. In a real system you could get anywhere from 1 to
199     /// // `output.len()` bytes in a single read.
200     /// assert_eq!(bytes, 4);
201     /// assert_eq!(output, [1, 2, 3, 4, 0]);
202     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
203     /// ```
read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self> where Self: Unpin,204     fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self>
205     where
206         Self: Unpin,
207     {
208         assert_future::<Result<usize>, _>(Read::new(self, buf))
209     }
210 
211     /// Creates a future which will read from the `AsyncRead` into `bufs` using vectored
212     /// IO operations.
213     ///
214     /// The returned future will resolve to the number of bytes read once the read
215     /// operation is completed.
read_vectored<'a>(&'a mut self, bufs: &'a mut [IoSliceMut<'a>]) -> ReadVectored<'a, Self> where Self: Unpin,216     fn read_vectored<'a>(&'a mut self, bufs: &'a mut [IoSliceMut<'a>]) -> ReadVectored<'a, Self>
217     where
218         Self: Unpin,
219     {
220         assert_future::<Result<usize>, _>(ReadVectored::new(self, bufs))
221     }
222 
223     /// Creates a future which will read exactly enough bytes to fill `buf`,
224     /// returning an error if end of file (EOF) is hit sooner.
225     ///
226     /// The returned future will resolve once the read operation is completed.
227     ///
228     /// In the case of an error the buffer and the object will be discarded, with
229     /// the error yielded.
230     ///
231     /// # Examples
232     ///
233     /// ```
234     /// # futures::executor::block_on(async {
235     /// use futures::io::{AsyncReadExt, Cursor};
236     ///
237     /// let mut reader = Cursor::new([1, 2, 3, 4]);
238     /// let mut output = [0u8; 4];
239     ///
240     /// reader.read_exact(&mut output).await?;
241     ///
242     /// assert_eq!(output, [1, 2, 3, 4]);
243     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
244     /// ```
245     ///
246     /// ## EOF is hit before `buf` is filled
247     ///
248     /// ```
249     /// # futures::executor::block_on(async {
250     /// use futures::io::{self, AsyncReadExt, Cursor};
251     ///
252     /// let mut reader = Cursor::new([1, 2, 3, 4]);
253     /// let mut output = [0u8; 5];
254     ///
255     /// let result = reader.read_exact(&mut output).await;
256     ///
257     /// assert_eq!(result.unwrap_err().kind(), io::ErrorKind::UnexpectedEof);
258     /// # });
259     /// ```
read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self> where Self: Unpin,260     fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self>
261     where
262         Self: Unpin,
263     {
264         assert_future::<Result<()>, _>(ReadExact::new(self, buf))
265     }
266 
267     /// Creates a future which will read all the bytes from this `AsyncRead`.
268     ///
269     /// On success the total number of bytes read is returned.
270     ///
271     /// # Examples
272     ///
273     /// ```
274     /// # futures::executor::block_on(async {
275     /// use futures::io::{AsyncReadExt, Cursor};
276     ///
277     /// let mut reader = Cursor::new([1, 2, 3, 4]);
278     /// let mut output = Vec::with_capacity(4);
279     ///
280     /// let bytes = reader.read_to_end(&mut output).await?;
281     ///
282     /// assert_eq!(bytes, 4);
283     /// assert_eq!(output, vec![1, 2, 3, 4]);
284     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
285     /// ```
read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEnd<'a, Self> where Self: Unpin,286     fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEnd<'a, Self>
287     where
288         Self: Unpin,
289     {
290         assert_future::<Result<usize>, _>(ReadToEnd::new(self, buf))
291     }
292 
293     /// Creates a future which will read all the bytes from this `AsyncRead`.
294     ///
295     /// On success the total number of bytes read is returned.
296     ///
297     /// # Examples
298     ///
299     /// ```
300     /// # futures::executor::block_on(async {
301     /// use futures::io::{AsyncReadExt, Cursor};
302     ///
303     /// let mut reader = Cursor::new(&b"1234"[..]);
304     /// let mut buffer = String::with_capacity(4);
305     ///
306     /// let bytes = reader.read_to_string(&mut buffer).await?;
307     ///
308     /// assert_eq!(bytes, 4);
309     /// assert_eq!(buffer, String::from("1234"));
310     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
311     /// ```
read_to_string<'a>(&'a mut self, buf: &'a mut String) -> ReadToString<'a, Self> where Self: Unpin,312     fn read_to_string<'a>(&'a mut self, buf: &'a mut String) -> ReadToString<'a, Self>
313     where
314         Self: Unpin,
315     {
316         assert_future::<Result<usize>, _>(ReadToString::new(self, buf))
317     }
318 
319     /// Helper method for splitting this read/write object into two halves.
320     ///
321     /// The two halves returned implement the `AsyncRead` and `AsyncWrite`
322     /// traits, respectively.
323     ///
324     /// # Examples
325     ///
326     /// ```
327     /// # futures::executor::block_on(async {
328     /// use futures::io::{self, AsyncReadExt, Cursor};
329     ///
330     /// // Note that for `Cursor` the read and write halves share a single
331     /// // seek position. This may or may not be true for other types that
332     /// // implement both `AsyncRead` and `AsyncWrite`.
333     ///
334     /// let reader = Cursor::new([1, 2, 3, 4]);
335     /// let mut buffer = Cursor::new(vec![0, 0, 0, 0, 5, 6, 7, 8]);
336     /// let mut writer = Cursor::new(vec![0u8; 5]);
337     ///
338     /// {
339     ///     let (buffer_reader, mut buffer_writer) = (&mut buffer).split();
340     ///     io::copy(reader, &mut buffer_writer).await?;
341     ///     io::copy(buffer_reader, &mut writer).await?;
342     /// }
343     ///
344     /// assert_eq!(buffer.into_inner(), [1, 2, 3, 4, 5, 6, 7, 8]);
345     /// assert_eq!(writer.into_inner(), [5, 6, 7, 8, 0]);
346     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
347     /// ```
split(self) -> (ReadHalf<Self>, WriteHalf<Self>) where Self: AsyncWrite + Sized,348     fn split(self) -> (ReadHalf<Self>, WriteHalf<Self>)
349     where
350         Self: AsyncWrite + Sized,
351     {
352         let (r, w) = split::split(self);
353         (assert_read(r), assert_write(w))
354     }
355 
356     /// Creates an AsyncRead adapter which will read at most `limit` bytes
357     /// from the underlying reader.
358     ///
359     /// # Examples
360     ///
361     /// ```
362     /// # futures::executor::block_on(async {
363     /// use futures::io::{AsyncReadExt, Cursor};
364     ///
365     /// let reader = Cursor::new(&b"12345678"[..]);
366     /// let mut buffer = [0; 5];
367     ///
368     /// let mut take = reader.take(4);
369     /// let n = take.read(&mut buffer).await?;
370     ///
371     /// assert_eq!(n, 4);
372     /// assert_eq!(&buffer, b"1234\0");
373     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
374     /// ```
take(self, limit: u64) -> Take<Self> where Self: Sized,375     fn take(self, limit: u64) -> Take<Self>
376     where
377         Self: Sized,
378     {
379         assert_read(Take::new(self, limit))
380     }
381 
382     /// Wraps an [`AsyncRead`] in a compatibility wrapper that allows it to be
383     /// used as a futures 0.1 / tokio-io 0.1 `AsyncRead`. If the wrapped type
384     /// implements [`AsyncWrite`] as well, the result will also implement the
385     /// futures 0.1 / tokio 0.1 `AsyncWrite` trait.
386     ///
387     /// Requires the `io-compat` feature to enable.
388     #[cfg(feature = "io-compat")]
389     #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
compat(self) -> Compat<Self> where Self: Sized + Unpin,390     fn compat(self) -> Compat<Self>
391     where
392         Self: Sized + Unpin,
393     {
394         Compat::new(self)
395     }
396 }
397 
398 impl<R: AsyncRead + ?Sized> AsyncReadExt for R {}
399 
400 /// An extension trait which adds utility methods to `AsyncWrite` types.
401 pub trait AsyncWriteExt: AsyncWrite {
402     /// Creates a future which will entirely flush this `AsyncWrite`.
403     ///
404     /// # Examples
405     ///
406     /// ```
407     /// # futures::executor::block_on(async {
408     /// use futures::io::{AllowStdIo, AsyncWriteExt};
409     /// use std::io::{BufWriter, Cursor};
410     ///
411     /// let mut output = vec![0u8; 5];
412     ///
413     /// {
414     ///     let writer = Cursor::new(&mut output);
415     ///     let mut buffered = AllowStdIo::new(BufWriter::new(writer));
416     ///     buffered.write_all(&[1, 2]).await?;
417     ///     buffered.write_all(&[3, 4]).await?;
418     ///     buffered.flush().await?;
419     /// }
420     ///
421     /// assert_eq!(output, [1, 2, 3, 4, 0]);
422     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
423     /// ```
flush(&mut self) -> Flush<'_, Self> where Self: Unpin,424     fn flush(&mut self) -> Flush<'_, Self>
425     where
426         Self: Unpin,
427     {
428         assert_future::<Result<()>, _>(Flush::new(self))
429     }
430 
431     /// Creates a future which will entirely close this `AsyncWrite`.
close(&mut self) -> Close<'_, Self> where Self: Unpin,432     fn close(&mut self) -> Close<'_, Self>
433     where
434         Self: Unpin,
435     {
436         assert_future::<Result<()>, _>(Close::new(self))
437     }
438 
439     /// Creates a future which will write bytes from `buf` into the object.
440     ///
441     /// The returned future will resolve to the number of bytes written once the write
442     /// operation is completed.
write<'a>(&'a mut self, buf: &'a [u8]) -> Write<'a, Self> where Self: Unpin,443     fn write<'a>(&'a mut self, buf: &'a [u8]) -> Write<'a, Self>
444     where
445         Self: Unpin,
446     {
447         assert_future::<Result<usize>, _>(Write::new(self, buf))
448     }
449 
450     /// Creates a future which will write bytes from `bufs` into the object using vectored
451     /// IO operations.
452     ///
453     /// The returned future will resolve to the number of bytes written once the write
454     /// operation is completed.
write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectored<'a, Self> where Self: Unpin,455     fn write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectored<'a, Self>
456     where
457         Self: Unpin,
458     {
459         assert_future::<Result<usize>, _>(WriteVectored::new(self, bufs))
460     }
461 
462     /// Write data into this object.
463     ///
464     /// Creates a future that will write the entire contents of the buffer `buf` into
465     /// this `AsyncWrite`.
466     ///
467     /// The returned future will not complete until all the data has been written.
468     ///
469     /// # Examples
470     ///
471     /// ```
472     /// # futures::executor::block_on(async {
473     /// use futures::io::{AsyncWriteExt, Cursor};
474     ///
475     /// let mut writer = Cursor::new(vec![0u8; 5]);
476     ///
477     /// writer.write_all(&[1, 2, 3, 4]).await?;
478     ///
479     /// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]);
480     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
481     /// ```
write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAll<'a, Self> where Self: Unpin,482     fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAll<'a, Self>
483     where
484         Self: Unpin,
485     {
486         assert_future::<Result<()>, _>(WriteAll::new(self, buf))
487     }
488 
489     /// Attempts to write multiple buffers into this writer.
490     ///
491     /// Creates a future that will write the entire contents of `bufs` into this
492     /// `AsyncWrite` using [vectored writes].
493     ///
494     /// The returned future will not complete until all the data has been
495     /// written.
496     ///
497     /// [vectored writes]: std::io::Write::write_vectored
498     ///
499     /// # Notes
500     ///
501     /// Unlike `io::Write::write_vectored`, this takes a *mutable* reference to
502     /// a slice of `IoSlice`s, not an immutable one. That's because we need to
503     /// modify the slice to keep track of the bytes already written.
504     ///
505     /// Once this futures returns, the contents of `bufs` are unspecified, as
506     /// this depends on how many calls to `write_vectored` were necessary. It is
507     /// best to understand this function as taking ownership of `bufs` and to
508     /// not use `bufs` afterwards. The underlying buffers, to which the
509     /// `IoSlice`s point (but not the `IoSlice`s themselves), are unchanged and
510     /// can be reused.
511     ///
512     /// # Examples
513     ///
514     /// ```
515     /// # futures::executor::block_on(async {
516     /// use futures::io::AsyncWriteExt;
517     /// use futures_util::io::Cursor;
518     /// use std::io::IoSlice;
519     ///
520     /// let mut writer = Cursor::new(Vec::new());
521     /// let bufs = &mut [
522     ///     IoSlice::new(&[1]),
523     ///     IoSlice::new(&[2, 3]),
524     ///     IoSlice::new(&[4, 5, 6]),
525     /// ];
526     ///
527     /// writer.write_all_vectored(bufs).await?;
528     /// // Note: the contents of `bufs` is now unspecified, see the Notes section.
529     ///
530     /// assert_eq!(writer.into_inner(), &[1, 2, 3, 4, 5, 6]);
531     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
532     /// ```
533     #[cfg(feature = "write-all-vectored")]
write_all_vectored<'a>( &'a mut self, bufs: &'a mut [IoSlice<'a>], ) -> WriteAllVectored<'a, Self> where Self: Unpin,534     fn write_all_vectored<'a>(
535         &'a mut self,
536         bufs: &'a mut [IoSlice<'a>],
537     ) -> WriteAllVectored<'a, Self>
538     where
539         Self: Unpin,
540     {
541         assert_future::<Result<()>, _>(WriteAllVectored::new(self, bufs))
542     }
543 
544     /// Wraps an [`AsyncWrite`] in a compatibility wrapper that allows it to be
545     /// used as a futures 0.1 / tokio-io 0.1 `AsyncWrite`.
546     /// Requires the `io-compat` feature to enable.
547     #[cfg(feature = "io-compat")]
548     #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
compat_write(self) -> Compat<Self> where Self: Sized + Unpin,549     fn compat_write(self) -> Compat<Self>
550     where
551         Self: Sized + Unpin,
552     {
553         Compat::new(self)
554     }
555 
556     /// Allow using an [`AsyncWrite`] as a [`Sink`](futures_sink::Sink)`<Item: AsRef<[u8]>>`.
557     ///
558     /// This adapter produces a sink that will write each value passed to it
559     /// into the underlying writer.
560     ///
561     /// Note that this function consumes the given writer, returning a wrapped
562     /// version.
563     ///
564     /// # Examples
565     ///
566     /// ```
567     /// # futures::executor::block_on(async {
568     /// use futures::io::AsyncWriteExt;
569     /// use futures::stream::{self, StreamExt};
570     ///
571     /// let stream = stream::iter(vec![Ok([1, 2, 3]), Ok([4, 5, 6])]);
572     ///
573     /// let mut writer = vec![];
574     ///
575     /// stream.forward((&mut writer).into_sink()).await?;
576     ///
577     /// assert_eq!(writer, vec![1, 2, 3, 4, 5, 6]);
578     /// # Ok::<(), Box<dyn std::error::Error>>(())
579     /// # })?;
580     /// # Ok::<(), Box<dyn std::error::Error>>(())
581     /// ```
582     #[cfg(feature = "sink")]
583     #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
into_sink<Item: AsRef<[u8]>>(self) -> IntoSink<Self, Item> where Self: Sized,584     fn into_sink<Item: AsRef<[u8]>>(self) -> IntoSink<Self, Item>
585     where
586         Self: Sized,
587     {
588         crate::sink::assert_sink::<Item, Error, _>(IntoSink::new(self))
589     }
590 }
591 
592 impl<W: AsyncWrite + ?Sized> AsyncWriteExt for W {}
593 
594 /// An extension trait which adds utility methods to `AsyncSeek` types.
595 pub trait AsyncSeekExt: AsyncSeek {
596     /// Creates a future which will seek an IO object, and then yield the
597     /// new position in the object and the object itself.
598     ///
599     /// In the case of an error the buffer and the object will be discarded, with
600     /// the error yielded.
seek(&mut self, pos: SeekFrom) -> Seek<'_, Self> where Self: Unpin,601     fn seek(&mut self, pos: SeekFrom) -> Seek<'_, Self>
602     where
603         Self: Unpin,
604     {
605         assert_future::<Result<u64>, _>(Seek::new(self, pos))
606     }
607 
608     /// Creates a future which will return the current seek position from the
609     /// start of the stream.
610     ///
611     /// This is equivalent to `self.seek(SeekFrom::Current(0))`.
stream_position(&mut self) -> Seek<'_, Self> where Self: Unpin,612     fn stream_position(&mut self) -> Seek<'_, Self>
613     where
614         Self: Unpin,
615     {
616         self.seek(SeekFrom::Current(0))
617     }
618 }
619 
620 impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {}
621 
622 /// An extension trait which adds utility methods to `AsyncBufRead` types.
623 pub trait AsyncBufReadExt: AsyncBufRead {
624     /// Creates a future which will wait for a non-empty buffer to be available from this I/O
625     /// object or EOF to be reached.
626     ///
627     /// This method is the async equivalent to [`BufRead::fill_buf`](std::io::BufRead::fill_buf).
628     ///
629     /// ```rust
630     /// # futures::executor::block_on(async {
631     /// use futures::{io::AsyncBufReadExt as _, stream::{iter, TryStreamExt as _}};
632     ///
633     /// let mut stream = iter(vec![Ok(vec![1, 2, 3]), Ok(vec![4, 5, 6])]).into_async_read();
634     ///
635     /// assert_eq!(stream.fill_buf().await?, vec![1, 2, 3]);
636     /// stream.consume_unpin(2);
637     ///
638     /// assert_eq!(stream.fill_buf().await?, vec![3]);
639     /// stream.consume_unpin(1);
640     ///
641     /// assert_eq!(stream.fill_buf().await?, vec![4, 5, 6]);
642     /// stream.consume_unpin(3);
643     ///
644     /// assert_eq!(stream.fill_buf().await?, vec![]);
645     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
646     /// ```
fill_buf(&mut self) -> FillBuf<'_, Self> where Self: Unpin,647     fn fill_buf(&mut self) -> FillBuf<'_, Self>
648     where
649         Self: Unpin,
650     {
651         assert_future::<Result<&[u8]>, _>(FillBuf::new(self))
652     }
653 
654     /// A convenience for calling [`AsyncBufRead::consume`] on [`Unpin`] IO types.
655     ///
656     /// ```rust
657     /// # futures::executor::block_on(async {
658     /// use futures::{io::AsyncBufReadExt as _, stream::{iter, TryStreamExt as _}};
659     ///
660     /// let mut stream = iter(vec![Ok(vec![1, 2, 3])]).into_async_read();
661     ///
662     /// assert_eq!(stream.fill_buf().await?, vec![1, 2, 3]);
663     /// stream.consume_unpin(2);
664     ///
665     /// assert_eq!(stream.fill_buf().await?, vec![3]);
666     /// stream.consume_unpin(1);
667     ///
668     /// assert_eq!(stream.fill_buf().await?, vec![]);
669     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
670     /// ```
consume_unpin(&mut self, amt: usize) where Self: Unpin,671     fn consume_unpin(&mut self, amt: usize)
672     where
673         Self: Unpin,
674     {
675         Pin::new(self).consume(amt)
676     }
677 
678     /// Creates a future which will read all the bytes associated with this I/O
679     /// object into `buf` until the delimiter `byte` or EOF is reached.
680     /// This method is the async equivalent to [`BufRead::read_until`](std::io::BufRead::read_until).
681     ///
682     /// This function will read bytes from the underlying stream until the
683     /// delimiter or EOF is found. Once found, all bytes up to, and including,
684     /// the delimiter (if found) will be appended to `buf`.
685     ///
686     /// The returned future will resolve to the number of bytes read once the read
687     /// operation is completed.
688     ///
689     /// In the case of an error the buffer and the object will be discarded, with
690     /// the error yielded.
691     ///
692     /// # Examples
693     ///
694     /// ```
695     /// # futures::executor::block_on(async {
696     /// use futures::io::{AsyncBufReadExt, Cursor};
697     ///
698     /// let mut cursor = Cursor::new(b"lorem-ipsum");
699     /// let mut buf = vec![];
700     ///
701     /// // cursor is at 'l'
702     /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
703     /// assert_eq!(num_bytes, 6);
704     /// assert_eq!(buf, b"lorem-");
705     /// buf.clear();
706     ///
707     /// // cursor is at 'i'
708     /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
709     /// assert_eq!(num_bytes, 5);
710     /// assert_eq!(buf, b"ipsum");
711     /// buf.clear();
712     ///
713     /// // cursor is at EOF
714     /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
715     /// assert_eq!(num_bytes, 0);
716     /// assert_eq!(buf, b"");
717     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
718     /// ```
read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUntil<'a, Self> where Self: Unpin,719     fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUntil<'a, Self>
720     where
721         Self: Unpin,
722     {
723         assert_future::<Result<usize>, _>(ReadUntil::new(self, byte, buf))
724     }
725 
726     /// Creates a future which will read all the bytes associated with this I/O
727     /// object into `buf` until a newline (the 0xA byte) or EOF is reached,
728     /// This method is the async equivalent to [`BufRead::read_line`](std::io::BufRead::read_line).
729     ///
730     /// This function will read bytes from the underlying stream until the
731     /// newline delimiter (the 0xA byte) or EOF is found. Once found, all bytes
732     /// up to, and including, the delimiter (if found) will be appended to
733     /// `buf`.
734     ///
735     /// The returned future will resolve to the number of bytes read once the read
736     /// operation is completed.
737     ///
738     /// In the case of an error the buffer and the object will be discarded, with
739     /// the error yielded.
740     ///
741     /// # Errors
742     ///
743     /// This function has the same error semantics as [`read_until`] and will
744     /// also return an error if the read bytes are not valid UTF-8. If an I/O
745     /// error is encountered then `buf` may contain some bytes already read in
746     /// the event that all data read so far was valid UTF-8.
747     ///
748     /// [`read_until`]: AsyncBufReadExt::read_until
749     ///
750     /// # Examples
751     ///
752     /// ```
753     /// # futures::executor::block_on(async {
754     /// use futures::io::{AsyncBufReadExt, Cursor};
755     ///
756     /// let mut cursor = Cursor::new(b"foo\nbar");
757     /// let mut buf = String::new();
758     ///
759     /// // cursor is at 'f'
760     /// let num_bytes = cursor.read_line(&mut buf).await?;
761     /// assert_eq!(num_bytes, 4);
762     /// assert_eq!(buf, "foo\n");
763     /// buf.clear();
764     ///
765     /// // cursor is at 'b'
766     /// let num_bytes = cursor.read_line(&mut buf).await?;
767     /// assert_eq!(num_bytes, 3);
768     /// assert_eq!(buf, "bar");
769     /// buf.clear();
770     ///
771     /// // cursor is at EOF
772     /// let num_bytes = cursor.read_line(&mut buf).await?;
773     /// assert_eq!(num_bytes, 0);
774     /// assert_eq!(buf, "");
775     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
776     /// ```
read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self> where Self: Unpin,777     fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self>
778     where
779         Self: Unpin,
780     {
781         assert_future::<Result<usize>, _>(ReadLine::new(self, buf))
782     }
783 
784     /// Returns a stream over the lines of this reader.
785     /// This method is the async equivalent to [`BufRead::lines`](std::io::BufRead::lines).
786     ///
787     /// The stream returned from this function will yield instances of
788     /// [`io::Result`]`<`[`String`]`>`. Each string returned will *not* have a newline
789     /// byte (the 0xA byte) or CRLF (0xD, 0xA bytes) at the end.
790     ///
791     /// [`io::Result`]: std::io::Result
792     /// [`String`]: String
793     ///
794     /// # Errors
795     ///
796     /// Each line of the stream has the same error semantics as [`AsyncBufReadExt::read_line`].
797     ///
798     /// [`AsyncBufReadExt::read_line`]: AsyncBufReadExt::read_line
799     ///
800     /// # Examples
801     ///
802     /// ```
803     /// # futures::executor::block_on(async {
804     /// use futures::io::{AsyncBufReadExt, Cursor};
805     /// use futures::stream::StreamExt;
806     ///
807     /// let cursor = Cursor::new(b"lorem\nipsum\xc2\r\ndolor");
808     ///
809     /// let mut lines_stream = cursor.lines().map(|l| l.unwrap_or(String::from("invalid UTF_8")));
810     /// assert_eq!(lines_stream.next().await, Some(String::from("lorem")));
811     /// assert_eq!(lines_stream.next().await, Some(String::from("invalid UTF_8")));
812     /// assert_eq!(lines_stream.next().await, Some(String::from("dolor")));
813     /// assert_eq!(lines_stream.next().await, None);
814     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
815     /// ```
lines(self) -> Lines<Self> where Self: Sized,816     fn lines(self) -> Lines<Self>
817     where
818         Self: Sized,
819     {
820         assert_stream::<Result<String>, _>(Lines::new(self))
821     }
822 }
823 
824 impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}
825 
826 // Just a helper function to ensure the reader we're returning all have the
827 // right implementations.
assert_read<R>(reader: R) -> R where R: AsyncRead,828 pub(crate) fn assert_read<R>(reader: R) -> R
829 where
830     R: AsyncRead,
831 {
832     reader
833 }
834 // Just a helper function to ensure the writer we're returning all have the
835 // right implementations.
assert_write<W>(writer: W) -> W where W: AsyncWrite,836 pub(crate) fn assert_write<W>(writer: W) -> W
837 where
838     W: AsyncWrite,
839 {
840     writer
841 }
842