1 /// Epoch-based garbage collector. 2 /// 3 /// # Examples 4 /// 5 /// ``` 6 /// use crossbeam_epoch::Collector; 7 /// 8 /// let collector = Collector::new(); 9 /// 10 /// let handle = collector.register(); 11 /// drop(collector); // `handle` still works after dropping `collector` 12 /// 13 /// handle.pin().flush(); 14 /// ``` 15 use core::fmt; 16 17 use crate::guard::Guard; 18 use crate::internal::{Global, Local}; 19 use crate::primitive::sync::Arc; 20 21 /// An epoch-based garbage collector. 22 pub struct Collector { 23 pub(crate) global: Arc<Global>, 24 } 25 26 unsafe impl Send for Collector {} 27 unsafe impl Sync for Collector {} 28 29 impl Default for Collector { default() -> Self30 fn default() -> Self { 31 Self { 32 global: Arc::new(Global::new()), 33 } 34 } 35 } 36 37 impl Collector { 38 /// Creates a new collector. new() -> Self39 pub fn new() -> Self { 40 Self::default() 41 } 42 43 /// Registers a new handle for the collector. register(&self) -> LocalHandle44 pub fn register(&self) -> LocalHandle { 45 Local::register(self) 46 } 47 } 48 49 impl Clone for Collector { 50 /// Creates another reference to the same garbage collector. clone(&self) -> Self51 fn clone(&self) -> Self { 52 Collector { 53 global: self.global.clone(), 54 } 55 } 56 } 57 58 impl fmt::Debug for Collector { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result59 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 60 f.pad("Collector { .. }") 61 } 62 } 63 64 impl PartialEq for Collector { 65 /// Checks if both handles point to the same collector. eq(&self, rhs: &Collector) -> bool66 fn eq(&self, rhs: &Collector) -> bool { 67 Arc::ptr_eq(&self.global, &rhs.global) 68 } 69 } 70 impl Eq for Collector {} 71 72 /// A handle to a garbage collector. 73 pub struct LocalHandle { 74 pub(crate) local: *const Local, 75 } 76 77 impl LocalHandle { 78 /// Pins the handle. 79 #[inline] pin(&self) -> Guard80 pub fn pin(&self) -> Guard { 81 unsafe { (*self.local).pin() } 82 } 83 84 /// Returns `true` if the handle is pinned. 85 #[inline] is_pinned(&self) -> bool86 pub fn is_pinned(&self) -> bool { 87 unsafe { (*self.local).is_pinned() } 88 } 89 90 /// Returns the `Collector` associated with this handle. 91 #[inline] collector(&self) -> &Collector92 pub fn collector(&self) -> &Collector { 93 unsafe { (*self.local).collector() } 94 } 95 } 96 97 impl Drop for LocalHandle { 98 #[inline] drop(&mut self)99 fn drop(&mut self) { 100 unsafe { 101 Local::release_handle(&*self.local); 102 } 103 } 104 } 105 106 impl fmt::Debug for LocalHandle { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result107 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 108 f.pad("LocalHandle { .. }") 109 } 110 } 111 112 #[cfg(all(test, not(crossbeam_loom)))] 113 mod tests { 114 use std::mem::ManuallyDrop; 115 use std::sync::atomic::{AtomicUsize, Ordering}; 116 117 use crossbeam_utils::thread; 118 119 use crate::{Collector, Owned}; 120 121 const NUM_THREADS: usize = 8; 122 123 #[test] pin_reentrant()124 fn pin_reentrant() { 125 let collector = Collector::new(); 126 let handle = collector.register(); 127 drop(collector); 128 129 assert!(!handle.is_pinned()); 130 { 131 let _guard = &handle.pin(); 132 assert!(handle.is_pinned()); 133 { 134 let _guard = &handle.pin(); 135 assert!(handle.is_pinned()); 136 } 137 assert!(handle.is_pinned()); 138 } 139 assert!(!handle.is_pinned()); 140 } 141 142 #[test] flush_local_bag()143 fn flush_local_bag() { 144 let collector = Collector::new(); 145 let handle = collector.register(); 146 drop(collector); 147 148 for _ in 0..100 { 149 let guard = &handle.pin(); 150 unsafe { 151 let a = Owned::new(7).into_shared(guard); 152 guard.defer_destroy(a); 153 154 assert!(!(*guard.local).bag.with(|b| (*b).is_empty())); 155 156 while !(*guard.local).bag.with(|b| (*b).is_empty()) { 157 guard.flush(); 158 } 159 } 160 } 161 } 162 163 #[test] garbage_buffering()164 fn garbage_buffering() { 165 let collector = Collector::new(); 166 let handle = collector.register(); 167 drop(collector); 168 169 let guard = &handle.pin(); 170 unsafe { 171 for _ in 0..10 { 172 let a = Owned::new(7).into_shared(guard); 173 guard.defer_destroy(a); 174 } 175 assert!(!(*guard.local).bag.with(|b| (*b).is_empty())); 176 } 177 } 178 179 #[test] pin_holds_advance()180 fn pin_holds_advance() { 181 #[cfg(miri)] 182 const N: usize = 500; 183 #[cfg(not(miri))] 184 const N: usize = 500_000; 185 186 let collector = Collector::new(); 187 188 thread::scope(|scope| { 189 for _ in 0..NUM_THREADS { 190 scope.spawn(|_| { 191 let handle = collector.register(); 192 for _ in 0..N { 193 let guard = &handle.pin(); 194 195 let before = collector.global.epoch.load(Ordering::Relaxed); 196 collector.global.collect(guard); 197 let after = collector.global.epoch.load(Ordering::Relaxed); 198 199 assert!(after.wrapping_sub(before) <= 2); 200 } 201 }); 202 } 203 }) 204 .unwrap(); 205 } 206 207 #[cfg(not(crossbeam_sanitize))] // TODO: assertions failed due to `cfg(crossbeam_sanitize)` reduce `internal::MAX_OBJECTS` 208 #[test] incremental()209 fn incremental() { 210 #[cfg(miri)] 211 const COUNT: usize = 500; 212 #[cfg(not(miri))] 213 const COUNT: usize = 100_000; 214 static DESTROYS: AtomicUsize = AtomicUsize::new(0); 215 216 let collector = Collector::new(); 217 let handle = collector.register(); 218 219 unsafe { 220 let guard = &handle.pin(); 221 for _ in 0..COUNT { 222 let a = Owned::new(7i32).into_shared(guard); 223 guard.defer_unchecked(move || { 224 drop(a.into_owned()); 225 DESTROYS.fetch_add(1, Ordering::Relaxed); 226 }); 227 } 228 guard.flush(); 229 } 230 231 let mut last = 0; 232 233 while last < COUNT { 234 let curr = DESTROYS.load(Ordering::Relaxed); 235 assert!(curr - last <= 1024); 236 last = curr; 237 238 let guard = &handle.pin(); 239 collector.global.collect(guard); 240 } 241 assert!(DESTROYS.load(Ordering::Relaxed) == COUNT); 242 } 243 244 #[test] buffering()245 fn buffering() { 246 const COUNT: usize = 10; 247 #[cfg(miri)] 248 const N: usize = 500; 249 #[cfg(not(miri))] 250 const N: usize = 100_000; 251 static DESTROYS: AtomicUsize = AtomicUsize::new(0); 252 253 let collector = Collector::new(); 254 let handle = collector.register(); 255 256 unsafe { 257 let guard = &handle.pin(); 258 for _ in 0..COUNT { 259 let a = Owned::new(7i32).into_shared(guard); 260 guard.defer_unchecked(move || { 261 drop(a.into_owned()); 262 DESTROYS.fetch_add(1, Ordering::Relaxed); 263 }); 264 } 265 } 266 267 for _ in 0..N { 268 collector.global.collect(&handle.pin()); 269 } 270 assert!(DESTROYS.load(Ordering::Relaxed) < COUNT); 271 272 handle.pin().flush(); 273 274 while DESTROYS.load(Ordering::Relaxed) < COUNT { 275 let guard = &handle.pin(); 276 collector.global.collect(guard); 277 } 278 assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT); 279 } 280 281 #[test] count_drops()282 fn count_drops() { 283 #[cfg(miri)] 284 const COUNT: usize = 500; 285 #[cfg(not(miri))] 286 const COUNT: usize = 100_000; 287 static DROPS: AtomicUsize = AtomicUsize::new(0); 288 289 struct Elem(#[allow(dead_code)] i32); 290 291 impl Drop for Elem { 292 fn drop(&mut self) { 293 DROPS.fetch_add(1, Ordering::Relaxed); 294 } 295 } 296 297 let collector = Collector::new(); 298 let handle = collector.register(); 299 300 unsafe { 301 let guard = &handle.pin(); 302 303 for _ in 0..COUNT { 304 let a = Owned::new(Elem(7i32)).into_shared(guard); 305 guard.defer_destroy(a); 306 } 307 guard.flush(); 308 } 309 310 while DROPS.load(Ordering::Relaxed) < COUNT { 311 let guard = &handle.pin(); 312 collector.global.collect(guard); 313 } 314 assert_eq!(DROPS.load(Ordering::Relaxed), COUNT); 315 } 316 317 #[test] count_destroy()318 fn count_destroy() { 319 #[cfg(miri)] 320 const COUNT: usize = 500; 321 #[cfg(not(miri))] 322 const COUNT: usize = 100_000; 323 static DESTROYS: AtomicUsize = AtomicUsize::new(0); 324 325 let collector = Collector::new(); 326 let handle = collector.register(); 327 328 unsafe { 329 let guard = &handle.pin(); 330 331 for _ in 0..COUNT { 332 let a = Owned::new(7i32).into_shared(guard); 333 guard.defer_unchecked(move || { 334 drop(a.into_owned()); 335 DESTROYS.fetch_add(1, Ordering::Relaxed); 336 }); 337 } 338 guard.flush(); 339 } 340 341 while DESTROYS.load(Ordering::Relaxed) < COUNT { 342 let guard = &handle.pin(); 343 collector.global.collect(guard); 344 } 345 assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT); 346 } 347 348 #[test] drop_array()349 fn drop_array() { 350 const COUNT: usize = 700; 351 static DROPS: AtomicUsize = AtomicUsize::new(0); 352 353 struct Elem(#[allow(dead_code)] i32); 354 355 impl Drop for Elem { 356 fn drop(&mut self) { 357 DROPS.fetch_add(1, Ordering::Relaxed); 358 } 359 } 360 361 let collector = Collector::new(); 362 let handle = collector.register(); 363 364 let mut guard = handle.pin(); 365 366 let mut v = Vec::with_capacity(COUNT); 367 for i in 0..COUNT { 368 v.push(Elem(i as i32)); 369 } 370 371 { 372 let a = Owned::new(v).into_shared(&guard); 373 unsafe { 374 guard.defer_destroy(a); 375 } 376 guard.flush(); 377 } 378 379 while DROPS.load(Ordering::Relaxed) < COUNT { 380 guard.repin(); 381 collector.global.collect(&guard); 382 } 383 assert_eq!(DROPS.load(Ordering::Relaxed), COUNT); 384 } 385 386 #[test] destroy_array()387 fn destroy_array() { 388 #[cfg(miri)] 389 const COUNT: usize = 500; 390 #[cfg(not(miri))] 391 const COUNT: usize = 100_000; 392 static DESTROYS: AtomicUsize = AtomicUsize::new(0); 393 394 let collector = Collector::new(); 395 let handle = collector.register(); 396 397 unsafe { 398 let guard = &handle.pin(); 399 400 let mut v = Vec::with_capacity(COUNT); 401 for i in 0..COUNT { 402 v.push(i as i32); 403 } 404 405 let len = v.len(); 406 let cap = v.capacity(); 407 let ptr = ManuallyDrop::new(v).as_mut_ptr(); 408 guard.defer_unchecked(move || { 409 drop(Vec::from_raw_parts(ptr, len, cap)); 410 DESTROYS.fetch_add(len, Ordering::Relaxed); 411 }); 412 guard.flush(); 413 } 414 415 while DESTROYS.load(Ordering::Relaxed) < COUNT { 416 let guard = &handle.pin(); 417 collector.global.collect(guard); 418 } 419 assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT); 420 } 421 422 #[test] stress()423 fn stress() { 424 const THREADS: usize = 8; 425 #[cfg(miri)] 426 const COUNT: usize = 500; 427 #[cfg(not(miri))] 428 const COUNT: usize = 100_000; 429 static DROPS: AtomicUsize = AtomicUsize::new(0); 430 431 struct Elem(#[allow(dead_code)] i32); 432 433 impl Drop for Elem { 434 fn drop(&mut self) { 435 DROPS.fetch_add(1, Ordering::Relaxed); 436 } 437 } 438 439 let collector = Collector::new(); 440 441 thread::scope(|scope| { 442 for _ in 0..THREADS { 443 scope.spawn(|_| { 444 let handle = collector.register(); 445 for _ in 0..COUNT { 446 let guard = &handle.pin(); 447 unsafe { 448 let a = Owned::new(Elem(7i32)).into_shared(guard); 449 guard.defer_destroy(a); 450 } 451 } 452 }); 453 } 454 }) 455 .unwrap(); 456 457 let handle = collector.register(); 458 while DROPS.load(Ordering::Relaxed) < COUNT * THREADS { 459 let guard = &handle.pin(); 460 collector.global.collect(guard); 461 } 462 assert_eq!(DROPS.load(Ordering::Relaxed), COUNT * THREADS); 463 } 464 } 465