1 //! Exercises load balancers with mocked services.
2 
3 use futures_core::{Stream, TryStream};
4 use futures_util::{stream, stream::StreamExt, stream::TryStreamExt};
5 use hdrhistogram::Histogram;
6 use pin_project_lite::pin_project;
7 use rand::{self, Rng};
8 use std::hash::Hash;
9 use std::time::Duration;
10 use std::{
11     pin::Pin,
12     task::{Context, Poll},
13 };
14 use tokio::time::{self, Instant};
15 use tower::balance as lb;
16 use tower::discover::{Change, Discover};
17 use tower::limit::concurrency::ConcurrencyLimit;
18 use tower::load;
19 use tower::util::ServiceExt;
20 use tower_service::Service;
21 
22 const REQUESTS: usize = 100_000;
23 const CONCURRENCY: usize = 500;
24 const DEFAULT_RTT: Duration = Duration::from_millis(30);
25 static ENDPOINT_CAPACITY: usize = CONCURRENCY;
26 static MAX_ENDPOINT_LATENCIES: [Duration; 10] = [
27     Duration::from_millis(1),
28     Duration::from_millis(5),
29     Duration::from_millis(10),
30     Duration::from_millis(10),
31     Duration::from_millis(10),
32     Duration::from_millis(100),
33     Duration::from_millis(100),
34     Duration::from_millis(100),
35     Duration::from_millis(500),
36     Duration::from_millis(1000),
37 ];
38 
39 struct Summary {
40     latencies: Histogram<u64>,
41     start: Instant,
42     count_by_instance: [usize; 10],
43 }
44 
45 #[tokio::main]
main()46 async fn main() {
47     tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::default()).unwrap();
48 
49     println!("REQUESTS={}", REQUESTS);
50     println!("CONCURRENCY={}", CONCURRENCY);
51     println!("ENDPOINT_CAPACITY={}", ENDPOINT_CAPACITY);
52     print!("MAX_ENDPOINT_LATENCIES=[");
53     for max in &MAX_ENDPOINT_LATENCIES {
54         let l = max.as_secs() * 1_000 + u64::from(max.subsec_millis());
55         print!("{}ms, ", l);
56     }
57     println!("]");
58 
59     let decay = Duration::from_secs(10);
60     let d = gen_disco();
61     let pe = lb::p2c::Balance::new(load::PeakEwmaDiscover::new(
62         d,
63         DEFAULT_RTT,
64         decay,
65         load::CompleteOnResponse::default(),
66     ));
67     run("P2C+PeakEWMA...", pe).await;
68 
69     let d = gen_disco();
70     let ll = lb::p2c::Balance::new(load::PendingRequestsDiscover::new(
71         d,
72         load::CompleteOnResponse::default(),
73     ));
74     run("P2C+LeastLoaded...", ll).await;
75 }
76 
77 type Error = Box<dyn std::error::Error + Send + Sync>;
78 
79 type Key = usize;
80 
81 pin_project! {
82     struct Disco<S> {
83         services: Vec<(Key, S)>
84     }
85 }
86 
87 impl<S> Disco<S> {
new(services: Vec<(Key, S)>) -> Self88     fn new(services: Vec<(Key, S)>) -> Self {
89         Self { services }
90     }
91 }
92 
93 impl<S> Stream for Disco<S>
94 where
95     S: Service<Req, Response = Rsp, Error = Error>,
96 {
97     type Item = Result<Change<Key, S>, Error>;
98 
poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>>99     fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
100         match self.project().services.pop() {
101             Some((k, service)) => Poll::Ready(Some(Ok(Change::Insert(k, service)))),
102             None => {
103                 // there may be more later
104                 Poll::Pending
105             }
106         }
107     }
108 }
109 
gen_disco() -> impl Discover< Key = Key, Error = Error, Service = ConcurrencyLimit< impl Service<Req, Response = Rsp, Error = Error, Future = impl Send> + Send, >, > + Send110 fn gen_disco() -> impl Discover<
111     Key = Key,
112     Error = Error,
113     Service = ConcurrencyLimit<
114         impl Service<Req, Response = Rsp, Error = Error, Future = impl Send> + Send,
115     >,
116 > + Send {
117     Disco::new(
118         MAX_ENDPOINT_LATENCIES
119             .iter()
120             .enumerate()
121             .map(|(instance, latency)| {
122                 let svc = tower::service_fn(move |_| {
123                     let start = Instant::now();
124 
125                     let maxms = u64::from(latency.subsec_millis())
126                         .saturating_add(latency.as_secs().saturating_mul(1_000));
127                     let latency = Duration::from_millis(rand::thread_rng().gen_range(0..maxms));
128 
129                     async move {
130                         time::sleep_until(start + latency).await;
131                         let latency = start.elapsed();
132                         Ok(Rsp { latency, instance })
133                     }
134                 });
135 
136                 (instance, ConcurrencyLimit::new(svc, ENDPOINT_CAPACITY))
137             })
138             .collect(),
139     )
140 }
141 
run<D>(name: &'static str, lb: lb::p2c::Balance<D, Req>) where D: Discover + Unpin + Send + 'static, D::Error: Into<Error>, D::Key: Clone + Send + Hash, D::Service: Service<Req, Response = Rsp> + load::Load + Send, <D::Service as Service<Req>>::Error: Into<Error>, <D::Service as Service<Req>>::Future: Send, <D::Service as load::Load>::Metric: std::fmt::Debug,142 async fn run<D>(name: &'static str, lb: lb::p2c::Balance<D, Req>)
143 where
144     D: Discover + Unpin + Send + 'static,
145     D::Error: Into<Error>,
146     D::Key: Clone + Send + Hash,
147     D::Service: Service<Req, Response = Rsp> + load::Load + Send,
148     <D::Service as Service<Req>>::Error: Into<Error>,
149     <D::Service as Service<Req>>::Future: Send,
150     <D::Service as load::Load>::Metric: std::fmt::Debug,
151 {
152     println!("{}", name);
153 
154     let requests = stream::repeat(Req).take(REQUESTS);
155     let service = ConcurrencyLimit::new(lb, CONCURRENCY);
156     let responses = service.call_all(requests).unordered();
157 
158     compute_histo(responses).await.unwrap().report();
159 }
160 
compute_histo<S>(mut times: S) -> Result<Summary, Error> where S: TryStream<Ok = Rsp, Error = Error> + 'static + Unpin,161 async fn compute_histo<S>(mut times: S) -> Result<Summary, Error>
162 where
163     S: TryStream<Ok = Rsp, Error = Error> + 'static + Unpin,
164 {
165     let mut summary = Summary::new();
166     while let Some(rsp) = times.try_next().await? {
167         summary.count(rsp);
168     }
169     Ok(summary)
170 }
171 
172 impl Summary {
new() -> Self173     fn new() -> Self {
174         Self {
175             // The max delay is 2000ms. At 3 significant figures.
176             latencies: Histogram::<u64>::new_with_max(3_000, 3).unwrap(),
177             start: Instant::now(),
178             count_by_instance: [0; 10],
179         }
180     }
181 
count(&mut self, rsp: Rsp)182     fn count(&mut self, rsp: Rsp) {
183         let ms = rsp.latency.as_secs() * 1_000;
184         let ms = ms + u64::from(rsp.latency.subsec_nanos()) / 1_000 / 1_000;
185         self.latencies += ms;
186         self.count_by_instance[rsp.instance] += 1;
187     }
188 
report(&self)189     fn report(&self) {
190         let mut total = 0;
191         for c in &self.count_by_instance {
192             total += c;
193         }
194         for (i, c) in self.count_by_instance.iter().enumerate() {
195             let p = *c as f64 / total as f64 * 100.0;
196             println!("  [{:02}] {:>5.01}%", i, p);
197         }
198 
199         println!("  wall {:4}s", self.start.elapsed().as_secs());
200 
201         if self.latencies.len() < 2 {
202             return;
203         }
204         println!("  p50  {:4}ms", self.latencies.value_at_quantile(0.5));
205 
206         if self.latencies.len() < 10 {
207             return;
208         }
209         println!("  p90  {:4}ms", self.latencies.value_at_quantile(0.9));
210 
211         if self.latencies.len() < 50 {
212             return;
213         }
214         println!("  p95  {:4}ms", self.latencies.value_at_quantile(0.95));
215 
216         if self.latencies.len() < 100 {
217             return;
218         }
219         println!("  p99  {:4}ms", self.latencies.value_at_quantile(0.99));
220 
221         if self.latencies.len() < 1000 {
222             return;
223         }
224         println!("  p999 {:4}ms", self.latencies.value_at_quantile(0.999));
225     }
226 }
227 
228 #[derive(Debug, Clone)]
229 struct Req;
230 
231 #[derive(Debug)]
232 struct Rsp {
233     latency: Duration,
234     instance: usize,
235 }
236