1 // Copyright 2019 Intel Corporation. All Rights Reserved. 2 // Copyright 2019-2021 Alibaba Cloud Computing. All rights reserved. 3 // 4 // SPDX-License-Identifier: Apache-2.0 5 6 //! A simple framework to run a vhost-user backend service. 7 8 #[macro_use] 9 extern crate log; 10 11 use std::fmt::{Display, Formatter}; 12 use std::sync::{Arc, Mutex}; 13 use std::thread; 14 15 use vhost::vhost_user::{Error as VhostUserError, Listener, SlaveListener, SlaveReqHandler}; 16 use vm_memory::bitmap::Bitmap; 17 use vm_memory::mmap::NewBitmap; 18 use vm_memory::{GuestMemoryAtomic, GuestMemoryMmap}; 19 20 use self::handler::VhostUserHandler; 21 22 mod backend; 23 pub use self::backend::{VhostUserBackend, VhostUserBackendMut}; 24 25 mod event_loop; 26 pub use self::event_loop::VringEpollHandler; 27 28 mod handler; 29 pub use self::handler::VhostUserHandlerError; 30 31 mod vring; 32 pub use self::vring::{ 33 VringMutex, VringRwLock, VringState, VringStateGuard, VringStateMutGuard, VringT, 34 }; 35 36 /// An alias for `GuestMemoryAtomic<GuestMemoryMmap<B>>` to simplify code. 37 type GM<B> = GuestMemoryAtomic<GuestMemoryMmap<B>>; 38 39 #[derive(Debug)] 40 /// Errors related to vhost-user daemon. 41 pub enum Error { 42 /// Failed to create a new vhost-user handler. 43 NewVhostUserHandler(VhostUserHandlerError), 44 /// Failed creating vhost-user slave listener. 45 CreateSlaveListener(VhostUserError), 46 /// Failed creating vhost-user slave handler. 47 CreateSlaveReqHandler(VhostUserError), 48 /// Failed starting daemon thread. 49 StartDaemon(std::io::Error), 50 /// Failed waiting for daemon thread. 51 WaitDaemon(std::boxed::Box<dyn std::any::Any + std::marker::Send>), 52 /// Failed handling a vhost-user request. 53 HandleRequest(VhostUserError), 54 } 55 56 impl Display for Error { fmt(&self, f: &mut Formatter) -> std::fmt::Result57 fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { 58 match self { 59 Error::NewVhostUserHandler(e) => write!(f, "cannot create vhost user handler: {}", e), 60 Error::CreateSlaveListener(e) => write!(f, "cannot create slave listener: {}", e), 61 Error::CreateSlaveReqHandler(e) => write!(f, "cannot create slave req handler: {}", e), 62 Error::StartDaemon(e) => write!(f, "failed to start daemon: {}", e), 63 Error::WaitDaemon(_e) => write!(f, "failed to wait for daemon exit"), 64 Error::HandleRequest(e) => write!(f, "failed to handle request: {}", e), 65 } 66 } 67 } 68 69 /// Result of vhost-user daemon operations. 70 pub type Result<T> = std::result::Result<T, Error>; 71 72 /// Implement a simple framework to run a vhost-user service daemon. 73 /// 74 /// This structure is the public API the backend is allowed to interact with in order to run 75 /// a fully functional vhost-user daemon. 76 pub struct VhostUserDaemon<S, V, B: Bitmap + 'static = ()> { 77 name: String, 78 handler: Arc<Mutex<VhostUserHandler<S, V, B>>>, 79 main_thread: Option<thread::JoinHandle<Result<()>>>, 80 } 81 82 impl<S, V, B> VhostUserDaemon<S, V, B> 83 where 84 S: VhostUserBackend<V, B> + Clone + 'static, 85 V: VringT<GM<B>> + Clone + Send + Sync + 'static, 86 B: NewBitmap + Clone + Send + Sync, 87 { 88 /// Create the daemon instance, providing the backend implementation of `VhostUserBackend`. 89 /// 90 /// Under the hood, this will start a dedicated thread responsible for listening onto 91 /// registered event. Those events can be vring events or custom events from the backend, 92 /// but they get to be registered later during the sequence. new( name: String, backend: S, atomic_mem: GuestMemoryAtomic<GuestMemoryMmap<B>>, ) -> Result<Self>93 pub fn new( 94 name: String, 95 backend: S, 96 atomic_mem: GuestMemoryAtomic<GuestMemoryMmap<B>>, 97 ) -> Result<Self> { 98 let handler = Arc::new(Mutex::new( 99 VhostUserHandler::new(backend, atomic_mem).map_err(Error::NewVhostUserHandler)?, 100 )); 101 102 Ok(VhostUserDaemon { 103 name, 104 handler, 105 main_thread: None, 106 }) 107 } 108 109 /// Run a dedicated thread handling all requests coming through the socket. 110 /// This runs in an infinite loop that should be terminating once the other 111 /// end of the socket (the VMM) hangs up. 112 /// 113 /// This function is the common code for starting a new daemon, no matter if 114 /// it acts as a client or a server. start_daemon( &mut self, mut handler: SlaveReqHandler<Mutex<VhostUserHandler<S, V, B>>>, ) -> Result<()>115 fn start_daemon( 116 &mut self, 117 mut handler: SlaveReqHandler<Mutex<VhostUserHandler<S, V, B>>>, 118 ) -> Result<()> { 119 let handle = thread::Builder::new() 120 .name(self.name.clone()) 121 .spawn(move || loop { 122 handler.handle_request().map_err(Error::HandleRequest)?; 123 }) 124 .map_err(Error::StartDaemon)?; 125 126 self.main_thread = Some(handle); 127 128 Ok(()) 129 } 130 131 /// Connect to the vhost-user socket and run a dedicated thread handling 132 /// all requests coming through this socket. This runs in an infinite loop 133 /// that should be terminating once the other end of the socket (the VMM) 134 /// hangs up. start_client(&mut self, socket_path: &str) -> Result<()>135 pub fn start_client(&mut self, socket_path: &str) -> Result<()> { 136 let slave_handler = SlaveReqHandler::connect(socket_path, self.handler.clone()) 137 .map_err(Error::CreateSlaveReqHandler)?; 138 self.start_daemon(slave_handler) 139 } 140 141 /// Listen to the vhost-user socket and run a dedicated thread handling all requests coming 142 /// through this socket. 143 /// 144 /// This runs in an infinite loop that should be terminating once the other end of the socket 145 /// (the VMM) disconnects. 146 // TODO: the current implementation has limitations that only one incoming connection will be 147 // handled from the listener. Should it be enhanced to support reconnection? start(&mut self, listener: Listener) -> Result<()>148 pub fn start(&mut self, listener: Listener) -> Result<()> { 149 let mut slave_listener = SlaveListener::new(listener, self.handler.clone()) 150 .map_err(Error::CreateSlaveListener)?; 151 let slave_handler = self.accept(&mut slave_listener)?; 152 self.start_daemon(slave_handler) 153 } 154 accept( &self, slave_listener: &mut SlaveListener<Mutex<VhostUserHandler<S, V, B>>>, ) -> Result<SlaveReqHandler<Mutex<VhostUserHandler<S, V, B>>>>155 fn accept( 156 &self, 157 slave_listener: &mut SlaveListener<Mutex<VhostUserHandler<S, V, B>>>, 158 ) -> Result<SlaveReqHandler<Mutex<VhostUserHandler<S, V, B>>>> { 159 loop { 160 match slave_listener.accept() { 161 Err(e) => return Err(Error::CreateSlaveListener(e)), 162 Ok(Some(v)) => return Ok(v), 163 Ok(None) => continue, 164 } 165 } 166 } 167 168 /// Wait for the thread handling the vhost-user socket connection to terminate. wait(&mut self) -> Result<()>169 pub fn wait(&mut self) -> Result<()> { 170 if let Some(handle) = self.main_thread.take() { 171 match handle.join().map_err(Error::WaitDaemon)? { 172 Ok(()) => Ok(()), 173 Err(Error::HandleRequest(VhostUserError::SocketBroken(_))) => Ok(()), 174 Err(e) => Err(e), 175 } 176 } else { 177 Ok(()) 178 } 179 } 180 181 /// Retrieve the vring epoll handler. 182 /// 183 /// This is necessary to perform further actions like registering and unregistering some extra 184 /// event file descriptors. get_epoll_handlers(&self) -> Vec<Arc<VringEpollHandler<S, V, B>>>185 pub fn get_epoll_handlers(&self) -> Vec<Arc<VringEpollHandler<S, V, B>>> { 186 // Do not expect poisoned lock. 187 self.handler.lock().unwrap().get_epoll_handlers() 188 } 189 } 190 191 #[cfg(test)] 192 mod tests { 193 use super::backend::tests::MockVhostBackend; 194 use super::*; 195 use std::os::unix::net::{UnixListener, UnixStream}; 196 use std::sync::Barrier; 197 use vm_memory::{GuestAddress, GuestMemoryAtomic, GuestMemoryMmap}; 198 199 #[test] test_new_daemon()200 fn test_new_daemon() { 201 let mem = GuestMemoryAtomic::new( 202 GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(), 203 ); 204 let backend = Arc::new(Mutex::new(MockVhostBackend::new())); 205 let mut daemon = VhostUserDaemon::new("test".to_owned(), backend, mem).unwrap(); 206 207 let handlers = daemon.get_epoll_handlers(); 208 assert_eq!(handlers.len(), 2); 209 210 let barrier = Arc::new(Barrier::new(2)); 211 let tmpdir = tempfile::tempdir().unwrap(); 212 let mut path = tmpdir.path().to_path_buf(); 213 path.push("socket"); 214 215 let barrier2 = barrier.clone(); 216 let path1 = path.clone(); 217 let thread = thread::spawn(move || { 218 barrier2.wait(); 219 let socket = UnixStream::connect(&path1).unwrap(); 220 barrier2.wait(); 221 drop(socket) 222 }); 223 224 let listener = Listener::new(&path, false).unwrap(); 225 barrier.wait(); 226 daemon.start(listener).unwrap(); 227 barrier.wait(); 228 // Above process generates a `HandleRequest(PartialMessage)` error. 229 daemon.wait().unwrap_err(); 230 daemon.wait().unwrap(); 231 thread.join().unwrap(); 232 } 233 234 #[test] test_new_daemon_client()235 fn test_new_daemon_client() { 236 let mem = GuestMemoryAtomic::new( 237 GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(), 238 ); 239 let backend = Arc::new(Mutex::new(MockVhostBackend::new())); 240 let mut daemon = VhostUserDaemon::new("test".to_owned(), backend, mem).unwrap(); 241 242 let handlers = daemon.get_epoll_handlers(); 243 assert_eq!(handlers.len(), 2); 244 245 let barrier = Arc::new(Barrier::new(2)); 246 let tmpdir = tempfile::tempdir().unwrap(); 247 let mut path = tmpdir.path().to_path_buf(); 248 path.push("socket"); 249 250 let barrier2 = barrier.clone(); 251 let path1 = path.clone(); 252 let thread = thread::spawn(move || { 253 let listener = UnixListener::bind(&path1).unwrap(); 254 barrier2.wait(); 255 let (stream, _) = listener.accept().unwrap(); 256 barrier2.wait(); 257 drop(stream) 258 }); 259 260 barrier.wait(); 261 daemon 262 .start_client(path.as_path().to_str().unwrap()) 263 .unwrap(); 264 barrier.wait(); 265 // Above process generates a `HandleRequest(PartialMessage)` error. 266 daemon.wait().unwrap_err(); 267 daemon.wait().unwrap(); 268 thread.join().unwrap(); 269 } 270 } 271