1 // Copyright 2019 Intel Corporation. All Rights Reserved.
2 // Copyright 2019-2021 Alibaba Cloud Computing. All rights reserved.
3 //
4 // SPDX-License-Identifier: Apache-2.0
5 
6 //! A simple framework to run a vhost-user backend service.
7 
8 #[macro_use]
9 extern crate log;
10 
11 use std::fmt::{Display, Formatter};
12 use std::sync::{Arc, Mutex};
13 use std::thread;
14 
15 use vhost::vhost_user::{Error as VhostUserError, Listener, SlaveListener, SlaveReqHandler};
16 use vm_memory::bitmap::Bitmap;
17 use vm_memory::mmap::NewBitmap;
18 use vm_memory::{GuestMemoryAtomic, GuestMemoryMmap};
19 
20 use self::handler::VhostUserHandler;
21 
22 mod backend;
23 pub use self::backend::{VhostUserBackend, VhostUserBackendMut};
24 
25 mod event_loop;
26 pub use self::event_loop::VringEpollHandler;
27 
28 mod handler;
29 pub use self::handler::VhostUserHandlerError;
30 
31 mod vring;
32 pub use self::vring::{
33     VringMutex, VringRwLock, VringState, VringStateGuard, VringStateMutGuard, VringT,
34 };
35 
36 /// An alias for `GuestMemoryAtomic<GuestMemoryMmap<B>>` to simplify code.
37 type GM<B> = GuestMemoryAtomic<GuestMemoryMmap<B>>;
38 
39 #[derive(Debug)]
40 /// Errors related to vhost-user daemon.
41 pub enum Error {
42     /// Failed to create a new vhost-user handler.
43     NewVhostUserHandler(VhostUserHandlerError),
44     /// Failed creating vhost-user slave listener.
45     CreateSlaveListener(VhostUserError),
46     /// Failed creating vhost-user slave handler.
47     CreateSlaveReqHandler(VhostUserError),
48     /// Failed starting daemon thread.
49     StartDaemon(std::io::Error),
50     /// Failed waiting for daemon thread.
51     WaitDaemon(std::boxed::Box<dyn std::any::Any + std::marker::Send>),
52     /// Failed handling a vhost-user request.
53     HandleRequest(VhostUserError),
54 }
55 
56 impl Display for Error {
fmt(&self, f: &mut Formatter) -> std::fmt::Result57     fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
58         match self {
59             Error::NewVhostUserHandler(e) => write!(f, "cannot create vhost user handler: {}", e),
60             Error::CreateSlaveListener(e) => write!(f, "cannot create slave listener: {}", e),
61             Error::CreateSlaveReqHandler(e) => write!(f, "cannot create slave req handler: {}", e),
62             Error::StartDaemon(e) => write!(f, "failed to start daemon: {}", e),
63             Error::WaitDaemon(_e) => write!(f, "failed to wait for daemon exit"),
64             Error::HandleRequest(e) => write!(f, "failed to handle request: {}", e),
65         }
66     }
67 }
68 
69 /// Result of vhost-user daemon operations.
70 pub type Result<T> = std::result::Result<T, Error>;
71 
72 /// Implement a simple framework to run a vhost-user service daemon.
73 ///
74 /// This structure is the public API the backend is allowed to interact with in order to run
75 /// a fully functional vhost-user daemon.
76 pub struct VhostUserDaemon<S, V, B: Bitmap + 'static = ()> {
77     name: String,
78     handler: Arc<Mutex<VhostUserHandler<S, V, B>>>,
79     main_thread: Option<thread::JoinHandle<Result<()>>>,
80 }
81 
82 impl<S, V, B> VhostUserDaemon<S, V, B>
83 where
84     S: VhostUserBackend<V, B> + Clone + 'static,
85     V: VringT<GM<B>> + Clone + Send + Sync + 'static,
86     B: NewBitmap + Clone + Send + Sync,
87 {
88     /// Create the daemon instance, providing the backend implementation of `VhostUserBackend`.
89     ///
90     /// Under the hood, this will start a dedicated thread responsible for listening onto
91     /// registered event. Those events can be vring events or custom events from the backend,
92     /// but they get to be registered later during the sequence.
new( name: String, backend: S, atomic_mem: GuestMemoryAtomic<GuestMemoryMmap<B>>, ) -> Result<Self>93     pub fn new(
94         name: String,
95         backend: S,
96         atomic_mem: GuestMemoryAtomic<GuestMemoryMmap<B>>,
97     ) -> Result<Self> {
98         let handler = Arc::new(Mutex::new(
99             VhostUserHandler::new(backend, atomic_mem).map_err(Error::NewVhostUserHandler)?,
100         ));
101 
102         Ok(VhostUserDaemon {
103             name,
104             handler,
105             main_thread: None,
106         })
107     }
108 
109     /// Run a dedicated thread handling all requests coming through the socket.
110     /// This runs in an infinite loop that should be terminating once the other
111     /// end of the socket (the VMM) hangs up.
112     ///
113     /// This function is the common code for starting a new daemon, no matter if
114     /// it acts as a client or a server.
start_daemon( &mut self, mut handler: SlaveReqHandler<Mutex<VhostUserHandler<S, V, B>>>, ) -> Result<()>115     fn start_daemon(
116         &mut self,
117         mut handler: SlaveReqHandler<Mutex<VhostUserHandler<S, V, B>>>,
118     ) -> Result<()> {
119         let handle = thread::Builder::new()
120             .name(self.name.clone())
121             .spawn(move || loop {
122                 handler.handle_request().map_err(Error::HandleRequest)?;
123             })
124             .map_err(Error::StartDaemon)?;
125 
126         self.main_thread = Some(handle);
127 
128         Ok(())
129     }
130 
131     /// Connect to the vhost-user socket and run a dedicated thread handling
132     /// all requests coming through this socket. This runs in an infinite loop
133     /// that should be terminating once the other end of the socket (the VMM)
134     /// hangs up.
start_client(&mut self, socket_path: &str) -> Result<()>135     pub fn start_client(&mut self, socket_path: &str) -> Result<()> {
136         let slave_handler = SlaveReqHandler::connect(socket_path, self.handler.clone())
137             .map_err(Error::CreateSlaveReqHandler)?;
138         self.start_daemon(slave_handler)
139     }
140 
141     /// Listen to the vhost-user socket and run a dedicated thread handling all requests coming
142     /// through this socket.
143     ///
144     /// This runs in an infinite loop that should be terminating once the other end of the socket
145     /// (the VMM) disconnects.
146     // TODO: the current implementation has limitations that only one incoming connection will be
147     // handled from the listener. Should it be enhanced to support reconnection?
start(&mut self, listener: Listener) -> Result<()>148     pub fn start(&mut self, listener: Listener) -> Result<()> {
149         let mut slave_listener = SlaveListener::new(listener, self.handler.clone())
150             .map_err(Error::CreateSlaveListener)?;
151         let slave_handler = self.accept(&mut slave_listener)?;
152         self.start_daemon(slave_handler)
153     }
154 
accept( &self, slave_listener: &mut SlaveListener<Mutex<VhostUserHandler<S, V, B>>>, ) -> Result<SlaveReqHandler<Mutex<VhostUserHandler<S, V, B>>>>155     fn accept(
156         &self,
157         slave_listener: &mut SlaveListener<Mutex<VhostUserHandler<S, V, B>>>,
158     ) -> Result<SlaveReqHandler<Mutex<VhostUserHandler<S, V, B>>>> {
159         loop {
160             match slave_listener.accept() {
161                 Err(e) => return Err(Error::CreateSlaveListener(e)),
162                 Ok(Some(v)) => return Ok(v),
163                 Ok(None) => continue,
164             }
165         }
166     }
167 
168     /// Wait for the thread handling the vhost-user socket connection to terminate.
wait(&mut self) -> Result<()>169     pub fn wait(&mut self) -> Result<()> {
170         if let Some(handle) = self.main_thread.take() {
171             match handle.join().map_err(Error::WaitDaemon)? {
172                 Ok(()) => Ok(()),
173                 Err(Error::HandleRequest(VhostUserError::SocketBroken(_))) => Ok(()),
174                 Err(e) => Err(e),
175             }
176         } else {
177             Ok(())
178         }
179     }
180 
181     /// Retrieve the vring epoll handler.
182     ///
183     /// This is necessary to perform further actions like registering and unregistering some extra
184     /// event file descriptors.
get_epoll_handlers(&self) -> Vec<Arc<VringEpollHandler<S, V, B>>>185     pub fn get_epoll_handlers(&self) -> Vec<Arc<VringEpollHandler<S, V, B>>> {
186         // Do not expect poisoned lock.
187         self.handler.lock().unwrap().get_epoll_handlers()
188     }
189 }
190 
191 #[cfg(test)]
192 mod tests {
193     use super::backend::tests::MockVhostBackend;
194     use super::*;
195     use std::os::unix::net::{UnixListener, UnixStream};
196     use std::sync::Barrier;
197     use vm_memory::{GuestAddress, GuestMemoryAtomic, GuestMemoryMmap};
198 
199     #[test]
test_new_daemon()200     fn test_new_daemon() {
201         let mem = GuestMemoryAtomic::new(
202             GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(),
203         );
204         let backend = Arc::new(Mutex::new(MockVhostBackend::new()));
205         let mut daemon = VhostUserDaemon::new("test".to_owned(), backend, mem).unwrap();
206 
207         let handlers = daemon.get_epoll_handlers();
208         assert_eq!(handlers.len(), 2);
209 
210         let barrier = Arc::new(Barrier::new(2));
211         let tmpdir = tempfile::tempdir().unwrap();
212         let mut path = tmpdir.path().to_path_buf();
213         path.push("socket");
214 
215         let barrier2 = barrier.clone();
216         let path1 = path.clone();
217         let thread = thread::spawn(move || {
218             barrier2.wait();
219             let socket = UnixStream::connect(&path1).unwrap();
220             barrier2.wait();
221             drop(socket)
222         });
223 
224         let listener = Listener::new(&path, false).unwrap();
225         barrier.wait();
226         daemon.start(listener).unwrap();
227         barrier.wait();
228         // Above process generates a `HandleRequest(PartialMessage)` error.
229         daemon.wait().unwrap_err();
230         daemon.wait().unwrap();
231         thread.join().unwrap();
232     }
233 
234     #[test]
test_new_daemon_client()235     fn test_new_daemon_client() {
236         let mem = GuestMemoryAtomic::new(
237             GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(),
238         );
239         let backend = Arc::new(Mutex::new(MockVhostBackend::new()));
240         let mut daemon = VhostUserDaemon::new("test".to_owned(), backend, mem).unwrap();
241 
242         let handlers = daemon.get_epoll_handlers();
243         assert_eq!(handlers.len(), 2);
244 
245         let barrier = Arc::new(Barrier::new(2));
246         let tmpdir = tempfile::tempdir().unwrap();
247         let mut path = tmpdir.path().to_path_buf();
248         path.push("socket");
249 
250         let barrier2 = barrier.clone();
251         let path1 = path.clone();
252         let thread = thread::spawn(move || {
253             let listener = UnixListener::bind(&path1).unwrap();
254             barrier2.wait();
255             let (stream, _) = listener.accept().unwrap();
256             barrier2.wait();
257             drop(stream)
258         });
259 
260         barrier.wait();
261         daemon
262             .start_client(path.as_path().to_str().unwrap())
263             .unwrap();
264         barrier.wait();
265         // Above process generates a `HandleRequest(PartialMessage)` error.
266         daemon.wait().unwrap_err();
267         daemon.wait().unwrap();
268         thread.join().unwrap();
269     }
270 }
271