1 # Refactor I/O driver
2 
3 Describes changes to the I/O driver for the Tokio 0.3 release.
4 
5 ## Goals
6 
7 * Support `async fn` on I/O types with `&self`.
8 * Refine the `Registration` API.
9 
10 ### Non-goals
11 
12 * Implement `AsyncRead` / `AsyncWrite` for `&TcpStream` or other reference type.
13 
14 ## Overview
15 
16 Currently, I/O types require `&mut self` for `async` functions. The reason for
17 this is the task's waker is stored in the I/O resource's internal state
18 (`ScheduledIo`) instead of in the future returned by the `async` function.
19 Because of this limitation, I/O types limit the number of wakers to one per
20 direction (a direction is either read-related events or write-related events).
21 
22 Moving the waker from the internal I/O resource's state to the operation's
23 future enables multiple wakers to be registered per operation. The "intrusive
24 wake list" strategy used by `Notify` applies to this case, though there are some
25 concerns unique to the I/O driver.
26 
27 ## Reworking the `Registration` type
28 
29 While `Registration` is made private (per #2728), it remains in Tokio as an
30 implementation detail backing I/O resources such as `TcpStream`. The API of
31 `Registration` is updated to support waiting for an arbitrary interest set with
32 `&self`. This supports concurrent waiters with a different readiness interest.
33 
34 ```rust
35 struct Registration { ... }
36 
37 // TODO: naming
38 struct ReadyEvent {
39     tick: u32,
40     ready: mio::Ready,
41 }
42 
43 impl Registration {
44     /// `interest` must be a super set of **all** interest sets specified in
45     /// the other methods. This is the interest set passed to `mio`.
46     pub fn new<T>(io: &T, interest: mio::Ready) -> io::Result<Registration>
47         where T: mio::Evented;
48 
49     /// Awaits for any readiness event included in `interest`. Returns a
50     /// `ReadyEvent` representing the received readiness event.
51     async fn readiness(&self, interest: mio::Ready) -> io::Result<ReadyEvent>;
52 
53     /// Clears resource level readiness represented by the specified `ReadyEvent`
54     async fn clear_readiness(&self, ready_event: ReadyEvent);
55 ```
56 
57 A new registration is created for a `T: mio::Evented` and a `interest`. This
58 creates a `ScheduledIo` entry with the I/O driver and registers the resource
59 with `mio`.
60 
61 Because Tokio uses **edge-triggered** notifications, the I/O driver only
62 receives readiness from the OS once the ready state **changes**. The I/O driver
63 must track each resource's known readiness state. This helps prevent syscalls
64 when the process knows the syscall should return with `EWOULDBLOCK`.
65 
66 A call to `readiness()` checks if the currently known resource readiness
67 overlaps with `interest`. If it does, then the `readiness()` immediately
68 returns. If it does not, then the task waits until the I/O driver receives a
69 readiness event.
70 
71 The pseudocode to perform a TCP read is as follows.
72 
73 ```rust
74 async fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
75     loop {
76         // Await readiness
77         let event = self.readiness(interest).await?;
78 
79         match self.mio_socket.read(buf) {
80             Ok(v) => return Ok(v),
81             Err(ref e) if e.kind() == WouldBlock => {
82                 self.clear_readiness(event);
83             }
84             Err(e) => return Err(e),
85         }
86     }
87 }
88 ```
89 
90 ## Reworking the `ScheduledIo` type
91 
92 The `ScheduledIo` type is switched to use an intrusive waker linked list. Each
93 entry in the linked list includes the `interest` set passed to `readiness()`.
94 
95 ```rust
96 #[derive(Debug)]
97 pub(crate) struct ScheduledIo {
98     /// Resource's known state packed with other state that must be
99     /// atomically updated.
100     readiness: AtomicUsize,
101 
102     /// Tracks tasks waiting on the resource
103     waiters: Mutex<Waiters>,
104 }
105 
106 #[derive(Debug)]
107 struct Waiters {
108     // List of intrusive waiters.
109     list: LinkedList<Waiter>,
110 
111     /// Waiter used by `AsyncRead` implementations.
112     reader: Option<Waker>,
113 
114     /// Waiter used by `AsyncWrite` implementations.
115     writer: Option<Waker>,
116 }
117 
118 // This struct is contained by the **future** returned by `readiness()`.
119 #[derive(Debug)]
120 struct Waiter {
121     /// Intrusive linked-list pointers
122     pointers: linked_list::Pointers<Waiter>,
123 
124     /// Waker for task waiting on I/O resource
125     waiter: Option<Waker>,
126 
127     /// Readiness events being waited on. This is
128     /// the value passed to `readiness()`
129     interest: mio::Ready,
130 
131     /// Should not be `Unpin`.
132     _p: PhantomPinned,
133 }
134 ```
135 
136 When an I/O event is received from `mio`, the associated resources' readiness is
137 updated and the waiter list is iterated. All waiters with `interest` that
138 overlap the received readiness event are notified. Any waiter with an `interest`
139 that does not overlap the readiness event remains in the list.
140 
141 ## Cancel interest on drop
142 
143 The future returned by `readiness()` uses an intrusive linked list to store the
144 waker with `ScheduledIo`. Because `readiness()` can be called concurrently, many
145 wakers may be stored simultaneously in the list. If the `readiness()` future is
146 dropped early, it is essential that the waker is removed from the list. This
147 prevents leaking memory.
148 
149 ## Race condition
150 
151 Consider how many tasks may concurrently attempt I/O operations. This, combined
152 with how Tokio uses edge-triggered events, can result in a race condition. Let's
153 revisit the TCP read function:
154 
155 ```rust
156 async fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
157     loop {
158         // Await readiness
159         let event = self.readiness(interest).await?;
160 
161         match self.mio_socket.read(buf) {
162             Ok(v) => return Ok(v),
163             Err(ref e) if e.kind() == WouldBlock => {
164                 self.clear_readiness(event);
165             }
166             Err(e) => return Err(e),
167         }
168     }
169 }
170 ```
171 
172 If care is not taken, if between `mio_socket.read(buf)` returning and
173 `clear_readiness(event)` is called, a readiness event arrives, the `read()`
174 function could deadlock. This happens because the readiness event is received,
175 `clear_readiness()` unsets the readiness event, and on the next iteration,
176 `readiness().await` will block forever as a new readiness event is not received.
177 
178 The current I/O driver handles this condition by always registering the task's
179 waker before performing the operation. This is not ideal as it will result in
180 unnecessary task notification.
181 
182 Instead, we will use a strategy to prevent clearing readiness if an "unseen"
183 readiness event has been received. The I/O driver will maintain a "tick" value.
184 Every time the `mio` `poll()` function is called, the tick is incremented. Each
185 readiness event has an associated tick. When the I/O driver sets the resource's
186 readiness, the driver's tick is packed into the atomic `usize`.
187 
188 The `ScheduledIo` readiness `AtomicUsize` is structured as:
189 
190 ```
191 | shutdown | generation |  driver tick | readiness |
192 |----------+------------+--------------+-----------|
193 |   1 bit  |   7 bits   +    8 bits    +  16 bits  |
194 ```
195 
196 The `shutdown` and `generation` components exist today.
197 
198 The `readiness()` function returns a `ReadyEvent` value. This value includes the
199 `tick` component read with the resource's readiness value. When
200 `clear_readiness()` is called, the `ReadyEvent` is provided. Readiness is only
201 cleared if the current `tick` matches the `tick` included in the `ReadyEvent`.
202 If the tick values do not match, the call to `readiness()` on the next iteration
203 will not block and the new `tick` is included in the new `ReadyToken.`
204 
205 TODO
206 
207 ## Implementing `AsyncRead` / `AsyncWrite`
208 
209 The `AsyncRead` and `AsyncWrite` traits use a "poll" based API. This means that
210 it is not possible to use an intrusive linked list to track the waker.
211 Additionally, there is no future associated with the operation which means it is
212 not possible to cancel interest in the readiness events.
213 
214 To implement `AsyncRead` and `AsyncWrite`, `ScheduledIo` includes dedicated
215 waker values for the read direction and the write direction. These values are
216 used to store the waker. Specific `interest` is not tracked for `AsyncRead` and
217 `AsyncWrite` implementations. It is assumed that only events of interest are:
218 
219 * Read ready
220 * Read closed
221 * Write ready
222 * Write closed
223 
224 Note that "read closed" and "write closed" are only available with Mio 0.7. With
225 Mio 0.6, things were a bit messy.
226 
227 It is only possible to implement `AsyncRead` and `AsyncWrite` for resource types
228 themselves and not for `&Resource`. Implementing the traits for `&Resource`
229 would permit concurrent operations to the resource. Because only a single waker
230 is stored per direction, any concurrent usage would result in deadlocks. An
231 alternate implementation would call for a `Vec<Waker>` but this would result in
232 memory leaks.
233 
234 ## Enabling reads and writes for `&TcpStream`
235 
236 Instead of implementing `AsyncRead` and `AsyncWrite` for `&TcpStream`, a new
237 function is added to `TcpStream`.
238 
239 ```rust
240 impl TcpStream {
241     /// Naming TBD
242     fn by_ref(&self) -> TcpStreamRef<'_>;
243 }
244 
245 struct TcpStreamRef<'a> {
246     stream: &'a TcpStream,
247 
248     // `Waiter` is the node in the intrusive waiter linked-list
249     read_waiter: Waiter,
250     write_waiter: Waiter,
251 }
252 ```
253 
254 Now, `AsyncRead` and `AsyncWrite` can be implemented on `TcpStreamRef<'a>`. When
255 the `TcpStreamRef` is dropped, all associated waker resources are cleaned up.
256 
257 ### Removing all the `split()` functions
258 
259 With `TcpStream::by_ref()`, `TcpStream::split()` is no longer needed. Instead,
260 it is possible to do something as follows.
261 
262 ```rust
263 let rd = my_stream.by_ref();
264 let wr = my_stream.by_ref();
265 
266 select! {
267     // use `rd` and `wr` in separate branches.
268 }
269 ```
270 
271 It is also possible to store a `TcpStream` in an `Arc`.
272 
273 ```rust
274 let arc_stream = Arc::new(my_tcp_stream);
275 let n = arc_stream.by_ref().read(buf).await?;
276 ```
277