1 use crate::task::{self as task03, ArcWake as ArcWake03, WakerRef};
2 use futures_01::{
3 task as task01, Async as Async01, Future as Future01, Poll as Poll01, Stream as Stream01,
4 };
5 #[cfg(feature = "sink")]
6 use futures_01::{AsyncSink as AsyncSink01, Sink as Sink01, StartSend as StartSend01};
7 use futures_core::{
8 future::TryFuture as TryFuture03,
9 stream::TryStream as TryStream03,
10 task::{RawWaker, RawWakerVTable},
11 };
12 #[cfg(feature = "sink")]
13 use futures_sink::Sink as Sink03;
14 #[cfg(feature = "sink")]
15 use std::marker::PhantomData;
16 use std::{mem, pin::Pin, sync::Arc, task::Context};
17
18 #[allow(clippy::too_long_first_doc_paragraph)] // clippy bug, see https://github.com/rust-lang/rust-clippy/issues/13315
19 /// Converts a futures 0.3 [`TryFuture`](futures_core::future::TryFuture) or
20 /// [`TryStream`](futures_core::stream::TryStream) into a futures 0.1
21 /// [`Future`](futures_01::future::Future) or
22 /// [`Stream`](futures_01::stream::Stream).
23 #[derive(Debug, Clone, Copy)]
24 #[must_use = "futures do nothing unless you `.await` or poll them"]
25 pub struct Compat<T> {
26 pub(crate) inner: T,
27 }
28
29 /// Converts a futures 0.3 [`Sink`](futures_sink::Sink) into a futures 0.1
30 /// [`Sink`](futures_01::sink::Sink).
31 #[cfg(feature = "sink")]
32 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
33 #[derive(Debug)]
34 #[must_use = "sinks do nothing unless polled"]
35 pub struct CompatSink<T, Item> {
36 inner: T,
37 _phantom: PhantomData<fn(Item)>,
38 }
39
40 impl<T> Compat<T> {
41 /// Creates a new [`Compat`].
42 ///
43 /// For types which implement appropriate futures `0.3`
44 /// traits, the result will be a type which implements
45 /// the corresponding futures 0.1 type.
new(inner: T) -> Self46 pub fn new(inner: T) -> Self {
47 Self { inner }
48 }
49
50 /// Get a reference to 0.3 Future, Stream, AsyncRead, or AsyncWrite object
51 /// contained within.
get_ref(&self) -> &T52 pub fn get_ref(&self) -> &T {
53 &self.inner
54 }
55
56 /// Get a mutable reference to 0.3 Future, Stream, AsyncRead, or AsyncWrite object
57 /// contained within.
get_mut(&mut self) -> &mut T58 pub fn get_mut(&mut self) -> &mut T {
59 &mut self.inner
60 }
61
62 /// Returns the inner item.
into_inner(self) -> T63 pub fn into_inner(self) -> T {
64 self.inner
65 }
66 }
67
68 #[cfg(feature = "sink")]
69 impl<T, Item> CompatSink<T, Item> {
70 /// Creates a new [`CompatSink`].
new(inner: T) -> Self71 pub fn new(inner: T) -> Self {
72 Self { inner, _phantom: PhantomData }
73 }
74
75 /// Get a reference to 0.3 Sink contained within.
get_ref(&self) -> &T76 pub fn get_ref(&self) -> &T {
77 &self.inner
78 }
79
80 /// Get a mutable reference to 0.3 Sink contained within.
get_mut(&mut self) -> &mut T81 pub fn get_mut(&mut self) -> &mut T {
82 &mut self.inner
83 }
84
85 /// Returns the inner item.
into_inner(self) -> T86 pub fn into_inner(self) -> T {
87 self.inner
88 }
89 }
90
poll_03_to_01<T, E>(x: task03::Poll<Result<T, E>>) -> Result<Async01<T>, E>91 fn poll_03_to_01<T, E>(x: task03::Poll<Result<T, E>>) -> Result<Async01<T>, E> {
92 match x? {
93 task03::Poll::Ready(t) => Ok(Async01::Ready(t)),
94 task03::Poll::Pending => Ok(Async01::NotReady),
95 }
96 }
97
98 impl<Fut> Future01 for Compat<Fut>
99 where
100 Fut: TryFuture03 + Unpin,
101 {
102 type Item = Fut::Ok;
103 type Error = Fut::Error;
104
poll(&mut self) -> Poll01<Self::Item, Self::Error>105 fn poll(&mut self) -> Poll01<Self::Item, Self::Error> {
106 with_context(self, |inner, cx| poll_03_to_01(inner.try_poll(cx)))
107 }
108 }
109
110 impl<St> Stream01 for Compat<St>
111 where
112 St: TryStream03 + Unpin,
113 {
114 type Item = St::Ok;
115 type Error = St::Error;
116
poll(&mut self) -> Poll01<Option<Self::Item>, Self::Error>117 fn poll(&mut self) -> Poll01<Option<Self::Item>, Self::Error> {
118 with_context(self, |inner, cx| match inner.try_poll_next(cx)? {
119 task03::Poll::Ready(None) => Ok(Async01::Ready(None)),
120 task03::Poll::Ready(Some(t)) => Ok(Async01::Ready(Some(t))),
121 task03::Poll::Pending => Ok(Async01::NotReady),
122 })
123 }
124 }
125
126 #[cfg(feature = "sink")]
127 impl<T, Item> Sink01 for CompatSink<T, Item>
128 where
129 T: Sink03<Item> + Unpin,
130 {
131 type SinkItem = Item;
132 type SinkError = T::Error;
133
start_send(&mut self, item: Self::SinkItem) -> StartSend01<Self::SinkItem, Self::SinkError>134 fn start_send(&mut self, item: Self::SinkItem) -> StartSend01<Self::SinkItem, Self::SinkError> {
135 with_sink_context(self, |mut inner, cx| match inner.as_mut().poll_ready(cx)? {
136 task03::Poll::Ready(()) => inner.start_send(item).map(|()| AsyncSink01::Ready),
137 task03::Poll::Pending => Ok(AsyncSink01::NotReady(item)),
138 })
139 }
140
poll_complete(&mut self) -> Poll01<(), Self::SinkError>141 fn poll_complete(&mut self) -> Poll01<(), Self::SinkError> {
142 with_sink_context(self, |inner, cx| poll_03_to_01(inner.poll_flush(cx)))
143 }
144
close(&mut self) -> Poll01<(), Self::SinkError>145 fn close(&mut self) -> Poll01<(), Self::SinkError> {
146 with_sink_context(self, |inner, cx| poll_03_to_01(inner.poll_close(cx)))
147 }
148 }
149
150 #[derive(Clone)]
151 struct Current(task01::Task);
152
153 impl Current {
new() -> Self154 fn new() -> Self {
155 Self(task01::current())
156 }
157
as_waker(&self) -> WakerRef<'_>158 fn as_waker(&self) -> WakerRef<'_> {
159 unsafe fn ptr_to_current<'a>(ptr: *const ()) -> &'a Current {
160 unsafe { &*(ptr as *const Current) }
161 }
162 fn current_to_ptr(current: &Current) -> *const () {
163 current as *const Current as *const ()
164 }
165
166 unsafe fn clone(ptr: *const ()) -> RawWaker {
167 // Lazily create the `Arc` only when the waker is actually cloned.
168 // FIXME: remove `transmute` when a `Waker` -> `RawWaker` conversion
169 // function is landed in `core`.
170 unsafe {
171 mem::transmute::<task03::Waker, RawWaker>(task03::waker(Arc::new(
172 ptr_to_current(ptr).clone(),
173 )))
174 }
175 }
176 unsafe fn drop(_: *const ()) {}
177 unsafe fn wake(ptr: *const ()) {
178 unsafe { ptr_to_current(ptr).0.notify() }
179 }
180
181 let ptr = current_to_ptr(self);
182 let vtable = &RawWakerVTable::new(clone, wake, wake, drop);
183 WakerRef::new_unowned(std::mem::ManuallyDrop::new(unsafe {
184 task03::Waker::from_raw(RawWaker::new(ptr, vtable))
185 }))
186 }
187 }
188
189 impl ArcWake03 for Current {
wake_by_ref(arc_self: &Arc<Self>)190 fn wake_by_ref(arc_self: &Arc<Self>) {
191 arc_self.0.notify();
192 }
193 }
194
with_context<T, R, F>(compat: &mut Compat<T>, f: F) -> R where T: Unpin, F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> R,195 fn with_context<T, R, F>(compat: &mut Compat<T>, f: F) -> R
196 where
197 T: Unpin,
198 F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> R,
199 {
200 let current = Current::new();
201 let waker = current.as_waker();
202 let mut cx = Context::from_waker(&waker);
203 f(Pin::new(&mut compat.inner), &mut cx)
204 }
205
206 #[cfg(feature = "sink")]
with_sink_context<T, Item, R, F>(compat: &mut CompatSink<T, Item>, f: F) -> R where T: Unpin, F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> R,207 fn with_sink_context<T, Item, R, F>(compat: &mut CompatSink<T, Item>, f: F) -> R
208 where
209 T: Unpin,
210 F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> R,
211 {
212 let current = Current::new();
213 let waker = current.as_waker();
214 let mut cx = Context::from_waker(&waker);
215 f(Pin::new(&mut compat.inner), &mut cx)
216 }
217
218 #[cfg(feature = "io-compat")]
219 #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
220 mod io {
221 use super::*;
222 use futures_io::{AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03};
223 use tokio_io::{AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01};
224
poll_03_to_io<T>(x: task03::Poll<Result<T, std::io::Error>>) -> Result<T, std::io::Error>225 fn poll_03_to_io<T>(x: task03::Poll<Result<T, std::io::Error>>) -> Result<T, std::io::Error> {
226 match x {
227 task03::Poll::Ready(Ok(t)) => Ok(t),
228 task03::Poll::Pending => Err(std::io::ErrorKind::WouldBlock.into()),
229 task03::Poll::Ready(Err(e)) => Err(e),
230 }
231 }
232
233 impl<R: AsyncRead03 + Unpin> std::io::Read for Compat<R> {
read(&mut self, buf: &mut [u8]) -> std::io::Result<usize>234 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
235 let current = Current::new();
236 let waker = current.as_waker();
237 let mut cx = Context::from_waker(&waker);
238 poll_03_to_io(Pin::new(&mut self.inner).poll_read(&mut cx, buf))
239 }
240 }
241
242 impl<R: AsyncRead03 + Unpin> AsyncRead01 for Compat<R> {}
243
244 impl<W: AsyncWrite03 + Unpin> std::io::Write for Compat<W> {
write(&mut self, buf: &[u8]) -> std::io::Result<usize>245 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
246 let current = Current::new();
247 let waker = current.as_waker();
248 let mut cx = Context::from_waker(&waker);
249 poll_03_to_io(Pin::new(&mut self.inner).poll_write(&mut cx, buf))
250 }
251
flush(&mut self) -> std::io::Result<()>252 fn flush(&mut self) -> std::io::Result<()> {
253 let current = Current::new();
254 let waker = current.as_waker();
255 let mut cx = Context::from_waker(&waker);
256 poll_03_to_io(Pin::new(&mut self.inner).poll_flush(&mut cx))
257 }
258 }
259
260 impl<W: AsyncWrite03 + Unpin> AsyncWrite01 for Compat<W> {
shutdown(&mut self) -> std::io::Result<Async01<()>>261 fn shutdown(&mut self) -> std::io::Result<Async01<()>> {
262 let current = Current::new();
263 let waker = current.as_waker();
264 let mut cx = Context::from_waker(&waker);
265 poll_03_to_01(Pin::new(&mut self.inner).poll_close(&mut cx))
266 }
267 }
268 }
269