1 //! Reconnect services when they fail. 2 //! 3 //! Reconnect takes some [`MakeService`] and transforms it into a 4 //! [`Service`]. It then attempts to lazily connect and 5 //! reconnect on failure. The `Reconnect` service becomes unavailable 6 //! when the inner `MakeService::poll_ready` returns an error. When the 7 //! connection future returned from `MakeService::call` fails this will be 8 //! returned in the next call to `Reconnect::call`. This allows the user to 9 //! call the service again even if the inner `MakeService` was unable to 10 //! connect on the last call. 11 //! 12 //! [`MakeService`]: crate::make::MakeService 13 //! [`Service`]: crate::Service 14 15 mod future; 16 17 pub use future::ResponseFuture; 18 19 use crate::make::MakeService; 20 use std::fmt; 21 use std::{ 22 future::Future, 23 pin::Pin, 24 task::{Context, Poll}, 25 }; 26 use tower_service::Service; 27 use tracing::trace; 28 29 /// Reconnect to failed services. 30 pub struct Reconnect<M, Target> 31 where 32 M: Service<Target>, 33 { 34 mk_service: M, 35 state: State<M::Future, M::Response>, 36 target: Target, 37 error: Option<M::Error>, 38 } 39 40 #[derive(Debug)] 41 enum State<F, S> { 42 Idle, 43 Connecting(F), 44 Connected(S), 45 } 46 47 impl<M, Target> Reconnect<M, Target> 48 where 49 M: Service<Target>, 50 { 51 /// Lazily connect and reconnect to a [`Service`]. new<S, Request>(mk_service: M, target: Target) -> Self52 pub fn new<S, Request>(mk_service: M, target: Target) -> Self { 53 Reconnect { 54 mk_service, 55 state: State::Idle, 56 target, 57 error: None, 58 } 59 } 60 61 /// Reconnect to a already connected [`Service`]. with_connection(init_conn: M::Response, mk_service: M, target: Target) -> Self62 pub fn with_connection(init_conn: M::Response, mk_service: M, target: Target) -> Self { 63 Reconnect { 64 mk_service, 65 state: State::Connected(init_conn), 66 target, 67 error: None, 68 } 69 } 70 } 71 72 impl<M, Target, S, Request> Service<Request> for Reconnect<M, Target> 73 where 74 M: Service<Target, Response = S>, 75 S: Service<Request>, 76 M::Future: Unpin, 77 crate::BoxError: From<M::Error> + From<S::Error>, 78 Target: Clone, 79 { 80 type Response = S::Response; 81 type Error = crate::BoxError; 82 type Future = ResponseFuture<S::Future, M::Error>; 83 poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>84 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 85 loop { 86 match &mut self.state { 87 State::Idle => { 88 trace!("poll_ready; idle"); 89 match self.mk_service.poll_ready(cx) { 90 Poll::Ready(r) => r?, 91 Poll::Pending => { 92 trace!("poll_ready; MakeService not ready"); 93 return Poll::Pending; 94 } 95 } 96 97 let fut = self.mk_service.make_service(self.target.clone()); 98 self.state = State::Connecting(fut); 99 continue; 100 } 101 State::Connecting(ref mut f) => { 102 trace!("poll_ready; connecting"); 103 match Pin::new(f).poll(cx) { 104 Poll::Ready(Ok(service)) => { 105 self.state = State::Connected(service); 106 } 107 Poll::Pending => { 108 trace!("poll_ready; not ready"); 109 return Poll::Pending; 110 } 111 Poll::Ready(Err(e)) => { 112 trace!("poll_ready; error"); 113 self.state = State::Idle; 114 self.error = Some(e); 115 break; 116 } 117 } 118 } 119 State::Connected(ref mut inner) => { 120 trace!("poll_ready; connected"); 121 match inner.poll_ready(cx) { 122 Poll::Ready(Ok(())) => { 123 trace!("poll_ready; ready"); 124 return Poll::Ready(Ok(())); 125 } 126 Poll::Pending => { 127 trace!("poll_ready; not ready"); 128 return Poll::Pending; 129 } 130 Poll::Ready(Err(_)) => { 131 trace!("poll_ready; error"); 132 self.state = State::Idle; 133 } 134 } 135 } 136 } 137 } 138 139 Poll::Ready(Ok(())) 140 } 141 call(&mut self, request: Request) -> Self::Future142 fn call(&mut self, request: Request) -> Self::Future { 143 if let Some(error) = self.error.take() { 144 return ResponseFuture::error(error); 145 } 146 147 let service = match self.state { 148 State::Connected(ref mut service) => service, 149 _ => panic!("service not ready; poll_ready must be called first"), 150 }; 151 152 let fut = service.call(request); 153 ResponseFuture::new(fut) 154 } 155 } 156 157 impl<M, Target> fmt::Debug for Reconnect<M, Target> 158 where 159 M: Service<Target> + fmt::Debug, 160 M::Future: fmt::Debug, 161 M::Response: fmt::Debug, 162 Target: fmt::Debug, 163 { fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result164 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { 165 fmt.debug_struct("Reconnect") 166 .field("mk_service", &self.mk_service) 167 .field("state", &self.state) 168 .field("target", &self.target) 169 .finish() 170 } 171 } 172