1 use crate::task::{self as task03, ArcWake as ArcWake03, WakerRef};
2 use futures_01::{
3     task as task01, Async as Async01, Future as Future01, Poll as Poll01, Stream as Stream01,
4 };
5 #[cfg(feature = "sink")]
6 use futures_01::{AsyncSink as AsyncSink01, Sink as Sink01, StartSend as StartSend01};
7 use futures_core::{
8     future::TryFuture as TryFuture03,
9     stream::TryStream as TryStream03,
10     task::{RawWaker, RawWakerVTable},
11 };
12 #[cfg(feature = "sink")]
13 use futures_sink::Sink as Sink03;
14 #[cfg(feature = "sink")]
15 use std::marker::PhantomData;
16 use std::{mem, pin::Pin, sync::Arc, task::Context};
17 
18 #[allow(clippy::too_long_first_doc_paragraph)] // clippy bug, see https://github.com/rust-lang/rust-clippy/issues/13315
19 /// Converts a futures 0.3 [`TryFuture`](futures_core::future::TryFuture) or
20 /// [`TryStream`](futures_core::stream::TryStream) into a futures 0.1
21 /// [`Future`](futures_01::future::Future) or
22 /// [`Stream`](futures_01::stream::Stream).
23 #[derive(Debug, Clone, Copy)]
24 #[must_use = "futures do nothing unless you `.await` or poll them"]
25 pub struct Compat<T> {
26     pub(crate) inner: T,
27 }
28 
29 /// Converts a futures 0.3 [`Sink`](futures_sink::Sink) into a futures 0.1
30 /// [`Sink`](futures_01::sink::Sink).
31 #[cfg(feature = "sink")]
32 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
33 #[derive(Debug)]
34 #[must_use = "sinks do nothing unless polled"]
35 pub struct CompatSink<T, Item> {
36     inner: T,
37     _phantom: PhantomData<fn(Item)>,
38 }
39 
40 impl<T> Compat<T> {
41     /// Creates a new [`Compat`].
42     ///
43     /// For types which implement appropriate futures `0.3`
44     /// traits, the result will be a type which implements
45     /// the corresponding futures 0.1 type.
new(inner: T) -> Self46     pub fn new(inner: T) -> Self {
47         Self { inner }
48     }
49 
50     /// Get a reference to 0.3 Future, Stream, AsyncRead, or AsyncWrite object
51     /// contained within.
get_ref(&self) -> &T52     pub fn get_ref(&self) -> &T {
53         &self.inner
54     }
55 
56     /// Get a mutable reference to 0.3 Future, Stream, AsyncRead, or AsyncWrite object
57     /// contained within.
get_mut(&mut self) -> &mut T58     pub fn get_mut(&mut self) -> &mut T {
59         &mut self.inner
60     }
61 
62     /// Returns the inner item.
into_inner(self) -> T63     pub fn into_inner(self) -> T {
64         self.inner
65     }
66 }
67 
68 #[cfg(feature = "sink")]
69 impl<T, Item> CompatSink<T, Item> {
70     /// Creates a new [`CompatSink`].
new(inner: T) -> Self71     pub fn new(inner: T) -> Self {
72         Self { inner, _phantom: PhantomData }
73     }
74 
75     /// Get a reference to 0.3 Sink contained within.
get_ref(&self) -> &T76     pub fn get_ref(&self) -> &T {
77         &self.inner
78     }
79 
80     /// Get a mutable reference to 0.3 Sink contained within.
get_mut(&mut self) -> &mut T81     pub fn get_mut(&mut self) -> &mut T {
82         &mut self.inner
83     }
84 
85     /// Returns the inner item.
into_inner(self) -> T86     pub fn into_inner(self) -> T {
87         self.inner
88     }
89 }
90 
poll_03_to_01<T, E>(x: task03::Poll<Result<T, E>>) -> Result<Async01<T>, E>91 fn poll_03_to_01<T, E>(x: task03::Poll<Result<T, E>>) -> Result<Async01<T>, E> {
92     match x? {
93         task03::Poll::Ready(t) => Ok(Async01::Ready(t)),
94         task03::Poll::Pending => Ok(Async01::NotReady),
95     }
96 }
97 
98 impl<Fut> Future01 for Compat<Fut>
99 where
100     Fut: TryFuture03 + Unpin,
101 {
102     type Item = Fut::Ok;
103     type Error = Fut::Error;
104 
poll(&mut self) -> Poll01<Self::Item, Self::Error>105     fn poll(&mut self) -> Poll01<Self::Item, Self::Error> {
106         with_context(self, |inner, cx| poll_03_to_01(inner.try_poll(cx)))
107     }
108 }
109 
110 impl<St> Stream01 for Compat<St>
111 where
112     St: TryStream03 + Unpin,
113 {
114     type Item = St::Ok;
115     type Error = St::Error;
116 
poll(&mut self) -> Poll01<Option<Self::Item>, Self::Error>117     fn poll(&mut self) -> Poll01<Option<Self::Item>, Self::Error> {
118         with_context(self, |inner, cx| match inner.try_poll_next(cx)? {
119             task03::Poll::Ready(None) => Ok(Async01::Ready(None)),
120             task03::Poll::Ready(Some(t)) => Ok(Async01::Ready(Some(t))),
121             task03::Poll::Pending => Ok(Async01::NotReady),
122         })
123     }
124 }
125 
126 #[cfg(feature = "sink")]
127 impl<T, Item> Sink01 for CompatSink<T, Item>
128 where
129     T: Sink03<Item> + Unpin,
130 {
131     type SinkItem = Item;
132     type SinkError = T::Error;
133 
start_send(&mut self, item: Self::SinkItem) -> StartSend01<Self::SinkItem, Self::SinkError>134     fn start_send(&mut self, item: Self::SinkItem) -> StartSend01<Self::SinkItem, Self::SinkError> {
135         with_sink_context(self, |mut inner, cx| match inner.as_mut().poll_ready(cx)? {
136             task03::Poll::Ready(()) => inner.start_send(item).map(|()| AsyncSink01::Ready),
137             task03::Poll::Pending => Ok(AsyncSink01::NotReady(item)),
138         })
139     }
140 
poll_complete(&mut self) -> Poll01<(), Self::SinkError>141     fn poll_complete(&mut self) -> Poll01<(), Self::SinkError> {
142         with_sink_context(self, |inner, cx| poll_03_to_01(inner.poll_flush(cx)))
143     }
144 
close(&mut self) -> Poll01<(), Self::SinkError>145     fn close(&mut self) -> Poll01<(), Self::SinkError> {
146         with_sink_context(self, |inner, cx| poll_03_to_01(inner.poll_close(cx)))
147     }
148 }
149 
150 #[derive(Clone)]
151 struct Current(task01::Task);
152 
153 impl Current {
new() -> Self154     fn new() -> Self {
155         Self(task01::current())
156     }
157 
as_waker(&self) -> WakerRef<'_>158     fn as_waker(&self) -> WakerRef<'_> {
159         unsafe fn ptr_to_current<'a>(ptr: *const ()) -> &'a Current {
160             unsafe { &*(ptr as *const Current) }
161         }
162         fn current_to_ptr(current: &Current) -> *const () {
163             current as *const Current as *const ()
164         }
165 
166         unsafe fn clone(ptr: *const ()) -> RawWaker {
167             // Lazily create the `Arc` only when the waker is actually cloned.
168             // FIXME: remove `transmute` when a `Waker` -> `RawWaker` conversion
169             // function is landed in `core`.
170             unsafe {
171                 mem::transmute::<task03::Waker, RawWaker>(task03::waker(Arc::new(
172                     ptr_to_current(ptr).clone(),
173                 )))
174             }
175         }
176         unsafe fn drop(_: *const ()) {}
177         unsafe fn wake(ptr: *const ()) {
178             unsafe { ptr_to_current(ptr).0.notify() }
179         }
180 
181         let ptr = current_to_ptr(self);
182         let vtable = &RawWakerVTable::new(clone, wake, wake, drop);
183         WakerRef::new_unowned(std::mem::ManuallyDrop::new(unsafe {
184             task03::Waker::from_raw(RawWaker::new(ptr, vtable))
185         }))
186     }
187 }
188 
189 impl ArcWake03 for Current {
wake_by_ref(arc_self: &Arc<Self>)190     fn wake_by_ref(arc_self: &Arc<Self>) {
191         arc_self.0.notify();
192     }
193 }
194 
with_context<T, R, F>(compat: &mut Compat<T>, f: F) -> R where T: Unpin, F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> R,195 fn with_context<T, R, F>(compat: &mut Compat<T>, f: F) -> R
196 where
197     T: Unpin,
198     F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> R,
199 {
200     let current = Current::new();
201     let waker = current.as_waker();
202     let mut cx = Context::from_waker(&waker);
203     f(Pin::new(&mut compat.inner), &mut cx)
204 }
205 
206 #[cfg(feature = "sink")]
with_sink_context<T, Item, R, F>(compat: &mut CompatSink<T, Item>, f: F) -> R where T: Unpin, F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> R,207 fn with_sink_context<T, Item, R, F>(compat: &mut CompatSink<T, Item>, f: F) -> R
208 where
209     T: Unpin,
210     F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> R,
211 {
212     let current = Current::new();
213     let waker = current.as_waker();
214     let mut cx = Context::from_waker(&waker);
215     f(Pin::new(&mut compat.inner), &mut cx)
216 }
217 
218 #[cfg(feature = "io-compat")]
219 #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
220 mod io {
221     use super::*;
222     use futures_io::{AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03};
223     use tokio_io::{AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01};
224 
poll_03_to_io<T>(x: task03::Poll<Result<T, std::io::Error>>) -> Result<T, std::io::Error>225     fn poll_03_to_io<T>(x: task03::Poll<Result<T, std::io::Error>>) -> Result<T, std::io::Error> {
226         match x {
227             task03::Poll::Ready(Ok(t)) => Ok(t),
228             task03::Poll::Pending => Err(std::io::ErrorKind::WouldBlock.into()),
229             task03::Poll::Ready(Err(e)) => Err(e),
230         }
231     }
232 
233     impl<R: AsyncRead03 + Unpin> std::io::Read for Compat<R> {
read(&mut self, buf: &mut [u8]) -> std::io::Result<usize>234         fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
235             let current = Current::new();
236             let waker = current.as_waker();
237             let mut cx = Context::from_waker(&waker);
238             poll_03_to_io(Pin::new(&mut self.inner).poll_read(&mut cx, buf))
239         }
240     }
241 
242     impl<R: AsyncRead03 + Unpin> AsyncRead01 for Compat<R> {}
243 
244     impl<W: AsyncWrite03 + Unpin> std::io::Write for Compat<W> {
write(&mut self, buf: &[u8]) -> std::io::Result<usize>245         fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
246             let current = Current::new();
247             let waker = current.as_waker();
248             let mut cx = Context::from_waker(&waker);
249             poll_03_to_io(Pin::new(&mut self.inner).poll_write(&mut cx, buf))
250         }
251 
flush(&mut self) -> std::io::Result<()>252         fn flush(&mut self) -> std::io::Result<()> {
253             let current = Current::new();
254             let waker = current.as_waker();
255             let mut cx = Context::from_waker(&waker);
256             poll_03_to_io(Pin::new(&mut self.inner).poll_flush(&mut cx))
257         }
258     }
259 
260     impl<W: AsyncWrite03 + Unpin> AsyncWrite01 for Compat<W> {
shutdown(&mut self) -> std::io::Result<Async01<()>>261         fn shutdown(&mut self) -> std::io::Result<Async01<()>> {
262             let current = Current::new();
263             let waker = current.as_waker();
264             let mut cx = Context::from_waker(&waker);
265             poll_03_to_01(Pin::new(&mut self.inner).poll_close(&mut cx))
266         }
267     }
268 }
269