1 #![warn(rust_2018_idioms)]
2 #![cfg(all(feature = "full", target_os = "linux"))]
3 
4 use std::sync::atomic::{AtomicUsize, Ordering};
5 use std::sync::Arc;
6 use tokio::net::UdpSocket;
7 
8 /// Ensure that UDP sockets have functional budgeting
9 ///
10 /// # Design
11 /// Two sockets communicate by spamming packets from one to the other.
12 ///
13 /// In Linux, this packet will be slammed through the entire network stack and into the receiver's buffer during the
14 /// send system call because we are using the loopback interface.
15 /// This happens because the softirq chain invoked on send when using the loopback interface covers virtually the
16 /// entirety of the lifecycle of a packet within the kernel network stack.
17 ///
18 /// As a result, neither socket will ever encounter an EWOULDBLOCK, and the only way for these to yield during the loop
19 /// is through budgeting.
20 ///
21 /// A second task runs in the background and increments a counter before yielding, allowing us to know how many times sockets yielded.
22 /// Since we are both sending and receiving, that should happen once per 64 packets, because budgets are of size 128
23 /// and there are two budget events per packet, a send and a recv.
24 #[tokio::test]
25 #[cfg_attr(miri, ignore)] // No `socket` on miri.
coop_budget_udp_send_recv()26 async fn coop_budget_udp_send_recv() {
27     const BUDGET: usize = 128;
28     const N_ITERATIONS: usize = 1024;
29 
30     const PACKET: &[u8] = b"Hello, world";
31     const PACKET_LEN: usize = 12;
32 
33     assert_eq!(
34         PACKET_LEN,
35         PACKET.len(),
36         "Defect in test, programmer can't do math"
37     );
38 
39     // bind each socket to a dynamic port, forcing IPv4 addressing on the localhost interface
40     let tx = UdpSocket::bind("127.0.0.1:0").await.unwrap();
41     let rx = UdpSocket::bind("127.0.0.1:0").await.unwrap();
42 
43     tx.connect(rx.local_addr().unwrap()).await.unwrap();
44     rx.connect(tx.local_addr().unwrap()).await.unwrap();
45 
46     let tracker = Arc::new(AtomicUsize::default());
47 
48     let tracker_clone = Arc::clone(&tracker);
49 
50     tokio::task::yield_now().await;
51 
52     tokio::spawn(async move {
53         loop {
54             tracker_clone.fetch_add(1, Ordering::SeqCst);
55 
56             tokio::task::yield_now().await;
57         }
58     });
59 
60     for _ in 0..N_ITERATIONS {
61         tx.send(PACKET).await.unwrap();
62 
63         let mut tmp = [0; PACKET_LEN];
64 
65         // ensure that we aren't somehow accumulating other
66         assert_eq!(
67             PACKET_LEN,
68             rx.recv(&mut tmp).await.unwrap(),
69             "Defect in test case, received unexpected result from socket"
70         );
71         assert_eq!(
72             PACKET, &tmp,
73             "Defect in test case, received unexpected result from socket"
74         );
75     }
76 
77     assert_eq!(N_ITERATIONS / (BUDGET / 2), tracker.load(Ordering::SeqCst));
78 }
79