1 use super::read_line::read_line_internal; 2 use futures_core::ready; 3 use futures_core::stream::Stream; 4 use futures_core::task::{Context, Poll}; 5 use futures_io::AsyncBufRead; 6 use pin_project_lite::pin_project; 7 use std::io; 8 use std::mem; 9 use std::pin::Pin; 10 use std::string::String; 11 use std::vec::Vec; 12 13 pin_project! { 14 /// Stream for the [`lines`](super::AsyncBufReadExt::lines) method. 15 #[derive(Debug)] 16 #[must_use = "streams do nothing unless polled"] 17 pub struct Lines<R> { 18 #[pin] 19 reader: R, 20 buf: String, 21 bytes: Vec<u8>, 22 read: usize, 23 } 24 } 25 26 impl<R: AsyncBufRead> Lines<R> { new(reader: R) -> Self27 pub(super) fn new(reader: R) -> Self { 28 Self { reader, buf: String::new(), bytes: Vec::new(), read: 0 } 29 } 30 } 31 32 impl<R: AsyncBufRead> Stream for Lines<R> { 33 type Item = io::Result<String>; 34 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>35 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 36 let this = self.project(); 37 let n = ready!(read_line_internal(this.reader, cx, this.buf, this.bytes, this.read))?; 38 *this.read = 0; 39 if n == 0 && this.buf.is_empty() { 40 return Poll::Ready(None); 41 } 42 if this.buf.ends_with('\n') { 43 this.buf.pop(); 44 if this.buf.ends_with('\r') { 45 this.buf.pop(); 46 } 47 } 48 Poll::Ready(Some(Ok(mem::take(this.buf)))) 49 } 50 } 51