1 //! Reconnect services when they fail.
2 //!
3 //! Reconnect takes some [`MakeService`] and transforms it into a
4 //! [`Service`]. It then attempts to lazily connect and
5 //! reconnect on failure. The `Reconnect` service becomes unavailable
6 //! when the inner `MakeService::poll_ready` returns an error. When the
7 //! connection future returned from `MakeService::call` fails this will be
8 //! returned in the next call to `Reconnect::call`. This allows the user to
9 //! call the service again even if the inner `MakeService` was unable to
10 //! connect on the last call.
11 //!
12 //! [`MakeService`]: crate::make::MakeService
13 //! [`Service`]: crate::Service
14 
15 mod future;
16 
17 pub use future::ResponseFuture;
18 
19 use crate::make::MakeService;
20 use std::fmt;
21 use std::{
22     future::Future,
23     pin::Pin,
24     task::{Context, Poll},
25 };
26 use tower_service::Service;
27 use tracing::trace;
28 
29 /// Reconnect to failed services.
30 pub struct Reconnect<M, Target>
31 where
32     M: Service<Target>,
33 {
34     mk_service: M,
35     state: State<M::Future, M::Response>,
36     target: Target,
37     error: Option<M::Error>,
38 }
39 
40 #[derive(Debug)]
41 enum State<F, S> {
42     Idle,
43     Connecting(F),
44     Connected(S),
45 }
46 
47 impl<M, Target> Reconnect<M, Target>
48 where
49     M: Service<Target>,
50 {
51     /// Lazily connect and reconnect to a [`Service`].
new<S, Request>(mk_service: M, target: Target) -> Self52     pub fn new<S, Request>(mk_service: M, target: Target) -> Self {
53         Reconnect {
54             mk_service,
55             state: State::Idle,
56             target,
57             error: None,
58         }
59     }
60 
61     /// Reconnect to a already connected [`Service`].
with_connection(init_conn: M::Response, mk_service: M, target: Target) -> Self62     pub fn with_connection(init_conn: M::Response, mk_service: M, target: Target) -> Self {
63         Reconnect {
64             mk_service,
65             state: State::Connected(init_conn),
66             target,
67             error: None,
68         }
69     }
70 }
71 
72 impl<M, Target, S, Request> Service<Request> for Reconnect<M, Target>
73 where
74     M: Service<Target, Response = S>,
75     S: Service<Request>,
76     M::Future: Unpin,
77     crate::BoxError: From<M::Error> + From<S::Error>,
78     Target: Clone,
79 {
80     type Response = S::Response;
81     type Error = crate::BoxError;
82     type Future = ResponseFuture<S::Future, M::Error>;
83 
poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>84     fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
85         loop {
86             match &mut self.state {
87                 State::Idle => {
88                     trace!("poll_ready; idle");
89                     match self.mk_service.poll_ready(cx) {
90                         Poll::Ready(r) => r?,
91                         Poll::Pending => {
92                             trace!("poll_ready; MakeService not ready");
93                             return Poll::Pending;
94                         }
95                     }
96 
97                     let fut = self.mk_service.make_service(self.target.clone());
98                     self.state = State::Connecting(fut);
99                     continue;
100                 }
101                 State::Connecting(ref mut f) => {
102                     trace!("poll_ready; connecting");
103                     match Pin::new(f).poll(cx) {
104                         Poll::Ready(Ok(service)) => {
105                             self.state = State::Connected(service);
106                         }
107                         Poll::Pending => {
108                             trace!("poll_ready; not ready");
109                             return Poll::Pending;
110                         }
111                         Poll::Ready(Err(e)) => {
112                             trace!("poll_ready; error");
113                             self.state = State::Idle;
114                             self.error = Some(e);
115                             break;
116                         }
117                     }
118                 }
119                 State::Connected(ref mut inner) => {
120                     trace!("poll_ready; connected");
121                     match inner.poll_ready(cx) {
122                         Poll::Ready(Ok(())) => {
123                             trace!("poll_ready; ready");
124                             return Poll::Ready(Ok(()));
125                         }
126                         Poll::Pending => {
127                             trace!("poll_ready; not ready");
128                             return Poll::Pending;
129                         }
130                         Poll::Ready(Err(_)) => {
131                             trace!("poll_ready; error");
132                             self.state = State::Idle;
133                         }
134                     }
135                 }
136             }
137         }
138 
139         Poll::Ready(Ok(()))
140     }
141 
call(&mut self, request: Request) -> Self::Future142     fn call(&mut self, request: Request) -> Self::Future {
143         if let Some(error) = self.error.take() {
144             return ResponseFuture::error(error);
145         }
146 
147         let service = match self.state {
148             State::Connected(ref mut service) => service,
149             _ => panic!("service not ready; poll_ready must be called first"),
150         };
151 
152         let fut = service.call(request);
153         ResponseFuture::new(fut)
154     }
155 }
156 
157 impl<M, Target> fmt::Debug for Reconnect<M, Target>
158 where
159     M: Service<Target> + fmt::Debug,
160     M::Future: fmt::Debug,
161     M::Response: fmt::Debug,
162     Target: fmt::Debug,
163 {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result164     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
165         fmt.debug_struct("Reconnect")
166             .field("mk_service", &self.mk_service)
167             .field("state", &self.state)
168             .field("target", &self.target)
169             .finish()
170     }
171 }
172