1 //! The implementation is based on Dmitry Vyukov's bounded MPMC queue.
2 //!
3 //! Source:
4 //!   - <http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue>
5 
6 use alloc::boxed::Box;
7 use core::cell::UnsafeCell;
8 use core::fmt;
9 use core::mem::{self, MaybeUninit};
10 use core::panic::{RefUnwindSafe, UnwindSafe};
11 use core::sync::atomic::{self, AtomicUsize, Ordering};
12 
13 use crossbeam_utils::{Backoff, CachePadded};
14 
15 /// A slot in a queue.
16 struct Slot<T> {
17     /// The current stamp.
18     ///
19     /// If the stamp equals the tail, this node will be next written to. If it equals head + 1,
20     /// this node will be next read from.
21     stamp: AtomicUsize,
22 
23     /// The value in this slot.
24     value: UnsafeCell<MaybeUninit<T>>,
25 }
26 
27 /// A bounded multi-producer multi-consumer queue.
28 ///
29 /// This queue allocates a fixed-capacity buffer on construction, which is used to store pushed
30 /// elements. The queue cannot hold more elements than the buffer allows. Attempting to push an
31 /// element into a full queue will fail. Alternatively, [`force_push`] makes it possible for
32 /// this queue to be used as a ring-buffer. Having a buffer allocated upfront makes this queue
33 /// a bit faster than [`SegQueue`].
34 ///
35 /// [`force_push`]: ArrayQueue::force_push
36 /// [`SegQueue`]: super::SegQueue
37 ///
38 /// # Examples
39 ///
40 /// ```
41 /// use crossbeam_queue::ArrayQueue;
42 ///
43 /// let q = ArrayQueue::new(2);
44 ///
45 /// assert_eq!(q.push('a'), Ok(()));
46 /// assert_eq!(q.push('b'), Ok(()));
47 /// assert_eq!(q.push('c'), Err('c'));
48 /// assert_eq!(q.pop(), Some('a'));
49 /// ```
50 pub struct ArrayQueue<T> {
51     /// The head of the queue.
52     ///
53     /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a
54     /// single `usize`. The lower bits represent the index, while the upper bits represent the lap.
55     ///
56     /// Elements are popped from the head of the queue.
57     head: CachePadded<AtomicUsize>,
58 
59     /// The tail of the queue.
60     ///
61     /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a
62     /// single `usize`. The lower bits represent the index, while the upper bits represent the lap.
63     ///
64     /// Elements are pushed into the tail of the queue.
65     tail: CachePadded<AtomicUsize>,
66 
67     /// The buffer holding slots.
68     buffer: Box<[Slot<T>]>,
69 
70     /// The queue capacity.
71     cap: usize,
72 
73     /// A stamp with the value of `{ lap: 1, index: 0 }`.
74     one_lap: usize,
75 }
76 
77 unsafe impl<T: Send> Sync for ArrayQueue<T> {}
78 unsafe impl<T: Send> Send for ArrayQueue<T> {}
79 
80 impl<T> UnwindSafe for ArrayQueue<T> {}
81 impl<T> RefUnwindSafe for ArrayQueue<T> {}
82 
83 impl<T> ArrayQueue<T> {
84     /// Creates a new bounded queue with the given capacity.
85     ///
86     /// # Panics
87     ///
88     /// Panics if the capacity is zero.
89     ///
90     /// # Examples
91     ///
92     /// ```
93     /// use crossbeam_queue::ArrayQueue;
94     ///
95     /// let q = ArrayQueue::<i32>::new(100);
96     /// ```
new(cap: usize) -> ArrayQueue<T>97     pub fn new(cap: usize) -> ArrayQueue<T> {
98         assert!(cap > 0, "capacity must be non-zero");
99 
100         // Head is initialized to `{ lap: 0, index: 0 }`.
101         // Tail is initialized to `{ lap: 0, index: 0 }`.
102         let head = 0;
103         let tail = 0;
104 
105         // Allocate a buffer of `cap` slots initialized
106         // with stamps.
107         let buffer: Box<[Slot<T>]> = (0..cap)
108             .map(|i| {
109                 // Set the stamp to `{ lap: 0, index: i }`.
110                 Slot {
111                     stamp: AtomicUsize::new(i),
112                     value: UnsafeCell::new(MaybeUninit::uninit()),
113                 }
114             })
115             .collect();
116 
117         // One lap is the smallest power of two greater than `cap`.
118         let one_lap = (cap + 1).next_power_of_two();
119 
120         ArrayQueue {
121             buffer,
122             cap,
123             one_lap,
124             head: CachePadded::new(AtomicUsize::new(head)),
125             tail: CachePadded::new(AtomicUsize::new(tail)),
126         }
127     }
128 
push_or_else<F>(&self, mut value: T, f: F) -> Result<(), T> where F: Fn(T, usize, usize, &Slot<T>) -> Result<T, T>,129     fn push_or_else<F>(&self, mut value: T, f: F) -> Result<(), T>
130     where
131         F: Fn(T, usize, usize, &Slot<T>) -> Result<T, T>,
132     {
133         let backoff = Backoff::new();
134         let mut tail = self.tail.load(Ordering::Relaxed);
135 
136         loop {
137             // Deconstruct the tail.
138             let index = tail & (self.one_lap - 1);
139             let lap = tail & !(self.one_lap - 1);
140 
141             let new_tail = if index + 1 < self.cap {
142                 // Same lap, incremented index.
143                 // Set to `{ lap: lap, index: index + 1 }`.
144                 tail + 1
145             } else {
146                 // One lap forward, index wraps around to zero.
147                 // Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
148                 lap.wrapping_add(self.one_lap)
149             };
150 
151             // Inspect the corresponding slot.
152             debug_assert!(index < self.buffer.len());
153             let slot = unsafe { self.buffer.get_unchecked(index) };
154             let stamp = slot.stamp.load(Ordering::Acquire);
155 
156             // If the tail and the stamp match, we may attempt to push.
157             if tail == stamp {
158                 // Try moving the tail.
159                 match self.tail.compare_exchange_weak(
160                     tail,
161                     new_tail,
162                     Ordering::SeqCst,
163                     Ordering::Relaxed,
164                 ) {
165                     Ok(_) => {
166                         // Write the value into the slot and update the stamp.
167                         unsafe {
168                             slot.value.get().write(MaybeUninit::new(value));
169                         }
170                         slot.stamp.store(tail + 1, Ordering::Release);
171                         return Ok(());
172                     }
173                     Err(t) => {
174                         tail = t;
175                         backoff.spin();
176                     }
177                 }
178             } else if stamp.wrapping_add(self.one_lap) == tail + 1 {
179                 atomic::fence(Ordering::SeqCst);
180                 value = f(value, tail, new_tail, slot)?;
181                 backoff.spin();
182                 tail = self.tail.load(Ordering::Relaxed);
183             } else {
184                 // Snooze because we need to wait for the stamp to get updated.
185                 backoff.snooze();
186                 tail = self.tail.load(Ordering::Relaxed);
187             }
188         }
189     }
190 
191     /// Attempts to push an element into the queue.
192     ///
193     /// If the queue is full, the element is returned back as an error.
194     ///
195     /// # Examples
196     ///
197     /// ```
198     /// use crossbeam_queue::ArrayQueue;
199     ///
200     /// let q = ArrayQueue::new(1);
201     ///
202     /// assert_eq!(q.push(10), Ok(()));
203     /// assert_eq!(q.push(20), Err(20));
204     /// ```
push(&self, value: T) -> Result<(), T>205     pub fn push(&self, value: T) -> Result<(), T> {
206         self.push_or_else(value, |v, tail, _, _| {
207             let head = self.head.load(Ordering::Relaxed);
208 
209             // If the head lags one lap behind the tail as well...
210             if head.wrapping_add(self.one_lap) == tail {
211                 // ...then the queue is full.
212                 Err(v)
213             } else {
214                 Ok(v)
215             }
216         })
217     }
218 
219     /// Pushes an element into the queue, replacing the oldest element if necessary.
220     ///
221     /// If the queue is full, the oldest element is replaced and returned,
222     /// otherwise `None` is returned.
223     ///
224     /// # Examples
225     ///
226     /// ```
227     /// use crossbeam_queue::ArrayQueue;
228     ///
229     /// let q = ArrayQueue::new(2);
230     ///
231     /// assert_eq!(q.force_push(10), None);
232     /// assert_eq!(q.force_push(20), None);
233     /// assert_eq!(q.force_push(30), Some(10));
234     /// assert_eq!(q.pop(), Some(20));
235     /// ```
force_push(&self, value: T) -> Option<T>236     pub fn force_push(&self, value: T) -> Option<T> {
237         self.push_or_else(value, |v, tail, new_tail, slot| {
238             let head = tail.wrapping_sub(self.one_lap);
239             let new_head = new_tail.wrapping_sub(self.one_lap);
240 
241             // Try moving the head.
242             if self
243                 .head
244                 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Relaxed)
245                 .is_ok()
246             {
247                 // Move the tail.
248                 self.tail.store(new_tail, Ordering::SeqCst);
249 
250                 // Swap the previous value.
251                 let old = unsafe { slot.value.get().replace(MaybeUninit::new(v)).assume_init() };
252 
253                 // Update the stamp.
254                 slot.stamp.store(tail + 1, Ordering::Release);
255 
256                 Err(old)
257             } else {
258                 Ok(v)
259             }
260         })
261         .err()
262     }
263 
264     /// Attempts to pop an element from the queue.
265     ///
266     /// If the queue is empty, `None` is returned.
267     ///
268     /// # Examples
269     ///
270     /// ```
271     /// use crossbeam_queue::ArrayQueue;
272     ///
273     /// let q = ArrayQueue::new(1);
274     /// assert_eq!(q.push(10), Ok(()));
275     ///
276     /// assert_eq!(q.pop(), Some(10));
277     /// assert!(q.pop().is_none());
278     /// ```
pop(&self) -> Option<T>279     pub fn pop(&self) -> Option<T> {
280         let backoff = Backoff::new();
281         let mut head = self.head.load(Ordering::Relaxed);
282 
283         loop {
284             // Deconstruct the head.
285             let index = head & (self.one_lap - 1);
286             let lap = head & !(self.one_lap - 1);
287 
288             // Inspect the corresponding slot.
289             debug_assert!(index < self.buffer.len());
290             let slot = unsafe { self.buffer.get_unchecked(index) };
291             let stamp = slot.stamp.load(Ordering::Acquire);
292 
293             // If the the stamp is ahead of the head by 1, we may attempt to pop.
294             if head + 1 == stamp {
295                 let new = if index + 1 < self.cap {
296                     // Same lap, incremented index.
297                     // Set to `{ lap: lap, index: index + 1 }`.
298                     head + 1
299                 } else {
300                     // One lap forward, index wraps around to zero.
301                     // Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
302                     lap.wrapping_add(self.one_lap)
303                 };
304 
305                 // Try moving the head.
306                 match self.head.compare_exchange_weak(
307                     head,
308                     new,
309                     Ordering::SeqCst,
310                     Ordering::Relaxed,
311                 ) {
312                     Ok(_) => {
313                         // Read the value from the slot and update the stamp.
314                         let msg = unsafe { slot.value.get().read().assume_init() };
315                         slot.stamp
316                             .store(head.wrapping_add(self.one_lap), Ordering::Release);
317                         return Some(msg);
318                     }
319                     Err(h) => {
320                         head = h;
321                         backoff.spin();
322                     }
323                 }
324             } else if stamp == head {
325                 atomic::fence(Ordering::SeqCst);
326                 let tail = self.tail.load(Ordering::Relaxed);
327 
328                 // If the tail equals the head, that means the channel is empty.
329                 if tail == head {
330                     return None;
331                 }
332 
333                 backoff.spin();
334                 head = self.head.load(Ordering::Relaxed);
335             } else {
336                 // Snooze because we need to wait for the stamp to get updated.
337                 backoff.snooze();
338                 head = self.head.load(Ordering::Relaxed);
339             }
340         }
341     }
342 
343     /// Returns the capacity of the queue.
344     ///
345     /// # Examples
346     ///
347     /// ```
348     /// use crossbeam_queue::ArrayQueue;
349     ///
350     /// let q = ArrayQueue::<i32>::new(100);
351     ///
352     /// assert_eq!(q.capacity(), 100);
353     /// ```
capacity(&self) -> usize354     pub fn capacity(&self) -> usize {
355         self.cap
356     }
357 
358     /// Returns `true` if the queue is empty.
359     ///
360     /// # Examples
361     ///
362     /// ```
363     /// use crossbeam_queue::ArrayQueue;
364     ///
365     /// let q = ArrayQueue::new(100);
366     ///
367     /// assert!(q.is_empty());
368     /// q.push(1).unwrap();
369     /// assert!(!q.is_empty());
370     /// ```
is_empty(&self) -> bool371     pub fn is_empty(&self) -> bool {
372         let head = self.head.load(Ordering::SeqCst);
373         let tail = self.tail.load(Ordering::SeqCst);
374 
375         // Is the tail lagging one lap behind head?
376         // Is the tail equal to the head?
377         //
378         // Note: If the head changes just before we load the tail, that means there was a moment
379         // when the channel was not empty, so it is safe to just return `false`.
380         tail == head
381     }
382 
383     /// Returns `true` if the queue is full.
384     ///
385     /// # Examples
386     ///
387     /// ```
388     /// use crossbeam_queue::ArrayQueue;
389     ///
390     /// let q = ArrayQueue::new(1);
391     ///
392     /// assert!(!q.is_full());
393     /// q.push(1).unwrap();
394     /// assert!(q.is_full());
395     /// ```
is_full(&self) -> bool396     pub fn is_full(&self) -> bool {
397         let tail = self.tail.load(Ordering::SeqCst);
398         let head = self.head.load(Ordering::SeqCst);
399 
400         // Is the head lagging one lap behind tail?
401         //
402         // Note: If the tail changes just before we load the head, that means there was a moment
403         // when the queue was not full, so it is safe to just return `false`.
404         head.wrapping_add(self.one_lap) == tail
405     }
406 
407     /// Returns the number of elements in the queue.
408     ///
409     /// # Examples
410     ///
411     /// ```
412     /// use crossbeam_queue::ArrayQueue;
413     ///
414     /// let q = ArrayQueue::new(100);
415     /// assert_eq!(q.len(), 0);
416     ///
417     /// q.push(10).unwrap();
418     /// assert_eq!(q.len(), 1);
419     ///
420     /// q.push(20).unwrap();
421     /// assert_eq!(q.len(), 2);
422     /// ```
len(&self) -> usize423     pub fn len(&self) -> usize {
424         loop {
425             // Load the tail, then load the head.
426             let tail = self.tail.load(Ordering::SeqCst);
427             let head = self.head.load(Ordering::SeqCst);
428 
429             // If the tail didn't change, we've got consistent values to work with.
430             if self.tail.load(Ordering::SeqCst) == tail {
431                 let hix = head & (self.one_lap - 1);
432                 let tix = tail & (self.one_lap - 1);
433 
434                 return if hix < tix {
435                     tix - hix
436                 } else if hix > tix {
437                     self.cap - hix + tix
438                 } else if tail == head {
439                     0
440                 } else {
441                     self.cap
442                 };
443             }
444         }
445     }
446 }
447 
448 impl<T> Drop for ArrayQueue<T> {
drop(&mut self)449     fn drop(&mut self) {
450         if mem::needs_drop::<T>() {
451             // Get the index of the head.
452             let head = *self.head.get_mut();
453             let tail = *self.tail.get_mut();
454 
455             let hix = head & (self.one_lap - 1);
456             let tix = tail & (self.one_lap - 1);
457 
458             let len = if hix < tix {
459                 tix - hix
460             } else if hix > tix {
461                 self.cap - hix + tix
462             } else if tail == head {
463                 0
464             } else {
465                 self.cap
466             };
467 
468             // Loop over all slots that hold a message and drop them.
469             for i in 0..len {
470                 // Compute the index of the next slot holding a message.
471                 let index = if hix + i < self.cap {
472                     hix + i
473                 } else {
474                     hix + i - self.cap
475                 };
476 
477                 unsafe {
478                     debug_assert!(index < self.buffer.len());
479                     let slot = self.buffer.get_unchecked_mut(index);
480                     (*slot.value.get()).assume_init_drop();
481                 }
482             }
483         }
484     }
485 }
486 
487 impl<T> fmt::Debug for ArrayQueue<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result488     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
489         f.pad("ArrayQueue { .. }")
490     }
491 }
492 
493 impl<T> IntoIterator for ArrayQueue<T> {
494     type Item = T;
495 
496     type IntoIter = IntoIter<T>;
497 
into_iter(self) -> Self::IntoIter498     fn into_iter(self) -> Self::IntoIter {
499         IntoIter { value: self }
500     }
501 }
502 
503 #[derive(Debug)]
504 pub struct IntoIter<T> {
505     value: ArrayQueue<T>,
506 }
507 
508 impl<T> Iterator for IntoIter<T> {
509     type Item = T;
510 
next(&mut self) -> Option<Self::Item>511     fn next(&mut self) -> Option<Self::Item> {
512         let value = &mut self.value;
513         let head = *value.head.get_mut();
514         if value.head.get_mut() != value.tail.get_mut() {
515             let index = head & (value.one_lap - 1);
516             let lap = head & !(value.one_lap - 1);
517             // SAFETY: We have mutable access to this, so we can read without
518             // worrying about concurrency. Furthermore, we know this is
519             // initialized because it is the value pointed at by `value.head`
520             // and this is a non-empty queue.
521             let val = unsafe {
522                 debug_assert!(index < value.buffer.len());
523                 let slot = value.buffer.get_unchecked_mut(index);
524                 slot.value.get().read().assume_init()
525             };
526             let new = if index + 1 < value.cap {
527                 // Same lap, incremented index.
528                 // Set to `{ lap: lap, index: index + 1 }`.
529                 head + 1
530             } else {
531                 // One lap forward, index wraps around to zero.
532                 // Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
533                 lap.wrapping_add(value.one_lap)
534             };
535             *value.head.get_mut() = new;
536             Option::Some(val)
537         } else {
538             Option::None
539         }
540     }
541 }
542