1 //! Utilities used to interact with the Tower ecosystem.
2 //!
3 //! This module provides `Connect` which hook-ins into the Tower ecosystem.
4 
5 use std::error::Error as StdError;
6 use std::future::Future;
7 use std::marker::PhantomData;
8 use std::pin::Pin;
9 use std::task::{Context, Poll};
10 
11 use tracing::debug;
12 
13 #[cfg_attr(feature = "deprecated", allow(deprecated))]
14 use super::conn::{Builder, SendRequest};
15 use crate::{
16     body::HttpBody,
17     service::{MakeConnection, Service},
18 };
19 
20 /// Creates a connection via `SendRequest`.
21 ///
22 /// This accepts a `hyper::client::conn::Builder` and provides
23 /// a `MakeService` implementation to create connections from some
24 /// target `T`.
25 #[derive(Debug)]
26 pub struct Connect<C, B, T> {
27     inner: C,
28     #[cfg_attr(feature = "deprecated", allow(deprecated))]
29     builder: Builder,
30     _pd: PhantomData<fn(T, B)>,
31 }
32 
33 impl<C, B, T> Connect<C, B, T> {
34     /// Create a new `Connect` with some inner connector `C` and a connection
35     /// builder.
36     #[cfg_attr(feature = "deprecated", allow(deprecated))]
new(inner: C, builder: Builder) -> Self37     pub fn new(inner: C, builder: Builder) -> Self {
38         Self {
39             inner,
40             builder,
41             _pd: PhantomData,
42         }
43     }
44 }
45 
46 impl<C, B, T> Service<T> for Connect<C, B, T>
47 where
48     C: MakeConnection<T>,
49     C::Connection: Unpin + Send + 'static,
50     C::Future: Send + 'static,
51     C::Error: Into<Box<dyn StdError + Send + Sync>> + Send,
52     B: HttpBody + Unpin + Send + 'static,
53     B::Data: Send + Unpin,
54     B::Error: Into<Box<dyn StdError + Send + Sync>>,
55 {
56     #[cfg_attr(feature = "deprecated", allow(deprecated))]
57     type Response = SendRequest<B>;
58     type Error = crate::Error;
59     type Future =
60         Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
61 
poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>62     fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
63         self.inner
64             .poll_ready(cx)
65             .map_err(|e| crate::Error::new(crate::error::Kind::Connect).with(e.into()))
66     }
67 
call(&mut self, req: T) -> Self::Future68     fn call(&mut self, req: T) -> Self::Future {
69         let builder = self.builder.clone();
70         let io = self.inner.make_connection(req);
71 
72         let fut = async move {
73             match io.await {
74                 Ok(io) => match builder.handshake(io).await {
75                     Ok((sr, conn)) => {
76                         #[cfg_attr(feature = "deprecated", allow(deprecated))]
77                         builder.exec.execute(async move {
78                             if let Err(e) = conn.await {
79                                 debug!("connection error: {:?}", e);
80                             }
81                         });
82                         Ok(sr)
83                     }
84                     Err(e) => Err(e),
85                 },
86                 Err(e) => {
87                     let err = crate::Error::new(crate::error::Kind::Connect).with(e.into());
88                     Err(err)
89                 }
90             }
91         };
92 
93         Box::pin(fut)
94     }
95 }
96