1 #![warn(
2     missing_debug_implementations,
3     missing_docs,
4     rust_2018_idioms,
5     unreachable_pub
6 )]
7 #![doc(test(no_crate_inject, attr(deny(rust_2018_idioms))))]
8 
9 //! Asynchronous stream of elements.
10 //!
11 //! Provides two macros, `stream!` and `try_stream!`, allowing the caller to
12 //! define asynchronous streams of elements. These are implemented using `async`
13 //! & `await` notation. This crate works without unstable features.
14 //!
15 //! The `stream!` macro returns an anonymous type implementing the [`Stream`]
16 //! trait. The `Item` associated type is the type of the values yielded from the
17 //! stream. The `try_stream!` also returns an anonymous type implementing the
18 //! [`Stream`] trait, but the `Item` associated type is `Result<T, Error>`. The
19 //! `try_stream!` macro supports using `?` notation as part of the
20 //! implementation.
21 //!
22 //! # Usage
23 //!
24 //! A basic stream yielding numbers. Values are yielded using the `yield`
25 //! keyword. The stream block must return `()`.
26 //!
27 //! ```rust
28 //! use async_stream::stream;
29 //!
30 //! use futures_util::pin_mut;
31 //! use futures_util::stream::StreamExt;
32 //!
33 //! #[tokio::main]
34 //! async fn main() {
35 //!     let s = stream! {
36 //!         for i in 0..3 {
37 //!             yield i;
38 //!         }
39 //!     };
40 //!
41 //!     pin_mut!(s); // needed for iteration
42 //!
43 //!     while let Some(value) = s.next().await {
44 //!         println!("got {}", value);
45 //!     }
46 //! }
47 //! ```
48 //!
49 //! Streams may be returned by using `impl Stream<Item = T>`:
50 //!
51 //! ```rust
52 //! use async_stream::stream;
53 //!
54 //! use futures_core::stream::Stream;
55 //! use futures_util::pin_mut;
56 //! use futures_util::stream::StreamExt;
57 //!
58 //! fn zero_to_three() -> impl Stream<Item = u32> {
59 //!     stream! {
60 //!         for i in 0..3 {
61 //!             yield i;
62 //!         }
63 //!     }
64 //! }
65 //!
66 //! #[tokio::main]
67 //! async fn main() {
68 //!     let s = zero_to_three();
69 //!     pin_mut!(s); // needed for iteration
70 //!
71 //!     while let Some(value) = s.next().await {
72 //!         println!("got {}", value);
73 //!     }
74 //! }
75 //! ```
76 //!
77 //! Streams may be implemented in terms of other streams - `async-stream` provides `for await`
78 //! syntax to assist with this:
79 //!
80 //! ```rust
81 //! use async_stream::stream;
82 //!
83 //! use futures_core::stream::Stream;
84 //! use futures_util::pin_mut;
85 //! use futures_util::stream::StreamExt;
86 //!
87 //! fn zero_to_three() -> impl Stream<Item = u32> {
88 //!     stream! {
89 //!         for i in 0..3 {
90 //!             yield i;
91 //!         }
92 //!     }
93 //! }
94 //!
95 //! fn double<S: Stream<Item = u32>>(input: S)
96 //!     -> impl Stream<Item = u32>
97 //! {
98 //!     stream! {
99 //!         for await value in input {
100 //!             yield value * 2;
101 //!         }
102 //!     }
103 //! }
104 //!
105 //! #[tokio::main]
106 //! async fn main() {
107 //!     let s = double(zero_to_three());
108 //!     pin_mut!(s); // needed for iteration
109 //!
110 //!     while let Some(value) = s.next().await {
111 //!         println!("got {}", value);
112 //!     }
113 //! }
114 //! ```
115 //!
116 //! Rust try notation (`?`) can be used with the `try_stream!` macro. The `Item`
117 //! of the returned stream is `Result` with `Ok` being the value yielded and
118 //! `Err` the error type returned by `?`.
119 //!
120 //! ```rust
121 //! use tokio::net::{TcpListener, TcpStream};
122 //!
123 //! use async_stream::try_stream;
124 //! use futures_core::stream::Stream;
125 //!
126 //! use std::io;
127 //! use std::net::SocketAddr;
128 //!
129 //! fn bind_and_accept(addr: SocketAddr)
130 //!     -> impl Stream<Item = io::Result<TcpStream>>
131 //! {
132 //!     try_stream! {
133 //!         let mut listener = TcpListener::bind(addr).await?;
134 //!
135 //!         loop {
136 //!             let (stream, addr) = listener.accept().await?;
137 //!             println!("received on {:?}", addr);
138 //!             yield stream;
139 //!         }
140 //!     }
141 //! }
142 //! ```
143 //!
144 //! # Implementation
145 //!
146 //! The `stream!` and `try_stream!` macros are implemented using proc macros.
147 //! The macro searches the syntax tree for instances of `yield $expr` and
148 //! transforms them into `sender.send($expr).await`.
149 //!
150 //! The stream uses a lightweight sender to send values from the stream
151 //! implementation to the caller. When entering the stream, an `Option<T>` is
152 //! stored on the stack. A pointer to the cell is stored in a thread local and
153 //! `poll` is called on the async block. When `poll` returns.
154 //! `sender.send(value)` stores the value that cell and yields back to the
155 //! caller.
156 //!
157 //! [`Stream`]: https://docs.rs/futures-core/*/futures_core/stream/trait.Stream.html
158 
159 mod async_stream;
160 mod next;
161 mod yielder;
162 
163 /// Asynchronous stream
164 ///
165 /// See [crate](index.html) documentation for more details.
166 ///
167 /// # Examples
168 ///
169 /// ```
170 /// use async_stream::stream;
171 ///
172 /// use futures_util::pin_mut;
173 /// use futures_util::stream::StreamExt;
174 ///
175 /// #[tokio::main]
176 /// async fn main() {
177 ///     let s = stream! {
178 ///         for i in 0..3 {
179 ///             yield i;
180 ///         }
181 ///     };
182 ///
183 ///     pin_mut!(s); // needed for iteration
184 ///
185 ///     while let Some(value) = s.next().await {
186 ///         println!("got {}", value);
187 ///     }
188 /// }
189 /// ```
190 #[macro_export]
191 macro_rules! stream {
192     ($($tt:tt)*) => {
193         $crate::__private::stream_inner!(($crate) $($tt)*)
194     }
195 }
196 
197 /// Asynchronous fallible stream
198 ///
199 /// See [crate](index.html) documentation for more details.
200 ///
201 /// # Examples
202 ///
203 /// ```
204 /// use tokio::net::{TcpListener, TcpStream};
205 ///
206 /// use async_stream::try_stream;
207 /// use futures_core::stream::Stream;
208 ///
209 /// use std::io;
210 /// use std::net::SocketAddr;
211 ///
212 /// fn bind_and_accept(addr: SocketAddr)
213 ///     -> impl Stream<Item = io::Result<TcpStream>>
214 /// {
215 ///     try_stream! {
216 ///         let mut listener = TcpListener::bind(addr).await?;
217 ///
218 ///         loop {
219 ///             let (stream, addr) = listener.accept().await?;
220 ///             println!("received on {:?}", addr);
221 ///             yield stream;
222 ///         }
223 ///     }
224 /// }
225 /// ```
226 #[macro_export]
227 macro_rules! try_stream {
228     ($($tt:tt)*) => {
229         $crate::__private::try_stream_inner!(($crate) $($tt)*)
230     }
231 }
232 
233 // Not public API.
234 #[doc(hidden)]
235 pub mod __private {
236     pub use crate::async_stream::AsyncStream;
237     pub use crate::next::next;
238     pub use async_stream_impl::{stream_inner, try_stream_inner};
239     pub mod yielder {
240         pub use crate::yielder::pair;
241     }
242 }
243