1 # Refactor I/O driver 2 3 Describes changes to the I/O driver for the Tokio 0.3 release. 4 5 ## Goals 6 7 * Support `async fn` on I/O types with `&self`. 8 * Refine the `Registration` API. 9 10 ### Non-goals 11 12 * Implement `AsyncRead` / `AsyncWrite` for `&TcpStream` or other reference type. 13 14 ## Overview 15 16 Currently, I/O types require `&mut self` for `async` functions. The reason for 17 this is the task's waker is stored in the I/O resource's internal state 18 (`ScheduledIo`) instead of in the future returned by the `async` function. 19 Because of this limitation, I/O types limit the number of wakers to one per 20 direction (a direction is either read-related events or write-related events). 21 22 Moving the waker from the internal I/O resource's state to the operation's 23 future enables multiple wakers to be registered per operation. The "intrusive 24 wake list" strategy used by `Notify` applies to this case, though there are some 25 concerns unique to the I/O driver. 26 27 ## Reworking the `Registration` type 28 29 While `Registration` is made private (per #2728), it remains in Tokio as an 30 implementation detail backing I/O resources such as `TcpStream`. The API of 31 `Registration` is updated to support waiting for an arbitrary interest set with 32 `&self`. This supports concurrent waiters with a different readiness interest. 33 34 ```rust 35 struct Registration { ... } 36 37 // TODO: naming 38 struct ReadyEvent { 39 tick: u32, 40 ready: mio::Ready, 41 } 42 43 impl Registration { 44 /// `interest` must be a super set of **all** interest sets specified in 45 /// the other methods. This is the interest set passed to `mio`. 46 pub fn new<T>(io: &T, interest: mio::Ready) -> io::Result<Registration> 47 where T: mio::Evented; 48 49 /// Awaits for any readiness event included in `interest`. Returns a 50 /// `ReadyEvent` representing the received readiness event. 51 async fn readiness(&self, interest: mio::Ready) -> io::Result<ReadyEvent>; 52 53 /// Clears resource level readiness represented by the specified `ReadyEvent` 54 async fn clear_readiness(&self, ready_event: ReadyEvent); 55 ``` 56 57 A new registration is created for a `T: mio::Evented` and a `interest`. This 58 creates a `ScheduledIo` entry with the I/O driver and registers the resource 59 with `mio`. 60 61 Because Tokio uses **edge-triggered** notifications, the I/O driver only 62 receives readiness from the OS once the ready state **changes**. The I/O driver 63 must track each resource's known readiness state. This helps prevent syscalls 64 when the process knows the syscall should return with `EWOULDBLOCK`. 65 66 A call to `readiness()` checks if the currently known resource readiness 67 overlaps with `interest`. If it does, then the `readiness()` immediately 68 returns. If it does not, then the task waits until the I/O driver receives a 69 readiness event. 70 71 The pseudocode to perform a TCP read is as follows. 72 73 ```rust 74 async fn read(&self, buf: &mut [u8]) -> io::Result<usize> { 75 loop { 76 // Await readiness 77 let event = self.readiness(interest).await?; 78 79 match self.mio_socket.read(buf) { 80 Ok(v) => return Ok(v), 81 Err(ref e) if e.kind() == WouldBlock => { 82 self.clear_readiness(event); 83 } 84 Err(e) => return Err(e), 85 } 86 } 87 } 88 ``` 89 90 ## Reworking the `ScheduledIo` type 91 92 The `ScheduledIo` type is switched to use an intrusive waker linked list. Each 93 entry in the linked list includes the `interest` set passed to `readiness()`. 94 95 ```rust 96 #[derive(Debug)] 97 pub(crate) struct ScheduledIo { 98 /// Resource's known state packed with other state that must be 99 /// atomically updated. 100 readiness: AtomicUsize, 101 102 /// Tracks tasks waiting on the resource 103 waiters: Mutex<Waiters>, 104 } 105 106 #[derive(Debug)] 107 struct Waiters { 108 // List of intrusive waiters. 109 list: LinkedList<Waiter>, 110 111 /// Waiter used by `AsyncRead` implementations. 112 reader: Option<Waker>, 113 114 /// Waiter used by `AsyncWrite` implementations. 115 writer: Option<Waker>, 116 } 117 118 // This struct is contained by the **future** returned by `readiness()`. 119 #[derive(Debug)] 120 struct Waiter { 121 /// Intrusive linked-list pointers 122 pointers: linked_list::Pointers<Waiter>, 123 124 /// Waker for task waiting on I/O resource 125 waiter: Option<Waker>, 126 127 /// Readiness events being waited on. This is 128 /// the value passed to `readiness()` 129 interest: mio::Ready, 130 131 /// Should not be `Unpin`. 132 _p: PhantomPinned, 133 } 134 ``` 135 136 When an I/O event is received from `mio`, the associated resources' readiness is 137 updated and the waiter list is iterated. All waiters with `interest` that 138 overlap the received readiness event are notified. Any waiter with an `interest` 139 that does not overlap the readiness event remains in the list. 140 141 ## Cancel interest on drop 142 143 The future returned by `readiness()` uses an intrusive linked list to store the 144 waker with `ScheduledIo`. Because `readiness()` can be called concurrently, many 145 wakers may be stored simultaneously in the list. If the `readiness()` future is 146 dropped early, it is essential that the waker is removed from the list. This 147 prevents leaking memory. 148 149 ## Race condition 150 151 Consider how many tasks may concurrently attempt I/O operations. This, combined 152 with how Tokio uses edge-triggered events, can result in a race condition. Let's 153 revisit the TCP read function: 154 155 ```rust 156 async fn read(&self, buf: &mut [u8]) -> io::Result<usize> { 157 loop { 158 // Await readiness 159 let event = self.readiness(interest).await?; 160 161 match self.mio_socket.read(buf) { 162 Ok(v) => return Ok(v), 163 Err(ref e) if e.kind() == WouldBlock => { 164 self.clear_readiness(event); 165 } 166 Err(e) => return Err(e), 167 } 168 } 169 } 170 ``` 171 172 If care is not taken, if between `mio_socket.read(buf)` returning and 173 `clear_readiness(event)` is called, a readiness event arrives, the `read()` 174 function could deadlock. This happens because the readiness event is received, 175 `clear_readiness()` unsets the readiness event, and on the next iteration, 176 `readiness().await` will block forever as a new readiness event is not received. 177 178 The current I/O driver handles this condition by always registering the task's 179 waker before performing the operation. This is not ideal as it will result in 180 unnecessary task notification. 181 182 Instead, we will use a strategy to prevent clearing readiness if an "unseen" 183 readiness event has been received. The I/O driver will maintain a "tick" value. 184 Every time the `mio` `poll()` function is called, the tick is incremented. Each 185 readiness event has an associated tick. When the I/O driver sets the resource's 186 readiness, the driver's tick is packed into the atomic `usize`. 187 188 The `ScheduledIo` readiness `AtomicUsize` is structured as: 189 190 ``` 191 | shutdown | generation | driver tick | readiness | 192 |----------+------------+--------------+-----------| 193 | 1 bit | 7 bits + 8 bits + 16 bits | 194 ``` 195 196 The `shutdown` and `generation` components exist today. 197 198 The `readiness()` function returns a `ReadyEvent` value. This value includes the 199 `tick` component read with the resource's readiness value. When 200 `clear_readiness()` is called, the `ReadyEvent` is provided. Readiness is only 201 cleared if the current `tick` matches the `tick` included in the `ReadyEvent`. 202 If the tick values do not match, the call to `readiness()` on the next iteration 203 will not block and the new `tick` is included in the new `ReadyToken.` 204 205 TODO 206 207 ## Implementing `AsyncRead` / `AsyncWrite` 208 209 The `AsyncRead` and `AsyncWrite` traits use a "poll" based API. This means that 210 it is not possible to use an intrusive linked list to track the waker. 211 Additionally, there is no future associated with the operation which means it is 212 not possible to cancel interest in the readiness events. 213 214 To implement `AsyncRead` and `AsyncWrite`, `ScheduledIo` includes dedicated 215 waker values for the read direction and the write direction. These values are 216 used to store the waker. Specific `interest` is not tracked for `AsyncRead` and 217 `AsyncWrite` implementations. It is assumed that only events of interest are: 218 219 * Read ready 220 * Read closed 221 * Write ready 222 * Write closed 223 224 Note that "read closed" and "write closed" are only available with Mio 0.7. With 225 Mio 0.6, things were a bit messy. 226 227 It is only possible to implement `AsyncRead` and `AsyncWrite` for resource types 228 themselves and not for `&Resource`. Implementing the traits for `&Resource` 229 would permit concurrent operations to the resource. Because only a single waker 230 is stored per direction, any concurrent usage would result in deadlocks. An 231 alternate implementation would call for a `Vec<Waker>` but this would result in 232 memory leaks. 233 234 ## Enabling reads and writes for `&TcpStream` 235 236 Instead of implementing `AsyncRead` and `AsyncWrite` for `&TcpStream`, a new 237 function is added to `TcpStream`. 238 239 ```rust 240 impl TcpStream { 241 /// Naming TBD 242 fn by_ref(&self) -> TcpStreamRef<'_>; 243 } 244 245 struct TcpStreamRef<'a> { 246 stream: &'a TcpStream, 247 248 // `Waiter` is the node in the intrusive waiter linked-list 249 read_waiter: Waiter, 250 write_waiter: Waiter, 251 } 252 ``` 253 254 Now, `AsyncRead` and `AsyncWrite` can be implemented on `TcpStreamRef<'a>`. When 255 the `TcpStreamRef` is dropped, all associated waker resources are cleaned up. 256 257 ### Removing all the `split()` functions 258 259 With `TcpStream::by_ref()`, `TcpStream::split()` is no longer needed. Instead, 260 it is possible to do something as follows. 261 262 ```rust 263 let rd = my_stream.by_ref(); 264 let wr = my_stream.by_ref(); 265 266 select! { 267 // use `rd` and `wr` in separate branches. 268 } 269 ``` 270 271 It is also possible to store a `TcpStream` in an `Arc`. 272 273 ```rust 274 let arc_stream = Arc::new(my_tcp_stream); 275 let n = arc_stream.by_ref().read(buf).await?; 276 ``` 277