1 //! The global data and participant for garbage collection.
2 //!
3 //! # Registration
4 //!
5 //! In order to track all participants in one place, we need some form of participant
6 //! registration. When a participant is created, it is registered to a global lock-free
7 //! singly-linked list of registries; and when a participant is leaving, it is unregistered from the
8 //! list.
9 //!
10 //! # Pinning
11 //!
12 //! Every participant contains an integer that tells whether the participant is pinned and if so,
13 //! what was the global epoch at the time it was pinned. Participants also hold a pin counter that
14 //! aids in periodic global epoch advancement.
15 //!
16 //! When a participant is pinned, a `Guard` is returned as a witness that the participant is pinned.
17 //! Guards are necessary for performing atomic operations, and for freeing/dropping locations.
18 //!
19 //! # Thread-local bag
20 //!
21 //! Objects that get unlinked from concurrent data structures must be stashed away until the global
22 //! epoch sufficiently advances so that they become safe for destruction. Pointers to such objects
23 //! are pushed into a thread-local bag, and when it becomes full, the bag is marked with the current
24 //! global epoch and pushed into the global queue of bags. We store objects in thread-local storages
25 //! for amortizing the synchronization cost of pushing the garbages to a global queue.
26 //!
27 //! # Global queue
28 //!
29 //! Whenever a bag is pushed into a queue, the objects in some bags in the queue are collected and
30 //! destroyed along the way. This design reduces contention on data structures. The global queue
31 //! cannot be explicitly accessed: the only way to interact with it is by calling functions
32 //! `defer()` that adds an object to the thread-local bag, or `collect()` that manually triggers
33 //! garbage collection.
34 //!
35 //! Ideally each instance of concurrent data structure may have its own queue that gets fully
36 //! destroyed as soon as the data structure gets dropped.
37 
38 use crate::primitive::cell::UnsafeCell;
39 use crate::primitive::sync::atomic::{self, Ordering};
40 use core::cell::Cell;
41 use core::mem::{self, ManuallyDrop};
42 use core::num::Wrapping;
43 use core::{fmt, ptr};
44 
45 use crossbeam_utils::CachePadded;
46 
47 use crate::atomic::{Owned, Shared};
48 use crate::collector::{Collector, LocalHandle};
49 use crate::deferred::Deferred;
50 use crate::epoch::{AtomicEpoch, Epoch};
51 use crate::guard::{unprotected, Guard};
52 use crate::sync::list::{Entry, IsElement, IterError, List};
53 use crate::sync::queue::Queue;
54 
55 /// Maximum number of objects a bag can contain.
56 #[cfg(not(any(crossbeam_sanitize, miri)))]
57 const MAX_OBJECTS: usize = 64;
58 // Makes it more likely to trigger any potential data races.
59 #[cfg(any(crossbeam_sanitize, miri))]
60 const MAX_OBJECTS: usize = 4;
61 
62 /// A bag of deferred functions.
63 pub(crate) struct Bag {
64     /// Stashed objects.
65     deferreds: [Deferred; MAX_OBJECTS],
66     len: usize,
67 }
68 
69 /// `Bag::try_push()` requires that it is safe for another thread to execute the given functions.
70 unsafe impl Send for Bag {}
71 
72 impl Bag {
73     /// Returns a new, empty bag.
new() -> Self74     pub(crate) fn new() -> Self {
75         Self::default()
76     }
77 
78     /// Returns `true` if the bag is empty.
is_empty(&self) -> bool79     pub(crate) fn is_empty(&self) -> bool {
80         self.len == 0
81     }
82 
83     /// Attempts to insert a deferred function into the bag.
84     ///
85     /// Returns `Ok(())` if successful, and `Err(deferred)` for the given `deferred` if the bag is
86     /// full.
87     ///
88     /// # Safety
89     ///
90     /// It should be safe for another thread to execute the given function.
try_push(&mut self, deferred: Deferred) -> Result<(), Deferred>91     pub(crate) unsafe fn try_push(&mut self, deferred: Deferred) -> Result<(), Deferred> {
92         if self.len < MAX_OBJECTS {
93             self.deferreds[self.len] = deferred;
94             self.len += 1;
95             Ok(())
96         } else {
97             Err(deferred)
98         }
99     }
100 
101     /// Seals the bag with the given epoch.
seal(self, epoch: Epoch) -> SealedBag102     fn seal(self, epoch: Epoch) -> SealedBag {
103         SealedBag { epoch, _bag: self }
104     }
105 }
106 
107 impl Default for Bag {
default() -> Self108     fn default() -> Self {
109         Bag {
110             len: 0,
111             deferreds: [Deferred::NO_OP; MAX_OBJECTS],
112         }
113     }
114 }
115 
116 impl Drop for Bag {
drop(&mut self)117     fn drop(&mut self) {
118         // Call all deferred functions.
119         for deferred in &mut self.deferreds[..self.len] {
120             let no_op = Deferred::NO_OP;
121             let owned_deferred = mem::replace(deferred, no_op);
122             owned_deferred.call();
123         }
124     }
125 }
126 
127 // can't #[derive(Debug)] because Debug is not implemented for arrays 64 items long
128 impl fmt::Debug for Bag {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result129     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130         f.debug_struct("Bag")
131             .field("deferreds", &&self.deferreds[..self.len])
132             .finish()
133     }
134 }
135 
136 /// A pair of an epoch and a bag.
137 #[derive(Default, Debug)]
138 struct SealedBag {
139     epoch: Epoch,
140     _bag: Bag,
141 }
142 
143 /// It is safe to share `SealedBag` because `is_expired` only inspects the epoch.
144 unsafe impl Sync for SealedBag {}
145 
146 impl SealedBag {
147     /// Checks if it is safe to drop the bag w.r.t. the given global epoch.
is_expired(&self, global_epoch: Epoch) -> bool148     fn is_expired(&self, global_epoch: Epoch) -> bool {
149         // A pinned participant can witness at most one epoch advancement. Therefore, any bag that
150         // is within one epoch of the current one cannot be destroyed yet.
151         global_epoch.wrapping_sub(self.epoch) >= 2
152     }
153 }
154 
155 /// The global data for a garbage collector.
156 pub(crate) struct Global {
157     /// The intrusive linked list of `Local`s.
158     locals: List<Local>,
159 
160     /// The global queue of bags of deferred functions.
161     queue: Queue<SealedBag>,
162 
163     /// The global epoch.
164     pub(crate) epoch: CachePadded<AtomicEpoch>,
165 }
166 
167 impl Global {
168     /// Number of bags to destroy.
169     const COLLECT_STEPS: usize = 8;
170 
171     /// Creates a new global data for garbage collection.
172     #[inline]
new() -> Self173     pub(crate) fn new() -> Self {
174         Self {
175             locals: List::new(),
176             queue: Queue::new(),
177             epoch: CachePadded::new(AtomicEpoch::new(Epoch::starting())),
178         }
179     }
180 
181     /// Pushes the bag into the global queue and replaces the bag with a new empty bag.
push_bag(&self, bag: &mut Bag, guard: &Guard)182     pub(crate) fn push_bag(&self, bag: &mut Bag, guard: &Guard) {
183         let bag = mem::replace(bag, Bag::new());
184 
185         atomic::fence(Ordering::SeqCst);
186 
187         let epoch = self.epoch.load(Ordering::Relaxed);
188         self.queue.push(bag.seal(epoch), guard);
189     }
190 
191     /// Collects several bags from the global queue and executes deferred functions in them.
192     ///
193     /// Note: This may itself produce garbage and in turn allocate new bags.
194     ///
195     /// `pin()` rarely calls `collect()`, so we want the compiler to place that call on a cold
196     /// path. In other words, we want the compiler to optimize branching for the case when
197     /// `collect()` is not called.
198     #[cold]
collect(&self, guard: &Guard)199     pub(crate) fn collect(&self, guard: &Guard) {
200         let global_epoch = self.try_advance(guard);
201 
202         let steps = if cfg!(crossbeam_sanitize) {
203             usize::max_value()
204         } else {
205             Self::COLLECT_STEPS
206         };
207 
208         for _ in 0..steps {
209             match self.queue.try_pop_if(
210                 &|sealed_bag: &SealedBag| sealed_bag.is_expired(global_epoch),
211                 guard,
212             ) {
213                 None => break,
214                 Some(sealed_bag) => drop(sealed_bag),
215             }
216         }
217     }
218 
219     /// Attempts to advance the global epoch.
220     ///
221     /// The global epoch can advance only if all currently pinned participants have been pinned in
222     /// the current epoch.
223     ///
224     /// Returns the current global epoch.
225     ///
226     /// `try_advance()` is annotated `#[cold]` because it is rarely called.
227     #[cold]
try_advance(&self, guard: &Guard) -> Epoch228     pub(crate) fn try_advance(&self, guard: &Guard) -> Epoch {
229         let global_epoch = self.epoch.load(Ordering::Relaxed);
230         atomic::fence(Ordering::SeqCst);
231 
232         // TODO(stjepang): `Local`s are stored in a linked list because linked lists are fairly
233         // easy to implement in a lock-free manner. However, traversal can be slow due to cache
234         // misses and data dependencies. We should experiment with other data structures as well.
235         for local in self.locals.iter(guard) {
236             match local {
237                 Err(IterError::Stalled) => {
238                     // A concurrent thread stalled this iteration. That thread might also try to
239                     // advance the epoch, in which case we leave the job to it. Otherwise, the
240                     // epoch will not be advanced.
241                     return global_epoch;
242                 }
243                 Ok(local) => {
244                     let local_epoch = local.epoch.load(Ordering::Relaxed);
245 
246                     // If the participant was pinned in a different epoch, we cannot advance the
247                     // global epoch just yet.
248                     if local_epoch.is_pinned() && local_epoch.unpinned() != global_epoch {
249                         return global_epoch;
250                     }
251                 }
252             }
253         }
254         atomic::fence(Ordering::Acquire);
255 
256         // All pinned participants were pinned in the current global epoch.
257         // Now let's advance the global epoch...
258         //
259         // Note that if another thread already advanced it before us, this store will simply
260         // overwrite the global epoch with the same value. This is true because `try_advance` was
261         // called from a thread that was pinned in `global_epoch`, and the global epoch cannot be
262         // advanced two steps ahead of it.
263         let new_epoch = global_epoch.successor();
264         self.epoch.store(new_epoch, Ordering::Release);
265         new_epoch
266     }
267 }
268 
269 /// Participant for garbage collection.
270 #[repr(C)] // Note: `entry` must be the first field
271 pub(crate) struct Local {
272     /// A node in the intrusive linked list of `Local`s.
273     entry: Entry,
274 
275     /// A reference to the global data.
276     ///
277     /// When all guards and handles get dropped, this reference is destroyed.
278     collector: UnsafeCell<ManuallyDrop<Collector>>,
279 
280     /// The local bag of deferred functions.
281     pub(crate) bag: UnsafeCell<Bag>,
282 
283     /// The number of guards keeping this participant pinned.
284     guard_count: Cell<usize>,
285 
286     /// The number of active handles.
287     handle_count: Cell<usize>,
288 
289     /// Total number of pinnings performed.
290     ///
291     /// This is just an auxiliary counter that sometimes kicks off collection.
292     pin_count: Cell<Wrapping<usize>>,
293 
294     /// The local epoch.
295     epoch: CachePadded<AtomicEpoch>,
296 }
297 
298 // Make sure `Local` is less than or equal to 2048 bytes.
299 // https://github.com/crossbeam-rs/crossbeam/issues/551
300 #[cfg(not(any(crossbeam_sanitize, miri)))] // `crossbeam_sanitize` and `miri` reduce the size of `Local`
301 #[test]
local_size()302 fn local_size() {
303     // TODO: https://github.com/crossbeam-rs/crossbeam/issues/869
304     // assert!(
305     //     core::mem::size_of::<Local>() <= 2048,
306     //     "An allocation of `Local` should be <= 2048 bytes."
307     // );
308 }
309 
310 impl Local {
311     /// Number of pinnings after which a participant will execute some deferred functions from the
312     /// global queue.
313     const PINNINGS_BETWEEN_COLLECT: usize = 128;
314 
315     /// Registers a new `Local` in the provided `Global`.
register(collector: &Collector) -> LocalHandle316     pub(crate) fn register(collector: &Collector) -> LocalHandle {
317         unsafe {
318             // Since we dereference no pointers in this block, it is safe to use `unprotected`.
319 
320             let local = Owned::new(Local {
321                 entry: Entry::default(),
322                 collector: UnsafeCell::new(ManuallyDrop::new(collector.clone())),
323                 bag: UnsafeCell::new(Bag::new()),
324                 guard_count: Cell::new(0),
325                 handle_count: Cell::new(1),
326                 pin_count: Cell::new(Wrapping(0)),
327                 epoch: CachePadded::new(AtomicEpoch::new(Epoch::starting())),
328             })
329             .into_shared(unprotected());
330             collector.global.locals.insert(local, unprotected());
331             LocalHandle {
332                 local: local.as_raw(),
333             }
334         }
335     }
336 
337     /// Returns a reference to the `Global` in which this `Local` resides.
338     #[inline]
global(&self) -> &Global339     pub(crate) fn global(&self) -> &Global {
340         &self.collector().global
341     }
342 
343     /// Returns a reference to the `Collector` in which this `Local` resides.
344     #[inline]
collector(&self) -> &Collector345     pub(crate) fn collector(&self) -> &Collector {
346         self.collector.with(|c| unsafe { &**c })
347     }
348 
349     /// Returns `true` if the current participant is pinned.
350     #[inline]
is_pinned(&self) -> bool351     pub(crate) fn is_pinned(&self) -> bool {
352         self.guard_count.get() > 0
353     }
354 
355     /// Adds `deferred` to the thread-local bag.
356     ///
357     /// # Safety
358     ///
359     /// It should be safe for another thread to execute the given function.
defer(&self, mut deferred: Deferred, guard: &Guard)360     pub(crate) unsafe fn defer(&self, mut deferred: Deferred, guard: &Guard) {
361         let bag = self.bag.with_mut(|b| &mut *b);
362 
363         while let Err(d) = bag.try_push(deferred) {
364             self.global().push_bag(bag, guard);
365             deferred = d;
366         }
367     }
368 
flush(&self, guard: &Guard)369     pub(crate) fn flush(&self, guard: &Guard) {
370         let bag = self.bag.with_mut(|b| unsafe { &mut *b });
371 
372         if !bag.is_empty() {
373             self.global().push_bag(bag, guard);
374         }
375 
376         self.global().collect(guard);
377     }
378 
379     /// Pins the `Local`.
380     #[inline]
pin(&self) -> Guard381     pub(crate) fn pin(&self) -> Guard {
382         let guard = Guard { local: self };
383 
384         let guard_count = self.guard_count.get();
385         self.guard_count.set(guard_count.checked_add(1).unwrap());
386 
387         if guard_count == 0 {
388             let global_epoch = self.global().epoch.load(Ordering::Relaxed);
389             let new_epoch = global_epoch.pinned();
390 
391             // Now we must store `new_epoch` into `self.epoch` and execute a `SeqCst` fence.
392             // The fence makes sure that any future loads from `Atomic`s will not happen before
393             // this store.
394             if cfg!(all(
395                 any(target_arch = "x86", target_arch = "x86_64"),
396                 not(miri)
397             )) {
398                 // HACK(stjepang): On x86 architectures there are two different ways of executing
399                 // a `SeqCst` fence.
400                 //
401                 // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
402                 // 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg`
403                 //    instruction.
404                 //
405                 // Both instructions have the effect of a full barrier, but benchmarks have shown
406                 // that the second one makes pinning faster in this particular case.  It is not
407                 // clear that this is permitted by the C++ memory model (SC fences work very
408                 // differently from SC accesses), but experimental evidence suggests that this
409                 // works fine.  Using inline assembly would be a viable (and correct) alternative,
410                 // but alas, that is not possible on stable Rust.
411                 let current = Epoch::starting();
412                 let res = self.epoch.compare_exchange(
413                     current,
414                     new_epoch,
415                     Ordering::SeqCst,
416                     Ordering::SeqCst,
417                 );
418                 debug_assert!(res.is_ok(), "participant was expected to be unpinned");
419                 // We add a compiler fence to make it less likely for LLVM to do something wrong
420                 // here.  Formally, this is not enough to get rid of data races; practically,
421                 // it should go a long way.
422                 atomic::compiler_fence(Ordering::SeqCst);
423             } else {
424                 self.epoch.store(new_epoch, Ordering::Relaxed);
425                 atomic::fence(Ordering::SeqCst);
426             }
427 
428             // Increment the pin counter.
429             let count = self.pin_count.get();
430             self.pin_count.set(count + Wrapping(1));
431 
432             // After every `PINNINGS_BETWEEN_COLLECT` try advancing the epoch and collecting
433             // some garbage.
434             if count.0 % Self::PINNINGS_BETWEEN_COLLECT == 0 {
435                 self.global().collect(&guard);
436             }
437         }
438 
439         guard
440     }
441 
442     /// Unpins the `Local`.
443     #[inline]
unpin(&self)444     pub(crate) fn unpin(&self) {
445         let guard_count = self.guard_count.get();
446         self.guard_count.set(guard_count - 1);
447 
448         if guard_count == 1 {
449             self.epoch.store(Epoch::starting(), Ordering::Release);
450 
451             if self.handle_count.get() == 0 {
452                 self.finalize();
453             }
454         }
455     }
456 
457     /// Unpins and then pins the `Local`.
458     #[inline]
repin(&self)459     pub(crate) fn repin(&self) {
460         let guard_count = self.guard_count.get();
461 
462         // Update the local epoch only if there's only one guard.
463         if guard_count == 1 {
464             let epoch = self.epoch.load(Ordering::Relaxed);
465             let global_epoch = self.global().epoch.load(Ordering::Relaxed).pinned();
466 
467             // Update the local epoch only if the global epoch is greater than the local epoch.
468             if epoch != global_epoch {
469                 // We store the new epoch with `Release` because we need to ensure any memory
470                 // accesses from the previous epoch do not leak into the new one.
471                 self.epoch.store(global_epoch, Ordering::Release);
472 
473                 // However, we don't need a following `SeqCst` fence, because it is safe for memory
474                 // accesses from the new epoch to be executed before updating the local epoch. At
475                 // worse, other threads will see the new epoch late and delay GC slightly.
476             }
477         }
478     }
479 
480     /// Increments the handle count.
481     #[inline]
acquire_handle(&self)482     pub(crate) fn acquire_handle(&self) {
483         let handle_count = self.handle_count.get();
484         debug_assert!(handle_count >= 1);
485         self.handle_count.set(handle_count + 1);
486     }
487 
488     /// Decrements the handle count.
489     #[inline]
release_handle(&self)490     pub(crate) fn release_handle(&self) {
491         let guard_count = self.guard_count.get();
492         let handle_count = self.handle_count.get();
493         debug_assert!(handle_count >= 1);
494         self.handle_count.set(handle_count - 1);
495 
496         if guard_count == 0 && handle_count == 1 {
497             self.finalize();
498         }
499     }
500 
501     /// Removes the `Local` from the global linked list.
502     #[cold]
finalize(&self)503     fn finalize(&self) {
504         debug_assert_eq!(self.guard_count.get(), 0);
505         debug_assert_eq!(self.handle_count.get(), 0);
506 
507         // Temporarily increment handle count. This is required so that the following call to `pin`
508         // doesn't call `finalize` again.
509         self.handle_count.set(1);
510         unsafe {
511             // Pin and move the local bag into the global queue. It's important that `push_bag`
512             // doesn't defer destruction on any new garbage.
513             let guard = &self.pin();
514             self.global()
515                 .push_bag(self.bag.with_mut(|b| &mut *b), guard);
516         }
517         // Revert the handle count back to zero.
518         self.handle_count.set(0);
519 
520         unsafe {
521             // Take the reference to the `Global` out of this `Local`. Since we're not protected
522             // by a guard at this time, it's crucial that the reference is read before marking the
523             // `Local` as deleted.
524             let collector: Collector = ptr::read(self.collector.with(|c| &*(*c)));
525 
526             // Mark this node in the linked list as deleted.
527             self.entry.delete(unprotected());
528 
529             // Finally, drop the reference to the global. Note that this might be the last reference
530             // to the `Global`. If so, the global data will be destroyed and all deferred functions
531             // in its queue will be executed.
532             drop(collector);
533         }
534     }
535 }
536 
537 impl IsElement<Self> for Local {
entry_of(local: &Self) -> &Entry538     fn entry_of(local: &Self) -> &Entry {
539         // SAFETY: `Local` is `repr(C)` and `entry` is the first field of it.
540         unsafe {
541             let entry_ptr = (local as *const Self).cast::<Entry>();
542             &*entry_ptr
543         }
544     }
545 
element_of(entry: &Entry) -> &Self546     unsafe fn element_of(entry: &Entry) -> &Self {
547         // SAFETY: `Local` is `repr(C)` and `entry` is the first field of it.
548         let local_ptr = (entry as *const Entry).cast::<Self>();
549         &*local_ptr
550     }
551 
finalize(entry: &Entry, guard: &Guard)552     unsafe fn finalize(entry: &Entry, guard: &Guard) {
553         guard.defer_destroy(Shared::from(Self::element_of(entry) as *const _));
554     }
555 }
556 
557 #[cfg(all(test, not(crossbeam_loom)))]
558 mod tests {
559     use std::sync::atomic::{AtomicUsize, Ordering};
560 
561     use super::*;
562 
563     #[test]
check_defer()564     fn check_defer() {
565         static FLAG: AtomicUsize = AtomicUsize::new(0);
566         fn set() {
567             FLAG.store(42, Ordering::Relaxed);
568         }
569 
570         let d = Deferred::new(set);
571         assert_eq!(FLAG.load(Ordering::Relaxed), 0);
572         d.call();
573         assert_eq!(FLAG.load(Ordering::Relaxed), 42);
574     }
575 
576     #[test]
check_bag()577     fn check_bag() {
578         static FLAG: AtomicUsize = AtomicUsize::new(0);
579         fn incr() {
580             FLAG.fetch_add(1, Ordering::Relaxed);
581         }
582 
583         let mut bag = Bag::new();
584         assert!(bag.is_empty());
585 
586         for _ in 0..MAX_OBJECTS {
587             assert!(unsafe { bag.try_push(Deferred::new(incr)).is_ok() });
588             assert!(!bag.is_empty());
589             assert_eq!(FLAG.load(Ordering::Relaxed), 0);
590         }
591 
592         let result = unsafe { bag.try_push(Deferred::new(incr)) };
593         assert!(result.is_err());
594         assert!(!bag.is_empty());
595         assert_eq!(FLAG.load(Ordering::Relaxed), 0);
596 
597         drop(bag);
598         assert_eq!(FLAG.load(Ordering::Relaxed), MAX_OBJECTS);
599     }
600 }
601