1 #![warn(rust_2018_idioms)]
2 
3 use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
4 use tokio_test::task;
5 use tokio_test::{
6     assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
7 };
8 use tokio_util::codec::*;
9 
10 use bytes::{BufMut, Bytes, BytesMut};
11 use futures::{pin_mut, Sink, Stream};
12 use std::collections::VecDeque;
13 use std::io;
14 use std::pin::Pin;
15 use std::task::{Context, Poll};
16 
17 macro_rules! mock {
18     ($($x:expr,)*) => {{
19         let mut v = VecDeque::new();
20         v.extend(vec![$($x),*]);
21         Mock { calls: v }
22     }};
23 }
24 
25 macro_rules! assert_next_eq {
26     ($io:ident, $expect:expr) => {{
27         task::spawn(()).enter(|cx, _| {
28             let res = assert_ready!($io.as_mut().poll_next(cx));
29             match res {
30                 Some(Ok(v)) => assert_eq!(v, $expect.as_ref()),
31                 Some(Err(e)) => panic!("error = {:?}", e),
32                 None => panic!("none"),
33             }
34         });
35     }};
36 }
37 
38 macro_rules! assert_next_pending {
39     ($io:ident) => {{
40         task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) {
41             Poll::Ready(Some(Ok(v))) => panic!("value = {:?}", v),
42             Poll::Ready(Some(Err(e))) => panic!("error = {:?}", e),
43             Poll::Ready(None) => panic!("done"),
44             Poll::Pending => {}
45         });
46     }};
47 }
48 
49 macro_rules! assert_next_err {
50     ($io:ident) => {{
51         task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) {
52             Poll::Ready(Some(Ok(v))) => panic!("value = {:?}", v),
53             Poll::Ready(Some(Err(_))) => {}
54             Poll::Ready(None) => panic!("done"),
55             Poll::Pending => panic!("pending"),
56         });
57     }};
58 }
59 
60 macro_rules! assert_done {
61     ($io:ident) => {{
62         task::spawn(()).enter(|cx, _| {
63             let res = assert_ready!($io.as_mut().poll_next(cx));
64             match res {
65                 Some(Ok(v)) => panic!("value = {:?}", v),
66                 Some(Err(e)) => panic!("error = {:?}", e),
67                 None => {}
68             }
69         });
70     }};
71 }
72 
73 #[test]
read_empty_io_yields_nothing()74 fn read_empty_io_yields_nothing() {
75     let io = Box::pin(FramedRead::new(mock!(), LengthDelimitedCodec::new()));
76     pin_mut!(io);
77 
78     assert_done!(io);
79 }
80 
81 #[test]
read_single_frame_one_packet()82 fn read_single_frame_one_packet() {
83     let io = FramedRead::new(
84         mock! {
85             data(b"\x00\x00\x00\x09abcdefghi"),
86         },
87         LengthDelimitedCodec::new(),
88     );
89     pin_mut!(io);
90 
91     assert_next_eq!(io, b"abcdefghi");
92     assert_done!(io);
93 }
94 
95 #[test]
read_single_frame_one_packet_little_endian()96 fn read_single_frame_one_packet_little_endian() {
97     let io = length_delimited::Builder::new()
98         .little_endian()
99         .new_read(mock! {
100             data(b"\x09\x00\x00\x00abcdefghi"),
101         });
102     pin_mut!(io);
103 
104     assert_next_eq!(io, b"abcdefghi");
105     assert_done!(io);
106 }
107 
108 #[test]
read_single_frame_one_packet_native_endian()109 fn read_single_frame_one_packet_native_endian() {
110     let d = if cfg!(target_endian = "big") {
111         b"\x00\x00\x00\x09abcdefghi"
112     } else {
113         b"\x09\x00\x00\x00abcdefghi"
114     };
115     let io = length_delimited::Builder::new()
116         .native_endian()
117         .new_read(mock! {
118             data(d),
119         });
120     pin_mut!(io);
121 
122     assert_next_eq!(io, b"abcdefghi");
123     assert_done!(io);
124 }
125 
126 #[test]
read_single_multi_frame_one_packet()127 fn read_single_multi_frame_one_packet() {
128     let mut d: Vec<u8> = vec![];
129     d.extend_from_slice(b"\x00\x00\x00\x09abcdefghi");
130     d.extend_from_slice(b"\x00\x00\x00\x03123");
131     d.extend_from_slice(b"\x00\x00\x00\x0bhello world");
132 
133     let io = FramedRead::new(
134         mock! {
135             data(&d),
136         },
137         LengthDelimitedCodec::new(),
138     );
139     pin_mut!(io);
140 
141     assert_next_eq!(io, b"abcdefghi");
142     assert_next_eq!(io, b"123");
143     assert_next_eq!(io, b"hello world");
144     assert_done!(io);
145 }
146 
147 #[test]
read_single_frame_multi_packet()148 fn read_single_frame_multi_packet() {
149     let io = FramedRead::new(
150         mock! {
151             data(b"\x00\x00"),
152             data(b"\x00\x09abc"),
153             data(b"defghi"),
154         },
155         LengthDelimitedCodec::new(),
156     );
157     pin_mut!(io);
158 
159     assert_next_eq!(io, b"abcdefghi");
160     assert_done!(io);
161 }
162 
163 #[test]
read_multi_frame_multi_packet()164 fn read_multi_frame_multi_packet() {
165     let io = FramedRead::new(
166         mock! {
167             data(b"\x00\x00"),
168             data(b"\x00\x09abc"),
169             data(b"defghi"),
170             data(b"\x00\x00\x00\x0312"),
171             data(b"3\x00\x00\x00\x0bhello world"),
172         },
173         LengthDelimitedCodec::new(),
174     );
175     pin_mut!(io);
176 
177     assert_next_eq!(io, b"abcdefghi");
178     assert_next_eq!(io, b"123");
179     assert_next_eq!(io, b"hello world");
180     assert_done!(io);
181 }
182 
183 #[test]
read_single_frame_multi_packet_wait()184 fn read_single_frame_multi_packet_wait() {
185     let io = FramedRead::new(
186         mock! {
187             data(b"\x00\x00"),
188             Poll::Pending,
189             data(b"\x00\x09abc"),
190             Poll::Pending,
191             data(b"defghi"),
192             Poll::Pending,
193         },
194         LengthDelimitedCodec::new(),
195     );
196     pin_mut!(io);
197 
198     assert_next_pending!(io);
199     assert_next_pending!(io);
200     assert_next_eq!(io, b"abcdefghi");
201     assert_next_pending!(io);
202     assert_done!(io);
203 }
204 
205 #[test]
read_multi_frame_multi_packet_wait()206 fn read_multi_frame_multi_packet_wait() {
207     let io = FramedRead::new(
208         mock! {
209             data(b"\x00\x00"),
210             Poll::Pending,
211             data(b"\x00\x09abc"),
212             Poll::Pending,
213             data(b"defghi"),
214             Poll::Pending,
215             data(b"\x00\x00\x00\x0312"),
216             Poll::Pending,
217             data(b"3\x00\x00\x00\x0bhello world"),
218             Poll::Pending,
219         },
220         LengthDelimitedCodec::new(),
221     );
222     pin_mut!(io);
223 
224     assert_next_pending!(io);
225     assert_next_pending!(io);
226     assert_next_eq!(io, b"abcdefghi");
227     assert_next_pending!(io);
228     assert_next_pending!(io);
229     assert_next_eq!(io, b"123");
230     assert_next_eq!(io, b"hello world");
231     assert_next_pending!(io);
232     assert_done!(io);
233 }
234 
235 #[test]
read_incomplete_head()236 fn read_incomplete_head() {
237     let io = FramedRead::new(
238         mock! {
239             data(b"\x00\x00"),
240         },
241         LengthDelimitedCodec::new(),
242     );
243     pin_mut!(io);
244 
245     assert_next_err!(io);
246 }
247 
248 #[test]
read_incomplete_head_multi()249 fn read_incomplete_head_multi() {
250     let io = FramedRead::new(
251         mock! {
252             Poll::Pending,
253             data(b"\x00"),
254             Poll::Pending,
255         },
256         LengthDelimitedCodec::new(),
257     );
258     pin_mut!(io);
259 
260     assert_next_pending!(io);
261     assert_next_pending!(io);
262     assert_next_err!(io);
263 }
264 
265 #[test]
read_incomplete_payload()266 fn read_incomplete_payload() {
267     let io = FramedRead::new(
268         mock! {
269             data(b"\x00\x00\x00\x09ab"),
270             Poll::Pending,
271             data(b"cd"),
272             Poll::Pending,
273         },
274         LengthDelimitedCodec::new(),
275     );
276     pin_mut!(io);
277 
278     assert_next_pending!(io);
279     assert_next_pending!(io);
280     assert_next_err!(io);
281 }
282 
283 #[test]
read_max_frame_len()284 fn read_max_frame_len() {
285     let io = length_delimited::Builder::new()
286         .max_frame_length(5)
287         .new_read(mock! {
288             data(b"\x00\x00\x00\x09abcdefghi"),
289         });
290     pin_mut!(io);
291 
292     assert_next_err!(io);
293 }
294 
295 #[test]
read_update_max_frame_len_at_rest()296 fn read_update_max_frame_len_at_rest() {
297     let io = length_delimited::Builder::new().new_read(mock! {
298         data(b"\x00\x00\x00\x09abcdefghi"),
299         data(b"\x00\x00\x00\x09abcdefghi"),
300     });
301     pin_mut!(io);
302 
303     assert_next_eq!(io, b"abcdefghi");
304     io.decoder_mut().set_max_frame_length(5);
305     assert_next_err!(io);
306 }
307 
308 #[test]
read_update_max_frame_len_in_flight()309 fn read_update_max_frame_len_in_flight() {
310     let io = length_delimited::Builder::new().new_read(mock! {
311         data(b"\x00\x00\x00\x09abcd"),
312         Poll::Pending,
313         data(b"efghi"),
314         data(b"\x00\x00\x00\x09abcdefghi"),
315     });
316     pin_mut!(io);
317 
318     assert_next_pending!(io);
319     io.decoder_mut().set_max_frame_length(5);
320     assert_next_eq!(io, b"abcdefghi");
321     assert_next_err!(io);
322 }
323 
324 #[test]
read_one_byte_length_field()325 fn read_one_byte_length_field() {
326     let io = length_delimited::Builder::new()
327         .length_field_length(1)
328         .new_read(mock! {
329             data(b"\x09abcdefghi"),
330         });
331     pin_mut!(io);
332 
333     assert_next_eq!(io, b"abcdefghi");
334     assert_done!(io);
335 }
336 
337 #[test]
read_header_offset()338 fn read_header_offset() {
339     let io = length_delimited::Builder::new()
340         .length_field_length(2)
341         .length_field_offset(4)
342         .new_read(mock! {
343             data(b"zzzz\x00\x09abcdefghi"),
344         });
345     pin_mut!(io);
346 
347     assert_next_eq!(io, b"abcdefghi");
348     assert_done!(io);
349 }
350 
351 #[test]
read_single_multi_frame_one_packet_skip_none_adjusted()352 fn read_single_multi_frame_one_packet_skip_none_adjusted() {
353     let mut d: Vec<u8> = vec![];
354     d.extend_from_slice(b"xx\x00\x09abcdefghi");
355     d.extend_from_slice(b"yy\x00\x03123");
356     d.extend_from_slice(b"zz\x00\x0bhello world");
357 
358     let io = length_delimited::Builder::new()
359         .length_field_length(2)
360         .length_field_offset(2)
361         .num_skip(0)
362         .length_adjustment(4)
363         .new_read(mock! {
364             data(&d),
365         });
366     pin_mut!(io);
367 
368     assert_next_eq!(io, b"xx\x00\x09abcdefghi");
369     assert_next_eq!(io, b"yy\x00\x03123");
370     assert_next_eq!(io, b"zz\x00\x0bhello world");
371     assert_done!(io);
372 }
373 
374 #[test]
read_single_frame_length_adjusted()375 fn read_single_frame_length_adjusted() {
376     let mut d: Vec<u8> = vec![];
377     d.extend_from_slice(b"\x00\x00\x0b\x0cHello world");
378 
379     let io = length_delimited::Builder::new()
380         .length_field_offset(0)
381         .length_field_length(3)
382         .length_adjustment(0)
383         .num_skip(4)
384         .new_read(mock! {
385             data(&d),
386         });
387     pin_mut!(io);
388 
389     assert_next_eq!(io, b"Hello world");
390     assert_done!(io);
391 }
392 
393 #[test]
read_single_multi_frame_one_packet_length_includes_head()394 fn read_single_multi_frame_one_packet_length_includes_head() {
395     let mut d: Vec<u8> = vec![];
396     d.extend_from_slice(b"\x00\x0babcdefghi");
397     d.extend_from_slice(b"\x00\x05123");
398     d.extend_from_slice(b"\x00\x0dhello world");
399 
400     let io = length_delimited::Builder::new()
401         .length_field_length(2)
402         .length_adjustment(-2)
403         .new_read(mock! {
404             data(&d),
405         });
406     pin_mut!(io);
407 
408     assert_next_eq!(io, b"abcdefghi");
409     assert_next_eq!(io, b"123");
410     assert_next_eq!(io, b"hello world");
411     assert_done!(io);
412 }
413 
414 #[test]
write_single_frame_length_adjusted()415 fn write_single_frame_length_adjusted() {
416     let io = length_delimited::Builder::new()
417         .length_adjustment(-2)
418         .new_write(mock! {
419             data(b"\x00\x00\x00\x0b"),
420             data(b"abcdefghi"),
421             flush(),
422         });
423     pin_mut!(io);
424 
425     task::spawn(()).enter(|cx, _| {
426         assert_ready_ok!(io.as_mut().poll_ready(cx));
427         assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
428         assert_ready_ok!(io.as_mut().poll_flush(cx));
429         assert!(io.get_ref().calls.is_empty());
430     });
431 }
432 
433 #[test]
write_nothing_yields_nothing()434 fn write_nothing_yields_nothing() {
435     let io = FramedWrite::new(mock!(), LengthDelimitedCodec::new());
436     pin_mut!(io);
437 
438     task::spawn(()).enter(|cx, _| {
439         assert_ready_ok!(io.poll_flush(cx));
440     });
441 }
442 
443 #[test]
write_single_frame_one_packet()444 fn write_single_frame_one_packet() {
445     let io = FramedWrite::new(
446         mock! {
447             data(b"\x00\x00\x00\x09"),
448             data(b"abcdefghi"),
449             flush(),
450         },
451         LengthDelimitedCodec::new(),
452     );
453     pin_mut!(io);
454 
455     task::spawn(()).enter(|cx, _| {
456         assert_ready_ok!(io.as_mut().poll_ready(cx));
457         assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
458         assert_ready_ok!(io.as_mut().poll_flush(cx));
459         assert!(io.get_ref().calls.is_empty());
460     });
461 }
462 
463 #[test]
write_single_multi_frame_one_packet()464 fn write_single_multi_frame_one_packet() {
465     let io = FramedWrite::new(
466         mock! {
467             data(b"\x00\x00\x00\x09"),
468             data(b"abcdefghi"),
469             data(b"\x00\x00\x00\x03"),
470             data(b"123"),
471             data(b"\x00\x00\x00\x0b"),
472             data(b"hello world"),
473             flush(),
474         },
475         LengthDelimitedCodec::new(),
476     );
477     pin_mut!(io);
478 
479     task::spawn(()).enter(|cx, _| {
480         assert_ready_ok!(io.as_mut().poll_ready(cx));
481         assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
482 
483         assert_ready_ok!(io.as_mut().poll_ready(cx));
484         assert_ok!(io.as_mut().start_send(Bytes::from("123")));
485 
486         assert_ready_ok!(io.as_mut().poll_ready(cx));
487         assert_ok!(io.as_mut().start_send(Bytes::from("hello world")));
488 
489         assert_ready_ok!(io.as_mut().poll_flush(cx));
490         assert!(io.get_ref().calls.is_empty());
491     });
492 }
493 
494 #[test]
write_single_multi_frame_multi_packet()495 fn write_single_multi_frame_multi_packet() {
496     let io = FramedWrite::new(
497         mock! {
498             data(b"\x00\x00\x00\x09"),
499             data(b"abcdefghi"),
500             flush(),
501             data(b"\x00\x00\x00\x03"),
502             data(b"123"),
503             flush(),
504             data(b"\x00\x00\x00\x0b"),
505             data(b"hello world"),
506             flush(),
507         },
508         LengthDelimitedCodec::new(),
509     );
510     pin_mut!(io);
511 
512     task::spawn(()).enter(|cx, _| {
513         assert_ready_ok!(io.as_mut().poll_ready(cx));
514         assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
515 
516         assert_ready_ok!(io.as_mut().poll_flush(cx));
517 
518         assert_ready_ok!(io.as_mut().poll_ready(cx));
519         assert_ok!(io.as_mut().start_send(Bytes::from("123")));
520 
521         assert_ready_ok!(io.as_mut().poll_flush(cx));
522 
523         assert_ready_ok!(io.as_mut().poll_ready(cx));
524         assert_ok!(io.as_mut().start_send(Bytes::from("hello world")));
525 
526         assert_ready_ok!(io.as_mut().poll_flush(cx));
527         assert!(io.get_ref().calls.is_empty());
528     });
529 }
530 
531 #[test]
write_single_frame_would_block()532 fn write_single_frame_would_block() {
533     let io = FramedWrite::new(
534         mock! {
535             Poll::Pending,
536             data(b"\x00\x00"),
537             Poll::Pending,
538             data(b"\x00\x09"),
539             data(b"abcdefghi"),
540             flush(),
541         },
542         LengthDelimitedCodec::new(),
543     );
544     pin_mut!(io);
545 
546     task::spawn(()).enter(|cx, _| {
547         assert_ready_ok!(io.as_mut().poll_ready(cx));
548         assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
549 
550         assert_pending!(io.as_mut().poll_flush(cx));
551         assert_pending!(io.as_mut().poll_flush(cx));
552         assert_ready_ok!(io.as_mut().poll_flush(cx));
553 
554         assert!(io.get_ref().calls.is_empty());
555     });
556 }
557 
558 #[test]
write_single_frame_little_endian()559 fn write_single_frame_little_endian() {
560     let io = length_delimited::Builder::new()
561         .little_endian()
562         .new_write(mock! {
563             data(b"\x09\x00\x00\x00"),
564             data(b"abcdefghi"),
565             flush(),
566         });
567     pin_mut!(io);
568 
569     task::spawn(()).enter(|cx, _| {
570         assert_ready_ok!(io.as_mut().poll_ready(cx));
571         assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
572 
573         assert_ready_ok!(io.as_mut().poll_flush(cx));
574         assert!(io.get_ref().calls.is_empty());
575     });
576 }
577 
578 #[test]
write_single_frame_with_short_length_field()579 fn write_single_frame_with_short_length_field() {
580     let io = length_delimited::Builder::new()
581         .length_field_length(1)
582         .new_write(mock! {
583             data(b"\x09"),
584             data(b"abcdefghi"),
585             flush(),
586         });
587     pin_mut!(io);
588 
589     task::spawn(()).enter(|cx, _| {
590         assert_ready_ok!(io.as_mut().poll_ready(cx));
591         assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
592 
593         assert_ready_ok!(io.as_mut().poll_flush(cx));
594 
595         assert!(io.get_ref().calls.is_empty());
596     });
597 }
598 
599 #[test]
write_max_frame_len()600 fn write_max_frame_len() {
601     let io = length_delimited::Builder::new()
602         .max_frame_length(5)
603         .new_write(mock! {});
604     pin_mut!(io);
605 
606     task::spawn(()).enter(|cx, _| {
607         assert_ready_ok!(io.as_mut().poll_ready(cx));
608         assert_err!(io.as_mut().start_send(Bytes::from("abcdef")));
609 
610         assert!(io.get_ref().calls.is_empty());
611     });
612 }
613 
614 #[test]
write_update_max_frame_len_at_rest()615 fn write_update_max_frame_len_at_rest() {
616     let io = length_delimited::Builder::new().new_write(mock! {
617         data(b"\x00\x00\x00\x06"),
618         data(b"abcdef"),
619         flush(),
620     });
621     pin_mut!(io);
622 
623     task::spawn(()).enter(|cx, _| {
624         assert_ready_ok!(io.as_mut().poll_ready(cx));
625         assert_ok!(io.as_mut().start_send(Bytes::from("abcdef")));
626 
627         assert_ready_ok!(io.as_mut().poll_flush(cx));
628 
629         io.encoder_mut().set_max_frame_length(5);
630 
631         assert_err!(io.as_mut().start_send(Bytes::from("abcdef")));
632 
633         assert!(io.get_ref().calls.is_empty());
634     });
635 }
636 
637 #[test]
write_update_max_frame_len_in_flight()638 fn write_update_max_frame_len_in_flight() {
639     let io = length_delimited::Builder::new().new_write(mock! {
640         data(b"\x00\x00\x00\x06"),
641         data(b"ab"),
642         Poll::Pending,
643         data(b"cdef"),
644         flush(),
645     });
646     pin_mut!(io);
647 
648     task::spawn(()).enter(|cx, _| {
649         assert_ready_ok!(io.as_mut().poll_ready(cx));
650         assert_ok!(io.as_mut().start_send(Bytes::from("abcdef")));
651 
652         assert_pending!(io.as_mut().poll_flush(cx));
653 
654         io.encoder_mut().set_max_frame_length(5);
655 
656         assert_ready_ok!(io.as_mut().poll_flush(cx));
657 
658         assert_err!(io.as_mut().start_send(Bytes::from("abcdef")));
659         assert!(io.get_ref().calls.is_empty());
660     });
661 }
662 
663 #[test]
write_zero()664 fn write_zero() {
665     let io = length_delimited::Builder::new().new_write(mock! {});
666     pin_mut!(io);
667 
668     task::spawn(()).enter(|cx, _| {
669         assert_ready_ok!(io.as_mut().poll_ready(cx));
670         assert_ok!(io.as_mut().start_send(Bytes::from("abcdef")));
671 
672         assert_ready_err!(io.as_mut().poll_flush(cx));
673 
674         assert!(io.get_ref().calls.is_empty());
675     });
676 }
677 
678 #[test]
encode_overflow()679 fn encode_overflow() {
680     // Test reproducing tokio-rs/tokio#681.
681     let mut codec = length_delimited::Builder::new().new_codec();
682     let mut buf = BytesMut::with_capacity(1024);
683 
684     // Put some data into the buffer without resizing it to hold more.
685     let some_as = std::iter::repeat(b'a').take(1024).collect::<Vec<_>>();
686     buf.put_slice(&some_as[..]);
687 
688     // Trying to encode the length header should resize the buffer if it won't fit.
689     codec.encode(Bytes::from("hello"), &mut buf).unwrap();
690 }
691 
692 #[test]
frame_does_not_fit()693 fn frame_does_not_fit() {
694     let codec = LengthDelimitedCodec::builder()
695         .length_field_length(1)
696         .max_frame_length(256)
697         .new_codec();
698 
699     assert_eq!(codec.max_frame_length(), 255);
700 }
701 
702 #[test]
neg_adjusted_frame_does_not_fit()703 fn neg_adjusted_frame_does_not_fit() {
704     let codec = LengthDelimitedCodec::builder()
705         .length_field_length(1)
706         .length_adjustment(-1)
707         .new_codec();
708 
709     assert_eq!(codec.max_frame_length(), 254);
710 }
711 
712 #[test]
pos_adjusted_frame_does_not_fit()713 fn pos_adjusted_frame_does_not_fit() {
714     let codec = LengthDelimitedCodec::builder()
715         .length_field_length(1)
716         .length_adjustment(1)
717         .new_codec();
718 
719     assert_eq!(codec.max_frame_length(), 256);
720 }
721 
722 #[test]
max_allowed_frame_fits()723 fn max_allowed_frame_fits() {
724     let codec = LengthDelimitedCodec::builder()
725         .length_field_length(std::mem::size_of::<usize>())
726         .max_frame_length(usize::MAX)
727         .new_codec();
728 
729     assert_eq!(codec.max_frame_length(), usize::MAX);
730 }
731 
732 #[test]
smaller_frame_len_not_adjusted()733 fn smaller_frame_len_not_adjusted() {
734     let codec = LengthDelimitedCodec::builder()
735         .max_frame_length(10)
736         .length_field_length(std::mem::size_of::<usize>())
737         .new_codec();
738 
739     assert_eq!(codec.max_frame_length(), 10);
740 }
741 
742 #[test]
max_allowed_length_field()743 fn max_allowed_length_field() {
744     let codec = LengthDelimitedCodec::builder()
745         .length_field_length(8)
746         .max_frame_length(usize::MAX)
747         .new_codec();
748 
749     assert_eq!(codec.max_frame_length(), usize::MAX);
750 }
751 
752 // ===== Test utils =====
753 
754 struct Mock {
755     calls: VecDeque<Poll<io::Result<Op>>>,
756 }
757 
758 enum Op {
759     Data(Vec<u8>),
760     Flush,
761 }
762 
763 impl AsyncRead for Mock {
poll_read( mut self: Pin<&mut Self>, _cx: &mut Context<'_>, dst: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>764     fn poll_read(
765         mut self: Pin<&mut Self>,
766         _cx: &mut Context<'_>,
767         dst: &mut ReadBuf<'_>,
768     ) -> Poll<io::Result<()>> {
769         match self.calls.pop_front() {
770             Some(Poll::Ready(Ok(Op::Data(data)))) => {
771                 debug_assert!(dst.remaining() >= data.len());
772                 dst.put_slice(&data);
773                 Poll::Ready(Ok(()))
774             }
775             Some(Poll::Ready(Ok(_))) => panic!(),
776             Some(Poll::Ready(Err(e))) => Poll::Ready(Err(e)),
777             Some(Poll::Pending) => Poll::Pending,
778             None => Poll::Ready(Ok(())),
779         }
780     }
781 }
782 
783 impl AsyncWrite for Mock {
poll_write( mut self: Pin<&mut Self>, _cx: &mut Context<'_>, src: &[u8], ) -> Poll<Result<usize, io::Error>>784     fn poll_write(
785         mut self: Pin<&mut Self>,
786         _cx: &mut Context<'_>,
787         src: &[u8],
788     ) -> Poll<Result<usize, io::Error>> {
789         match self.calls.pop_front() {
790             Some(Poll::Ready(Ok(Op::Data(data)))) => {
791                 let len = data.len();
792                 assert!(src.len() >= len, "expect={data:?}; actual={src:?}");
793                 assert_eq!(&data[..], &src[..len]);
794                 Poll::Ready(Ok(len))
795             }
796             Some(Poll::Ready(Ok(_))) => panic!(),
797             Some(Poll::Ready(Err(e))) => Poll::Ready(Err(e)),
798             Some(Poll::Pending) => Poll::Pending,
799             None => Poll::Ready(Ok(0)),
800         }
801     }
802 
poll_flush(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>803     fn poll_flush(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
804         match self.calls.pop_front() {
805             Some(Poll::Ready(Ok(Op::Flush))) => Poll::Ready(Ok(())),
806             Some(Poll::Ready(Ok(_))) => panic!(),
807             Some(Poll::Ready(Err(e))) => Poll::Ready(Err(e)),
808             Some(Poll::Pending) => Poll::Pending,
809             None => Poll::Ready(Ok(())),
810         }
811     }
812 
poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>813     fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
814         Poll::Ready(Ok(()))
815     }
816 }
817 
818 impl<'a> From<&'a [u8]> for Op {
from(src: &'a [u8]) -> Op819     fn from(src: &'a [u8]) -> Op {
820         Op::Data(src.into())
821     }
822 }
823 
824 impl From<Vec<u8>> for Op {
from(src: Vec<u8>) -> Op825     fn from(src: Vec<u8>) -> Op {
826         Op::Data(src)
827     }
828 }
829 
data(bytes: &[u8]) -> Poll<io::Result<Op>>830 fn data(bytes: &[u8]) -> Poll<io::Result<Op>> {
831     Poll::Ready(Ok(bytes.into()))
832 }
833 
flush() -> Poll<io::Result<Op>>834 fn flush() -> Poll<io::Result<Op>> {
835     Poll::Ready(Ok(Op::Flush))
836 }
837