1  use crate::stream_ext::Fuse;
2  use crate::Stream;
3  
4  use core::pin::Pin;
5  use core::task::{Context, Poll};
6  use pin_project_lite::pin_project;
7  
8  pin_project! {
9      /// Stream returned by the [`merge`](super::StreamExt::merge) method.
10      pub struct Merge<T, U> {
11          #[pin]
12          a: Fuse<T>,
13          #[pin]
14          b: Fuse<U>,
15          // When `true`, poll `a` first, otherwise, `poll` b`.
16          a_first: bool,
17      }
18  }
19  
20  impl<T, U> Merge<T, U> {
new(a: T, b: U) -> Merge<T, U> where T: Stream, U: Stream,21      pub(super) fn new(a: T, b: U) -> Merge<T, U>
22      where
23          T: Stream,
24          U: Stream,
25      {
26          Merge {
27              a: Fuse::new(a),
28              b: Fuse::new(b),
29              a_first: true,
30          }
31      }
32  }
33  
34  impl<T, U> Stream for Merge<T, U>
35  where
36      T: Stream,
37      U: Stream<Item = T::Item>,
38  {
39      type Item = T::Item;
40  
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>>41      fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
42          let me = self.project();
43          let a_first = *me.a_first;
44  
45          // Toggle the flag
46          *me.a_first = !a_first;
47  
48          if a_first {
49              poll_next(me.a, me.b, cx)
50          } else {
51              poll_next(me.b, me.a, cx)
52          }
53      }
54  
size_hint(&self) -> (usize, Option<usize>)55      fn size_hint(&self) -> (usize, Option<usize>) {
56          super::merge_size_hints(self.a.size_hint(), self.b.size_hint())
57      }
58  }
59  
poll_next<T, U>( first: Pin<&mut T>, second: Pin<&mut U>, cx: &mut Context<'_>, ) -> Poll<Option<T::Item>> where T: Stream, U: Stream<Item = T::Item>,60  fn poll_next<T, U>(
61      first: Pin<&mut T>,
62      second: Pin<&mut U>,
63      cx: &mut Context<'_>,
64  ) -> Poll<Option<T::Item>>
65  where
66      T: Stream,
67      U: Stream<Item = T::Item>,
68  {
69      let mut done = true;
70  
71      match first.poll_next(cx) {
72          Poll::Ready(Some(val)) => return Poll::Ready(Some(val)),
73          Poll::Ready(None) => {}
74          Poll::Pending => done = false,
75      }
76  
77      match second.poll_next(cx) {
78          Poll::Ready(Some(val)) => return Poll::Ready(Some(val)),
79          Poll::Ready(None) => {}
80          Poll::Pending => done = false,
81      }
82  
83      if done {
84          Poll::Ready(None)
85      } else {
86          Poll::Pending
87      }
88  }
89