1 use core::iter::FromIterator; 2 use core::mem::{self, ManuallyDrop}; 3 use core::ops::{Deref, RangeBounds}; 4 use core::ptr::NonNull; 5 use core::{cmp, fmt, hash, ptr, slice, usize}; 6 7 use alloc::{ 8 alloc::{dealloc, Layout}, 9 borrow::Borrow, 10 boxed::Box, 11 string::String, 12 vec::Vec, 13 }; 14 15 use crate::buf::IntoIter; 16 #[allow(unused)] 17 use crate::loom::sync::atomic::AtomicMut; 18 use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; 19 use crate::{offset_from, Buf, BytesMut}; 20 21 /// A cheaply cloneable and sliceable chunk of contiguous memory. 22 /// 23 /// `Bytes` is an efficient container for storing and operating on contiguous 24 /// slices of memory. It is intended for use primarily in networking code, but 25 /// could have applications elsewhere as well. 26 /// 27 /// `Bytes` values facilitate zero-copy network programming by allowing multiple 28 /// `Bytes` objects to point to the same underlying memory. 29 /// 30 /// `Bytes` does not have a single implementation. It is an interface, whose 31 /// exact behavior is implemented through dynamic dispatch in several underlying 32 /// implementations of `Bytes`. 33 /// 34 /// All `Bytes` implementations must fulfill the following requirements: 35 /// - They are cheaply cloneable and thereby shareable between an unlimited amount 36 /// of components, for example by modifying a reference count. 37 /// - Instances can be sliced to refer to a subset of the original buffer. 38 /// 39 /// ``` 40 /// use bytes::Bytes; 41 /// 42 /// let mut mem = Bytes::from("Hello world"); 43 /// let a = mem.slice(0..5); 44 /// 45 /// assert_eq!(a, "Hello"); 46 /// 47 /// let b = mem.split_to(6); 48 /// 49 /// assert_eq!(mem, "world"); 50 /// assert_eq!(b, "Hello "); 51 /// ``` 52 /// 53 /// # Memory layout 54 /// 55 /// The `Bytes` struct itself is fairly small, limited to 4 `usize` fields used 56 /// to track information about which segment of the underlying memory the 57 /// `Bytes` handle has access to. 58 /// 59 /// `Bytes` keeps both a pointer to the shared state containing the full memory 60 /// slice and a pointer to the start of the region visible by the handle. 61 /// `Bytes` also tracks the length of its view into the memory. 62 /// 63 /// # Sharing 64 /// 65 /// `Bytes` contains a vtable, which allows implementations of `Bytes` to define 66 /// how sharing/cloning is implemented in detail. 67 /// When `Bytes::clone()` is called, `Bytes` will call the vtable function for 68 /// cloning the backing storage in order to share it behind multiple `Bytes` 69 /// instances. 70 /// 71 /// For `Bytes` implementations which refer to constant memory (e.g. created 72 /// via `Bytes::from_static()`) the cloning implementation will be a no-op. 73 /// 74 /// For `Bytes` implementations which point to a reference counted shared storage 75 /// (e.g. an `Arc<[u8]>`), sharing will be implemented by increasing the 76 /// reference count. 77 /// 78 /// Due to this mechanism, multiple `Bytes` instances may point to the same 79 /// shared memory region. 80 /// Each `Bytes` instance can point to different sections within that 81 /// memory region, and `Bytes` instances may or may not have overlapping views 82 /// into the memory. 83 /// 84 /// The following diagram visualizes a scenario where 2 `Bytes` instances make 85 /// use of an `Arc`-based backing storage, and provide access to different views: 86 /// 87 /// ```text 88 /// 89 /// Arc ptrs ┌─────────┐ 90 /// ________________________ / │ Bytes 2 │ 91 /// / └─────────┘ 92 /// / ┌───────────┐ | | 93 /// |_________/ │ Bytes 1 │ | | 94 /// | └───────────┘ | | 95 /// | | | ___/ data | tail 96 /// | data | tail |/ | 97 /// v v v v 98 /// ┌─────┬─────┬───────────┬───────────────┬─────┐ 99 /// │ Arc │ │ │ │ │ 100 /// └─────┴─────┴───────────┴───────────────┴─────┘ 101 /// ``` 102 pub struct Bytes { 103 ptr: *const u8, 104 len: usize, 105 // inlined "trait object" 106 data: AtomicPtr<()>, 107 vtable: &'static Vtable, 108 } 109 110 pub(crate) struct Vtable { 111 /// fn(data, ptr, len) 112 pub clone: unsafe fn(&AtomicPtr<()>, *const u8, usize) -> Bytes, 113 /// fn(data, ptr, len) 114 /// 115 /// takes `Bytes` to value 116 pub to_vec: unsafe fn(&AtomicPtr<()>, *const u8, usize) -> Vec<u8>, 117 pub to_mut: unsafe fn(&AtomicPtr<()>, *const u8, usize) -> BytesMut, 118 /// fn(data) 119 pub is_unique: unsafe fn(&AtomicPtr<()>) -> bool, 120 /// fn(data, ptr, len) 121 pub drop: unsafe fn(&mut AtomicPtr<()>, *const u8, usize), 122 } 123 124 impl Bytes { 125 /// Creates a new empty `Bytes`. 126 /// 127 /// This will not allocate and the returned `Bytes` handle will be empty. 128 /// 129 /// # Examples 130 /// 131 /// ``` 132 /// use bytes::Bytes; 133 /// 134 /// let b = Bytes::new(); 135 /// assert_eq!(&b[..], b""); 136 /// ``` 137 #[inline] 138 #[cfg(not(all(loom, test)))] new() -> Self139 pub const fn new() -> Self { 140 // Make it a named const to work around 141 // "unsizing casts are not allowed in const fn" 142 const EMPTY: &[u8] = &[]; 143 Bytes::from_static(EMPTY) 144 } 145 146 /// Creates a new empty `Bytes`. 147 #[cfg(all(loom, test))] new() -> Self148 pub fn new() -> Self { 149 const EMPTY: &[u8] = &[]; 150 Bytes::from_static(EMPTY) 151 } 152 153 /// Creates a new `Bytes` from a static slice. 154 /// 155 /// The returned `Bytes` will point directly to the static slice. There is 156 /// no allocating or copying. 157 /// 158 /// # Examples 159 /// 160 /// ``` 161 /// use bytes::Bytes; 162 /// 163 /// let b = Bytes::from_static(b"hello"); 164 /// assert_eq!(&b[..], b"hello"); 165 /// ``` 166 #[inline] 167 #[cfg(not(all(loom, test)))] from_static(bytes: &'static [u8]) -> Self168 pub const fn from_static(bytes: &'static [u8]) -> Self { 169 Bytes { 170 ptr: bytes.as_ptr(), 171 len: bytes.len(), 172 data: AtomicPtr::new(ptr::null_mut()), 173 vtable: &STATIC_VTABLE, 174 } 175 } 176 177 /// Creates a new `Bytes` from a static slice. 178 #[cfg(all(loom, test))] from_static(bytes: &'static [u8]) -> Self179 pub fn from_static(bytes: &'static [u8]) -> Self { 180 Bytes { 181 ptr: bytes.as_ptr(), 182 len: bytes.len(), 183 data: AtomicPtr::new(ptr::null_mut()), 184 vtable: &STATIC_VTABLE, 185 } 186 } 187 188 /// Creates a new `Bytes` with length zero and the given pointer as the address. new_empty_with_ptr(ptr: *const u8) -> Self189 fn new_empty_with_ptr(ptr: *const u8) -> Self { 190 debug_assert!(!ptr.is_null()); 191 192 // Detach this pointer's provenance from whichever allocation it came from, and reattach it 193 // to the provenance of the fake ZST [u8;0] at the same address. 194 let ptr = without_provenance(ptr as usize); 195 196 Bytes { 197 ptr, 198 len: 0, 199 data: AtomicPtr::new(ptr::null_mut()), 200 vtable: &STATIC_VTABLE, 201 } 202 } 203 204 /// Create [Bytes] with a buffer whose lifetime is controlled 205 /// via an explicit owner. 206 /// 207 /// A common use case is to zero-copy construct from mapped memory. 208 /// 209 /// ``` 210 /// # struct File; 211 /// # 212 /// # impl File { 213 /// # pub fn open(_: &str) -> Result<Self, ()> { 214 /// # Ok(Self) 215 /// # } 216 /// # } 217 /// # 218 /// # mod memmap2 { 219 /// # pub struct Mmap; 220 /// # 221 /// # impl Mmap { 222 /// # pub unsafe fn map(_file: &super::File) -> Result<Self, ()> { 223 /// # Ok(Self) 224 /// # } 225 /// # } 226 /// # 227 /// # impl AsRef<[u8]> for Mmap { 228 /// # fn as_ref(&self) -> &[u8] { 229 /// # b"buf" 230 /// # } 231 /// # } 232 /// # } 233 /// use bytes::Bytes; 234 /// use memmap2::Mmap; 235 /// 236 /// # fn main() -> Result<(), ()> { 237 /// let file = File::open("upload_bundle.tar.gz")?; 238 /// let mmap = unsafe { Mmap::map(&file) }?; 239 /// let b = Bytes::from_owner(mmap); 240 /// # Ok(()) 241 /// # } 242 /// ``` 243 /// 244 /// The `owner` will be transferred to the constructed [Bytes] object, which 245 /// will ensure it is dropped once all remaining clones of the constructed 246 /// object are dropped. The owner will then be responsible for dropping the 247 /// specified region of memory as part of its [Drop] implementation. 248 /// 249 /// Note that converting [Bytes] constructed from an owner into a [BytesMut] 250 /// will always create a deep copy of the buffer into newly allocated memory. from_owner<T>(owner: T) -> Self where T: AsRef<[u8]> + Send + 'static,251 pub fn from_owner<T>(owner: T) -> Self 252 where 253 T: AsRef<[u8]> + Send + 'static, 254 { 255 // Safety & Miri: 256 // The ownership of `owner` is first transferred to the `Owned` wrapper and `Bytes` object. 257 // This ensures that the owner is pinned in memory, allowing us to call `.as_ref()` safely 258 // since the lifetime of the owner is controlled by the lifetime of the new `Bytes` object, 259 // and the lifetime of the resulting borrowed `&[u8]` matches that of the owner. 260 // Note that this remains safe so long as we only call `.as_ref()` once. 261 // 262 // There are some additional special considerations here: 263 // * We rely on Bytes's Drop impl to clean up memory should `.as_ref()` panic. 264 // * Setting the `ptr` and `len` on the bytes object last (after moving the owner to 265 // Bytes) allows Miri checks to pass since it avoids obtaining the `&[u8]` slice 266 // from a stack-owned Box. 267 // More details on this: https://github.com/tokio-rs/bytes/pull/742/#discussion_r1813375863 268 // and: https://github.com/tokio-rs/bytes/pull/742/#discussion_r1813316032 269 270 let owned = Box::into_raw(Box::new(Owned { 271 lifetime: OwnedLifetime { 272 ref_cnt: AtomicUsize::new(1), 273 drop: owned_box_and_drop::<T>, 274 }, 275 owner, 276 })); 277 278 let mut ret = Bytes { 279 ptr: NonNull::dangling().as_ptr(), 280 len: 0, 281 data: AtomicPtr::new(owned.cast()), 282 vtable: &OWNED_VTABLE, 283 }; 284 285 let buf = unsafe { &*owned }.owner.as_ref(); 286 ret.ptr = buf.as_ptr(); 287 ret.len = buf.len(); 288 289 ret 290 } 291 292 /// Returns the number of bytes contained in this `Bytes`. 293 /// 294 /// # Examples 295 /// 296 /// ``` 297 /// use bytes::Bytes; 298 /// 299 /// let b = Bytes::from(&b"hello"[..]); 300 /// assert_eq!(b.len(), 5); 301 /// ``` 302 #[inline] len(&self) -> usize303 pub const fn len(&self) -> usize { 304 self.len 305 } 306 307 /// Returns true if the `Bytes` has a length of 0. 308 /// 309 /// # Examples 310 /// 311 /// ``` 312 /// use bytes::Bytes; 313 /// 314 /// let b = Bytes::new(); 315 /// assert!(b.is_empty()); 316 /// ``` 317 #[inline] is_empty(&self) -> bool318 pub const fn is_empty(&self) -> bool { 319 self.len == 0 320 } 321 322 /// Returns true if this is the only reference to the data and 323 /// `Into<BytesMut>` would avoid cloning the underlying buffer. 324 /// 325 /// Always returns false if the data is backed by a [static slice](Bytes::from_static), 326 /// or an [owner](Bytes::from_owner). 327 /// 328 /// The result of this method may be invalidated immediately if another 329 /// thread clones this value while this is being called. Ensure you have 330 /// unique access to this value (`&mut Bytes`) first if you need to be 331 /// certain the result is valid (i.e. for safety reasons). 332 /// # Examples 333 /// 334 /// ``` 335 /// use bytes::Bytes; 336 /// 337 /// let a = Bytes::from(vec![1, 2, 3]); 338 /// assert!(a.is_unique()); 339 /// let b = a.clone(); 340 /// assert!(!a.is_unique()); 341 /// ``` is_unique(&self) -> bool342 pub fn is_unique(&self) -> bool { 343 unsafe { (self.vtable.is_unique)(&self.data) } 344 } 345 346 /// Creates `Bytes` instance from slice, by copying it. copy_from_slice(data: &[u8]) -> Self347 pub fn copy_from_slice(data: &[u8]) -> Self { 348 data.to_vec().into() 349 } 350 351 /// Returns a slice of self for the provided range. 352 /// 353 /// This will increment the reference count for the underlying memory and 354 /// return a new `Bytes` handle set to the slice. 355 /// 356 /// This operation is `O(1)`. 357 /// 358 /// # Examples 359 /// 360 /// ``` 361 /// use bytes::Bytes; 362 /// 363 /// let a = Bytes::from(&b"hello world"[..]); 364 /// let b = a.slice(2..5); 365 /// 366 /// assert_eq!(&b[..], b"llo"); 367 /// ``` 368 /// 369 /// # Panics 370 /// 371 /// Requires that `begin <= end` and `end <= self.len()`, otherwise slicing 372 /// will panic. slice(&self, range: impl RangeBounds<usize>) -> Self373 pub fn slice(&self, range: impl RangeBounds<usize>) -> Self { 374 use core::ops::Bound; 375 376 let len = self.len(); 377 378 let begin = match range.start_bound() { 379 Bound::Included(&n) => n, 380 Bound::Excluded(&n) => n.checked_add(1).expect("out of range"), 381 Bound::Unbounded => 0, 382 }; 383 384 let end = match range.end_bound() { 385 Bound::Included(&n) => n.checked_add(1).expect("out of range"), 386 Bound::Excluded(&n) => n, 387 Bound::Unbounded => len, 388 }; 389 390 assert!( 391 begin <= end, 392 "range start must not be greater than end: {:?} <= {:?}", 393 begin, 394 end, 395 ); 396 assert!( 397 end <= len, 398 "range end out of bounds: {:?} <= {:?}", 399 end, 400 len, 401 ); 402 403 if end == begin { 404 return Bytes::new(); 405 } 406 407 let mut ret = self.clone(); 408 409 ret.len = end - begin; 410 ret.ptr = unsafe { ret.ptr.add(begin) }; 411 412 ret 413 } 414 415 /// Returns a slice of self that is equivalent to the given `subset`. 416 /// 417 /// When processing a `Bytes` buffer with other tools, one often gets a 418 /// `&[u8]` which is in fact a slice of the `Bytes`, i.e. a subset of it. 419 /// This function turns that `&[u8]` into another `Bytes`, as if one had 420 /// called `self.slice()` with the offsets that correspond to `subset`. 421 /// 422 /// This operation is `O(1)`. 423 /// 424 /// # Examples 425 /// 426 /// ``` 427 /// use bytes::Bytes; 428 /// 429 /// let bytes = Bytes::from(&b"012345678"[..]); 430 /// let as_slice = bytes.as_ref(); 431 /// let subset = &as_slice[2..6]; 432 /// let subslice = bytes.slice_ref(&subset); 433 /// assert_eq!(&subslice[..], b"2345"); 434 /// ``` 435 /// 436 /// # Panics 437 /// 438 /// Requires that the given `sub` slice is in fact contained within the 439 /// `Bytes` buffer; otherwise this function will panic. slice_ref(&self, subset: &[u8]) -> Self440 pub fn slice_ref(&self, subset: &[u8]) -> Self { 441 // Empty slice and empty Bytes may have their pointers reset 442 // so explicitly allow empty slice to be a subslice of any slice. 443 if subset.is_empty() { 444 return Bytes::new(); 445 } 446 447 let bytes_p = self.as_ptr() as usize; 448 let bytes_len = self.len(); 449 450 let sub_p = subset.as_ptr() as usize; 451 let sub_len = subset.len(); 452 453 assert!( 454 sub_p >= bytes_p, 455 "subset pointer ({:p}) is smaller than self pointer ({:p})", 456 subset.as_ptr(), 457 self.as_ptr(), 458 ); 459 assert!( 460 sub_p + sub_len <= bytes_p + bytes_len, 461 "subset is out of bounds: self = ({:p}, {}), subset = ({:p}, {})", 462 self.as_ptr(), 463 bytes_len, 464 subset.as_ptr(), 465 sub_len, 466 ); 467 468 let sub_offset = sub_p - bytes_p; 469 470 self.slice(sub_offset..(sub_offset + sub_len)) 471 } 472 473 /// Splits the bytes into two at the given index. 474 /// 475 /// Afterwards `self` contains elements `[0, at)`, and the returned `Bytes` 476 /// contains elements `[at, len)`. It's guaranteed that the memory does not 477 /// move, that is, the address of `self` does not change, and the address of 478 /// the returned slice is `at` bytes after that. 479 /// 480 /// This is an `O(1)` operation that just increases the reference count and 481 /// sets a few indices. 482 /// 483 /// # Examples 484 /// 485 /// ``` 486 /// use bytes::Bytes; 487 /// 488 /// let mut a = Bytes::from(&b"hello world"[..]); 489 /// let b = a.split_off(5); 490 /// 491 /// assert_eq!(&a[..], b"hello"); 492 /// assert_eq!(&b[..], b" world"); 493 /// ``` 494 /// 495 /// # Panics 496 /// 497 /// Panics if `at > len`. 498 #[must_use = "consider Bytes::truncate if you don't need the other half"] split_off(&mut self, at: usize) -> Self499 pub fn split_off(&mut self, at: usize) -> Self { 500 if at == self.len() { 501 return Bytes::new_empty_with_ptr(self.ptr.wrapping_add(at)); 502 } 503 504 if at == 0 { 505 return mem::replace(self, Bytes::new_empty_with_ptr(self.ptr)); 506 } 507 508 assert!( 509 at <= self.len(), 510 "split_off out of bounds: {:?} <= {:?}", 511 at, 512 self.len(), 513 ); 514 515 let mut ret = self.clone(); 516 517 self.len = at; 518 519 unsafe { ret.inc_start(at) }; 520 521 ret 522 } 523 524 /// Splits the bytes into two at the given index. 525 /// 526 /// Afterwards `self` contains elements `[at, len)`, and the returned 527 /// `Bytes` contains elements `[0, at)`. 528 /// 529 /// This is an `O(1)` operation that just increases the reference count and 530 /// sets a few indices. 531 /// 532 /// # Examples 533 /// 534 /// ``` 535 /// use bytes::Bytes; 536 /// 537 /// let mut a = Bytes::from(&b"hello world"[..]); 538 /// let b = a.split_to(5); 539 /// 540 /// assert_eq!(&a[..], b" world"); 541 /// assert_eq!(&b[..], b"hello"); 542 /// ``` 543 /// 544 /// # Panics 545 /// 546 /// Panics if `at > len`. 547 #[must_use = "consider Bytes::advance if you don't need the other half"] split_to(&mut self, at: usize) -> Self548 pub fn split_to(&mut self, at: usize) -> Self { 549 if at == self.len() { 550 let end_ptr = self.ptr.wrapping_add(at); 551 return mem::replace(self, Bytes::new_empty_with_ptr(end_ptr)); 552 } 553 554 if at == 0 { 555 return Bytes::new_empty_with_ptr(self.ptr); 556 } 557 558 assert!( 559 at <= self.len(), 560 "split_to out of bounds: {:?} <= {:?}", 561 at, 562 self.len(), 563 ); 564 565 let mut ret = self.clone(); 566 567 unsafe { self.inc_start(at) }; 568 569 ret.len = at; 570 ret 571 } 572 573 /// Shortens the buffer, keeping the first `len` bytes and dropping the 574 /// rest. 575 /// 576 /// If `len` is greater than the buffer's current length, this has no 577 /// effect. 578 /// 579 /// The [split_off](`Self::split_off()`) method can emulate `truncate`, but this causes the 580 /// excess bytes to be returned instead of dropped. 581 /// 582 /// # Examples 583 /// 584 /// ``` 585 /// use bytes::Bytes; 586 /// 587 /// let mut buf = Bytes::from(&b"hello world"[..]); 588 /// buf.truncate(5); 589 /// assert_eq!(buf, b"hello"[..]); 590 /// ``` 591 #[inline] truncate(&mut self, len: usize)592 pub fn truncate(&mut self, len: usize) { 593 if len < self.len { 594 // The Vec "promotable" vtables do not store the capacity, 595 // so we cannot truncate while using this repr. We *have* to 596 // promote using `split_off` so the capacity can be stored. 597 if self.vtable as *const Vtable == &PROMOTABLE_EVEN_VTABLE 598 || self.vtable as *const Vtable == &PROMOTABLE_ODD_VTABLE 599 { 600 drop(self.split_off(len)); 601 } else { 602 self.len = len; 603 } 604 } 605 } 606 607 /// Clears the buffer, removing all data. 608 /// 609 /// # Examples 610 /// 611 /// ``` 612 /// use bytes::Bytes; 613 /// 614 /// let mut buf = Bytes::from(&b"hello world"[..]); 615 /// buf.clear(); 616 /// assert!(buf.is_empty()); 617 /// ``` 618 #[inline] clear(&mut self)619 pub fn clear(&mut self) { 620 self.truncate(0); 621 } 622 623 /// Try to convert self into `BytesMut`. 624 /// 625 /// If `self` is unique for the entire original buffer, this will succeed 626 /// and return a `BytesMut` with the contents of `self` without copying. 627 /// If `self` is not unique for the entire original buffer, this will fail 628 /// and return self. 629 /// 630 /// This will also always fail if the buffer was constructed via either 631 /// [from_owner](Bytes::from_owner) or [from_static](Bytes::from_static). 632 /// 633 /// # Examples 634 /// 635 /// ``` 636 /// use bytes::{Bytes, BytesMut}; 637 /// 638 /// let bytes = Bytes::from(b"hello".to_vec()); 639 /// assert_eq!(bytes.try_into_mut(), Ok(BytesMut::from(&b"hello"[..]))); 640 /// ``` try_into_mut(self) -> Result<BytesMut, Bytes>641 pub fn try_into_mut(self) -> Result<BytesMut, Bytes> { 642 if self.is_unique() { 643 Ok(self.into()) 644 } else { 645 Err(self) 646 } 647 } 648 649 #[inline] with_vtable( ptr: *const u8, len: usize, data: AtomicPtr<()>, vtable: &'static Vtable, ) -> Bytes650 pub(crate) unsafe fn with_vtable( 651 ptr: *const u8, 652 len: usize, 653 data: AtomicPtr<()>, 654 vtable: &'static Vtable, 655 ) -> Bytes { 656 Bytes { 657 ptr, 658 len, 659 data, 660 vtable, 661 } 662 } 663 664 // private 665 666 #[inline] as_slice(&self) -> &[u8]667 fn as_slice(&self) -> &[u8] { 668 unsafe { slice::from_raw_parts(self.ptr, self.len) } 669 } 670 671 #[inline] inc_start(&mut self, by: usize)672 unsafe fn inc_start(&mut self, by: usize) { 673 // should already be asserted, but debug assert for tests 674 debug_assert!(self.len >= by, "internal: inc_start out of bounds"); 675 self.len -= by; 676 self.ptr = self.ptr.add(by); 677 } 678 } 679 680 // Vtable must enforce this behavior 681 unsafe impl Send for Bytes {} 682 unsafe impl Sync for Bytes {} 683 684 impl Drop for Bytes { 685 #[inline] drop(&mut self)686 fn drop(&mut self) { 687 unsafe { (self.vtable.drop)(&mut self.data, self.ptr, self.len) } 688 } 689 } 690 691 impl Clone for Bytes { 692 #[inline] clone(&self) -> Bytes693 fn clone(&self) -> Bytes { 694 unsafe { (self.vtable.clone)(&self.data, self.ptr, self.len) } 695 } 696 } 697 698 impl Buf for Bytes { 699 #[inline] remaining(&self) -> usize700 fn remaining(&self) -> usize { 701 self.len() 702 } 703 704 #[inline] chunk(&self) -> &[u8]705 fn chunk(&self) -> &[u8] { 706 self.as_slice() 707 } 708 709 #[inline] advance(&mut self, cnt: usize)710 fn advance(&mut self, cnt: usize) { 711 assert!( 712 cnt <= self.len(), 713 "cannot advance past `remaining`: {:?} <= {:?}", 714 cnt, 715 self.len(), 716 ); 717 718 unsafe { 719 self.inc_start(cnt); 720 } 721 } 722 copy_to_bytes(&mut self, len: usize) -> Self723 fn copy_to_bytes(&mut self, len: usize) -> Self { 724 self.split_to(len) 725 } 726 } 727 728 impl Deref for Bytes { 729 type Target = [u8]; 730 731 #[inline] deref(&self) -> &[u8]732 fn deref(&self) -> &[u8] { 733 self.as_slice() 734 } 735 } 736 737 impl AsRef<[u8]> for Bytes { 738 #[inline] as_ref(&self) -> &[u8]739 fn as_ref(&self) -> &[u8] { 740 self.as_slice() 741 } 742 } 743 744 impl hash::Hash for Bytes { hash<H>(&self, state: &mut H) where H: hash::Hasher,745 fn hash<H>(&self, state: &mut H) 746 where 747 H: hash::Hasher, 748 { 749 self.as_slice().hash(state); 750 } 751 } 752 753 impl Borrow<[u8]> for Bytes { borrow(&self) -> &[u8]754 fn borrow(&self) -> &[u8] { 755 self.as_slice() 756 } 757 } 758 759 impl IntoIterator for Bytes { 760 type Item = u8; 761 type IntoIter = IntoIter<Bytes>; 762 into_iter(self) -> Self::IntoIter763 fn into_iter(self) -> Self::IntoIter { 764 IntoIter::new(self) 765 } 766 } 767 768 impl<'a> IntoIterator for &'a Bytes { 769 type Item = &'a u8; 770 type IntoIter = core::slice::Iter<'a, u8>; 771 into_iter(self) -> Self::IntoIter772 fn into_iter(self) -> Self::IntoIter { 773 self.as_slice().iter() 774 } 775 } 776 777 impl FromIterator<u8> for Bytes { from_iter<T: IntoIterator<Item = u8>>(into_iter: T) -> Self778 fn from_iter<T: IntoIterator<Item = u8>>(into_iter: T) -> Self { 779 Vec::from_iter(into_iter).into() 780 } 781 } 782 783 // impl Eq 784 785 impl PartialEq for Bytes { eq(&self, other: &Bytes) -> bool786 fn eq(&self, other: &Bytes) -> bool { 787 self.as_slice() == other.as_slice() 788 } 789 } 790 791 impl PartialOrd for Bytes { partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering>792 fn partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering> { 793 self.as_slice().partial_cmp(other.as_slice()) 794 } 795 } 796 797 impl Ord for Bytes { cmp(&self, other: &Bytes) -> cmp::Ordering798 fn cmp(&self, other: &Bytes) -> cmp::Ordering { 799 self.as_slice().cmp(other.as_slice()) 800 } 801 } 802 803 impl Eq for Bytes {} 804 805 impl PartialEq<[u8]> for Bytes { eq(&self, other: &[u8]) -> bool806 fn eq(&self, other: &[u8]) -> bool { 807 self.as_slice() == other 808 } 809 } 810 811 impl PartialOrd<[u8]> for Bytes { partial_cmp(&self, other: &[u8]) -> Option<cmp::Ordering>812 fn partial_cmp(&self, other: &[u8]) -> Option<cmp::Ordering> { 813 self.as_slice().partial_cmp(other) 814 } 815 } 816 817 impl PartialEq<Bytes> for [u8] { eq(&self, other: &Bytes) -> bool818 fn eq(&self, other: &Bytes) -> bool { 819 *other == *self 820 } 821 } 822 823 impl PartialOrd<Bytes> for [u8] { partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering>824 fn partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering> { 825 <[u8] as PartialOrd<[u8]>>::partial_cmp(self, other) 826 } 827 } 828 829 impl PartialEq<str> for Bytes { eq(&self, other: &str) -> bool830 fn eq(&self, other: &str) -> bool { 831 self.as_slice() == other.as_bytes() 832 } 833 } 834 835 impl PartialOrd<str> for Bytes { partial_cmp(&self, other: &str) -> Option<cmp::Ordering>836 fn partial_cmp(&self, other: &str) -> Option<cmp::Ordering> { 837 self.as_slice().partial_cmp(other.as_bytes()) 838 } 839 } 840 841 impl PartialEq<Bytes> for str { eq(&self, other: &Bytes) -> bool842 fn eq(&self, other: &Bytes) -> bool { 843 *other == *self 844 } 845 } 846 847 impl PartialOrd<Bytes> for str { partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering>848 fn partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering> { 849 <[u8] as PartialOrd<[u8]>>::partial_cmp(self.as_bytes(), other) 850 } 851 } 852 853 impl PartialEq<Vec<u8>> for Bytes { eq(&self, other: &Vec<u8>) -> bool854 fn eq(&self, other: &Vec<u8>) -> bool { 855 *self == other[..] 856 } 857 } 858 859 impl PartialOrd<Vec<u8>> for Bytes { partial_cmp(&self, other: &Vec<u8>) -> Option<cmp::Ordering>860 fn partial_cmp(&self, other: &Vec<u8>) -> Option<cmp::Ordering> { 861 self.as_slice().partial_cmp(&other[..]) 862 } 863 } 864 865 impl PartialEq<Bytes> for Vec<u8> { eq(&self, other: &Bytes) -> bool866 fn eq(&self, other: &Bytes) -> bool { 867 *other == *self 868 } 869 } 870 871 impl PartialOrd<Bytes> for Vec<u8> { partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering>872 fn partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering> { 873 <[u8] as PartialOrd<[u8]>>::partial_cmp(self, other) 874 } 875 } 876 877 impl PartialEq<String> for Bytes { eq(&self, other: &String) -> bool878 fn eq(&self, other: &String) -> bool { 879 *self == other[..] 880 } 881 } 882 883 impl PartialOrd<String> for Bytes { partial_cmp(&self, other: &String) -> Option<cmp::Ordering>884 fn partial_cmp(&self, other: &String) -> Option<cmp::Ordering> { 885 self.as_slice().partial_cmp(other.as_bytes()) 886 } 887 } 888 889 impl PartialEq<Bytes> for String { eq(&self, other: &Bytes) -> bool890 fn eq(&self, other: &Bytes) -> bool { 891 *other == *self 892 } 893 } 894 895 impl PartialOrd<Bytes> for String { partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering>896 fn partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering> { 897 <[u8] as PartialOrd<[u8]>>::partial_cmp(self.as_bytes(), other) 898 } 899 } 900 901 impl PartialEq<Bytes> for &[u8] { eq(&self, other: &Bytes) -> bool902 fn eq(&self, other: &Bytes) -> bool { 903 *other == *self 904 } 905 } 906 907 impl PartialOrd<Bytes> for &[u8] { partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering>908 fn partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering> { 909 <[u8] as PartialOrd<[u8]>>::partial_cmp(self, other) 910 } 911 } 912 913 impl PartialEq<Bytes> for &str { eq(&self, other: &Bytes) -> bool914 fn eq(&self, other: &Bytes) -> bool { 915 *other == *self 916 } 917 } 918 919 impl PartialOrd<Bytes> for &str { partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering>920 fn partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering> { 921 <[u8] as PartialOrd<[u8]>>::partial_cmp(self.as_bytes(), other) 922 } 923 } 924 925 impl<'a, T: ?Sized> PartialEq<&'a T> for Bytes 926 where 927 Bytes: PartialEq<T>, 928 { eq(&self, other: &&'a T) -> bool929 fn eq(&self, other: &&'a T) -> bool { 930 *self == **other 931 } 932 } 933 934 impl<'a, T: ?Sized> PartialOrd<&'a T> for Bytes 935 where 936 Bytes: PartialOrd<T>, 937 { partial_cmp(&self, other: &&'a T) -> Option<cmp::Ordering>938 fn partial_cmp(&self, other: &&'a T) -> Option<cmp::Ordering> { 939 self.partial_cmp(&**other) 940 } 941 } 942 943 // impl From 944 945 impl Default for Bytes { 946 #[inline] default() -> Bytes947 fn default() -> Bytes { 948 Bytes::new() 949 } 950 } 951 952 impl From<&'static [u8]> for Bytes { from(slice: &'static [u8]) -> Bytes953 fn from(slice: &'static [u8]) -> Bytes { 954 Bytes::from_static(slice) 955 } 956 } 957 958 impl From<&'static str> for Bytes { from(slice: &'static str) -> Bytes959 fn from(slice: &'static str) -> Bytes { 960 Bytes::from_static(slice.as_bytes()) 961 } 962 } 963 964 impl From<Vec<u8>> for Bytes { from(vec: Vec<u8>) -> Bytes965 fn from(vec: Vec<u8>) -> Bytes { 966 let mut vec = ManuallyDrop::new(vec); 967 let ptr = vec.as_mut_ptr(); 968 let len = vec.len(); 969 let cap = vec.capacity(); 970 971 // Avoid an extra allocation if possible. 972 if len == cap { 973 let vec = ManuallyDrop::into_inner(vec); 974 return Bytes::from(vec.into_boxed_slice()); 975 } 976 977 let shared = Box::new(Shared { 978 buf: ptr, 979 cap, 980 ref_cnt: AtomicUsize::new(1), 981 }); 982 983 let shared = Box::into_raw(shared); 984 // The pointer should be aligned, so this assert should 985 // always succeed. 986 debug_assert!( 987 0 == (shared as usize & KIND_MASK), 988 "internal: Box<Shared> should have an aligned pointer", 989 ); 990 Bytes { 991 ptr, 992 len, 993 data: AtomicPtr::new(shared as _), 994 vtable: &SHARED_VTABLE, 995 } 996 } 997 } 998 999 impl From<Box<[u8]>> for Bytes { from(slice: Box<[u8]>) -> Bytes1000 fn from(slice: Box<[u8]>) -> Bytes { 1001 // Box<[u8]> doesn't contain a heap allocation for empty slices, 1002 // so the pointer isn't aligned enough for the KIND_VEC stashing to 1003 // work. 1004 if slice.is_empty() { 1005 return Bytes::new(); 1006 } 1007 1008 let len = slice.len(); 1009 let ptr = Box::into_raw(slice) as *mut u8; 1010 1011 if ptr as usize & 0x1 == 0 { 1012 let data = ptr_map(ptr, |addr| addr | KIND_VEC); 1013 Bytes { 1014 ptr, 1015 len, 1016 data: AtomicPtr::new(data.cast()), 1017 vtable: &PROMOTABLE_EVEN_VTABLE, 1018 } 1019 } else { 1020 Bytes { 1021 ptr, 1022 len, 1023 data: AtomicPtr::new(ptr.cast()), 1024 vtable: &PROMOTABLE_ODD_VTABLE, 1025 } 1026 } 1027 } 1028 } 1029 1030 impl From<Bytes> for BytesMut { 1031 /// Convert self into `BytesMut`. 1032 /// 1033 /// If `bytes` is unique for the entire original buffer, this will return a 1034 /// `BytesMut` with the contents of `bytes` without copying. 1035 /// If `bytes` is not unique for the entire original buffer, this will make 1036 /// a copy of `bytes` subset of the original buffer in a new `BytesMut`. 1037 /// 1038 /// # Examples 1039 /// 1040 /// ``` 1041 /// use bytes::{Bytes, BytesMut}; 1042 /// 1043 /// let bytes = Bytes::from(b"hello".to_vec()); 1044 /// assert_eq!(BytesMut::from(bytes), BytesMut::from(&b"hello"[..])); 1045 /// ``` from(bytes: Bytes) -> Self1046 fn from(bytes: Bytes) -> Self { 1047 let bytes = ManuallyDrop::new(bytes); 1048 unsafe { (bytes.vtable.to_mut)(&bytes.data, bytes.ptr, bytes.len) } 1049 } 1050 } 1051 1052 impl From<String> for Bytes { from(s: String) -> Bytes1053 fn from(s: String) -> Bytes { 1054 Bytes::from(s.into_bytes()) 1055 } 1056 } 1057 1058 impl From<Bytes> for Vec<u8> { from(bytes: Bytes) -> Vec<u8>1059 fn from(bytes: Bytes) -> Vec<u8> { 1060 let bytes = ManuallyDrop::new(bytes); 1061 unsafe { (bytes.vtable.to_vec)(&bytes.data, bytes.ptr, bytes.len) } 1062 } 1063 } 1064 1065 // ===== impl Vtable ===== 1066 1067 impl fmt::Debug for Vtable { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1068 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 1069 f.debug_struct("Vtable") 1070 .field("clone", &(self.clone as *const ())) 1071 .field("drop", &(self.drop as *const ())) 1072 .finish() 1073 } 1074 } 1075 1076 // ===== impl StaticVtable ===== 1077 1078 const STATIC_VTABLE: Vtable = Vtable { 1079 clone: static_clone, 1080 to_vec: static_to_vec, 1081 to_mut: static_to_mut, 1082 is_unique: static_is_unique, 1083 drop: static_drop, 1084 }; 1085 static_clone(_: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes1086 unsafe fn static_clone(_: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes { 1087 let slice = slice::from_raw_parts(ptr, len); 1088 Bytes::from_static(slice) 1089 } 1090 static_to_vec(_: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8>1091 unsafe fn static_to_vec(_: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8> { 1092 let slice = slice::from_raw_parts(ptr, len); 1093 slice.to_vec() 1094 } 1095 static_to_mut(_: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut1096 unsafe fn static_to_mut(_: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut { 1097 let slice = slice::from_raw_parts(ptr, len); 1098 BytesMut::from(slice) 1099 } 1100 static_is_unique(_: &AtomicPtr<()>) -> bool1101 fn static_is_unique(_: &AtomicPtr<()>) -> bool { 1102 false 1103 } 1104 static_drop(_: &mut AtomicPtr<()>, _: *const u8, _: usize)1105 unsafe fn static_drop(_: &mut AtomicPtr<()>, _: *const u8, _: usize) { 1106 // nothing to drop for &'static [u8] 1107 } 1108 1109 // ===== impl OwnedVtable ===== 1110 1111 #[repr(C)] 1112 struct OwnedLifetime { 1113 ref_cnt: AtomicUsize, 1114 drop: unsafe fn(*mut ()), 1115 } 1116 1117 #[repr(C)] 1118 struct Owned<T> { 1119 lifetime: OwnedLifetime, 1120 owner: T, 1121 } 1122 owned_box_and_drop<T>(ptr: *mut ())1123 unsafe fn owned_box_and_drop<T>(ptr: *mut ()) { 1124 let b: Box<Owned<T>> = Box::from_raw(ptr as _); 1125 drop(b); 1126 } 1127 owned_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes1128 unsafe fn owned_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes { 1129 let owned = data.load(Ordering::Relaxed); 1130 let ref_cnt = &(*owned.cast::<OwnedLifetime>()).ref_cnt; 1131 let old_cnt = ref_cnt.fetch_add(1, Ordering::Relaxed); 1132 if old_cnt > usize::MAX >> 1 { 1133 crate::abort() 1134 } 1135 1136 Bytes { 1137 ptr, 1138 len, 1139 data: AtomicPtr::new(owned as _), 1140 vtable: &OWNED_VTABLE, 1141 } 1142 } 1143 owned_to_vec(_data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8>1144 unsafe fn owned_to_vec(_data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8> { 1145 let slice = slice::from_raw_parts(ptr, len); 1146 slice.to_vec() 1147 } 1148 owned_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut1149 unsafe fn owned_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut { 1150 let bytes_mut = BytesMut::from_vec(owned_to_vec(data, ptr, len)); 1151 owned_drop_impl(data.load(Ordering::Relaxed)); 1152 bytes_mut 1153 } 1154 owned_is_unique(_data: &AtomicPtr<()>) -> bool1155 unsafe fn owned_is_unique(_data: &AtomicPtr<()>) -> bool { 1156 false 1157 } 1158 owned_drop_impl(owned: *mut ())1159 unsafe fn owned_drop_impl(owned: *mut ()) { 1160 let lifetime = owned.cast::<OwnedLifetime>(); 1161 let ref_cnt = &(*lifetime).ref_cnt; 1162 1163 let old_cnt = ref_cnt.fetch_sub(1, Ordering::Release); 1164 if old_cnt != 1 { 1165 return; 1166 } 1167 ref_cnt.load(Ordering::Acquire); 1168 1169 let drop_fn = &(*lifetime).drop; 1170 drop_fn(owned) 1171 } 1172 owned_drop(data: &mut AtomicPtr<()>, _ptr: *const u8, _len: usize)1173 unsafe fn owned_drop(data: &mut AtomicPtr<()>, _ptr: *const u8, _len: usize) { 1174 let owned = data.load(Ordering::Relaxed); 1175 owned_drop_impl(owned); 1176 } 1177 1178 static OWNED_VTABLE: Vtable = Vtable { 1179 clone: owned_clone, 1180 to_vec: owned_to_vec, 1181 to_mut: owned_to_mut, 1182 is_unique: owned_is_unique, 1183 drop: owned_drop, 1184 }; 1185 1186 // ===== impl PromotableVtable ===== 1187 1188 static PROMOTABLE_EVEN_VTABLE: Vtable = Vtable { 1189 clone: promotable_even_clone, 1190 to_vec: promotable_even_to_vec, 1191 to_mut: promotable_even_to_mut, 1192 is_unique: promotable_is_unique, 1193 drop: promotable_even_drop, 1194 }; 1195 1196 static PROMOTABLE_ODD_VTABLE: Vtable = Vtable { 1197 clone: promotable_odd_clone, 1198 to_vec: promotable_odd_to_vec, 1199 to_mut: promotable_odd_to_mut, 1200 is_unique: promotable_is_unique, 1201 drop: promotable_odd_drop, 1202 }; 1203 promotable_even_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes1204 unsafe fn promotable_even_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes { 1205 let shared = data.load(Ordering::Acquire); 1206 let kind = shared as usize & KIND_MASK; 1207 1208 if kind == KIND_ARC { 1209 shallow_clone_arc(shared.cast(), ptr, len) 1210 } else { 1211 debug_assert_eq!(kind, KIND_VEC); 1212 let buf = ptr_map(shared.cast(), |addr| addr & !KIND_MASK); 1213 shallow_clone_vec(data, shared, buf, ptr, len) 1214 } 1215 } 1216 promotable_to_vec( data: &AtomicPtr<()>, ptr: *const u8, len: usize, f: fn(*mut ()) -> *mut u8, ) -> Vec<u8>1217 unsafe fn promotable_to_vec( 1218 data: &AtomicPtr<()>, 1219 ptr: *const u8, 1220 len: usize, 1221 f: fn(*mut ()) -> *mut u8, 1222 ) -> Vec<u8> { 1223 let shared = data.load(Ordering::Acquire); 1224 let kind = shared as usize & KIND_MASK; 1225 1226 if kind == KIND_ARC { 1227 shared_to_vec_impl(shared.cast(), ptr, len) 1228 } else { 1229 // If Bytes holds a Vec, then the offset must be 0. 1230 debug_assert_eq!(kind, KIND_VEC); 1231 1232 let buf = f(shared); 1233 1234 let cap = offset_from(ptr, buf) + len; 1235 1236 // Copy back buffer 1237 ptr::copy(ptr, buf, len); 1238 1239 Vec::from_raw_parts(buf, len, cap) 1240 } 1241 } 1242 promotable_to_mut( data: &AtomicPtr<()>, ptr: *const u8, len: usize, f: fn(*mut ()) -> *mut u8, ) -> BytesMut1243 unsafe fn promotable_to_mut( 1244 data: &AtomicPtr<()>, 1245 ptr: *const u8, 1246 len: usize, 1247 f: fn(*mut ()) -> *mut u8, 1248 ) -> BytesMut { 1249 let shared = data.load(Ordering::Acquire); 1250 let kind = shared as usize & KIND_MASK; 1251 1252 if kind == KIND_ARC { 1253 shared_to_mut_impl(shared.cast(), ptr, len) 1254 } else { 1255 // KIND_VEC is a view of an underlying buffer at a certain offset. 1256 // The ptr + len always represents the end of that buffer. 1257 // Before truncating it, it is first promoted to KIND_ARC. 1258 // Thus, we can safely reconstruct a Vec from it without leaking memory. 1259 debug_assert_eq!(kind, KIND_VEC); 1260 1261 let buf = f(shared); 1262 let off = offset_from(ptr, buf); 1263 let cap = off + len; 1264 let v = Vec::from_raw_parts(buf, cap, cap); 1265 1266 let mut b = BytesMut::from_vec(v); 1267 b.advance_unchecked(off); 1268 b 1269 } 1270 } 1271 promotable_even_to_vec(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8>1272 unsafe fn promotable_even_to_vec(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8> { 1273 promotable_to_vec(data, ptr, len, |shared| { 1274 ptr_map(shared.cast(), |addr| addr & !KIND_MASK) 1275 }) 1276 } 1277 promotable_even_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut1278 unsafe fn promotable_even_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut { 1279 promotable_to_mut(data, ptr, len, |shared| { 1280 ptr_map(shared.cast(), |addr| addr & !KIND_MASK) 1281 }) 1282 } 1283 promotable_even_drop(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize)1284 unsafe fn promotable_even_drop(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize) { 1285 data.with_mut(|shared| { 1286 let shared = *shared; 1287 let kind = shared as usize & KIND_MASK; 1288 1289 if kind == KIND_ARC { 1290 release_shared(shared.cast()); 1291 } else { 1292 debug_assert_eq!(kind, KIND_VEC); 1293 let buf = ptr_map(shared.cast(), |addr| addr & !KIND_MASK); 1294 free_boxed_slice(buf, ptr, len); 1295 } 1296 }); 1297 } 1298 promotable_odd_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes1299 unsafe fn promotable_odd_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes { 1300 let shared = data.load(Ordering::Acquire); 1301 let kind = shared as usize & KIND_MASK; 1302 1303 if kind == KIND_ARC { 1304 shallow_clone_arc(shared as _, ptr, len) 1305 } else { 1306 debug_assert_eq!(kind, KIND_VEC); 1307 shallow_clone_vec(data, shared, shared.cast(), ptr, len) 1308 } 1309 } 1310 promotable_odd_to_vec(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8>1311 unsafe fn promotable_odd_to_vec(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8> { 1312 promotable_to_vec(data, ptr, len, |shared| shared.cast()) 1313 } 1314 promotable_odd_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut1315 unsafe fn promotable_odd_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut { 1316 promotable_to_mut(data, ptr, len, |shared| shared.cast()) 1317 } 1318 promotable_odd_drop(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize)1319 unsafe fn promotable_odd_drop(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize) { 1320 data.with_mut(|shared| { 1321 let shared = *shared; 1322 let kind = shared as usize & KIND_MASK; 1323 1324 if kind == KIND_ARC { 1325 release_shared(shared.cast()); 1326 } else { 1327 debug_assert_eq!(kind, KIND_VEC); 1328 1329 free_boxed_slice(shared.cast(), ptr, len); 1330 } 1331 }); 1332 } 1333 promotable_is_unique(data: &AtomicPtr<()>) -> bool1334 unsafe fn promotable_is_unique(data: &AtomicPtr<()>) -> bool { 1335 let shared = data.load(Ordering::Acquire); 1336 let kind = shared as usize & KIND_MASK; 1337 1338 if kind == KIND_ARC { 1339 let ref_cnt = (*shared.cast::<Shared>()).ref_cnt.load(Ordering::Relaxed); 1340 ref_cnt == 1 1341 } else { 1342 true 1343 } 1344 } 1345 free_boxed_slice(buf: *mut u8, offset: *const u8, len: usize)1346 unsafe fn free_boxed_slice(buf: *mut u8, offset: *const u8, len: usize) { 1347 let cap = offset_from(offset, buf) + len; 1348 dealloc(buf, Layout::from_size_align(cap, 1).unwrap()) 1349 } 1350 1351 // ===== impl SharedVtable ===== 1352 1353 struct Shared { 1354 // Holds arguments to dealloc upon Drop, but otherwise doesn't use them 1355 buf: *mut u8, 1356 cap: usize, 1357 ref_cnt: AtomicUsize, 1358 } 1359 1360 impl Drop for Shared { drop(&mut self)1361 fn drop(&mut self) { 1362 unsafe { dealloc(self.buf, Layout::from_size_align(self.cap, 1).unwrap()) } 1363 } 1364 } 1365 1366 // Assert that the alignment of `Shared` is divisible by 2. 1367 // This is a necessary invariant since we depend on allocating `Shared` a 1368 // shared object to implicitly carry the `KIND_ARC` flag in its pointer. 1369 // This flag is set when the LSB is 0. 1370 const _: [(); 0 - mem::align_of::<Shared>() % 2] = []; // Assert that the alignment of `Shared` is divisible by 2. 1371 1372 static SHARED_VTABLE: Vtable = Vtable { 1373 clone: shared_clone, 1374 to_vec: shared_to_vec, 1375 to_mut: shared_to_mut, 1376 is_unique: shared_is_unique, 1377 drop: shared_drop, 1378 }; 1379 1380 const KIND_ARC: usize = 0b0; 1381 const KIND_VEC: usize = 0b1; 1382 const KIND_MASK: usize = 0b1; 1383 shared_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes1384 unsafe fn shared_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes { 1385 let shared = data.load(Ordering::Relaxed); 1386 shallow_clone_arc(shared as _, ptr, len) 1387 } 1388 shared_to_vec_impl(shared: *mut Shared, ptr: *const u8, len: usize) -> Vec<u8>1389 unsafe fn shared_to_vec_impl(shared: *mut Shared, ptr: *const u8, len: usize) -> Vec<u8> { 1390 // Check that the ref_cnt is 1 (unique). 1391 // 1392 // If it is unique, then it is set to 0 with AcqRel fence for the same 1393 // reason in release_shared. 1394 // 1395 // Otherwise, we take the other branch and call release_shared. 1396 if (*shared) 1397 .ref_cnt 1398 .compare_exchange(1, 0, Ordering::AcqRel, Ordering::Relaxed) 1399 .is_ok() 1400 { 1401 // Deallocate the `Shared` instance without running its destructor. 1402 let shared = *Box::from_raw(shared); 1403 let shared = ManuallyDrop::new(shared); 1404 let buf = shared.buf; 1405 let cap = shared.cap; 1406 1407 // Copy back buffer 1408 ptr::copy(ptr, buf, len); 1409 1410 Vec::from_raw_parts(buf, len, cap) 1411 } else { 1412 let v = slice::from_raw_parts(ptr, len).to_vec(); 1413 release_shared(shared); 1414 v 1415 } 1416 } 1417 shared_to_vec(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8>1418 unsafe fn shared_to_vec(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8> { 1419 shared_to_vec_impl(data.load(Ordering::Relaxed).cast(), ptr, len) 1420 } 1421 shared_to_mut_impl(shared: *mut Shared, ptr: *const u8, len: usize) -> BytesMut1422 unsafe fn shared_to_mut_impl(shared: *mut Shared, ptr: *const u8, len: usize) -> BytesMut { 1423 // The goal is to check if the current handle is the only handle 1424 // that currently has access to the buffer. This is done by 1425 // checking if the `ref_cnt` is currently 1. 1426 // 1427 // The `Acquire` ordering synchronizes with the `Release` as 1428 // part of the `fetch_sub` in `release_shared`. The `fetch_sub` 1429 // operation guarantees that any mutations done in other threads 1430 // are ordered before the `ref_cnt` is decremented. As such, 1431 // this `Acquire` will guarantee that those mutations are 1432 // visible to the current thread. 1433 // 1434 // Otherwise, we take the other branch, copy the data and call `release_shared`. 1435 if (*shared).ref_cnt.load(Ordering::Acquire) == 1 { 1436 // Deallocate the `Shared` instance without running its destructor. 1437 let shared = *Box::from_raw(shared); 1438 let shared = ManuallyDrop::new(shared); 1439 let buf = shared.buf; 1440 let cap = shared.cap; 1441 1442 // Rebuild Vec 1443 let off = offset_from(ptr, buf); 1444 let v = Vec::from_raw_parts(buf, len + off, cap); 1445 1446 let mut b = BytesMut::from_vec(v); 1447 b.advance_unchecked(off); 1448 b 1449 } else { 1450 // Copy the data from Shared in a new Vec, then release it 1451 let v = slice::from_raw_parts(ptr, len).to_vec(); 1452 release_shared(shared); 1453 BytesMut::from_vec(v) 1454 } 1455 } 1456 shared_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut1457 unsafe fn shared_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut { 1458 shared_to_mut_impl(data.load(Ordering::Relaxed).cast(), ptr, len) 1459 } 1460 shared_is_unique(data: &AtomicPtr<()>) -> bool1461 pub(crate) unsafe fn shared_is_unique(data: &AtomicPtr<()>) -> bool { 1462 let shared = data.load(Ordering::Acquire); 1463 let ref_cnt = (*shared.cast::<Shared>()).ref_cnt.load(Ordering::Relaxed); 1464 ref_cnt == 1 1465 } 1466 shared_drop(data: &mut AtomicPtr<()>, _ptr: *const u8, _len: usize)1467 unsafe fn shared_drop(data: &mut AtomicPtr<()>, _ptr: *const u8, _len: usize) { 1468 data.with_mut(|shared| { 1469 release_shared(shared.cast()); 1470 }); 1471 } 1472 shallow_clone_arc(shared: *mut Shared, ptr: *const u8, len: usize) -> Bytes1473 unsafe fn shallow_clone_arc(shared: *mut Shared, ptr: *const u8, len: usize) -> Bytes { 1474 let old_size = (*shared).ref_cnt.fetch_add(1, Ordering::Relaxed); 1475 1476 if old_size > usize::MAX >> 1 { 1477 crate::abort(); 1478 } 1479 1480 Bytes { 1481 ptr, 1482 len, 1483 data: AtomicPtr::new(shared as _), 1484 vtable: &SHARED_VTABLE, 1485 } 1486 } 1487 1488 #[cold] shallow_clone_vec( atom: &AtomicPtr<()>, ptr: *const (), buf: *mut u8, offset: *const u8, len: usize, ) -> Bytes1489 unsafe fn shallow_clone_vec( 1490 atom: &AtomicPtr<()>, 1491 ptr: *const (), 1492 buf: *mut u8, 1493 offset: *const u8, 1494 len: usize, 1495 ) -> Bytes { 1496 // If the buffer is still tracked in a `Vec<u8>`. It is time to 1497 // promote the vec to an `Arc`. This could potentially be called 1498 // concurrently, so some care must be taken. 1499 1500 // First, allocate a new `Shared` instance containing the 1501 // `Vec` fields. It's important to note that `ptr`, `len`, 1502 // and `cap` cannot be mutated without having `&mut self`. 1503 // This means that these fields will not be concurrently 1504 // updated and since the buffer hasn't been promoted to an 1505 // `Arc`, those three fields still are the components of the 1506 // vector. 1507 let shared = Box::new(Shared { 1508 buf, 1509 cap: offset_from(offset, buf) + len, 1510 // Initialize refcount to 2. One for this reference, and one 1511 // for the new clone that will be returned from 1512 // `shallow_clone`. 1513 ref_cnt: AtomicUsize::new(2), 1514 }); 1515 1516 let shared = Box::into_raw(shared); 1517 1518 // The pointer should be aligned, so this assert should 1519 // always succeed. 1520 debug_assert!( 1521 0 == (shared as usize & KIND_MASK), 1522 "internal: Box<Shared> should have an aligned pointer", 1523 ); 1524 1525 // Try compare & swapping the pointer into the `arc` field. 1526 // `Release` is used synchronize with other threads that 1527 // will load the `arc` field. 1528 // 1529 // If the `compare_exchange` fails, then the thread lost the 1530 // race to promote the buffer to shared. The `Acquire` 1531 // ordering will synchronize with the `compare_exchange` 1532 // that happened in the other thread and the `Shared` 1533 // pointed to by `actual` will be visible. 1534 match atom.compare_exchange(ptr as _, shared as _, Ordering::AcqRel, Ordering::Acquire) { 1535 Ok(actual) => { 1536 debug_assert!(actual as usize == ptr as usize); 1537 // The upgrade was successful, the new handle can be 1538 // returned. 1539 Bytes { 1540 ptr: offset, 1541 len, 1542 data: AtomicPtr::new(shared as _), 1543 vtable: &SHARED_VTABLE, 1544 } 1545 } 1546 Err(actual) => { 1547 // The upgrade failed, a concurrent clone happened. Release 1548 // the allocation that was made in this thread, it will not 1549 // be needed. 1550 let shared = Box::from_raw(shared); 1551 mem::forget(*shared); 1552 1553 // Buffer already promoted to shared storage, so increment ref 1554 // count. 1555 shallow_clone_arc(actual as _, offset, len) 1556 } 1557 } 1558 } 1559 release_shared(ptr: *mut Shared)1560 unsafe fn release_shared(ptr: *mut Shared) { 1561 // `Shared` storage... follow the drop steps from Arc. 1562 if (*ptr).ref_cnt.fetch_sub(1, Ordering::Release) != 1 { 1563 return; 1564 } 1565 1566 // This fence is needed to prevent reordering of use of the data and 1567 // deletion of the data. Because it is marked `Release`, the decreasing 1568 // of the reference count synchronizes with this `Acquire` fence. This 1569 // means that use of the data happens before decreasing the reference 1570 // count, which happens before this fence, which happens before the 1571 // deletion of the data. 1572 // 1573 // As explained in the [Boost documentation][1], 1574 // 1575 // > It is important to enforce any possible access to the object in one 1576 // > thread (through an existing reference) to *happen before* deleting 1577 // > the object in a different thread. This is achieved by a "release" 1578 // > operation after dropping a reference (any access to the object 1579 // > through this reference must obviously happened before), and an 1580 // > "acquire" operation before deleting the object. 1581 // 1582 // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) 1583 // 1584 // Thread sanitizer does not support atomic fences. Use an atomic load 1585 // instead. 1586 (*ptr).ref_cnt.load(Ordering::Acquire); 1587 1588 // Drop the data 1589 drop(Box::from_raw(ptr)); 1590 } 1591 1592 // Ideally we would always use this version of `ptr_map` since it is strict 1593 // provenance compatible, but it results in worse codegen. We will however still 1594 // use it on miri because it gives better diagnostics for people who test bytes 1595 // code with miri. 1596 // 1597 // See https://github.com/tokio-rs/bytes/pull/545 for more info. 1598 #[cfg(miri)] ptr_map<F>(ptr: *mut u8, f: F) -> *mut u8 where F: FnOnce(usize) -> usize,1599 fn ptr_map<F>(ptr: *mut u8, f: F) -> *mut u8 1600 where 1601 F: FnOnce(usize) -> usize, 1602 { 1603 let old_addr = ptr as usize; 1604 let new_addr = f(old_addr); 1605 let diff = new_addr.wrapping_sub(old_addr); 1606 ptr.wrapping_add(diff) 1607 } 1608 1609 #[cfg(not(miri))] ptr_map<F>(ptr: *mut u8, f: F) -> *mut u8 where F: FnOnce(usize) -> usize,1610 fn ptr_map<F>(ptr: *mut u8, f: F) -> *mut u8 1611 where 1612 F: FnOnce(usize) -> usize, 1613 { 1614 let old_addr = ptr as usize; 1615 let new_addr = f(old_addr); 1616 new_addr as *mut u8 1617 } 1618 without_provenance(ptr: usize) -> *const u81619 fn without_provenance(ptr: usize) -> *const u8 { 1620 core::ptr::null::<u8>().wrapping_add(ptr) 1621 } 1622 1623 // compile-fails 1624 1625 /// ```compile_fail 1626 /// use bytes::Bytes; 1627 /// #[deny(unused_must_use)] 1628 /// { 1629 /// let mut b1 = Bytes::from("hello world"); 1630 /// b1.split_to(6); 1631 /// } 1632 /// ``` _split_to_must_use()1633 fn _split_to_must_use() {} 1634 1635 /// ```compile_fail 1636 /// use bytes::Bytes; 1637 /// #[deny(unused_must_use)] 1638 /// { 1639 /// let mut b1 = Bytes::from("hello world"); 1640 /// b1.split_off(6); 1641 /// } 1642 /// ``` _split_off_must_use()1643 fn _split_off_must_use() {} 1644 1645 // fuzz tests 1646 #[cfg(all(test, loom))] 1647 mod fuzz { 1648 use loom::sync::Arc; 1649 use loom::thread; 1650 1651 use super::Bytes; 1652 #[test] bytes_cloning_vec()1653 fn bytes_cloning_vec() { 1654 loom::model(|| { 1655 let a = Bytes::from(b"abcdefgh".to_vec()); 1656 let addr = a.as_ptr() as usize; 1657 1658 // test the Bytes::clone is Sync by putting it in an Arc 1659 let a1 = Arc::new(a); 1660 let a2 = a1.clone(); 1661 1662 let t1 = thread::spawn(move || { 1663 let b: Bytes = (*a1).clone(); 1664 assert_eq!(b.as_ptr() as usize, addr); 1665 }); 1666 1667 let t2 = thread::spawn(move || { 1668 let b: Bytes = (*a2).clone(); 1669 assert_eq!(b.as_ptr() as usize, addr); 1670 }); 1671 1672 t1.join().unwrap(); 1673 t2.join().unwrap(); 1674 }); 1675 } 1676 } 1677