1 /// Epoch-based garbage collector.
2 ///
3 /// # Examples
4 ///
5 /// ```
6 /// use crossbeam_epoch::Collector;
7 ///
8 /// let collector = Collector::new();
9 ///
10 /// let handle = collector.register();
11 /// drop(collector); // `handle` still works after dropping `collector`
12 ///
13 /// handle.pin().flush();
14 /// ```
15 use core::fmt;
16 
17 use crate::guard::Guard;
18 use crate::internal::{Global, Local};
19 use crate::primitive::sync::Arc;
20 
21 /// An epoch-based garbage collector.
22 pub struct Collector {
23     pub(crate) global: Arc<Global>,
24 }
25 
26 unsafe impl Send for Collector {}
27 unsafe impl Sync for Collector {}
28 
29 impl Default for Collector {
default() -> Self30     fn default() -> Self {
31         Self {
32             global: Arc::new(Global::new()),
33         }
34     }
35 }
36 
37 impl Collector {
38     /// Creates a new collector.
new() -> Self39     pub fn new() -> Self {
40         Self::default()
41     }
42 
43     /// Registers a new handle for the collector.
register(&self) -> LocalHandle44     pub fn register(&self) -> LocalHandle {
45         Local::register(self)
46     }
47 }
48 
49 impl Clone for Collector {
50     /// Creates another reference to the same garbage collector.
clone(&self) -> Self51     fn clone(&self) -> Self {
52         Collector {
53             global: self.global.clone(),
54         }
55     }
56 }
57 
58 impl fmt::Debug for Collector {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result59     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60         f.pad("Collector { .. }")
61     }
62 }
63 
64 impl PartialEq for Collector {
65     /// Checks if both handles point to the same collector.
eq(&self, rhs: &Collector) -> bool66     fn eq(&self, rhs: &Collector) -> bool {
67         Arc::ptr_eq(&self.global, &rhs.global)
68     }
69 }
70 impl Eq for Collector {}
71 
72 /// A handle to a garbage collector.
73 pub struct LocalHandle {
74     pub(crate) local: *const Local,
75 }
76 
77 impl LocalHandle {
78     /// Pins the handle.
79     #[inline]
pin(&self) -> Guard80     pub fn pin(&self) -> Guard {
81         unsafe { (*self.local).pin() }
82     }
83 
84     /// Returns `true` if the handle is pinned.
85     #[inline]
is_pinned(&self) -> bool86     pub fn is_pinned(&self) -> bool {
87         unsafe { (*self.local).is_pinned() }
88     }
89 
90     /// Returns the `Collector` associated with this handle.
91     #[inline]
collector(&self) -> &Collector92     pub fn collector(&self) -> &Collector {
93         unsafe { (*self.local).collector() }
94     }
95 }
96 
97 impl Drop for LocalHandle {
98     #[inline]
drop(&mut self)99     fn drop(&mut self) {
100         unsafe {
101             Local::release_handle(&*self.local);
102         }
103     }
104 }
105 
106 impl fmt::Debug for LocalHandle {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result107     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108         f.pad("LocalHandle { .. }")
109     }
110 }
111 
112 #[cfg(all(test, not(crossbeam_loom)))]
113 mod tests {
114     use std::mem::ManuallyDrop;
115     use std::sync::atomic::{AtomicUsize, Ordering};
116 
117     use crossbeam_utils::thread;
118 
119     use crate::{Collector, Owned};
120 
121     const NUM_THREADS: usize = 8;
122 
123     #[test]
pin_reentrant()124     fn pin_reentrant() {
125         let collector = Collector::new();
126         let handle = collector.register();
127         drop(collector);
128 
129         assert!(!handle.is_pinned());
130         {
131             let _guard = &handle.pin();
132             assert!(handle.is_pinned());
133             {
134                 let _guard = &handle.pin();
135                 assert!(handle.is_pinned());
136             }
137             assert!(handle.is_pinned());
138         }
139         assert!(!handle.is_pinned());
140     }
141 
142     #[test]
flush_local_bag()143     fn flush_local_bag() {
144         let collector = Collector::new();
145         let handle = collector.register();
146         drop(collector);
147 
148         for _ in 0..100 {
149             let guard = &handle.pin();
150             unsafe {
151                 let a = Owned::new(7).into_shared(guard);
152                 guard.defer_destroy(a);
153 
154                 assert!(!(*guard.local).bag.with(|b| (*b).is_empty()));
155 
156                 while !(*guard.local).bag.with(|b| (*b).is_empty()) {
157                     guard.flush();
158                 }
159             }
160         }
161     }
162 
163     #[test]
garbage_buffering()164     fn garbage_buffering() {
165         let collector = Collector::new();
166         let handle = collector.register();
167         drop(collector);
168 
169         let guard = &handle.pin();
170         unsafe {
171             for _ in 0..10 {
172                 let a = Owned::new(7).into_shared(guard);
173                 guard.defer_destroy(a);
174             }
175             assert!(!(*guard.local).bag.with(|b| (*b).is_empty()));
176         }
177     }
178 
179     #[test]
pin_holds_advance()180     fn pin_holds_advance() {
181         #[cfg(miri)]
182         const N: usize = 500;
183         #[cfg(not(miri))]
184         const N: usize = 500_000;
185 
186         let collector = Collector::new();
187 
188         thread::scope(|scope| {
189             for _ in 0..NUM_THREADS {
190                 scope.spawn(|_| {
191                     let handle = collector.register();
192                     for _ in 0..N {
193                         let guard = &handle.pin();
194 
195                         let before = collector.global.epoch.load(Ordering::Relaxed);
196                         collector.global.collect(guard);
197                         let after = collector.global.epoch.load(Ordering::Relaxed);
198 
199                         assert!(after.wrapping_sub(before) <= 2);
200                     }
201                 });
202             }
203         })
204         .unwrap();
205     }
206 
207     #[cfg(not(crossbeam_sanitize))] // TODO: assertions failed due to `cfg(crossbeam_sanitize)` reduce `internal::MAX_OBJECTS`
208     #[test]
incremental()209     fn incremental() {
210         #[cfg(miri)]
211         const COUNT: usize = 500;
212         #[cfg(not(miri))]
213         const COUNT: usize = 100_000;
214         static DESTROYS: AtomicUsize = AtomicUsize::new(0);
215 
216         let collector = Collector::new();
217         let handle = collector.register();
218 
219         unsafe {
220             let guard = &handle.pin();
221             for _ in 0..COUNT {
222                 let a = Owned::new(7i32).into_shared(guard);
223                 guard.defer_unchecked(move || {
224                     drop(a.into_owned());
225                     DESTROYS.fetch_add(1, Ordering::Relaxed);
226                 });
227             }
228             guard.flush();
229         }
230 
231         let mut last = 0;
232 
233         while last < COUNT {
234             let curr = DESTROYS.load(Ordering::Relaxed);
235             assert!(curr - last <= 1024);
236             last = curr;
237 
238             let guard = &handle.pin();
239             collector.global.collect(guard);
240         }
241         assert!(DESTROYS.load(Ordering::Relaxed) == COUNT);
242     }
243 
244     #[test]
buffering()245     fn buffering() {
246         const COUNT: usize = 10;
247         #[cfg(miri)]
248         const N: usize = 500;
249         #[cfg(not(miri))]
250         const N: usize = 100_000;
251         static DESTROYS: AtomicUsize = AtomicUsize::new(0);
252 
253         let collector = Collector::new();
254         let handle = collector.register();
255 
256         unsafe {
257             let guard = &handle.pin();
258             for _ in 0..COUNT {
259                 let a = Owned::new(7i32).into_shared(guard);
260                 guard.defer_unchecked(move || {
261                     drop(a.into_owned());
262                     DESTROYS.fetch_add(1, Ordering::Relaxed);
263                 });
264             }
265         }
266 
267         for _ in 0..N {
268             collector.global.collect(&handle.pin());
269         }
270         assert!(DESTROYS.load(Ordering::Relaxed) < COUNT);
271 
272         handle.pin().flush();
273 
274         while DESTROYS.load(Ordering::Relaxed) < COUNT {
275             let guard = &handle.pin();
276             collector.global.collect(guard);
277         }
278         assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT);
279     }
280 
281     #[test]
count_drops()282     fn count_drops() {
283         #[cfg(miri)]
284         const COUNT: usize = 500;
285         #[cfg(not(miri))]
286         const COUNT: usize = 100_000;
287         static DROPS: AtomicUsize = AtomicUsize::new(0);
288 
289         struct Elem(#[allow(dead_code)] i32);
290 
291         impl Drop for Elem {
292             fn drop(&mut self) {
293                 DROPS.fetch_add(1, Ordering::Relaxed);
294             }
295         }
296 
297         let collector = Collector::new();
298         let handle = collector.register();
299 
300         unsafe {
301             let guard = &handle.pin();
302 
303             for _ in 0..COUNT {
304                 let a = Owned::new(Elem(7i32)).into_shared(guard);
305                 guard.defer_destroy(a);
306             }
307             guard.flush();
308         }
309 
310         while DROPS.load(Ordering::Relaxed) < COUNT {
311             let guard = &handle.pin();
312             collector.global.collect(guard);
313         }
314         assert_eq!(DROPS.load(Ordering::Relaxed), COUNT);
315     }
316 
317     #[test]
count_destroy()318     fn count_destroy() {
319         #[cfg(miri)]
320         const COUNT: usize = 500;
321         #[cfg(not(miri))]
322         const COUNT: usize = 100_000;
323         static DESTROYS: AtomicUsize = AtomicUsize::new(0);
324 
325         let collector = Collector::new();
326         let handle = collector.register();
327 
328         unsafe {
329             let guard = &handle.pin();
330 
331             for _ in 0..COUNT {
332                 let a = Owned::new(7i32).into_shared(guard);
333                 guard.defer_unchecked(move || {
334                     drop(a.into_owned());
335                     DESTROYS.fetch_add(1, Ordering::Relaxed);
336                 });
337             }
338             guard.flush();
339         }
340 
341         while DESTROYS.load(Ordering::Relaxed) < COUNT {
342             let guard = &handle.pin();
343             collector.global.collect(guard);
344         }
345         assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT);
346     }
347 
348     #[test]
drop_array()349     fn drop_array() {
350         const COUNT: usize = 700;
351         static DROPS: AtomicUsize = AtomicUsize::new(0);
352 
353         struct Elem(#[allow(dead_code)] i32);
354 
355         impl Drop for Elem {
356             fn drop(&mut self) {
357                 DROPS.fetch_add(1, Ordering::Relaxed);
358             }
359         }
360 
361         let collector = Collector::new();
362         let handle = collector.register();
363 
364         let mut guard = handle.pin();
365 
366         let mut v = Vec::with_capacity(COUNT);
367         for i in 0..COUNT {
368             v.push(Elem(i as i32));
369         }
370 
371         {
372             let a = Owned::new(v).into_shared(&guard);
373             unsafe {
374                 guard.defer_destroy(a);
375             }
376             guard.flush();
377         }
378 
379         while DROPS.load(Ordering::Relaxed) < COUNT {
380             guard.repin();
381             collector.global.collect(&guard);
382         }
383         assert_eq!(DROPS.load(Ordering::Relaxed), COUNT);
384     }
385 
386     #[test]
destroy_array()387     fn destroy_array() {
388         #[cfg(miri)]
389         const COUNT: usize = 500;
390         #[cfg(not(miri))]
391         const COUNT: usize = 100_000;
392         static DESTROYS: AtomicUsize = AtomicUsize::new(0);
393 
394         let collector = Collector::new();
395         let handle = collector.register();
396 
397         unsafe {
398             let guard = &handle.pin();
399 
400             let mut v = Vec::with_capacity(COUNT);
401             for i in 0..COUNT {
402                 v.push(i as i32);
403             }
404 
405             let len = v.len();
406             let cap = v.capacity();
407             let ptr = ManuallyDrop::new(v).as_mut_ptr();
408             guard.defer_unchecked(move || {
409                 drop(Vec::from_raw_parts(ptr, len, cap));
410                 DESTROYS.fetch_add(len, Ordering::Relaxed);
411             });
412             guard.flush();
413         }
414 
415         while DESTROYS.load(Ordering::Relaxed) < COUNT {
416             let guard = &handle.pin();
417             collector.global.collect(guard);
418         }
419         assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT);
420     }
421 
422     #[test]
stress()423     fn stress() {
424         const THREADS: usize = 8;
425         #[cfg(miri)]
426         const COUNT: usize = 500;
427         #[cfg(not(miri))]
428         const COUNT: usize = 100_000;
429         static DROPS: AtomicUsize = AtomicUsize::new(0);
430 
431         struct Elem(#[allow(dead_code)] i32);
432 
433         impl Drop for Elem {
434             fn drop(&mut self) {
435                 DROPS.fetch_add(1, Ordering::Relaxed);
436             }
437         }
438 
439         let collector = Collector::new();
440 
441         thread::scope(|scope| {
442             for _ in 0..THREADS {
443                 scope.spawn(|_| {
444                     let handle = collector.register();
445                     for _ in 0..COUNT {
446                         let guard = &handle.pin();
447                         unsafe {
448                             let a = Owned::new(Elem(7i32)).into_shared(guard);
449                             guard.defer_destroy(a);
450                         }
451                     }
452                 });
453             }
454         })
455         .unwrap();
456 
457         let handle = collector.register();
458         while DROPS.load(Ordering::Relaxed) < COUNT * THREADS {
459             let guard = &handle.pin();
460             collector.global.collect(guard);
461         }
462         assert_eq!(DROPS.load(Ordering::Relaxed), COUNT * THREADS);
463     }
464 }
465