1  use std::ops::{Deref, DerefMut};
2  #[cfg(any(unix, target_os = "wasi"))]
3  use std::os::fd::AsRawFd;
4  // TODO: once <https://github.com/rust-lang/rust/issues/126198> is fixed this
5  // can use `std::os::fd` and be merged with the above.
6  #[cfg(target_os = "hermit")]
7  use std::os::hermit::io::AsRawFd;
8  #[cfg(windows)]
9  use std::os::windows::io::AsRawSocket;
10  #[cfg(debug_assertions)]
11  use std::sync::atomic::{AtomicUsize, Ordering};
12  use std::{fmt, io};
13  
14  use crate::sys::IoSourceState;
15  use crate::{event, Interest, Registry, Token};
16  
17  /// Adapter for a [`RawFd`] or [`RawSocket`] providing an [`event::Source`]
18  /// implementation.
19  ///
20  /// `IoSource` enables registering any FD or socket wrapper with [`Poll`].
21  ///
22  /// While only implementations for TCP, UDP, and UDS (Unix only) are provided,
23  /// Mio supports registering any FD or socket that can be registered with the
24  /// underlying OS selector. `IoSource` provides the necessary bridge.
25  ///
26  /// [`RawFd`]: std::os::fd::RawFd
27  /// [`RawSocket`]: std::os::windows::io::RawSocket
28  ///
29  /// # Notes
30  ///
31  /// To handle the registrations and events properly **all** I/O operations (such
32  /// as `read`, `write`, etc.) must go through the [`do_io`] method to ensure the
33  /// internal state is updated accordingly.
34  ///
35  /// [`Poll`]: crate::Poll
36  /// [`do_io`]: IoSource::do_io
37  pub struct IoSource<T> {
38      state: IoSourceState,
39      inner: T,
40      #[cfg(debug_assertions)]
41      selector_id: SelectorId,
42  }
43  
44  impl<T> IoSource<T> {
45      /// Create a new `IoSource`.
new(io: T) -> IoSource<T>46      pub fn new(io: T) -> IoSource<T> {
47          IoSource {
48              state: IoSourceState::new(),
49              inner: io,
50              #[cfg(debug_assertions)]
51              selector_id: SelectorId::new(),
52          }
53      }
54  
55      /// Execute an I/O operations ensuring that the socket receives more events
56      /// if it hits a [`WouldBlock`] error.
57      ///
58      /// # Notes
59      ///
60      /// This method is required to be called for **all** I/O operations to
61      /// ensure the user will receive events once the socket is ready again after
62      /// returning a [`WouldBlock`] error.
63      ///
64      /// [`WouldBlock`]: io::ErrorKind::WouldBlock
do_io<F, R>(&self, f: F) -> io::Result<R> where F: FnOnce(&T) -> io::Result<R>,65      pub fn do_io<F, R>(&self, f: F) -> io::Result<R>
66      where
67          F: FnOnce(&T) -> io::Result<R>,
68      {
69          self.state.do_io(f, &self.inner)
70      }
71  
72      /// Returns the I/O source, dropping the state.
73      ///
74      /// # Notes
75      ///
76      /// To ensure no more events are to be received for this I/O source first
77      /// [`deregister`] it.
78      ///
79      /// [`deregister`]: Registry::deregister
into_inner(self) -> T80      pub fn into_inner(self) -> T {
81          self.inner
82      }
83  }
84  
85  /// Be careful when using this method. All I/O operations that may block must go
86  /// through the [`do_io`] method.
87  ///
88  /// [`do_io`]: IoSource::do_io
89  impl<T> Deref for IoSource<T> {
90      type Target = T;
91  
deref(&self) -> &Self::Target92      fn deref(&self) -> &Self::Target {
93          &self.inner
94      }
95  }
96  
97  /// Be careful when using this method. All I/O operations that may block must go
98  /// through the [`do_io`] method.
99  ///
100  /// [`do_io`]: IoSource::do_io
101  impl<T> DerefMut for IoSource<T> {
deref_mut(&mut self) -> &mut Self::Target102      fn deref_mut(&mut self) -> &mut Self::Target {
103          &mut self.inner
104      }
105  }
106  
107  #[cfg(any(unix, target_os = "hermit"))]
108  impl<T> event::Source for IoSource<T>
109  where
110      T: AsRawFd,
111  {
register( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>112      fn register(
113          &mut self,
114          registry: &Registry,
115          token: Token,
116          interests: Interest,
117      ) -> io::Result<()> {
118          #[cfg(debug_assertions)]
119          self.selector_id.associate(registry)?;
120          self.state
121              .register(registry, token, interests, self.inner.as_raw_fd())
122      }
123  
reregister( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>124      fn reregister(
125          &mut self,
126          registry: &Registry,
127          token: Token,
128          interests: Interest,
129      ) -> io::Result<()> {
130          #[cfg(debug_assertions)]
131          self.selector_id.check_association(registry)?;
132          self.state
133              .reregister(registry, token, interests, self.inner.as_raw_fd())
134      }
135  
deregister(&mut self, registry: &Registry) -> io::Result<()>136      fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
137          #[cfg(debug_assertions)]
138          self.selector_id.remove_association(registry)?;
139          self.state.deregister(registry, self.inner.as_raw_fd())
140      }
141  }
142  
143  #[cfg(windows)]
144  impl<T> event::Source for IoSource<T>
145  where
146      T: AsRawSocket,
147  {
register( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>148      fn register(
149          &mut self,
150          registry: &Registry,
151          token: Token,
152          interests: Interest,
153      ) -> io::Result<()> {
154          #[cfg(debug_assertions)]
155          self.selector_id.associate(registry)?;
156          self.state
157              .register(registry, token, interests, self.inner.as_raw_socket())
158      }
159  
reregister( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>160      fn reregister(
161          &mut self,
162          registry: &Registry,
163          token: Token,
164          interests: Interest,
165      ) -> io::Result<()> {
166          #[cfg(debug_assertions)]
167          self.selector_id.check_association(registry)?;
168          self.state.reregister(registry, token, interests)
169      }
170  
deregister(&mut self, _registry: &Registry) -> io::Result<()>171      fn deregister(&mut self, _registry: &Registry) -> io::Result<()> {
172          #[cfg(debug_assertions)]
173          self.selector_id.remove_association(_registry)?;
174          self.state.deregister()
175      }
176  }
177  
178  #[cfg(target_os = "wasi")]
179  impl<T> event::Source for IoSource<T>
180  where
181      T: AsRawFd,
182  {
register( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>183      fn register(
184          &mut self,
185          registry: &Registry,
186          token: Token,
187          interests: Interest,
188      ) -> io::Result<()> {
189          #[cfg(debug_assertions)]
190          self.selector_id.associate(registry)?;
191          registry
192              .selector()
193              .register(self.inner.as_raw_fd() as _, token, interests)
194      }
195  
reregister( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>196      fn reregister(
197          &mut self,
198          registry: &Registry,
199          token: Token,
200          interests: Interest,
201      ) -> io::Result<()> {
202          #[cfg(debug_assertions)]
203          self.selector_id.check_association(registry)?;
204          registry
205              .selector()
206              .reregister(self.inner.as_raw_fd() as _, token, interests)
207      }
208  
deregister(&mut self, registry: &Registry) -> io::Result<()>209      fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
210          #[cfg(debug_assertions)]
211          self.selector_id.remove_association(registry)?;
212          registry.selector().deregister(self.inner.as_raw_fd() as _)
213      }
214  }
215  
216  impl<T> fmt::Debug for IoSource<T>
217  where
218      T: fmt::Debug,
219  {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result220      fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221          self.inner.fmt(f)
222      }
223  }
224  
225  /// Used to associate an `IoSource` with a `sys::Selector`.
226  #[cfg(debug_assertions)]
227  #[derive(Debug)]
228  struct SelectorId {
229      id: AtomicUsize,
230  }
231  
232  #[cfg(debug_assertions)]
233  impl SelectorId {
234      /// Value of `id` if `SelectorId` is not associated with any
235      /// `sys::Selector`. Valid selector ids start at 1.
236      const UNASSOCIATED: usize = 0;
237  
238      /// Create a new `SelectorId`.
new() -> SelectorId239      const fn new() -> SelectorId {
240          SelectorId {
241              id: AtomicUsize::new(Self::UNASSOCIATED),
242          }
243      }
244  
245      /// Associate an I/O source with `registry`, returning an error if its
246      /// already registered.
associate(&self, registry: &Registry) -> io::Result<()>247      fn associate(&self, registry: &Registry) -> io::Result<()> {
248          let registry_id = registry.selector().id();
249          let previous_id = self.id.swap(registry_id, Ordering::AcqRel);
250  
251          if previous_id == Self::UNASSOCIATED {
252              Ok(())
253          } else {
254              Err(io::Error::new(
255                  io::ErrorKind::AlreadyExists,
256                  "I/O source already registered with a `Registry`",
257              ))
258          }
259      }
260  
261      /// Check the association of an I/O source with `registry`, returning an
262      /// error if its registered with a different `Registry` or not registered at
263      /// all.
check_association(&self, registry: &Registry) -> io::Result<()>264      fn check_association(&self, registry: &Registry) -> io::Result<()> {
265          let registry_id = registry.selector().id();
266          let id = self.id.load(Ordering::Acquire);
267  
268          if id == registry_id {
269              Ok(())
270          } else if id == Self::UNASSOCIATED {
271              Err(io::Error::new(
272                  io::ErrorKind::NotFound,
273                  "I/O source not registered with `Registry`",
274              ))
275          } else {
276              Err(io::Error::new(
277                  io::ErrorKind::AlreadyExists,
278                  "I/O source already registered with a different `Registry`",
279              ))
280          }
281      }
282  
283      /// Remove a previously made association from `registry`, returns an error
284      /// if it was not previously associated with `registry`.
remove_association(&self, registry: &Registry) -> io::Result<()>285      fn remove_association(&self, registry: &Registry) -> io::Result<()> {
286          let registry_id = registry.selector().id();
287          let previous_id = self.id.swap(Self::UNASSOCIATED, Ordering::AcqRel);
288  
289          if previous_id == registry_id {
290              Ok(())
291          } else {
292              Err(io::Error::new(
293                  io::ErrorKind::NotFound,
294                  "I/O source not registered with `Registry`",
295              ))
296          }
297      }
298  }
299  
300  #[cfg(debug_assertions)]
301  impl Clone for SelectorId {
clone(&self) -> SelectorId302      fn clone(&self) -> SelectorId {
303          SelectorId {
304              id: AtomicUsize::new(self.id.load(Ordering::Acquire)),
305          }
306      }
307  }
308