1 #![warn(rust_2018_idioms)]
2 #![cfg(all(feature = "full", target_os = "linux"))]
3
4 use std::sync::atomic::{AtomicUsize, Ordering};
5 use std::sync::Arc;
6 use tokio::net::UdpSocket;
7
8 /// Ensure that UDP sockets have functional budgeting
9 ///
10 /// # Design
11 /// Two sockets communicate by spamming packets from one to the other.
12 ///
13 /// In Linux, this packet will be slammed through the entire network stack and into the receiver's buffer during the
14 /// send system call because we are using the loopback interface.
15 /// This happens because the softirq chain invoked on send when using the loopback interface covers virtually the
16 /// entirety of the lifecycle of a packet within the kernel network stack.
17 ///
18 /// As a result, neither socket will ever encounter an EWOULDBLOCK, and the only way for these to yield during the loop
19 /// is through budgeting.
20 ///
21 /// A second task runs in the background and increments a counter before yielding, allowing us to know how many times sockets yielded.
22 /// Since we are both sending and receiving, that should happen once per 64 packets, because budgets are of size 128
23 /// and there are two budget events per packet, a send and a recv.
24 #[tokio::test]
25 #[cfg_attr(miri, ignore)] // No `socket` on miri.
coop_budget_udp_send_recv()26 async fn coop_budget_udp_send_recv() {
27 const BUDGET: usize = 128;
28 const N_ITERATIONS: usize = 1024;
29
30 const PACKET: &[u8] = b"Hello, world";
31 const PACKET_LEN: usize = 12;
32
33 assert_eq!(
34 PACKET_LEN,
35 PACKET.len(),
36 "Defect in test, programmer can't do math"
37 );
38
39 // bind each socket to a dynamic port, forcing IPv4 addressing on the localhost interface
40 let tx = UdpSocket::bind("127.0.0.1:0").await.unwrap();
41 let rx = UdpSocket::bind("127.0.0.1:0").await.unwrap();
42
43 tx.connect(rx.local_addr().unwrap()).await.unwrap();
44 rx.connect(tx.local_addr().unwrap()).await.unwrap();
45
46 let tracker = Arc::new(AtomicUsize::default());
47
48 let tracker_clone = Arc::clone(&tracker);
49
50 tokio::task::yield_now().await;
51
52 tokio::spawn(async move {
53 loop {
54 tracker_clone.fetch_add(1, Ordering::SeqCst);
55
56 tokio::task::yield_now().await;
57 }
58 });
59
60 for _ in 0..N_ITERATIONS {
61 tx.send(PACKET).await.unwrap();
62
63 let mut tmp = [0; PACKET_LEN];
64
65 // ensure that we aren't somehow accumulating other
66 assert_eq!(
67 PACKET_LEN,
68 rx.recv(&mut tmp).await.unwrap(),
69 "Defect in test case, received unexpected result from socket"
70 );
71 assert_eq!(
72 PACKET, &tmp,
73 "Defect in test case, received unexpected result from socket"
74 );
75 }
76
77 assert_eq!(N_ITERATIONS / (BUDGET / 2), tracker.load(Ordering::SeqCst));
78 }
79