1 use crate::stream_ext::Fuse; 2 use crate::Stream; 3 4 use core::pin::Pin; 5 use core::task::{Context, Poll}; 6 use pin_project_lite::pin_project; 7 8 pin_project! { 9 /// Stream returned by the [`merge`](super::StreamExt::merge) method. 10 pub struct Merge<T, U> { 11 #[pin] 12 a: Fuse<T>, 13 #[pin] 14 b: Fuse<U>, 15 // When `true`, poll `a` first, otherwise, `poll` b`. 16 a_first: bool, 17 } 18 } 19 20 impl<T, U> Merge<T, U> { new(a: T, b: U) -> Merge<T, U> where T: Stream, U: Stream,21 pub(super) fn new(a: T, b: U) -> Merge<T, U> 22 where 23 T: Stream, 24 U: Stream, 25 { 26 Merge { 27 a: Fuse::new(a), 28 b: Fuse::new(b), 29 a_first: true, 30 } 31 } 32 } 33 34 impl<T, U> Stream for Merge<T, U> 35 where 36 T: Stream, 37 U: Stream<Item = T::Item>, 38 { 39 type Item = T::Item; 40 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>>41 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> { 42 let me = self.project(); 43 let a_first = *me.a_first; 44 45 // Toggle the flag 46 *me.a_first = !a_first; 47 48 if a_first { 49 poll_next(me.a, me.b, cx) 50 } else { 51 poll_next(me.b, me.a, cx) 52 } 53 } 54 size_hint(&self) -> (usize, Option<usize>)55 fn size_hint(&self) -> (usize, Option<usize>) { 56 super::merge_size_hints(self.a.size_hint(), self.b.size_hint()) 57 } 58 } 59 poll_next<T, U>( first: Pin<&mut T>, second: Pin<&mut U>, cx: &mut Context<'_>, ) -> Poll<Option<T::Item>> where T: Stream, U: Stream<Item = T::Item>,60 fn poll_next<T, U>( 61 first: Pin<&mut T>, 62 second: Pin<&mut U>, 63 cx: &mut Context<'_>, 64 ) -> Poll<Option<T::Item>> 65 where 66 T: Stream, 67 U: Stream<Item = T::Item>, 68 { 69 let mut done = true; 70 71 match first.poll_next(cx) { 72 Poll::Ready(Some(val)) => return Poll::Ready(Some(val)), 73 Poll::Ready(None) => {} 74 Poll::Pending => done = false, 75 } 76 77 match second.poll_next(cx) { 78 Poll::Ready(Some(val)) => return Poll::Ready(Some(val)), 79 Poll::Ready(None) => {} 80 Poll::Pending => done = false, 81 } 82 83 if done { 84 Poll::Ready(None) 85 } else { 86 Poll::Pending 87 } 88 } 89