1 //! Utilities used to interact with the Tower ecosystem. 2 //! 3 //! This module provides `Connect` which hook-ins into the Tower ecosystem. 4 5 use std::error::Error as StdError; 6 use std::future::Future; 7 use std::marker::PhantomData; 8 use std::pin::Pin; 9 use std::task::{Context, Poll}; 10 11 use tracing::debug; 12 13 #[cfg_attr(feature = "deprecated", allow(deprecated))] 14 use super::conn::{Builder, SendRequest}; 15 use crate::{ 16 body::HttpBody, 17 service::{MakeConnection, Service}, 18 }; 19 20 /// Creates a connection via `SendRequest`. 21 /// 22 /// This accepts a `hyper::client::conn::Builder` and provides 23 /// a `MakeService` implementation to create connections from some 24 /// target `T`. 25 #[derive(Debug)] 26 pub struct Connect<C, B, T> { 27 inner: C, 28 #[cfg_attr(feature = "deprecated", allow(deprecated))] 29 builder: Builder, 30 _pd: PhantomData<fn(T, B)>, 31 } 32 33 impl<C, B, T> Connect<C, B, T> { 34 /// Create a new `Connect` with some inner connector `C` and a connection 35 /// builder. 36 #[cfg_attr(feature = "deprecated", allow(deprecated))] new(inner: C, builder: Builder) -> Self37 pub fn new(inner: C, builder: Builder) -> Self { 38 Self { 39 inner, 40 builder, 41 _pd: PhantomData, 42 } 43 } 44 } 45 46 impl<C, B, T> Service<T> for Connect<C, B, T> 47 where 48 C: MakeConnection<T>, 49 C::Connection: Unpin + Send + 'static, 50 C::Future: Send + 'static, 51 C::Error: Into<Box<dyn StdError + Send + Sync>> + Send, 52 B: HttpBody + Unpin + Send + 'static, 53 B::Data: Send + Unpin, 54 B::Error: Into<Box<dyn StdError + Send + Sync>>, 55 { 56 #[cfg_attr(feature = "deprecated", allow(deprecated))] 57 type Response = SendRequest<B>; 58 type Error = crate::Error; 59 type Future = 60 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>; 61 poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>62 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 63 self.inner 64 .poll_ready(cx) 65 .map_err(|e| crate::Error::new(crate::error::Kind::Connect).with(e.into())) 66 } 67 call(&mut self, req: T) -> Self::Future68 fn call(&mut self, req: T) -> Self::Future { 69 let builder = self.builder.clone(); 70 let io = self.inner.make_connection(req); 71 72 let fut = async move { 73 match io.await { 74 Ok(io) => match builder.handshake(io).await { 75 Ok((sr, conn)) => { 76 #[cfg_attr(feature = "deprecated", allow(deprecated))] 77 builder.exec.execute(async move { 78 if let Err(e) = conn.await { 79 debug!("connection error: {:?}", e); 80 } 81 }); 82 Ok(sr) 83 } 84 Err(e) => Err(e), 85 }, 86 Err(e) => { 87 let err = crate::Error::new(crate::error::Kind::Connect).with(e.into()); 88 Err(err) 89 } 90 } 91 }; 92 93 Box::pin(fut) 94 } 95 } 96