1 //! High-level interface for a [V4L2 video
2 //! encoder](https://www.kernel.org/doc/html/latest/userspace-api/media/v4l/dev-encoder.html).
3 use crate::{
4     device::{
5         poller::{DeviceEvent, PollError, PollEvent, Poller, Waker},
6         queue::{
7             direction::{Capture, Output},
8             dqbuf::DqBuffer,
9             handles_provider::HandlesProvider,
10             qbuf::{
11                 get_free::{GetFreeBufferError, GetFreeCaptureBuffer, GetFreeOutputBuffer},
12                 get_indexed::GetCaptureBufferByIndex,
13                 CaptureQueueable, OutputQueueableProvider,
14             },
15             BuffersAllocated, CanceledBuffer, CreateQueueError, FormatBuilder, Queue, QueueInit,
16             RequestBuffersError,
17         },
18         AllocatedQueue, Device, DeviceConfig, DeviceOpenError, Stream, TryDequeue,
19     },
20     ioctl::{
21         self, DqBufError, DqBufIoctlError, EncoderCommand, FormatFlags, GFmtError,
22         V4l2BufferFromError,
23     },
24     memory::{BufferHandles, PrimitiveBufferHandles},
25     Format,
26 };
27 
28 use log::warn;
29 use std::{
30     any::Any,
31     io,
32     path::Path,
33     sync::{atomic::AtomicUsize, Arc},
34     task::Wake,
35     thread::JoinHandle,
36 };
37 use thiserror::Error;
38 
39 /// Trait implemented by all states of the encoder.
40 pub trait EncoderState {}
41 
42 pub struct Encoder<S: EncoderState> {
43     // Make sure to keep the device alive as long as we are.
44     device: Arc<Device>,
45     state: S,
46 }
47 
48 pub struct AwaitingCaptureFormat {
49     output_queue: Queue<Output, QueueInit>,
50     capture_queue: Queue<Capture, QueueInit>,
51 }
52 impl EncoderState for AwaitingCaptureFormat {}
53 
54 #[derive(Debug, Error)]
55 pub enum EncoderOpenError {
56     #[error("error while opening device")]
57     DeviceOpenError(#[from] DeviceOpenError),
58     #[error("error while creating queue")]
59     CreateQueueError(#[from] CreateQueueError),
60     #[error("specified device is not an encoder")]
61     NotAnEncoder,
62 }
63 
64 impl Encoder<AwaitingCaptureFormat> {
open(path: &Path) -> Result<Self, EncoderOpenError>65     pub fn open(path: &Path) -> Result<Self, EncoderOpenError> {
66         let config = DeviceConfig::new().non_blocking_dqbuf();
67         let device = Arc::new(Device::open(path, config)?);
68 
69         // Check that the device is indeed an encoder.
70         let capture_queue = Queue::get_capture_mplane_queue(device.clone())?;
71         let output_queue = Queue::get_output_mplane_queue(device.clone())?;
72 
73         // On an encoder, the OUTPUT formats are not compressed, but the CAPTURE ones are.
74         // Return an error if our device does not satisfy these conditions.
75         output_queue
76             .format_iter()
77             .find(|fmt| !fmt.flags.contains(FormatFlags::COMPRESSED))
78             .and(
79                 capture_queue
80                     .format_iter()
81                     .find(|fmt| fmt.flags.contains(FormatFlags::COMPRESSED)),
82             )
83             .ok_or(EncoderOpenError::NotAnEncoder)
84             .map(|_| ())?;
85 
86         Ok(Encoder {
87             device,
88             state: AwaitingCaptureFormat {
89                 output_queue,
90                 capture_queue,
91             },
92         })
93     }
94 
set_capture_format<F>(mut self, f: F) -> anyhow::Result<Encoder<AwaitingOutputFormat>> where F: FnOnce(FormatBuilder) -> anyhow::Result<()>,95     pub fn set_capture_format<F>(mut self, f: F) -> anyhow::Result<Encoder<AwaitingOutputFormat>>
96     where
97         F: FnOnce(FormatBuilder) -> anyhow::Result<()>,
98     {
99         let builder = self.state.capture_queue.change_format()?;
100         f(builder)?;
101 
102         Ok(Encoder {
103             device: self.device,
104             state: AwaitingOutputFormat {
105                 output_queue: self.state.output_queue,
106                 capture_queue: self.state.capture_queue,
107             },
108         })
109     }
110 }
111 
112 pub struct AwaitingOutputFormat {
113     output_queue: Queue<Output, QueueInit>,
114     capture_queue: Queue<Capture, QueueInit>,
115 }
116 impl EncoderState for AwaitingOutputFormat {}
117 
118 impl Encoder<AwaitingOutputFormat> {
set_output_format<F>(mut self, f: F) -> anyhow::Result<Encoder<AwaitingOutputBuffers>> where F: FnOnce(FormatBuilder) -> anyhow::Result<()>,119     pub fn set_output_format<F>(mut self, f: F) -> anyhow::Result<Encoder<AwaitingOutputBuffers>>
120     where
121         F: FnOnce(FormatBuilder) -> anyhow::Result<()>,
122     {
123         let builder = self.state.output_queue.change_format()?;
124         f(builder)?;
125 
126         Ok(Encoder {
127             device: self.device,
128             state: AwaitingOutputBuffers {
129                 output_queue: self.state.output_queue,
130                 capture_queue: self.state.capture_queue,
131             },
132         })
133     }
134 }
135 
136 pub struct AwaitingOutputBuffers {
137     output_queue: Queue<Output, QueueInit>,
138     capture_queue: Queue<Capture, QueueInit>,
139 }
140 impl EncoderState for AwaitingOutputBuffers {}
141 
142 impl Encoder<AwaitingOutputBuffers> {
allocate_output_buffers_generic<OP: BufferHandles>( self, memory_type: OP::SupportedMemoryType, num_output: usize, ) -> Result<Encoder<AwaitingCaptureBuffers<OP>>, RequestBuffersError>143     pub fn allocate_output_buffers_generic<OP: BufferHandles>(
144         self,
145         memory_type: OP::SupportedMemoryType,
146         num_output: usize,
147     ) -> Result<Encoder<AwaitingCaptureBuffers<OP>>, RequestBuffersError> {
148         Ok(Encoder {
149             device: self.device,
150             state: AwaitingCaptureBuffers {
151                 output_queue: self
152                     .state
153                     .output_queue
154                     .request_buffers_generic::<OP>(memory_type, num_output as u32)?,
155                 capture_queue: self.state.capture_queue,
156             },
157         })
158     }
159 
allocate_output_buffers<OP: PrimitiveBufferHandles>( self, num_output: usize, ) -> Result<Encoder<AwaitingCaptureBuffers<OP>>, RequestBuffersError>160     pub fn allocate_output_buffers<OP: PrimitiveBufferHandles>(
161         self,
162         num_output: usize,
163     ) -> Result<Encoder<AwaitingCaptureBuffers<OP>>, RequestBuffersError> {
164         self.allocate_output_buffers_generic(OP::MEMORY_TYPE, num_output)
165     }
166 
get_output_format(&self) -> Result<Format, GFmtError>167     pub fn get_output_format(&self) -> Result<Format, GFmtError> {
168         self.state.output_queue.get_format()
169     }
170 
get_capture_format(&self) -> Result<Format, GFmtError>171     pub fn get_capture_format(&self) -> Result<Format, GFmtError> {
172         self.state.capture_queue.get_format()
173     }
174 }
175 
176 pub struct AwaitingCaptureBuffers<OP: BufferHandles> {
177     output_queue: Queue<Output, BuffersAllocated<OP>>,
178     capture_queue: Queue<Capture, QueueInit>,
179 }
180 impl<OP: BufferHandles> EncoderState for AwaitingCaptureBuffers<OP> {}
181 
182 impl<OP: BufferHandles> Encoder<AwaitingCaptureBuffers<OP>> {
allocate_capture_buffers_generic<P: HandlesProvider>( self, memory_type: <P::HandleType as BufferHandles>::SupportedMemoryType, num_capture: usize, capture_memory_provider: P, ) -> Result<Encoder<ReadyToEncode<OP, P>>, RequestBuffersError> where for<'a> Queue<Capture, BuffersAllocated<P::HandleType>>: GetFreeCaptureBuffer<'a, P::HandleType>,183     pub fn allocate_capture_buffers_generic<P: HandlesProvider>(
184         self,
185         memory_type: <P::HandleType as BufferHandles>::SupportedMemoryType,
186         num_capture: usize,
187         capture_memory_provider: P,
188     ) -> Result<Encoder<ReadyToEncode<OP, P>>, RequestBuffersError>
189     where
190         for<'a> Queue<Capture, BuffersAllocated<P::HandleType>>:
191             GetFreeCaptureBuffer<'a, P::HandleType>,
192     {
193         Ok(Encoder {
194             device: self.device,
195             state: ReadyToEncode {
196                 output_queue: self.state.output_queue,
197                 capture_queue: self
198                     .state
199                     .capture_queue
200                     .request_buffers_generic::<P::HandleType>(memory_type, num_capture as u32)?,
201                 capture_memory_provider,
202                 poll_wakeups_counter: None,
203             },
204         })
205     }
206 
allocate_capture_buffers<P: HandlesProvider>( self, num_capture: usize, capture_memory_provider: P, ) -> Result<Encoder<ReadyToEncode<OP, P>>, RequestBuffersError> where P::HandleType: PrimitiveBufferHandles, for<'a> Queue<Capture, BuffersAllocated<P::HandleType>>: GetFreeCaptureBuffer<'a, P::HandleType>,207     pub fn allocate_capture_buffers<P: HandlesProvider>(
208         self,
209         num_capture: usize,
210         capture_memory_provider: P,
211     ) -> Result<Encoder<ReadyToEncode<OP, P>>, RequestBuffersError>
212     where
213         P::HandleType: PrimitiveBufferHandles,
214         for<'a> Queue<Capture, BuffersAllocated<P::HandleType>>:
215             GetFreeCaptureBuffer<'a, P::HandleType>,
216     {
217         self.allocate_capture_buffers_generic(
218             P::HandleType::MEMORY_TYPE,
219             num_capture,
220             capture_memory_provider,
221         )
222     }
223 }
224 
225 pub struct ReadyToEncode<OP: BufferHandles, P: HandlesProvider> {
226     output_queue: Queue<Output, BuffersAllocated<OP>>,
227     capture_queue: Queue<Capture, BuffersAllocated<P::HandleType>>,
228     capture_memory_provider: P,
229     poll_wakeups_counter: Option<Arc<AtomicUsize>>,
230 }
231 impl<OP: BufferHandles, P: HandlesProvider> EncoderState for ReadyToEncode<OP, P> {}
232 
233 impl<OP: BufferHandles, P: HandlesProvider> Encoder<ReadyToEncode<OP, P>>
234 where
235     for<'a> Queue<Capture, BuffersAllocated<P::HandleType>>:
236         GetFreeCaptureBuffer<'a, P::HandleType> + GetCaptureBufferByIndex<'a, P::HandleType>,
237 {
set_poll_counter(mut self, poll_wakeups_counter: Arc<AtomicUsize>) -> Self238     pub fn set_poll_counter(mut self, poll_wakeups_counter: Arc<AtomicUsize>) -> Self {
239         self.state.poll_wakeups_counter = Some(poll_wakeups_counter);
240         self
241     }
242 
start<InputDoneCb, OutputReadyCb>( self, input_done_cb: InputDoneCb, output_ready_cb: OutputReadyCb, ) -> io::Result<Encoder<Encoding<OP, P, InputDoneCb, OutputReadyCb>>> where InputDoneCb: Fn(CompletedOutputBuffer<OP>), OutputReadyCb: FnMut(DqBuffer<Capture, P::HandleType>) + Send + 'static,243     pub fn start<InputDoneCb, OutputReadyCb>(
244         self,
245         input_done_cb: InputDoneCb,
246         output_ready_cb: OutputReadyCb,
247     ) -> io::Result<Encoder<Encoding<OP, P, InputDoneCb, OutputReadyCb>>>
248     where
249         InputDoneCb: Fn(CompletedOutputBuffer<OP>),
250         OutputReadyCb: FnMut(DqBuffer<Capture, P::HandleType>) + Send + 'static,
251     {
252         self.state.output_queue.stream_on().unwrap();
253         self.state.capture_queue.stream_on().unwrap();
254 
255         let mut output_poller = Poller::new(Arc::clone(&self.device))?;
256         output_poller.enable_event(DeviceEvent::OutputReady)?;
257 
258         let mut encoder_thread = EncoderThread::new(
259             &self.device,
260             self.state.capture_queue,
261             self.state.capture_memory_provider,
262             output_ready_cb,
263         )?;
264 
265         if let Some(counter) = &self.state.poll_wakeups_counter {
266             output_poller.set_poll_counter(Arc::clone(counter));
267             encoder_thread.set_poll_counter(Arc::clone(counter));
268         }
269 
270         let handle = std::thread::Builder::new()
271             .name("V4L2 Encoder".into())
272             .spawn(move || encoder_thread.run())?;
273 
274         Ok(Encoder {
275             device: self.device,
276             state: Encoding {
277                 output_queue: self.state.output_queue,
278                 input_done_cb,
279                 output_poller,
280                 handle,
281             },
282         })
283     }
284 }
285 
286 pub struct Encoding<OP: BufferHandles, P, InputDoneCb, OutputReadyCb>
287 where
288     P: HandlesProvider,
289     InputDoneCb: Fn(CompletedOutputBuffer<OP>),
290     OutputReadyCb: FnMut(DqBuffer<Capture, P::HandleType>) + Send,
291 {
292     output_queue: Queue<Output, BuffersAllocated<OP>>,
293     input_done_cb: InputDoneCb,
294     output_poller: Poller,
295 
296     handle: JoinHandle<EncoderThread<P, OutputReadyCb>>,
297 }
298 impl<OP, P, InputDoneCb, OutputReadyCb> EncoderState for Encoding<OP, P, InputDoneCb, OutputReadyCb>
299 where
300     OP: BufferHandles,
301     P: HandlesProvider,
302     InputDoneCb: Fn(CompletedOutputBuffer<OP>),
303     OutputReadyCb: FnMut(DqBuffer<Capture, P::HandleType>) + Send,
304 {
305 }
306 
307 // Safe because all Rcs are internal and never leaked outside of the struct.
308 unsafe impl<S: EncoderState> Send for Encoder<S> {}
309 
310 pub enum CompletedOutputBuffer<OP: BufferHandles> {
311     Dequeued(DqBuffer<Output, OP>),
312     Canceled(CanceledBuffer<OP>),
313 }
314 
315 #[derive(Debug, Error)]
316 pub enum GetBufferError {
317     #[error("error while dequeueing buffer")]
318     DequeueError(#[from] DqBufError<V4l2BufferFromError>),
319     #[error("error during poll")]
320     PollError(#[from] PollError),
321     #[error("error while obtaining buffer")]
322     GetFreeBufferError(#[from] GetFreeBufferError),
323 }
324 
325 #[derive(Debug, Error)]
326 pub enum EncoderStopError {
327     #[error("error while sending STOP command")]
328     EncoderCmdError(#[from] ioctl::EncoderCmdError),
329     #[error("thread has panicked")]
330     ThreadPanickedError(Box<dyn Any + Send + 'static>),
331     #[error("cannot streamoff capture queue")]
332     CaptureQueueStreamoffError(ioctl::StreamOffError),
333     #[error("cannot streamoff output queue")]
334     OutputQueueStreamoffError(ioctl::StreamOffError),
335 }
336 
337 impl<OP, P, InputDoneCb, OutputReadyCb> Encoder<Encoding<OP, P, InputDoneCb, OutputReadyCb>>
338 where
339     OP: BufferHandles,
340     P: HandlesProvider,
341     InputDoneCb: Fn(CompletedOutputBuffer<OP>),
342     OutputReadyCb: FnMut(DqBuffer<Capture, P::HandleType>) + Send,
343 {
344     /// Stop the encoder, and returns the encoder ready to be started again.
stop(self) -> Result<Encoder<ReadyToEncode<OP, P>>, EncoderStopError>345     pub fn stop(self) -> Result<Encoder<ReadyToEncode<OP, P>>, EncoderStopError> {
346         ioctl::encoder_cmd::<_, ()>(&*self.device, &EncoderCommand::Stop(false))?;
347 
348         // The encoder thread should receive the LAST buffer and exit on its own.
349         let encoding_thread = self
350             .state
351             .handle
352             .join()
353             .map_err(EncoderStopError::ThreadPanickedError)?;
354 
355         encoding_thread
356             .capture_queue
357             .stream_off()
358             .map_err(EncoderStopError::CaptureQueueStreamoffError)?;
359         /* Return all canceled buffers to the client */
360         let canceled_buffers = self
361             .state
362             .output_queue
363             .stream_off()
364             .map_err(EncoderStopError::OutputQueueStreamoffError)?;
365         for buffer in canceled_buffers {
366             (self.state.input_done_cb)(CompletedOutputBuffer::Canceled(buffer));
367         }
368 
369         Ok(Encoder {
370             device: self.device,
371             state: ReadyToEncode {
372                 output_queue: self.state.output_queue,
373                 capture_queue: encoding_thread.capture_queue,
374                 capture_memory_provider: encoding_thread.capture_memory_provider,
375                 poll_wakeups_counter: None,
376             },
377         })
378     }
379 
380     /// Attempts to dequeue and release output buffers that the driver is done with.
dequeue_output_buffers(&self) -> Result<(), DqBufError<V4l2BufferFromError>>381     fn dequeue_output_buffers(&self) -> Result<(), DqBufError<V4l2BufferFromError>> {
382         let output_queue = &self.state.output_queue;
383 
384         while output_queue.num_queued_buffers() > 0 {
385             match output_queue.try_dequeue() {
386                 Ok(buf) => {
387                     (self.state.input_done_cb)(CompletedOutputBuffer::Dequeued(buf));
388                 }
389                 Err(DqBufError::IoctlError(DqBufIoctlError::NotReady)) => break,
390                 // TODO buffers with the error flag set should not result in
391                 // a fatal error!
392                 Err(e) => return Err(e),
393             }
394         }
395 
396         Ok(())
397     }
398 
399     // Make this thread sleep until at least one OUTPUT buffer is ready to be
400     // obtained through `try_get_buffer()`, dequeuing buffers if necessary.
wait_for_output_buffer(&mut self) -> Result<(), GetBufferError>401     fn wait_for_output_buffer(&mut self) -> Result<(), GetBufferError> {
402         for event in self.state.output_poller.poll(None)? {
403             match event {
404                 PollEvent::Device(DeviceEvent::OutputReady) => {
405                     self.dequeue_output_buffers()?;
406                 }
407                 _ => panic!("Unexpected return from OUTPUT queue poll!"),
408             }
409         }
410 
411         Ok(())
412     }
413 }
414 
415 impl<'a, OP, P, InputDoneCb, OutputReadyCb> OutputQueueableProvider<'a, OP>
416     for Encoder<Encoding<OP, P, InputDoneCb, OutputReadyCb>>
417 where
418     Queue<Output, BuffersAllocated<OP>>: OutputQueueableProvider<'a, OP>,
419     OP: BufferHandles,
420     P: HandlesProvider,
421     InputDoneCb: Fn(CompletedOutputBuffer<OP>),
422     OutputReadyCb: FnMut(DqBuffer<Capture, P::HandleType>) + Send,
423 {
424     type Queueable =
425         <Queue<Output, BuffersAllocated<OP>> as OutputQueueableProvider<'a, OP>>::Queueable;
426 }
427 
428 /// Let the encoder provide the buffers from the OUTPUT queue.
429 impl<'a, OP, P, InputDoneCb, OutputReadyCb> GetFreeOutputBuffer<'a, OP, GetBufferError>
430     for Encoder<Encoding<OP, P, InputDoneCb, OutputReadyCb>>
431 where
432     Queue<Output, BuffersAllocated<OP>>: GetFreeOutputBuffer<'a, OP>,
433     OP: BufferHandles,
434     P: HandlesProvider,
435     InputDoneCb: Fn(CompletedOutputBuffer<OP>),
436     OutputReadyCb: FnMut(DqBuffer<Capture, P::HandleType>) + Send,
437 {
438     /// Returns a V4L2 buffer to be filled with a frame to encode if one
439     /// is available.
440     ///
441     /// This method will return None immediately if all the allocated buffers
442     /// are currently queued.
try_get_free_buffer(&'a self) -> Result<Self::Queueable, GetBufferError>443     fn try_get_free_buffer(&'a self) -> Result<Self::Queueable, GetBufferError> {
444         self.dequeue_output_buffers()?;
445         Ok(self.state.output_queue.try_get_free_buffer()?)
446     }
447 }
448 
449 // If `GetFreeBuffer` is implemented, we can also provide a blocking `get_buffer`
450 // method.
451 impl<'a, OP, P, InputDoneCb, OutputReadyCb> Encoder<Encoding<OP, P, InputDoneCb, OutputReadyCb>>
452 where
453     Self: GetFreeOutputBuffer<'a, OP, GetBufferError>,
454     OP: BufferHandles,
455     P: HandlesProvider,
456     InputDoneCb: Fn(CompletedOutputBuffer<OP>),
457     OutputReadyCb: FnMut(DqBuffer<Capture, P::HandleType>) + Send,
458 {
459     /// Returns a V4L2 buffer to be filled with a frame to encode, waiting for
460     /// one to be available if needed.
461     ///
462     /// Contrary to `try_get_free_buffer(), this method will wait for a buffer
463     /// to be available if needed.
get_buffer( &'a mut self, ) -> Result<<Self as OutputQueueableProvider<'a, OP>>::Queueable, GetBufferError>464     pub fn get_buffer(
465         &'a mut self,
466     ) -> Result<<Self as OutputQueueableProvider<'a, OP>>::Queueable, GetBufferError> {
467         let output_queue = &self.state.output_queue;
468 
469         // If all our buffers are queued, wait until we can dequeue some.
470         if output_queue.num_queued_buffers() == output_queue.num_buffers() {
471             self.wait_for_output_buffer()?;
472         }
473 
474         self.try_get_free_buffer()
475     }
476 }
477 
478 struct EncoderThread<P, OutputReadyCb>
479 where
480     P: HandlesProvider,
481     OutputReadyCb: FnMut(DqBuffer<Capture, P::HandleType>) + Send,
482 {
483     capture_queue: Queue<Capture, BuffersAllocated<P::HandleType>>,
484     capture_memory_provider: P,
485     poller: Poller,
486     waker: Arc<Waker>,
487     output_ready_cb: OutputReadyCb,
488 }
489 
490 impl<P, OutputReadyCb> EncoderThread<P, OutputReadyCb>
491 where
492     P: HandlesProvider,
493     OutputReadyCb: FnMut(DqBuffer<Capture, P::HandleType>) + Send,
494     for<'a> Queue<Capture, BuffersAllocated<P::HandleType>>:
495         GetFreeCaptureBuffer<'a, P::HandleType> + GetCaptureBufferByIndex<'a, P::HandleType>,
496 {
new( device: &Arc<Device>, capture_queue: Queue<Capture, BuffersAllocated<P::HandleType>>, capture_memory_provider: P, output_ready_cb: OutputReadyCb, ) -> io::Result<Self>497     fn new(
498         device: &Arc<Device>,
499         capture_queue: Queue<Capture, BuffersAllocated<P::HandleType>>,
500         capture_memory_provider: P,
501         output_ready_cb: OutputReadyCb,
502     ) -> io::Result<Self> {
503         let mut poller = Poller::new(Arc::clone(device))?;
504 
505         poller.enable_event(DeviceEvent::CaptureReady)?;
506         let waker = poller.add_waker(0)?;
507 
508         Ok(EncoderThread {
509             capture_queue,
510             capture_memory_provider,
511             poller,
512             waker,
513             output_ready_cb,
514         })
515     }
516 
set_poll_counter(&mut self, poll_wakeups_counter: Arc<AtomicUsize>)517     fn set_poll_counter(&mut self, poll_wakeups_counter: Arc<AtomicUsize>) {
518         self.poller.set_poll_counter(poll_wakeups_counter);
519     }
520 
run(mut self) -> Self521     fn run(mut self) -> Self {
522         self.enqueue_capture_buffers();
523 
524         'polling: loop {
525             match self.capture_queue.num_queued_buffers() {
526                 // If there are no buffers on the CAPTURE queue, poll() will return
527                 // immediately with EPOLLERR and we would loop indefinitely.
528                 // Prevent this by temporarily disabling polling the device in such
529                 // cases.
530                 0 => {
531                     self.poller
532                         .disable_event(DeviceEvent::CaptureReady)
533                         .unwrap();
534                 }
535                 // If device polling was disabled and we have buffers queued, we
536                 // can reenable it as poll will now wait for a CAPTURE buffer to
537                 // be ready for dequeue.
538                 _ => {
539                     self.poller.enable_event(DeviceEvent::CaptureReady).unwrap();
540                 }
541             }
542 
543             // TODO handle errors - this system call can be interrupted and we
544             // should leave in this case.
545             for event in self.poller.poll(None).unwrap() {
546                 match event {
547                     // A CAPTURE buffer has been released by the client.
548                     PollEvent::Waker(0) => {
549                         // Requeue all available CAPTURE buffers.
550                         self.enqueue_capture_buffers();
551                     }
552                     // A CAPTURE buffer is ready to be dequeued.
553                     PollEvent::Device(DeviceEvent::CaptureReady) => {
554                         // Get the encoded buffer
555                         // TODO Manage errors here, including corrupted buffers!
556                         if let Ok(mut cap_buf) = self.capture_queue.try_dequeue() {
557                             let is_last = cap_buf.data.is_last();
558                             let is_empty = *cap_buf.data.get_first_plane().bytesused == 0;
559 
560                             // Add a drop callback to the dequeued buffer so we
561                             // re-queue it as soon as it is dropped.
562                             let cap_waker = Arc::clone(&self.waker);
563                             cap_buf.add_drop_callback(move |_dqbuf| {
564                                 cap_waker.wake();
565                             });
566 
567                             // Empty buffers do not need to be passed to the client.
568                             if !is_empty {
569                                 (self.output_ready_cb)(cap_buf);
570                             }
571 
572                             // Last buffer of the stream? Time for us to terminate.
573                             if is_last {
574                                 break 'polling;
575                             }
576                         } else {
577                             // TODO we should not crash here.
578                             panic!("Expected a CAPTURE buffer but none available!");
579                         }
580                     }
581                     _ => panic!("Unexpected return from CAPTURE queue poll!"),
582                 }
583             }
584         }
585 
586         self
587     }
588 
enqueue_capture_buffers(&mut self)589     fn enqueue_capture_buffers(&mut self) {
590         'enqueue: while let Some(handles) = self.capture_memory_provider.get_handles(&self.waker) {
591             if let Ok(buffer) = self
592                 .capture_memory_provider
593                 .get_suitable_buffer_for(&handles, &self.capture_queue)
594             {
595                 buffer.queue_with_handles(handles).unwrap();
596             } else {
597                 warn!("Handles potentially lost due to no V4L2 buffer being available");
598                 break 'enqueue;
599             }
600         }
601     }
602 }
603