1 #![warn( 2 missing_debug_implementations, 3 missing_docs, 4 rust_2018_idioms, 5 unreachable_pub 6 )] 7 #![doc(test(no_crate_inject, attr(deny(rust_2018_idioms))))] 8 9 //! Asynchronous stream of elements. 10 //! 11 //! Provides two macros, `stream!` and `try_stream!`, allowing the caller to 12 //! define asynchronous streams of elements. These are implemented using `async` 13 //! & `await` notation. This crate works without unstable features. 14 //! 15 //! The `stream!` macro returns an anonymous type implementing the [`Stream`] 16 //! trait. The `Item` associated type is the type of the values yielded from the 17 //! stream. The `try_stream!` also returns an anonymous type implementing the 18 //! [`Stream`] trait, but the `Item` associated type is `Result<T, Error>`. The 19 //! `try_stream!` macro supports using `?` notation as part of the 20 //! implementation. 21 //! 22 //! # Usage 23 //! 24 //! A basic stream yielding numbers. Values are yielded using the `yield` 25 //! keyword. The stream block must return `()`. 26 //! 27 //! ```rust 28 //! use async_stream::stream; 29 //! 30 //! use futures_util::pin_mut; 31 //! use futures_util::stream::StreamExt; 32 //! 33 //! #[tokio::main] 34 //! async fn main() { 35 //! let s = stream! { 36 //! for i in 0..3 { 37 //! yield i; 38 //! } 39 //! }; 40 //! 41 //! pin_mut!(s); // needed for iteration 42 //! 43 //! while let Some(value) = s.next().await { 44 //! println!("got {}", value); 45 //! } 46 //! } 47 //! ``` 48 //! 49 //! Streams may be returned by using `impl Stream<Item = T>`: 50 //! 51 //! ```rust 52 //! use async_stream::stream; 53 //! 54 //! use futures_core::stream::Stream; 55 //! use futures_util::pin_mut; 56 //! use futures_util::stream::StreamExt; 57 //! 58 //! fn zero_to_three() -> impl Stream<Item = u32> { 59 //! stream! { 60 //! for i in 0..3 { 61 //! yield i; 62 //! } 63 //! } 64 //! } 65 //! 66 //! #[tokio::main] 67 //! async fn main() { 68 //! let s = zero_to_three(); 69 //! pin_mut!(s); // needed for iteration 70 //! 71 //! while let Some(value) = s.next().await { 72 //! println!("got {}", value); 73 //! } 74 //! } 75 //! ``` 76 //! 77 //! Streams may be implemented in terms of other streams - `async-stream` provides `for await` 78 //! syntax to assist with this: 79 //! 80 //! ```rust 81 //! use async_stream::stream; 82 //! 83 //! use futures_core::stream::Stream; 84 //! use futures_util::pin_mut; 85 //! use futures_util::stream::StreamExt; 86 //! 87 //! fn zero_to_three() -> impl Stream<Item = u32> { 88 //! stream! { 89 //! for i in 0..3 { 90 //! yield i; 91 //! } 92 //! } 93 //! } 94 //! 95 //! fn double<S: Stream<Item = u32>>(input: S) 96 //! -> impl Stream<Item = u32> 97 //! { 98 //! stream! { 99 //! for await value in input { 100 //! yield value * 2; 101 //! } 102 //! } 103 //! } 104 //! 105 //! #[tokio::main] 106 //! async fn main() { 107 //! let s = double(zero_to_three()); 108 //! pin_mut!(s); // needed for iteration 109 //! 110 //! while let Some(value) = s.next().await { 111 //! println!("got {}", value); 112 //! } 113 //! } 114 //! ``` 115 //! 116 //! Rust try notation (`?`) can be used with the `try_stream!` macro. The `Item` 117 //! of the returned stream is `Result` with `Ok` being the value yielded and 118 //! `Err` the error type returned by `?`. 119 //! 120 //! ```rust 121 //! use tokio::net::{TcpListener, TcpStream}; 122 //! 123 //! use async_stream::try_stream; 124 //! use futures_core::stream::Stream; 125 //! 126 //! use std::io; 127 //! use std::net::SocketAddr; 128 //! 129 //! fn bind_and_accept(addr: SocketAddr) 130 //! -> impl Stream<Item = io::Result<TcpStream>> 131 //! { 132 //! try_stream! { 133 //! let mut listener = TcpListener::bind(addr).await?; 134 //! 135 //! loop { 136 //! let (stream, addr) = listener.accept().await?; 137 //! println!("received on {:?}", addr); 138 //! yield stream; 139 //! } 140 //! } 141 //! } 142 //! ``` 143 //! 144 //! # Implementation 145 //! 146 //! The `stream!` and `try_stream!` macros are implemented using proc macros. 147 //! The macro searches the syntax tree for instances of `yield $expr` and 148 //! transforms them into `sender.send($expr).await`. 149 //! 150 //! The stream uses a lightweight sender to send values from the stream 151 //! implementation to the caller. When entering the stream, an `Option<T>` is 152 //! stored on the stack. A pointer to the cell is stored in a thread local and 153 //! `poll` is called on the async block. When `poll` returns. 154 //! `sender.send(value)` stores the value that cell and yields back to the 155 //! caller. 156 //! 157 //! [`Stream`]: https://docs.rs/futures-core/*/futures_core/stream/trait.Stream.html 158 159 mod async_stream; 160 mod next; 161 mod yielder; 162 163 /// Asynchronous stream 164 /// 165 /// See [crate](index.html) documentation for more details. 166 /// 167 /// # Examples 168 /// 169 /// ``` 170 /// use async_stream::stream; 171 /// 172 /// use futures_util::pin_mut; 173 /// use futures_util::stream::StreamExt; 174 /// 175 /// #[tokio::main] 176 /// async fn main() { 177 /// let s = stream! { 178 /// for i in 0..3 { 179 /// yield i; 180 /// } 181 /// }; 182 /// 183 /// pin_mut!(s); // needed for iteration 184 /// 185 /// while let Some(value) = s.next().await { 186 /// println!("got {}", value); 187 /// } 188 /// } 189 /// ``` 190 #[macro_export] 191 macro_rules! stream { 192 ($($tt:tt)*) => { 193 $crate::__private::stream_inner!(($crate) $($tt)*) 194 } 195 } 196 197 /// Asynchronous fallible stream 198 /// 199 /// See [crate](index.html) documentation for more details. 200 /// 201 /// # Examples 202 /// 203 /// ``` 204 /// use tokio::net::{TcpListener, TcpStream}; 205 /// 206 /// use async_stream::try_stream; 207 /// use futures_core::stream::Stream; 208 /// 209 /// use std::io; 210 /// use std::net::SocketAddr; 211 /// 212 /// fn bind_and_accept(addr: SocketAddr) 213 /// -> impl Stream<Item = io::Result<TcpStream>> 214 /// { 215 /// try_stream! { 216 /// let mut listener = TcpListener::bind(addr).await?; 217 /// 218 /// loop { 219 /// let (stream, addr) = listener.accept().await?; 220 /// println!("received on {:?}", addr); 221 /// yield stream; 222 /// } 223 /// } 224 /// } 225 /// ``` 226 #[macro_export] 227 macro_rules! try_stream { 228 ($($tt:tt)*) => { 229 $crate::__private::try_stream_inner!(($crate) $($tt)*) 230 } 231 } 232 233 // Not public API. 234 #[doc(hidden)] 235 pub mod __private { 236 pub use crate::async_stream::AsyncStream; 237 pub use crate::next::next; 238 pub use async_stream_impl::{stream_inner, try_stream_inner}; 239 pub mod yielder { 240 pub use crate::yielder::pair; 241 } 242 } 243