1  //! HTTP/1 Server Connections
2  
3  use std::error::Error as StdError;
4  use std::fmt;
5  use std::future::Future;
6  use std::marker::Unpin;
7  use std::pin::Pin;
8  use std::task::{Context, Poll};
9  use std::time::Duration;
10  
11  use bytes::Bytes;
12  use tokio::io::{AsyncRead, AsyncWrite};
13  
14  use crate::body::{Body as IncomingBody, HttpBody as Body};
15  use crate::proto;
16  use crate::service::HttpService;
17  
18  type Http1Dispatcher<T, B, S> = proto::h1::Dispatcher<
19      proto::h1::dispatch::Server<S, IncomingBody>,
20      B,
21      T,
22      proto::ServerTransaction,
23  >;
24  
25  pin_project_lite::pin_project! {
26      /// A future binding an http1 connection with a Service.
27      ///
28      /// Polling this future will drive HTTP forward.
29      #[must_use = "futures do nothing unless polled"]
30      pub struct Connection<T, S>
31      where
32          S: HttpService<IncomingBody>,
33      {
34          conn: Http1Dispatcher<T, S::ResBody, S>,
35      }
36  }
37  
38  /// A configuration builder for HTTP/1 server connections.
39  #[derive(Clone, Debug)]
40  pub struct Builder {
41      h1_half_close: bool,
42      h1_keep_alive: bool,
43      h1_title_case_headers: bool,
44      h1_preserve_header_case: bool,
45      h1_header_read_timeout: Option<Duration>,
46      h1_writev: Option<bool>,
47      max_buf_size: Option<usize>,
48      pipeline_flush: bool,
49  }
50  
51  /// Deconstructed parts of a `Connection`.
52  ///
53  /// This allows taking apart a `Connection` at a later time, in order to
54  /// reclaim the IO object, and additional related pieces.
55  #[derive(Debug)]
56  pub struct Parts<T, S> {
57      /// The original IO object used in the handshake.
58      pub io: T,
59      /// A buffer of bytes that have been read but not processed as HTTP.
60      ///
61      /// If the client sent additional bytes after its last request, and
62      /// this connection "ended" with an upgrade, the read buffer will contain
63      /// those bytes.
64      ///
65      /// You will want to check for any existing bytes if you plan to continue
66      /// communicating on the IO object.
67      pub read_buf: Bytes,
68      /// The `Service` used to serve this connection.
69      pub service: S,
70      _inner: (),
71  }
72  
73  // ===== impl Connection =====
74  
75  impl<I, S> fmt::Debug for Connection<I, S>
76  where
77      S: HttpService<IncomingBody>,
78  {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result79      fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
80          f.debug_struct("Connection").finish()
81      }
82  }
83  
84  impl<I, B, S> Connection<I, S>
85  where
86      S: HttpService<IncomingBody, ResBody = B>,
87      S::Error: Into<Box<dyn StdError + Send + Sync>>,
88      I: AsyncRead + AsyncWrite + Unpin,
89      B: Body + 'static,
90      B::Error: Into<Box<dyn StdError + Send + Sync>>,
91  {
92      /// Start a graceful shutdown process for this connection.
93      ///
94      /// This `Connection` should continue to be polled until shutdown
95      /// can finish.
96      ///
97      /// # Note
98      ///
99      /// This should only be called while the `Connection` future is still
100      /// pending. If called after `Connection::poll` has resolved, this does
101      /// nothing.
graceful_shutdown(mut self: Pin<&mut Self>)102      pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
103          self.conn.disable_keep_alive();
104      }
105  
106      /// Return the inner IO object, and additional information.
107      ///
108      /// If the IO object has been "rewound" the io will not contain those bytes rewound.
109      /// This should only be called after `poll_without_shutdown` signals
110      /// that the connection is "done". Otherwise, it may not have finished
111      /// flushing all necessary HTTP bytes.
112      ///
113      /// # Panics
114      /// This method will panic if this connection is using an h2 protocol.
into_parts(self) -> Parts<I, S>115      pub fn into_parts(self) -> Parts<I, S> {
116          let (io, read_buf, dispatch) = self.conn.into_inner();
117          Parts {
118              io,
119              read_buf,
120              service: dispatch.into_service(),
121              _inner: (),
122          }
123      }
124  
125      /// Poll the connection for completion, but without calling `shutdown`
126      /// on the underlying IO.
127      ///
128      /// This is useful to allow running a connection while doing an HTTP
129      /// upgrade. Once the upgrade is completed, the connection would be "done",
130      /// but it is not desired to actually shutdown the IO object. Instead you
131      /// would take it back using `into_parts`.
poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> where S: Unpin, S::Future: Unpin, B: Unpin,132      pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>>
133      where
134          S: Unpin,
135          S::Future: Unpin,
136          B: Unpin,
137      {
138          self.conn.poll_without_shutdown(cx)
139      }
140  
141      /// Prevent shutdown of the underlying IO object at the end of service the request,
142      /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
143      ///
144      /// # Error
145      ///
146      /// This errors if the underlying connection protocol is not HTTP/1.
without_shutdown(self) -> impl Future<Output = crate::Result<Parts<I, S>>> where S: Unpin, S::Future: Unpin, B: Unpin,147      pub fn without_shutdown(self) -> impl Future<Output = crate::Result<Parts<I, S>>>
148      where
149          S: Unpin,
150          S::Future: Unpin,
151          B: Unpin,
152      {
153          let mut zelf = Some(self);
154          futures_util::future::poll_fn(move |cx| {
155              ready!(zelf.as_mut().unwrap().conn.poll_without_shutdown(cx))?;
156              Poll::Ready(Ok(zelf.take().unwrap().into_parts()))
157          })
158      }
159  
160      /// Enable this connection to support higher-level HTTP upgrades.
161      ///
162      /// See [the `upgrade` module](crate::upgrade) for more.
with_upgrades(self) -> upgrades::UpgradeableConnection<I, S> where I: Send,163      pub fn with_upgrades(self) -> upgrades::UpgradeableConnection<I, S>
164      where
165          I: Send,
166      {
167          upgrades::UpgradeableConnection { inner: Some(self) }
168      }
169  }
170  
171  impl<I, B, S> Future for Connection<I, S>
172  where
173      S: HttpService<IncomingBody, ResBody = B>,
174      S::Error: Into<Box<dyn StdError + Send + Sync>>,
175      I: AsyncRead + AsyncWrite + Unpin + 'static,
176      B: Body + 'static,
177      B::Error: Into<Box<dyn StdError + Send + Sync>>,
178  {
179      type Output = crate::Result<()>;
180  
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>181      fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
182          match ready!(Pin::new(&mut self.conn).poll(cx)) {
183              Ok(done) => {
184                  match done {
185                      proto::Dispatched::Shutdown => {}
186                      proto::Dispatched::Upgrade(pending) => {
187                          // With no `Send` bound on `I`, we can't try to do
188                          // upgrades here. In case a user was trying to use
189                          // `Body::on_upgrade` with this API, send a special
190                          // error letting them know about that.
191                          pending.manual();
192                      }
193                  };
194                  return Poll::Ready(Ok(()));
195              }
196              Err(e) => Poll::Ready(Err(e)),
197          }
198      }
199  }
200  
201  // ===== impl Builder =====
202  
203  impl Builder {
204      /// Create a new connection builder.
new() -> Self205      pub fn new() -> Self {
206          Self {
207              h1_half_close: false,
208              h1_keep_alive: true,
209              h1_title_case_headers: false,
210              h1_preserve_header_case: false,
211              h1_header_read_timeout: None,
212              h1_writev: None,
213              max_buf_size: None,
214              pipeline_flush: false,
215          }
216      }
217      /// Set whether HTTP/1 connections should support half-closures.
218      ///
219      /// Clients can chose to shutdown their write-side while waiting
220      /// for the server to respond. Setting this to `true` will
221      /// prevent closing the connection immediately if `read`
222      /// detects an EOF in the middle of a request.
223      ///
224      /// Default is `false`.
half_close(&mut self, val: bool) -> &mut Self225      pub fn half_close(&mut self, val: bool) -> &mut Self {
226          self.h1_half_close = val;
227          self
228      }
229  
230      /// Enables or disables HTTP/1 keep-alive.
231      ///
232      /// Default is true.
keep_alive(&mut self, val: bool) -> &mut Self233      pub fn keep_alive(&mut self, val: bool) -> &mut Self {
234          self.h1_keep_alive = val;
235          self
236      }
237  
238      /// Set whether HTTP/1 connections will write header names as title case at
239      /// the socket level.
240      ///
241      /// Default is false.
title_case_headers(&mut self, enabled: bool) -> &mut Self242      pub fn title_case_headers(&mut self, enabled: bool) -> &mut Self {
243          self.h1_title_case_headers = enabled;
244          self
245      }
246  
247      /// Set whether to support preserving original header cases.
248      ///
249      /// Currently, this will record the original cases received, and store them
250      /// in a private extension on the `Request`. It will also look for and use
251      /// such an extension in any provided `Response`.
252      ///
253      /// Since the relevant extension is still private, there is no way to
254      /// interact with the original cases. The only effect this can have now is
255      /// to forward the cases in a proxy-like fashion.
256      ///
257      /// Default is false.
preserve_header_case(&mut self, enabled: bool) -> &mut Self258      pub fn preserve_header_case(&mut self, enabled: bool) -> &mut Self {
259          self.h1_preserve_header_case = enabled;
260          self
261      }
262  
263      /// Set a timeout for reading client request headers. If a client does not
264      /// transmit the entire header within this time, the connection is closed.
265      ///
266      /// Default is None.
header_read_timeout(&mut self, read_timeout: Duration) -> &mut Self267      pub fn header_read_timeout(&mut self, read_timeout: Duration) -> &mut Self {
268          self.h1_header_read_timeout = Some(read_timeout);
269          self
270      }
271  
272      /// Set whether HTTP/1 connections should try to use vectored writes,
273      /// or always flatten into a single buffer.
274      ///
275      /// Note that setting this to false may mean more copies of body data,
276      /// but may also improve performance when an IO transport doesn't
277      /// support vectored writes well, such as most TLS implementations.
278      ///
279      /// Setting this to true will force hyper to use queued strategy
280      /// which may eliminate unnecessary cloning on some TLS backends
281      ///
282      /// Default is `auto`. In this mode hyper will try to guess which
283      /// mode to use
writev(&mut self, val: bool) -> &mut Self284      pub fn writev(&mut self, val: bool) -> &mut Self {
285          self.h1_writev = Some(val);
286          self
287      }
288  
289      /// Set the maximum buffer size for the connection.
290      ///
291      /// Default is ~400kb.
292      ///
293      /// # Panics
294      ///
295      /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
max_buf_size(&mut self, max: usize) -> &mut Self296      pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
297          assert!(
298              max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE,
299              "the max_buf_size cannot be smaller than the minimum that h1 specifies."
300          );
301          self.max_buf_size = Some(max);
302          self
303      }
304  
305      /// Aggregates flushes to better support pipelined responses.
306      ///
307      /// Experimental, may have bugs.
308      ///
309      /// Default is false.
pipeline_flush(&mut self, enabled: bool) -> &mut Self310      pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self {
311          self.pipeline_flush = enabled;
312          self
313      }
314  
315      // /// Set the timer used in background tasks.
316      // pub fn timer<M>(&mut self, timer: M) -> &mut Self
317      // where
318      //     M: Timer + Send + Sync + 'static,
319      // {
320      //     self.timer = Time::Timer(Arc::new(timer));
321      //     self
322      // }
323  
324      /// Bind a connection together with a [`Service`](crate::service::Service).
325      ///
326      /// This returns a Future that must be polled in order for HTTP to be
327      /// driven on the connection.
328      ///
329      /// # Example
330      ///
331      /// ```
332      /// # use hyper::{Body as Incoming, Request, Response};
333      /// # use hyper::service::Service;
334      /// # use hyper::server::conn::http1::Builder;
335      /// # use tokio::io::{AsyncRead, AsyncWrite};
336      /// # async fn run<I, S>(some_io: I, some_service: S)
337      /// # where
338      /// #     I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
339      /// #     S: Service<hyper::Request<Incoming>, Response=hyper::Response<Incoming>> + Send + 'static,
340      /// #     S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
341      /// #     S::Future: Send,
342      /// # {
343      /// let http = Builder::new();
344      /// let conn = http.serve_connection(some_io, some_service);
345      ///
346      /// if let Err(e) = conn.await {
347      ///     eprintln!("server connection error: {}", e);
348      /// }
349      /// # }
350      /// # fn main() {}
351      /// ```
serve_connection<I, S>(&self, io: I, service: S) -> Connection<I, S> where S: HttpService<IncomingBody>, S::Error: Into<Box<dyn StdError + Send + Sync>>, S::ResBody: 'static, <S::ResBody as Body>::Error: Into<Box<dyn StdError + Send + Sync>>, I: AsyncRead + AsyncWrite + Unpin,352      pub fn serve_connection<I, S>(&self, io: I, service: S) -> Connection<I, S>
353      where
354          S: HttpService<IncomingBody>,
355          S::Error: Into<Box<dyn StdError + Send + Sync>>,
356          S::ResBody: 'static,
357          <S::ResBody as Body>::Error: Into<Box<dyn StdError + Send + Sync>>,
358          I: AsyncRead + AsyncWrite + Unpin,
359      {
360          let mut conn = proto::Conn::new(io);
361          if !self.h1_keep_alive {
362              conn.disable_keep_alive();
363          }
364          if self.h1_half_close {
365              conn.set_allow_half_close();
366          }
367          if self.h1_title_case_headers {
368              conn.set_title_case_headers();
369          }
370          if self.h1_preserve_header_case {
371              conn.set_preserve_header_case();
372          }
373          if let Some(header_read_timeout) = self.h1_header_read_timeout {
374              conn.set_http1_header_read_timeout(header_read_timeout);
375          }
376          if let Some(writev) = self.h1_writev {
377              if writev {
378                  conn.set_write_strategy_queue();
379              } else {
380                  conn.set_write_strategy_flatten();
381              }
382          }
383          conn.set_flush_pipeline(self.pipeline_flush);
384          if let Some(max) = self.max_buf_size {
385              conn.set_max_buf_size(max);
386          }
387          let sd = proto::h1::dispatch::Server::new(service);
388          let proto = proto::h1::Dispatcher::new(sd, conn);
389          Connection { conn: proto }
390      }
391  }
392  
393  mod upgrades {
394      use crate::upgrade::Upgraded;
395  
396      use super::*;
397  
398      // A future binding a connection with a Service with Upgrade support.
399      //
400      // This type is unnameable outside the crate.
401      #[must_use = "futures do nothing unless polled"]
402      #[allow(missing_debug_implementations)]
403      pub struct UpgradeableConnection<T, S>
404      where
405          S: HttpService<IncomingBody>,
406      {
407          pub(super) inner: Option<Connection<T, S>>,
408      }
409  
410      impl<I, B, S> UpgradeableConnection<I, S>
411      where
412          S: HttpService<IncomingBody, ResBody = B>,
413          S::Error: Into<Box<dyn StdError + Send + Sync>>,
414          I: AsyncRead + AsyncWrite + Unpin,
415          B: Body + 'static,
416          B::Error: Into<Box<dyn StdError + Send + Sync>>,
417      {
418          /// Start a graceful shutdown process for this connection.
419          ///
420          /// This `Connection` should continue to be polled until shutdown
421          /// can finish.
graceful_shutdown(mut self: Pin<&mut Self>)422          pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
423              Pin::new(self.inner.as_mut().unwrap()).graceful_shutdown()
424          }
425      }
426  
427      impl<I, B, S> Future for UpgradeableConnection<I, S>
428      where
429          S: HttpService<IncomingBody, ResBody = B>,
430          S::Error: Into<Box<dyn StdError + Send + Sync>>,
431          I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
432          B: Body + 'static,
433          B::Error: Into<Box<dyn StdError + Send + Sync>>,
434      {
435          type Output = crate::Result<()>;
436  
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>437          fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
438              match ready!(Pin::new(&mut self.inner.as_mut().unwrap().conn).poll(cx)) {
439                  Ok(proto::Dispatched::Shutdown) => Poll::Ready(Ok(())),
440                  Ok(proto::Dispatched::Upgrade(pending)) => {
441                      let (io, buf, _) = self.inner.take().unwrap().conn.into_inner();
442                      pending.fulfill(Upgraded::new(io, buf));
443                      Poll::Ready(Ok(()))
444                  }
445                  Err(e) => Poll::Ready(Err(e)),
446              }
447          }
448      }
449  }
450