1 use super::{SendError, Sender, TrySendError, UnboundedSender}; 2 use futures_core::task::{Context, Poll}; 3 use futures_sink::Sink; 4 use std::pin::Pin; 5 6 impl<T> Sink<T> for Sender<T> { 7 type Error = SendError; 8 poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>9 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 10 (*self).poll_ready(cx) 11 } 12 start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error>13 fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { 14 (*self).start_send(msg) 15 } 16 poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>17 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 18 match (*self).poll_ready(cx) { 19 Poll::Ready(Err(ref e)) if e.is_disconnected() => { 20 // If the receiver disconnected, we consider the sink to be flushed. 21 Poll::Ready(Ok(())) 22 } 23 x => x, 24 } 25 } 26 poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>27 fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 28 self.disconnect(); 29 Poll::Ready(Ok(())) 30 } 31 } 32 33 impl<T> Sink<T> for UnboundedSender<T> { 34 type Error = SendError; 35 poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>36 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 37 Self::poll_ready(&*self, cx) 38 } 39 start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error>40 fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { 41 Self::start_send(&mut *self, msg) 42 } 43 poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>44 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 45 Poll::Ready(Ok(())) 46 } 47 poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>48 fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 49 self.disconnect(); 50 Poll::Ready(Ok(())) 51 } 52 } 53 54 impl<T> Sink<T> for &UnboundedSender<T> { 55 type Error = SendError; 56 poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>57 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 58 UnboundedSender::poll_ready(*self, cx) 59 } 60 start_send(self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error>61 fn start_send(self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { 62 self.unbounded_send(msg).map_err(TrySendError::into_send_error) 63 } 64 poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>65 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 66 Poll::Ready(Ok(())) 67 } 68 poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>69 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 70 self.close_channel(); 71 Poll::Ready(Ok(())) 72 } 73 } 74