1 use alloc::collections::VecDeque;
2 
3 use crate::{
4     error::StreamError,
5     stream::{ParseError, Positioned, ResetStream, StreamErrorFor, StreamOnce},
6 };
7 
8 /// `Stream` which buffers items from an instance of `StreamOnce` into a ring buffer.
9 /// Instances of `StreamOnce` which is not able to implement `ResetStream` (such as `ReadStream`) may
10 /// use this as a way to implement `ResetStream` and become a full `Stream` instance.
11 ///
12 /// The drawback is that the buffer only stores a limited number of items which limits how many
13 /// tokens that can be reset and replayed. If a `buffered::Stream` is reset past this limit an error
14 /// will be returned when `uncons` is next called.
15 ///
16 /// NOTE: If this stream is used in conjunction with an error enhancing stream such as
17 /// `easy::Stream` (also via the `easy_parser` method) it is recommended that the `buffered::Stream`
18 /// instance wraps the `easy::Stream` instance instead of the other way around.
19 ///
20 /// ```ignore
21 /// // DO
22 /// buffered::Stream::new(easy::Stream(..), ..)
23 /// // DON'T
24 /// easy::Stream(buffered::Stream::new(.., ..))
25 /// parser.easy_parse(buffered::Stream::new(..));
26 /// ```
27 #[derive(Debug, PartialEq)]
28 pub struct Stream<Input>
29 where
30     Input: StreamOnce + Positioned,
31 {
32     offset: usize,
33     iter: Input,
34     buffer_offset: usize,
35     buffer: VecDeque<(Input::Token, Input::Position)>,
36 }
37 
38 impl<Input> ResetStream for Stream<Input>
39 where
40     Input: Positioned,
41 {
42     type Checkpoint = usize;
43 
checkpoint(&self) -> Self::Checkpoint44     fn checkpoint(&self) -> Self::Checkpoint {
45         self.offset
46     }
47 
reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), Self::Error>48     fn reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), Self::Error> {
49         if checkpoint < self.buffer_offset - self.buffer.len() {
50             // We have backtracked to far
51             Err(Self::Error::from_error(
52                 self.position(),
53                 StreamErrorFor::<Self>::message_static_message("Backtracked to far"),
54             ))
55         } else {
56             self.offset = checkpoint;
57             Ok(())
58         }
59     }
60 }
61 
62 impl<Input> Stream<Input>
63 where
64     Input: StreamOnce + Positioned,
65     Input::Position: Clone,
66     Input::Token: Clone,
67 {
68     /// Constructs a new `BufferedStream` from a `StreamOnce` instance with a `lookahead`
69     /// number of elements that can be stored in the buffer.
new(iter: Input, lookahead: usize) -> Stream<Input>70     pub fn new(iter: Input, lookahead: usize) -> Stream<Input> {
71         Stream {
72             offset: 0,
73             iter,
74             buffer_offset: 0,
75             buffer: VecDeque::with_capacity(lookahead),
76         }
77     }
78 }
79 
80 impl<Input> Positioned for Stream<Input>
81 where
82     Input: StreamOnce + Positioned,
83 {
84     #[inline]
position(&self) -> Self::Position85     fn position(&self) -> Self::Position {
86         if self.offset >= self.buffer_offset {
87             self.iter.position()
88         } else if self.offset < self.buffer_offset - self.buffer.len() {
89             self.buffer
90                 .front()
91                 .expect("At least 1 element in the buffer")
92                 .1
93                 .clone()
94         } else {
95             self.buffer[self.buffer.len() - (self.buffer_offset - self.offset)]
96                 .1
97                 .clone()
98         }
99     }
100 }
101 
102 impl<Input> StreamOnce for Stream<Input>
103 where
104     Input: StreamOnce + Positioned,
105     Input::Token: Clone,
106 {
107     type Token = Input::Token;
108     type Range = Input::Range;
109     type Position = Input::Position;
110     type Error = Input::Error;
111 
112     #[inline]
uncons(&mut self) -> Result<Input::Token, StreamErrorFor<Self>>113     fn uncons(&mut self) -> Result<Input::Token, StreamErrorFor<Self>> {
114         if self.offset >= self.buffer_offset {
115             let position = self.iter.position();
116             let token = self.iter.uncons()?;
117             self.buffer_offset += 1;
118             // We want the VecDeque to only keep the last .capacity() elements so we need to remove
119             // an element if it gets to large
120             if self.buffer.len() == self.buffer.capacity() {
121                 self.buffer.pop_front();
122             }
123             self.buffer.push_back((token.clone(), position));
124             self.offset += 1;
125             Ok(token)
126         } else if self.offset < self.buffer_offset - self.buffer.len() {
127             // We have backtracked to far
128             Err(StreamError::message_static_message("Backtracked to far"))
129         } else {
130             let value = self.buffer[self.buffer.len() - (self.buffer_offset - self.offset)]
131                 .0
132                 .clone();
133             self.offset += 1;
134             Ok(value)
135         }
136     }
137 
is_partial(&self) -> bool138     fn is_partial(&self) -> bool {
139         self.iter.is_partial()
140     }
141 }
142