1 use std::ops::{Deref, DerefMut}; 2 #[cfg(any(unix, target_os = "wasi"))] 3 use std::os::fd::AsRawFd; 4 // TODO: once <https://github.com/rust-lang/rust/issues/126198> is fixed this 5 // can use `std::os::fd` and be merged with the above. 6 #[cfg(target_os = "hermit")] 7 use std::os::hermit::io::AsRawFd; 8 #[cfg(windows)] 9 use std::os::windows::io::AsRawSocket; 10 #[cfg(debug_assertions)] 11 use std::sync::atomic::{AtomicUsize, Ordering}; 12 use std::{fmt, io}; 13 14 use crate::sys::IoSourceState; 15 use crate::{event, Interest, Registry, Token}; 16 17 /// Adapter for a [`RawFd`] or [`RawSocket`] providing an [`event::Source`] 18 /// implementation. 19 /// 20 /// `IoSource` enables registering any FD or socket wrapper with [`Poll`]. 21 /// 22 /// While only implementations for TCP, UDP, and UDS (Unix only) are provided, 23 /// Mio supports registering any FD or socket that can be registered with the 24 /// underlying OS selector. `IoSource` provides the necessary bridge. 25 /// 26 /// [`RawFd`]: std::os::fd::RawFd 27 /// [`RawSocket`]: std::os::windows::io::RawSocket 28 /// 29 /// # Notes 30 /// 31 /// To handle the registrations and events properly **all** I/O operations (such 32 /// as `read`, `write`, etc.) must go through the [`do_io`] method to ensure the 33 /// internal state is updated accordingly. 34 /// 35 /// [`Poll`]: crate::Poll 36 /// [`do_io`]: IoSource::do_io 37 pub struct IoSource<T> { 38 state: IoSourceState, 39 inner: T, 40 #[cfg(debug_assertions)] 41 selector_id: SelectorId, 42 } 43 44 impl<T> IoSource<T> { 45 /// Create a new `IoSource`. new(io: T) -> IoSource<T>46 pub fn new(io: T) -> IoSource<T> { 47 IoSource { 48 state: IoSourceState::new(), 49 inner: io, 50 #[cfg(debug_assertions)] 51 selector_id: SelectorId::new(), 52 } 53 } 54 55 /// Execute an I/O operations ensuring that the socket receives more events 56 /// if it hits a [`WouldBlock`] error. 57 /// 58 /// # Notes 59 /// 60 /// This method is required to be called for **all** I/O operations to 61 /// ensure the user will receive events once the socket is ready again after 62 /// returning a [`WouldBlock`] error. 63 /// 64 /// [`WouldBlock`]: io::ErrorKind::WouldBlock do_io<F, R>(&self, f: F) -> io::Result<R> where F: FnOnce(&T) -> io::Result<R>,65 pub fn do_io<F, R>(&self, f: F) -> io::Result<R> 66 where 67 F: FnOnce(&T) -> io::Result<R>, 68 { 69 self.state.do_io(f, &self.inner) 70 } 71 72 /// Returns the I/O source, dropping the state. 73 /// 74 /// # Notes 75 /// 76 /// To ensure no more events are to be received for this I/O source first 77 /// [`deregister`] it. 78 /// 79 /// [`deregister`]: Registry::deregister into_inner(self) -> T80 pub fn into_inner(self) -> T { 81 self.inner 82 } 83 } 84 85 /// Be careful when using this method. All I/O operations that may block must go 86 /// through the [`do_io`] method. 87 /// 88 /// [`do_io`]: IoSource::do_io 89 impl<T> Deref for IoSource<T> { 90 type Target = T; 91 deref(&self) -> &Self::Target92 fn deref(&self) -> &Self::Target { 93 &self.inner 94 } 95 } 96 97 /// Be careful when using this method. All I/O operations that may block must go 98 /// through the [`do_io`] method. 99 /// 100 /// [`do_io`]: IoSource::do_io 101 impl<T> DerefMut for IoSource<T> { deref_mut(&mut self) -> &mut Self::Target102 fn deref_mut(&mut self) -> &mut Self::Target { 103 &mut self.inner 104 } 105 } 106 107 #[cfg(any(unix, target_os = "hermit"))] 108 impl<T> event::Source for IoSource<T> 109 where 110 T: AsRawFd, 111 { register( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>112 fn register( 113 &mut self, 114 registry: &Registry, 115 token: Token, 116 interests: Interest, 117 ) -> io::Result<()> { 118 #[cfg(debug_assertions)] 119 self.selector_id.associate(registry)?; 120 self.state 121 .register(registry, token, interests, self.inner.as_raw_fd()) 122 } 123 reregister( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>124 fn reregister( 125 &mut self, 126 registry: &Registry, 127 token: Token, 128 interests: Interest, 129 ) -> io::Result<()> { 130 #[cfg(debug_assertions)] 131 self.selector_id.check_association(registry)?; 132 self.state 133 .reregister(registry, token, interests, self.inner.as_raw_fd()) 134 } 135 deregister(&mut self, registry: &Registry) -> io::Result<()>136 fn deregister(&mut self, registry: &Registry) -> io::Result<()> { 137 #[cfg(debug_assertions)] 138 self.selector_id.remove_association(registry)?; 139 self.state.deregister(registry, self.inner.as_raw_fd()) 140 } 141 } 142 143 #[cfg(windows)] 144 impl<T> event::Source for IoSource<T> 145 where 146 T: AsRawSocket, 147 { register( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>148 fn register( 149 &mut self, 150 registry: &Registry, 151 token: Token, 152 interests: Interest, 153 ) -> io::Result<()> { 154 #[cfg(debug_assertions)] 155 self.selector_id.associate(registry)?; 156 self.state 157 .register(registry, token, interests, self.inner.as_raw_socket()) 158 } 159 reregister( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>160 fn reregister( 161 &mut self, 162 registry: &Registry, 163 token: Token, 164 interests: Interest, 165 ) -> io::Result<()> { 166 #[cfg(debug_assertions)] 167 self.selector_id.check_association(registry)?; 168 self.state.reregister(registry, token, interests) 169 } 170 deregister(&mut self, _registry: &Registry) -> io::Result<()>171 fn deregister(&mut self, _registry: &Registry) -> io::Result<()> { 172 #[cfg(debug_assertions)] 173 self.selector_id.remove_association(_registry)?; 174 self.state.deregister() 175 } 176 } 177 178 #[cfg(target_os = "wasi")] 179 impl<T> event::Source for IoSource<T> 180 where 181 T: AsRawFd, 182 { register( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>183 fn register( 184 &mut self, 185 registry: &Registry, 186 token: Token, 187 interests: Interest, 188 ) -> io::Result<()> { 189 #[cfg(debug_assertions)] 190 self.selector_id.associate(registry)?; 191 registry 192 .selector() 193 .register(self.inner.as_raw_fd() as _, token, interests) 194 } 195 reregister( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>196 fn reregister( 197 &mut self, 198 registry: &Registry, 199 token: Token, 200 interests: Interest, 201 ) -> io::Result<()> { 202 #[cfg(debug_assertions)] 203 self.selector_id.check_association(registry)?; 204 registry 205 .selector() 206 .reregister(self.inner.as_raw_fd() as _, token, interests) 207 } 208 deregister(&mut self, registry: &Registry) -> io::Result<()>209 fn deregister(&mut self, registry: &Registry) -> io::Result<()> { 210 #[cfg(debug_assertions)] 211 self.selector_id.remove_association(registry)?; 212 registry.selector().deregister(self.inner.as_raw_fd() as _) 213 } 214 } 215 216 impl<T> fmt::Debug for IoSource<T> 217 where 218 T: fmt::Debug, 219 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result220 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 221 self.inner.fmt(f) 222 } 223 } 224 225 /// Used to associate an `IoSource` with a `sys::Selector`. 226 #[cfg(debug_assertions)] 227 #[derive(Debug)] 228 struct SelectorId { 229 id: AtomicUsize, 230 } 231 232 #[cfg(debug_assertions)] 233 impl SelectorId { 234 /// Value of `id` if `SelectorId` is not associated with any 235 /// `sys::Selector`. Valid selector ids start at 1. 236 const UNASSOCIATED: usize = 0; 237 238 /// Create a new `SelectorId`. new() -> SelectorId239 const fn new() -> SelectorId { 240 SelectorId { 241 id: AtomicUsize::new(Self::UNASSOCIATED), 242 } 243 } 244 245 /// Associate an I/O source with `registry`, returning an error if its 246 /// already registered. associate(&self, registry: &Registry) -> io::Result<()>247 fn associate(&self, registry: &Registry) -> io::Result<()> { 248 let registry_id = registry.selector().id(); 249 let previous_id = self.id.swap(registry_id, Ordering::AcqRel); 250 251 if previous_id == Self::UNASSOCIATED { 252 Ok(()) 253 } else { 254 Err(io::Error::new( 255 io::ErrorKind::AlreadyExists, 256 "I/O source already registered with a `Registry`", 257 )) 258 } 259 } 260 261 /// Check the association of an I/O source with `registry`, returning an 262 /// error if its registered with a different `Registry` or not registered at 263 /// all. check_association(&self, registry: &Registry) -> io::Result<()>264 fn check_association(&self, registry: &Registry) -> io::Result<()> { 265 let registry_id = registry.selector().id(); 266 let id = self.id.load(Ordering::Acquire); 267 268 if id == registry_id { 269 Ok(()) 270 } else if id == Self::UNASSOCIATED { 271 Err(io::Error::new( 272 io::ErrorKind::NotFound, 273 "I/O source not registered with `Registry`", 274 )) 275 } else { 276 Err(io::Error::new( 277 io::ErrorKind::AlreadyExists, 278 "I/O source already registered with a different `Registry`", 279 )) 280 } 281 } 282 283 /// Remove a previously made association from `registry`, returns an error 284 /// if it was not previously associated with `registry`. remove_association(&self, registry: &Registry) -> io::Result<()>285 fn remove_association(&self, registry: &Registry) -> io::Result<()> { 286 let registry_id = registry.selector().id(); 287 let previous_id = self.id.swap(Self::UNASSOCIATED, Ordering::AcqRel); 288 289 if previous_id == registry_id { 290 Ok(()) 291 } else { 292 Err(io::Error::new( 293 io::ErrorKind::NotFound, 294 "I/O source not registered with `Registry`", 295 )) 296 } 297 } 298 } 299 300 #[cfg(debug_assertions)] 301 impl Clone for SelectorId { clone(&self) -> SelectorId302 fn clone(&self) -> SelectorId { 303 SelectorId { 304 id: AtomicUsize::new(self.id.load(Ordering::Acquire)), 305 } 306 } 307 } 308