1 //! High-level interface for a [V4L2 video 2 //! encoder](https://www.kernel.org/doc/html/latest/userspace-api/media/v4l/dev-encoder.html). 3 use crate::{ 4 device::{ 5 poller::{DeviceEvent, PollError, PollEvent, Poller, Waker}, 6 queue::{ 7 direction::{Capture, Output}, 8 dqbuf::DqBuffer, 9 handles_provider::HandlesProvider, 10 qbuf::{ 11 get_free::{GetFreeBufferError, GetFreeCaptureBuffer, GetFreeOutputBuffer}, 12 get_indexed::GetCaptureBufferByIndex, 13 CaptureQueueable, OutputQueueableProvider, 14 }, 15 BuffersAllocated, CanceledBuffer, CreateQueueError, FormatBuilder, Queue, QueueInit, 16 RequestBuffersError, 17 }, 18 AllocatedQueue, Device, DeviceConfig, DeviceOpenError, Stream, TryDequeue, 19 }, 20 ioctl::{ 21 self, DqBufError, DqBufIoctlError, EncoderCommand, FormatFlags, GFmtError, 22 V4l2BufferFromError, 23 }, 24 memory::{BufferHandles, PrimitiveBufferHandles}, 25 Format, 26 }; 27 28 use log::warn; 29 use std::{ 30 any::Any, 31 io, 32 path::Path, 33 sync::{atomic::AtomicUsize, Arc}, 34 task::Wake, 35 thread::JoinHandle, 36 }; 37 use thiserror::Error; 38 39 /// Trait implemented by all states of the encoder. 40 pub trait EncoderState {} 41 42 pub struct Encoder<S: EncoderState> { 43 // Make sure to keep the device alive as long as we are. 44 device: Arc<Device>, 45 state: S, 46 } 47 48 pub struct AwaitingCaptureFormat { 49 output_queue: Queue<Output, QueueInit>, 50 capture_queue: Queue<Capture, QueueInit>, 51 } 52 impl EncoderState for AwaitingCaptureFormat {} 53 54 #[derive(Debug, Error)] 55 pub enum EncoderOpenError { 56 #[error("error while opening device")] 57 DeviceOpenError(#[from] DeviceOpenError), 58 #[error("error while creating queue")] 59 CreateQueueError(#[from] CreateQueueError), 60 #[error("specified device is not an encoder")] 61 NotAnEncoder, 62 } 63 64 impl Encoder<AwaitingCaptureFormat> { open(path: &Path) -> Result<Self, EncoderOpenError>65 pub fn open(path: &Path) -> Result<Self, EncoderOpenError> { 66 let config = DeviceConfig::new().non_blocking_dqbuf(); 67 let device = Arc::new(Device::open(path, config)?); 68 69 // Check that the device is indeed an encoder. 70 let capture_queue = Queue::get_capture_mplane_queue(device.clone())?; 71 let output_queue = Queue::get_output_mplane_queue(device.clone())?; 72 73 // On an encoder, the OUTPUT formats are not compressed, but the CAPTURE ones are. 74 // Return an error if our device does not satisfy these conditions. 75 output_queue 76 .format_iter() 77 .find(|fmt| !fmt.flags.contains(FormatFlags::COMPRESSED)) 78 .and( 79 capture_queue 80 .format_iter() 81 .find(|fmt| fmt.flags.contains(FormatFlags::COMPRESSED)), 82 ) 83 .ok_or(EncoderOpenError::NotAnEncoder) 84 .map(|_| ())?; 85 86 Ok(Encoder { 87 device, 88 state: AwaitingCaptureFormat { 89 output_queue, 90 capture_queue, 91 }, 92 }) 93 } 94 set_capture_format<F>(mut self, f: F) -> anyhow::Result<Encoder<AwaitingOutputFormat>> where F: FnOnce(FormatBuilder) -> anyhow::Result<()>,95 pub fn set_capture_format<F>(mut self, f: F) -> anyhow::Result<Encoder<AwaitingOutputFormat>> 96 where 97 F: FnOnce(FormatBuilder) -> anyhow::Result<()>, 98 { 99 let builder = self.state.capture_queue.change_format()?; 100 f(builder)?; 101 102 Ok(Encoder { 103 device: self.device, 104 state: AwaitingOutputFormat { 105 output_queue: self.state.output_queue, 106 capture_queue: self.state.capture_queue, 107 }, 108 }) 109 } 110 } 111 112 pub struct AwaitingOutputFormat { 113 output_queue: Queue<Output, QueueInit>, 114 capture_queue: Queue<Capture, QueueInit>, 115 } 116 impl EncoderState for AwaitingOutputFormat {} 117 118 impl Encoder<AwaitingOutputFormat> { set_output_format<F>(mut self, f: F) -> anyhow::Result<Encoder<AwaitingOutputBuffers>> where F: FnOnce(FormatBuilder) -> anyhow::Result<()>,119 pub fn set_output_format<F>(mut self, f: F) -> anyhow::Result<Encoder<AwaitingOutputBuffers>> 120 where 121 F: FnOnce(FormatBuilder) -> anyhow::Result<()>, 122 { 123 let builder = self.state.output_queue.change_format()?; 124 f(builder)?; 125 126 Ok(Encoder { 127 device: self.device, 128 state: AwaitingOutputBuffers { 129 output_queue: self.state.output_queue, 130 capture_queue: self.state.capture_queue, 131 }, 132 }) 133 } 134 } 135 136 pub struct AwaitingOutputBuffers { 137 output_queue: Queue<Output, QueueInit>, 138 capture_queue: Queue<Capture, QueueInit>, 139 } 140 impl EncoderState for AwaitingOutputBuffers {} 141 142 impl Encoder<AwaitingOutputBuffers> { allocate_output_buffers_generic<OP: BufferHandles>( self, memory_type: OP::SupportedMemoryType, num_output: usize, ) -> Result<Encoder<AwaitingCaptureBuffers<OP>>, RequestBuffersError>143 pub fn allocate_output_buffers_generic<OP: BufferHandles>( 144 self, 145 memory_type: OP::SupportedMemoryType, 146 num_output: usize, 147 ) -> Result<Encoder<AwaitingCaptureBuffers<OP>>, RequestBuffersError> { 148 Ok(Encoder { 149 device: self.device, 150 state: AwaitingCaptureBuffers { 151 output_queue: self 152 .state 153 .output_queue 154 .request_buffers_generic::<OP>(memory_type, num_output as u32)?, 155 capture_queue: self.state.capture_queue, 156 }, 157 }) 158 } 159 allocate_output_buffers<OP: PrimitiveBufferHandles>( self, num_output: usize, ) -> Result<Encoder<AwaitingCaptureBuffers<OP>>, RequestBuffersError>160 pub fn allocate_output_buffers<OP: PrimitiveBufferHandles>( 161 self, 162 num_output: usize, 163 ) -> Result<Encoder<AwaitingCaptureBuffers<OP>>, RequestBuffersError> { 164 self.allocate_output_buffers_generic(OP::MEMORY_TYPE, num_output) 165 } 166 get_output_format(&self) -> Result<Format, GFmtError>167 pub fn get_output_format(&self) -> Result<Format, GFmtError> { 168 self.state.output_queue.get_format() 169 } 170 get_capture_format(&self) -> Result<Format, GFmtError>171 pub fn get_capture_format(&self) -> Result<Format, GFmtError> { 172 self.state.capture_queue.get_format() 173 } 174 } 175 176 pub struct AwaitingCaptureBuffers<OP: BufferHandles> { 177 output_queue: Queue<Output, BuffersAllocated<OP>>, 178 capture_queue: Queue<Capture, QueueInit>, 179 } 180 impl<OP: BufferHandles> EncoderState for AwaitingCaptureBuffers<OP> {} 181 182 impl<OP: BufferHandles> Encoder<AwaitingCaptureBuffers<OP>> { allocate_capture_buffers_generic<P: HandlesProvider>( self, memory_type: <P::HandleType as BufferHandles>::SupportedMemoryType, num_capture: usize, capture_memory_provider: P, ) -> Result<Encoder<ReadyToEncode<OP, P>>, RequestBuffersError> where for<'a> Queue<Capture, BuffersAllocated<P::HandleType>>: GetFreeCaptureBuffer<'a, P::HandleType>,183 pub fn allocate_capture_buffers_generic<P: HandlesProvider>( 184 self, 185 memory_type: <P::HandleType as BufferHandles>::SupportedMemoryType, 186 num_capture: usize, 187 capture_memory_provider: P, 188 ) -> Result<Encoder<ReadyToEncode<OP, P>>, RequestBuffersError> 189 where 190 for<'a> Queue<Capture, BuffersAllocated<P::HandleType>>: 191 GetFreeCaptureBuffer<'a, P::HandleType>, 192 { 193 Ok(Encoder { 194 device: self.device, 195 state: ReadyToEncode { 196 output_queue: self.state.output_queue, 197 capture_queue: self 198 .state 199 .capture_queue 200 .request_buffers_generic::<P::HandleType>(memory_type, num_capture as u32)?, 201 capture_memory_provider, 202 poll_wakeups_counter: None, 203 }, 204 }) 205 } 206 allocate_capture_buffers<P: HandlesProvider>( self, num_capture: usize, capture_memory_provider: P, ) -> Result<Encoder<ReadyToEncode<OP, P>>, RequestBuffersError> where P::HandleType: PrimitiveBufferHandles, for<'a> Queue<Capture, BuffersAllocated<P::HandleType>>: GetFreeCaptureBuffer<'a, P::HandleType>,207 pub fn allocate_capture_buffers<P: HandlesProvider>( 208 self, 209 num_capture: usize, 210 capture_memory_provider: P, 211 ) -> Result<Encoder<ReadyToEncode<OP, P>>, RequestBuffersError> 212 where 213 P::HandleType: PrimitiveBufferHandles, 214 for<'a> Queue<Capture, BuffersAllocated<P::HandleType>>: 215 GetFreeCaptureBuffer<'a, P::HandleType>, 216 { 217 self.allocate_capture_buffers_generic( 218 P::HandleType::MEMORY_TYPE, 219 num_capture, 220 capture_memory_provider, 221 ) 222 } 223 } 224 225 pub struct ReadyToEncode<OP: BufferHandles, P: HandlesProvider> { 226 output_queue: Queue<Output, BuffersAllocated<OP>>, 227 capture_queue: Queue<Capture, BuffersAllocated<P::HandleType>>, 228 capture_memory_provider: P, 229 poll_wakeups_counter: Option<Arc<AtomicUsize>>, 230 } 231 impl<OP: BufferHandles, P: HandlesProvider> EncoderState for ReadyToEncode<OP, P> {} 232 233 impl<OP: BufferHandles, P: HandlesProvider> Encoder<ReadyToEncode<OP, P>> 234 where 235 for<'a> Queue<Capture, BuffersAllocated<P::HandleType>>: 236 GetFreeCaptureBuffer<'a, P::HandleType> + GetCaptureBufferByIndex<'a, P::HandleType>, 237 { set_poll_counter(mut self, poll_wakeups_counter: Arc<AtomicUsize>) -> Self238 pub fn set_poll_counter(mut self, poll_wakeups_counter: Arc<AtomicUsize>) -> Self { 239 self.state.poll_wakeups_counter = Some(poll_wakeups_counter); 240 self 241 } 242 start<InputDoneCb, OutputReadyCb>( self, input_done_cb: InputDoneCb, output_ready_cb: OutputReadyCb, ) -> io::Result<Encoder<Encoding<OP, P, InputDoneCb, OutputReadyCb>>> where InputDoneCb: Fn(CompletedOutputBuffer<OP>), OutputReadyCb: FnMut(DqBuffer<Capture, P::HandleType>) + Send + 'static,243 pub fn start<InputDoneCb, OutputReadyCb>( 244 self, 245 input_done_cb: InputDoneCb, 246 output_ready_cb: OutputReadyCb, 247 ) -> io::Result<Encoder<Encoding<OP, P, InputDoneCb, OutputReadyCb>>> 248 where 249 InputDoneCb: Fn(CompletedOutputBuffer<OP>), 250 OutputReadyCb: FnMut(DqBuffer<Capture, P::HandleType>) + Send + 'static, 251 { 252 self.state.output_queue.stream_on().unwrap(); 253 self.state.capture_queue.stream_on().unwrap(); 254 255 let mut output_poller = Poller::new(Arc::clone(&self.device))?; 256 output_poller.enable_event(DeviceEvent::OutputReady)?; 257 258 let mut encoder_thread = EncoderThread::new( 259 &self.device, 260 self.state.capture_queue, 261 self.state.capture_memory_provider, 262 output_ready_cb, 263 )?; 264 265 if let Some(counter) = &self.state.poll_wakeups_counter { 266 output_poller.set_poll_counter(Arc::clone(counter)); 267 encoder_thread.set_poll_counter(Arc::clone(counter)); 268 } 269 270 let handle = std::thread::Builder::new() 271 .name("V4L2 Encoder".into()) 272 .spawn(move || encoder_thread.run())?; 273 274 Ok(Encoder { 275 device: self.device, 276 state: Encoding { 277 output_queue: self.state.output_queue, 278 input_done_cb, 279 output_poller, 280 handle, 281 }, 282 }) 283 } 284 } 285 286 pub struct Encoding<OP: BufferHandles, P, InputDoneCb, OutputReadyCb> 287 where 288 P: HandlesProvider, 289 InputDoneCb: Fn(CompletedOutputBuffer<OP>), 290 OutputReadyCb: FnMut(DqBuffer<Capture, P::HandleType>) + Send, 291 { 292 output_queue: Queue<Output, BuffersAllocated<OP>>, 293 input_done_cb: InputDoneCb, 294 output_poller: Poller, 295 296 handle: JoinHandle<EncoderThread<P, OutputReadyCb>>, 297 } 298 impl<OP, P, InputDoneCb, OutputReadyCb> EncoderState for Encoding<OP, P, InputDoneCb, OutputReadyCb> 299 where 300 OP: BufferHandles, 301 P: HandlesProvider, 302 InputDoneCb: Fn(CompletedOutputBuffer<OP>), 303 OutputReadyCb: FnMut(DqBuffer<Capture, P::HandleType>) + Send, 304 { 305 } 306 307 // Safe because all Rcs are internal and never leaked outside of the struct. 308 unsafe impl<S: EncoderState> Send for Encoder<S> {} 309 310 pub enum CompletedOutputBuffer<OP: BufferHandles> { 311 Dequeued(DqBuffer<Output, OP>), 312 Canceled(CanceledBuffer<OP>), 313 } 314 315 #[derive(Debug, Error)] 316 pub enum GetBufferError { 317 #[error("error while dequeueing buffer")] 318 DequeueError(#[from] DqBufError<V4l2BufferFromError>), 319 #[error("error during poll")] 320 PollError(#[from] PollError), 321 #[error("error while obtaining buffer")] 322 GetFreeBufferError(#[from] GetFreeBufferError), 323 } 324 325 #[derive(Debug, Error)] 326 pub enum EncoderStopError { 327 #[error("error while sending STOP command")] 328 EncoderCmdError(#[from] ioctl::EncoderCmdError), 329 #[error("thread has panicked")] 330 ThreadPanickedError(Box<dyn Any + Send + 'static>), 331 #[error("cannot streamoff capture queue")] 332 CaptureQueueStreamoffError(ioctl::StreamOffError), 333 #[error("cannot streamoff output queue")] 334 OutputQueueStreamoffError(ioctl::StreamOffError), 335 } 336 337 impl<OP, P, InputDoneCb, OutputReadyCb> Encoder<Encoding<OP, P, InputDoneCb, OutputReadyCb>> 338 where 339 OP: BufferHandles, 340 P: HandlesProvider, 341 InputDoneCb: Fn(CompletedOutputBuffer<OP>), 342 OutputReadyCb: FnMut(DqBuffer<Capture, P::HandleType>) + Send, 343 { 344 /// Stop the encoder, and returns the encoder ready to be started again. stop(self) -> Result<Encoder<ReadyToEncode<OP, P>>, EncoderStopError>345 pub fn stop(self) -> Result<Encoder<ReadyToEncode<OP, P>>, EncoderStopError> { 346 ioctl::encoder_cmd::<_, ()>(&*self.device, &EncoderCommand::Stop(false))?; 347 348 // The encoder thread should receive the LAST buffer and exit on its own. 349 let encoding_thread = self 350 .state 351 .handle 352 .join() 353 .map_err(EncoderStopError::ThreadPanickedError)?; 354 355 encoding_thread 356 .capture_queue 357 .stream_off() 358 .map_err(EncoderStopError::CaptureQueueStreamoffError)?; 359 /* Return all canceled buffers to the client */ 360 let canceled_buffers = self 361 .state 362 .output_queue 363 .stream_off() 364 .map_err(EncoderStopError::OutputQueueStreamoffError)?; 365 for buffer in canceled_buffers { 366 (self.state.input_done_cb)(CompletedOutputBuffer::Canceled(buffer)); 367 } 368 369 Ok(Encoder { 370 device: self.device, 371 state: ReadyToEncode { 372 output_queue: self.state.output_queue, 373 capture_queue: encoding_thread.capture_queue, 374 capture_memory_provider: encoding_thread.capture_memory_provider, 375 poll_wakeups_counter: None, 376 }, 377 }) 378 } 379 380 /// Attempts to dequeue and release output buffers that the driver is done with. dequeue_output_buffers(&self) -> Result<(), DqBufError<V4l2BufferFromError>>381 fn dequeue_output_buffers(&self) -> Result<(), DqBufError<V4l2BufferFromError>> { 382 let output_queue = &self.state.output_queue; 383 384 while output_queue.num_queued_buffers() > 0 { 385 match output_queue.try_dequeue() { 386 Ok(buf) => { 387 (self.state.input_done_cb)(CompletedOutputBuffer::Dequeued(buf)); 388 } 389 Err(DqBufError::IoctlError(DqBufIoctlError::NotReady)) => break, 390 // TODO buffers with the error flag set should not result in 391 // a fatal error! 392 Err(e) => return Err(e), 393 } 394 } 395 396 Ok(()) 397 } 398 399 // Make this thread sleep until at least one OUTPUT buffer is ready to be 400 // obtained through `try_get_buffer()`, dequeuing buffers if necessary. wait_for_output_buffer(&mut self) -> Result<(), GetBufferError>401 fn wait_for_output_buffer(&mut self) -> Result<(), GetBufferError> { 402 for event in self.state.output_poller.poll(None)? { 403 match event { 404 PollEvent::Device(DeviceEvent::OutputReady) => { 405 self.dequeue_output_buffers()?; 406 } 407 _ => panic!("Unexpected return from OUTPUT queue poll!"), 408 } 409 } 410 411 Ok(()) 412 } 413 } 414 415 impl<'a, OP, P, InputDoneCb, OutputReadyCb> OutputQueueableProvider<'a, OP> 416 for Encoder<Encoding<OP, P, InputDoneCb, OutputReadyCb>> 417 where 418 Queue<Output, BuffersAllocated<OP>>: OutputQueueableProvider<'a, OP>, 419 OP: BufferHandles, 420 P: HandlesProvider, 421 InputDoneCb: Fn(CompletedOutputBuffer<OP>), 422 OutputReadyCb: FnMut(DqBuffer<Capture, P::HandleType>) + Send, 423 { 424 type Queueable = 425 <Queue<Output, BuffersAllocated<OP>> as OutputQueueableProvider<'a, OP>>::Queueable; 426 } 427 428 /// Let the encoder provide the buffers from the OUTPUT queue. 429 impl<'a, OP, P, InputDoneCb, OutputReadyCb> GetFreeOutputBuffer<'a, OP, GetBufferError> 430 for Encoder<Encoding<OP, P, InputDoneCb, OutputReadyCb>> 431 where 432 Queue<Output, BuffersAllocated<OP>>: GetFreeOutputBuffer<'a, OP>, 433 OP: BufferHandles, 434 P: HandlesProvider, 435 InputDoneCb: Fn(CompletedOutputBuffer<OP>), 436 OutputReadyCb: FnMut(DqBuffer<Capture, P::HandleType>) + Send, 437 { 438 /// Returns a V4L2 buffer to be filled with a frame to encode if one 439 /// is available. 440 /// 441 /// This method will return None immediately if all the allocated buffers 442 /// are currently queued. try_get_free_buffer(&'a self) -> Result<Self::Queueable, GetBufferError>443 fn try_get_free_buffer(&'a self) -> Result<Self::Queueable, GetBufferError> { 444 self.dequeue_output_buffers()?; 445 Ok(self.state.output_queue.try_get_free_buffer()?) 446 } 447 } 448 449 // If `GetFreeBuffer` is implemented, we can also provide a blocking `get_buffer` 450 // method. 451 impl<'a, OP, P, InputDoneCb, OutputReadyCb> Encoder<Encoding<OP, P, InputDoneCb, OutputReadyCb>> 452 where 453 Self: GetFreeOutputBuffer<'a, OP, GetBufferError>, 454 OP: BufferHandles, 455 P: HandlesProvider, 456 InputDoneCb: Fn(CompletedOutputBuffer<OP>), 457 OutputReadyCb: FnMut(DqBuffer<Capture, P::HandleType>) + Send, 458 { 459 /// Returns a V4L2 buffer to be filled with a frame to encode, waiting for 460 /// one to be available if needed. 461 /// 462 /// Contrary to `try_get_free_buffer(), this method will wait for a buffer 463 /// to be available if needed. get_buffer( &'a mut self, ) -> Result<<Self as OutputQueueableProvider<'a, OP>>::Queueable, GetBufferError>464 pub fn get_buffer( 465 &'a mut self, 466 ) -> Result<<Self as OutputQueueableProvider<'a, OP>>::Queueable, GetBufferError> { 467 let output_queue = &self.state.output_queue; 468 469 // If all our buffers are queued, wait until we can dequeue some. 470 if output_queue.num_queued_buffers() == output_queue.num_buffers() { 471 self.wait_for_output_buffer()?; 472 } 473 474 self.try_get_free_buffer() 475 } 476 } 477 478 struct EncoderThread<P, OutputReadyCb> 479 where 480 P: HandlesProvider, 481 OutputReadyCb: FnMut(DqBuffer<Capture, P::HandleType>) + Send, 482 { 483 capture_queue: Queue<Capture, BuffersAllocated<P::HandleType>>, 484 capture_memory_provider: P, 485 poller: Poller, 486 waker: Arc<Waker>, 487 output_ready_cb: OutputReadyCb, 488 } 489 490 impl<P, OutputReadyCb> EncoderThread<P, OutputReadyCb> 491 where 492 P: HandlesProvider, 493 OutputReadyCb: FnMut(DqBuffer<Capture, P::HandleType>) + Send, 494 for<'a> Queue<Capture, BuffersAllocated<P::HandleType>>: 495 GetFreeCaptureBuffer<'a, P::HandleType> + GetCaptureBufferByIndex<'a, P::HandleType>, 496 { new( device: &Arc<Device>, capture_queue: Queue<Capture, BuffersAllocated<P::HandleType>>, capture_memory_provider: P, output_ready_cb: OutputReadyCb, ) -> io::Result<Self>497 fn new( 498 device: &Arc<Device>, 499 capture_queue: Queue<Capture, BuffersAllocated<P::HandleType>>, 500 capture_memory_provider: P, 501 output_ready_cb: OutputReadyCb, 502 ) -> io::Result<Self> { 503 let mut poller = Poller::new(Arc::clone(device))?; 504 505 poller.enable_event(DeviceEvent::CaptureReady)?; 506 let waker = poller.add_waker(0)?; 507 508 Ok(EncoderThread { 509 capture_queue, 510 capture_memory_provider, 511 poller, 512 waker, 513 output_ready_cb, 514 }) 515 } 516 set_poll_counter(&mut self, poll_wakeups_counter: Arc<AtomicUsize>)517 fn set_poll_counter(&mut self, poll_wakeups_counter: Arc<AtomicUsize>) { 518 self.poller.set_poll_counter(poll_wakeups_counter); 519 } 520 run(mut self) -> Self521 fn run(mut self) -> Self { 522 self.enqueue_capture_buffers(); 523 524 'polling: loop { 525 match self.capture_queue.num_queued_buffers() { 526 // If there are no buffers on the CAPTURE queue, poll() will return 527 // immediately with EPOLLERR and we would loop indefinitely. 528 // Prevent this by temporarily disabling polling the device in such 529 // cases. 530 0 => { 531 self.poller 532 .disable_event(DeviceEvent::CaptureReady) 533 .unwrap(); 534 } 535 // If device polling was disabled and we have buffers queued, we 536 // can reenable it as poll will now wait for a CAPTURE buffer to 537 // be ready for dequeue. 538 _ => { 539 self.poller.enable_event(DeviceEvent::CaptureReady).unwrap(); 540 } 541 } 542 543 // TODO handle errors - this system call can be interrupted and we 544 // should leave in this case. 545 for event in self.poller.poll(None).unwrap() { 546 match event { 547 // A CAPTURE buffer has been released by the client. 548 PollEvent::Waker(0) => { 549 // Requeue all available CAPTURE buffers. 550 self.enqueue_capture_buffers(); 551 } 552 // A CAPTURE buffer is ready to be dequeued. 553 PollEvent::Device(DeviceEvent::CaptureReady) => { 554 // Get the encoded buffer 555 // TODO Manage errors here, including corrupted buffers! 556 if let Ok(mut cap_buf) = self.capture_queue.try_dequeue() { 557 let is_last = cap_buf.data.is_last(); 558 let is_empty = *cap_buf.data.get_first_plane().bytesused == 0; 559 560 // Add a drop callback to the dequeued buffer so we 561 // re-queue it as soon as it is dropped. 562 let cap_waker = Arc::clone(&self.waker); 563 cap_buf.add_drop_callback(move |_dqbuf| { 564 cap_waker.wake(); 565 }); 566 567 // Empty buffers do not need to be passed to the client. 568 if !is_empty { 569 (self.output_ready_cb)(cap_buf); 570 } 571 572 // Last buffer of the stream? Time for us to terminate. 573 if is_last { 574 break 'polling; 575 } 576 } else { 577 // TODO we should not crash here. 578 panic!("Expected a CAPTURE buffer but none available!"); 579 } 580 } 581 _ => panic!("Unexpected return from CAPTURE queue poll!"), 582 } 583 } 584 } 585 586 self 587 } 588 enqueue_capture_buffers(&mut self)589 fn enqueue_capture_buffers(&mut self) { 590 'enqueue: while let Some(handles) = self.capture_memory_provider.get_handles(&self.waker) { 591 if let Ok(buffer) = self 592 .capture_memory_provider 593 .get_suitable_buffer_for(&handles, &self.capture_queue) 594 { 595 buffer.queue_with_handles(handles).unwrap(); 596 } else { 597 warn!("Handles potentially lost due to no V4L2 buffer being available"); 598 break 'enqueue; 599 } 600 } 601 } 602 } 603