1 //! Exercises load balancers with mocked services.
2
3 use futures_core::{Stream, TryStream};
4 use futures_util::{stream, stream::StreamExt, stream::TryStreamExt};
5 use hdrhistogram::Histogram;
6 use pin_project_lite::pin_project;
7 use rand::{self, Rng};
8 use std::hash::Hash;
9 use std::time::Duration;
10 use std::{
11 pin::Pin,
12 task::{Context, Poll},
13 };
14 use tokio::time::{self, Instant};
15 use tower::balance as lb;
16 use tower::discover::{Change, Discover};
17 use tower::limit::concurrency::ConcurrencyLimit;
18 use tower::load;
19 use tower::util::ServiceExt;
20 use tower_service::Service;
21
22 const REQUESTS: usize = 100_000;
23 const CONCURRENCY: usize = 500;
24 const DEFAULT_RTT: Duration = Duration::from_millis(30);
25 static ENDPOINT_CAPACITY: usize = CONCURRENCY;
26 static MAX_ENDPOINT_LATENCIES: [Duration; 10] = [
27 Duration::from_millis(1),
28 Duration::from_millis(5),
29 Duration::from_millis(10),
30 Duration::from_millis(10),
31 Duration::from_millis(10),
32 Duration::from_millis(100),
33 Duration::from_millis(100),
34 Duration::from_millis(100),
35 Duration::from_millis(500),
36 Duration::from_millis(1000),
37 ];
38
39 struct Summary {
40 latencies: Histogram<u64>,
41 start: Instant,
42 count_by_instance: [usize; 10],
43 }
44
45 #[tokio::main]
main()46 async fn main() {
47 tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::default()).unwrap();
48
49 println!("REQUESTS={}", REQUESTS);
50 println!("CONCURRENCY={}", CONCURRENCY);
51 println!("ENDPOINT_CAPACITY={}", ENDPOINT_CAPACITY);
52 print!("MAX_ENDPOINT_LATENCIES=[");
53 for max in &MAX_ENDPOINT_LATENCIES {
54 let l = max.as_secs() * 1_000 + u64::from(max.subsec_millis());
55 print!("{}ms, ", l);
56 }
57 println!("]");
58
59 let decay = Duration::from_secs(10);
60 let d = gen_disco();
61 let pe = lb::p2c::Balance::new(load::PeakEwmaDiscover::new(
62 d,
63 DEFAULT_RTT,
64 decay,
65 load::CompleteOnResponse::default(),
66 ));
67 run("P2C+PeakEWMA...", pe).await;
68
69 let d = gen_disco();
70 let ll = lb::p2c::Balance::new(load::PendingRequestsDiscover::new(
71 d,
72 load::CompleteOnResponse::default(),
73 ));
74 run("P2C+LeastLoaded...", ll).await;
75 }
76
77 type Error = Box<dyn std::error::Error + Send + Sync>;
78
79 type Key = usize;
80
81 pin_project! {
82 struct Disco<S> {
83 services: Vec<(Key, S)>
84 }
85 }
86
87 impl<S> Disco<S> {
new(services: Vec<(Key, S)>) -> Self88 fn new(services: Vec<(Key, S)>) -> Self {
89 Self { services }
90 }
91 }
92
93 impl<S> Stream for Disco<S>
94 where
95 S: Service<Req, Response = Rsp, Error = Error>,
96 {
97 type Item = Result<Change<Key, S>, Error>;
98
poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>>99 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
100 match self.project().services.pop() {
101 Some((k, service)) => Poll::Ready(Some(Ok(Change::Insert(k, service)))),
102 None => {
103 // there may be more later
104 Poll::Pending
105 }
106 }
107 }
108 }
109
gen_disco() -> impl Discover< Key = Key, Error = Error, Service = ConcurrencyLimit< impl Service<Req, Response = Rsp, Error = Error, Future = impl Send> + Send, >, > + Send110 fn gen_disco() -> impl Discover<
111 Key = Key,
112 Error = Error,
113 Service = ConcurrencyLimit<
114 impl Service<Req, Response = Rsp, Error = Error, Future = impl Send> + Send,
115 >,
116 > + Send {
117 Disco::new(
118 MAX_ENDPOINT_LATENCIES
119 .iter()
120 .enumerate()
121 .map(|(instance, latency)| {
122 let svc = tower::service_fn(move |_| {
123 let start = Instant::now();
124
125 let maxms = u64::from(latency.subsec_millis())
126 .saturating_add(latency.as_secs().saturating_mul(1_000));
127 let latency = Duration::from_millis(rand::thread_rng().gen_range(0..maxms));
128
129 async move {
130 time::sleep_until(start + latency).await;
131 let latency = start.elapsed();
132 Ok(Rsp { latency, instance })
133 }
134 });
135
136 (instance, ConcurrencyLimit::new(svc, ENDPOINT_CAPACITY))
137 })
138 .collect(),
139 )
140 }
141
run<D>(name: &'static str, lb: lb::p2c::Balance<D, Req>) where D: Discover + Unpin + Send + 'static, D::Error: Into<Error>, D::Key: Clone + Send + Hash, D::Service: Service<Req, Response = Rsp> + load::Load + Send, <D::Service as Service<Req>>::Error: Into<Error>, <D::Service as Service<Req>>::Future: Send, <D::Service as load::Load>::Metric: std::fmt::Debug,142 async fn run<D>(name: &'static str, lb: lb::p2c::Balance<D, Req>)
143 where
144 D: Discover + Unpin + Send + 'static,
145 D::Error: Into<Error>,
146 D::Key: Clone + Send + Hash,
147 D::Service: Service<Req, Response = Rsp> + load::Load + Send,
148 <D::Service as Service<Req>>::Error: Into<Error>,
149 <D::Service as Service<Req>>::Future: Send,
150 <D::Service as load::Load>::Metric: std::fmt::Debug,
151 {
152 println!("{}", name);
153
154 let requests = stream::repeat(Req).take(REQUESTS);
155 let service = ConcurrencyLimit::new(lb, CONCURRENCY);
156 let responses = service.call_all(requests).unordered();
157
158 compute_histo(responses).await.unwrap().report();
159 }
160
compute_histo<S>(mut times: S) -> Result<Summary, Error> where S: TryStream<Ok = Rsp, Error = Error> + 'static + Unpin,161 async fn compute_histo<S>(mut times: S) -> Result<Summary, Error>
162 where
163 S: TryStream<Ok = Rsp, Error = Error> + 'static + Unpin,
164 {
165 let mut summary = Summary::new();
166 while let Some(rsp) = times.try_next().await? {
167 summary.count(rsp);
168 }
169 Ok(summary)
170 }
171
172 impl Summary {
new() -> Self173 fn new() -> Self {
174 Self {
175 // The max delay is 2000ms. At 3 significant figures.
176 latencies: Histogram::<u64>::new_with_max(3_000, 3).unwrap(),
177 start: Instant::now(),
178 count_by_instance: [0; 10],
179 }
180 }
181
count(&mut self, rsp: Rsp)182 fn count(&mut self, rsp: Rsp) {
183 let ms = rsp.latency.as_secs() * 1_000;
184 let ms = ms + u64::from(rsp.latency.subsec_nanos()) / 1_000 / 1_000;
185 self.latencies += ms;
186 self.count_by_instance[rsp.instance] += 1;
187 }
188
report(&self)189 fn report(&self) {
190 let mut total = 0;
191 for c in &self.count_by_instance {
192 total += c;
193 }
194 for (i, c) in self.count_by_instance.iter().enumerate() {
195 let p = *c as f64 / total as f64 * 100.0;
196 println!(" [{:02}] {:>5.01}%", i, p);
197 }
198
199 println!(" wall {:4}s", self.start.elapsed().as_secs());
200
201 if self.latencies.len() < 2 {
202 return;
203 }
204 println!(" p50 {:4}ms", self.latencies.value_at_quantile(0.5));
205
206 if self.latencies.len() < 10 {
207 return;
208 }
209 println!(" p90 {:4}ms", self.latencies.value_at_quantile(0.9));
210
211 if self.latencies.len() < 50 {
212 return;
213 }
214 println!(" p95 {:4}ms", self.latencies.value_at_quantile(0.95));
215
216 if self.latencies.len() < 100 {
217 return;
218 }
219 println!(" p99 {:4}ms", self.latencies.value_at_quantile(0.99));
220
221 if self.latencies.len() < 1000 {
222 return;
223 }
224 println!(" p999 {:4}ms", self.latencies.value_at_quantile(0.999));
225 }
226 }
227
228 #[derive(Debug, Clone)]
229 struct Req;
230
231 #[derive(Debug)]
232 struct Rsp {
233 latency: Duration,
234 instance: usize,
235 }
236