1 #![warn(rust_2018_idioms)]
2
3 use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
4 use tokio_test::task;
5 use tokio_test::{
6 assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
7 };
8 use tokio_util::codec::*;
9
10 use bytes::{BufMut, Bytes, BytesMut};
11 use futures::{pin_mut, Sink, Stream};
12 use std::collections::VecDeque;
13 use std::io;
14 use std::pin::Pin;
15 use std::task::{Context, Poll};
16
17 macro_rules! mock {
18 ($($x:expr,)*) => {{
19 let mut v = VecDeque::new();
20 v.extend(vec![$($x),*]);
21 Mock { calls: v }
22 }};
23 }
24
25 macro_rules! assert_next_eq {
26 ($io:ident, $expect:expr) => {{
27 task::spawn(()).enter(|cx, _| {
28 let res = assert_ready!($io.as_mut().poll_next(cx));
29 match res {
30 Some(Ok(v)) => assert_eq!(v, $expect.as_ref()),
31 Some(Err(e)) => panic!("error = {:?}", e),
32 None => panic!("none"),
33 }
34 });
35 }};
36 }
37
38 macro_rules! assert_next_pending {
39 ($io:ident) => {{
40 task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) {
41 Poll::Ready(Some(Ok(v))) => panic!("value = {:?}", v),
42 Poll::Ready(Some(Err(e))) => panic!("error = {:?}", e),
43 Poll::Ready(None) => panic!("done"),
44 Poll::Pending => {}
45 });
46 }};
47 }
48
49 macro_rules! assert_next_err {
50 ($io:ident) => {{
51 task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) {
52 Poll::Ready(Some(Ok(v))) => panic!("value = {:?}", v),
53 Poll::Ready(Some(Err(_))) => {}
54 Poll::Ready(None) => panic!("done"),
55 Poll::Pending => panic!("pending"),
56 });
57 }};
58 }
59
60 macro_rules! assert_done {
61 ($io:ident) => {{
62 task::spawn(()).enter(|cx, _| {
63 let res = assert_ready!($io.as_mut().poll_next(cx));
64 match res {
65 Some(Ok(v)) => panic!("value = {:?}", v),
66 Some(Err(e)) => panic!("error = {:?}", e),
67 None => {}
68 }
69 });
70 }};
71 }
72
73 #[test]
read_empty_io_yields_nothing()74 fn read_empty_io_yields_nothing() {
75 let io = Box::pin(FramedRead::new(mock!(), LengthDelimitedCodec::new()));
76 pin_mut!(io);
77
78 assert_done!(io);
79 }
80
81 #[test]
read_single_frame_one_packet()82 fn read_single_frame_one_packet() {
83 let io = FramedRead::new(
84 mock! {
85 data(b"\x00\x00\x00\x09abcdefghi"),
86 },
87 LengthDelimitedCodec::new(),
88 );
89 pin_mut!(io);
90
91 assert_next_eq!(io, b"abcdefghi");
92 assert_done!(io);
93 }
94
95 #[test]
read_single_frame_one_packet_little_endian()96 fn read_single_frame_one_packet_little_endian() {
97 let io = length_delimited::Builder::new()
98 .little_endian()
99 .new_read(mock! {
100 data(b"\x09\x00\x00\x00abcdefghi"),
101 });
102 pin_mut!(io);
103
104 assert_next_eq!(io, b"abcdefghi");
105 assert_done!(io);
106 }
107
108 #[test]
read_single_frame_one_packet_native_endian()109 fn read_single_frame_one_packet_native_endian() {
110 let d = if cfg!(target_endian = "big") {
111 b"\x00\x00\x00\x09abcdefghi"
112 } else {
113 b"\x09\x00\x00\x00abcdefghi"
114 };
115 let io = length_delimited::Builder::new()
116 .native_endian()
117 .new_read(mock! {
118 data(d),
119 });
120 pin_mut!(io);
121
122 assert_next_eq!(io, b"abcdefghi");
123 assert_done!(io);
124 }
125
126 #[test]
read_single_multi_frame_one_packet()127 fn read_single_multi_frame_one_packet() {
128 let mut d: Vec<u8> = vec![];
129 d.extend_from_slice(b"\x00\x00\x00\x09abcdefghi");
130 d.extend_from_slice(b"\x00\x00\x00\x03123");
131 d.extend_from_slice(b"\x00\x00\x00\x0bhello world");
132
133 let io = FramedRead::new(
134 mock! {
135 data(&d),
136 },
137 LengthDelimitedCodec::new(),
138 );
139 pin_mut!(io);
140
141 assert_next_eq!(io, b"abcdefghi");
142 assert_next_eq!(io, b"123");
143 assert_next_eq!(io, b"hello world");
144 assert_done!(io);
145 }
146
147 #[test]
read_single_frame_multi_packet()148 fn read_single_frame_multi_packet() {
149 let io = FramedRead::new(
150 mock! {
151 data(b"\x00\x00"),
152 data(b"\x00\x09abc"),
153 data(b"defghi"),
154 },
155 LengthDelimitedCodec::new(),
156 );
157 pin_mut!(io);
158
159 assert_next_eq!(io, b"abcdefghi");
160 assert_done!(io);
161 }
162
163 #[test]
read_multi_frame_multi_packet()164 fn read_multi_frame_multi_packet() {
165 let io = FramedRead::new(
166 mock! {
167 data(b"\x00\x00"),
168 data(b"\x00\x09abc"),
169 data(b"defghi"),
170 data(b"\x00\x00\x00\x0312"),
171 data(b"3\x00\x00\x00\x0bhello world"),
172 },
173 LengthDelimitedCodec::new(),
174 );
175 pin_mut!(io);
176
177 assert_next_eq!(io, b"abcdefghi");
178 assert_next_eq!(io, b"123");
179 assert_next_eq!(io, b"hello world");
180 assert_done!(io);
181 }
182
183 #[test]
read_single_frame_multi_packet_wait()184 fn read_single_frame_multi_packet_wait() {
185 let io = FramedRead::new(
186 mock! {
187 data(b"\x00\x00"),
188 Poll::Pending,
189 data(b"\x00\x09abc"),
190 Poll::Pending,
191 data(b"defghi"),
192 Poll::Pending,
193 },
194 LengthDelimitedCodec::new(),
195 );
196 pin_mut!(io);
197
198 assert_next_pending!(io);
199 assert_next_pending!(io);
200 assert_next_eq!(io, b"abcdefghi");
201 assert_next_pending!(io);
202 assert_done!(io);
203 }
204
205 #[test]
read_multi_frame_multi_packet_wait()206 fn read_multi_frame_multi_packet_wait() {
207 let io = FramedRead::new(
208 mock! {
209 data(b"\x00\x00"),
210 Poll::Pending,
211 data(b"\x00\x09abc"),
212 Poll::Pending,
213 data(b"defghi"),
214 Poll::Pending,
215 data(b"\x00\x00\x00\x0312"),
216 Poll::Pending,
217 data(b"3\x00\x00\x00\x0bhello world"),
218 Poll::Pending,
219 },
220 LengthDelimitedCodec::new(),
221 );
222 pin_mut!(io);
223
224 assert_next_pending!(io);
225 assert_next_pending!(io);
226 assert_next_eq!(io, b"abcdefghi");
227 assert_next_pending!(io);
228 assert_next_pending!(io);
229 assert_next_eq!(io, b"123");
230 assert_next_eq!(io, b"hello world");
231 assert_next_pending!(io);
232 assert_done!(io);
233 }
234
235 #[test]
read_incomplete_head()236 fn read_incomplete_head() {
237 let io = FramedRead::new(
238 mock! {
239 data(b"\x00\x00"),
240 },
241 LengthDelimitedCodec::new(),
242 );
243 pin_mut!(io);
244
245 assert_next_err!(io);
246 }
247
248 #[test]
read_incomplete_head_multi()249 fn read_incomplete_head_multi() {
250 let io = FramedRead::new(
251 mock! {
252 Poll::Pending,
253 data(b"\x00"),
254 Poll::Pending,
255 },
256 LengthDelimitedCodec::new(),
257 );
258 pin_mut!(io);
259
260 assert_next_pending!(io);
261 assert_next_pending!(io);
262 assert_next_err!(io);
263 }
264
265 #[test]
read_incomplete_payload()266 fn read_incomplete_payload() {
267 let io = FramedRead::new(
268 mock! {
269 data(b"\x00\x00\x00\x09ab"),
270 Poll::Pending,
271 data(b"cd"),
272 Poll::Pending,
273 },
274 LengthDelimitedCodec::new(),
275 );
276 pin_mut!(io);
277
278 assert_next_pending!(io);
279 assert_next_pending!(io);
280 assert_next_err!(io);
281 }
282
283 #[test]
read_max_frame_len()284 fn read_max_frame_len() {
285 let io = length_delimited::Builder::new()
286 .max_frame_length(5)
287 .new_read(mock! {
288 data(b"\x00\x00\x00\x09abcdefghi"),
289 });
290 pin_mut!(io);
291
292 assert_next_err!(io);
293 }
294
295 #[test]
read_update_max_frame_len_at_rest()296 fn read_update_max_frame_len_at_rest() {
297 let io = length_delimited::Builder::new().new_read(mock! {
298 data(b"\x00\x00\x00\x09abcdefghi"),
299 data(b"\x00\x00\x00\x09abcdefghi"),
300 });
301 pin_mut!(io);
302
303 assert_next_eq!(io, b"abcdefghi");
304 io.decoder_mut().set_max_frame_length(5);
305 assert_next_err!(io);
306 }
307
308 #[test]
read_update_max_frame_len_in_flight()309 fn read_update_max_frame_len_in_flight() {
310 let io = length_delimited::Builder::new().new_read(mock! {
311 data(b"\x00\x00\x00\x09abcd"),
312 Poll::Pending,
313 data(b"efghi"),
314 data(b"\x00\x00\x00\x09abcdefghi"),
315 });
316 pin_mut!(io);
317
318 assert_next_pending!(io);
319 io.decoder_mut().set_max_frame_length(5);
320 assert_next_eq!(io, b"abcdefghi");
321 assert_next_err!(io);
322 }
323
324 #[test]
read_one_byte_length_field()325 fn read_one_byte_length_field() {
326 let io = length_delimited::Builder::new()
327 .length_field_length(1)
328 .new_read(mock! {
329 data(b"\x09abcdefghi"),
330 });
331 pin_mut!(io);
332
333 assert_next_eq!(io, b"abcdefghi");
334 assert_done!(io);
335 }
336
337 #[test]
read_header_offset()338 fn read_header_offset() {
339 let io = length_delimited::Builder::new()
340 .length_field_length(2)
341 .length_field_offset(4)
342 .new_read(mock! {
343 data(b"zzzz\x00\x09abcdefghi"),
344 });
345 pin_mut!(io);
346
347 assert_next_eq!(io, b"abcdefghi");
348 assert_done!(io);
349 }
350
351 #[test]
read_single_multi_frame_one_packet_skip_none_adjusted()352 fn read_single_multi_frame_one_packet_skip_none_adjusted() {
353 let mut d: Vec<u8> = vec![];
354 d.extend_from_slice(b"xx\x00\x09abcdefghi");
355 d.extend_from_slice(b"yy\x00\x03123");
356 d.extend_from_slice(b"zz\x00\x0bhello world");
357
358 let io = length_delimited::Builder::new()
359 .length_field_length(2)
360 .length_field_offset(2)
361 .num_skip(0)
362 .length_adjustment(4)
363 .new_read(mock! {
364 data(&d),
365 });
366 pin_mut!(io);
367
368 assert_next_eq!(io, b"xx\x00\x09abcdefghi");
369 assert_next_eq!(io, b"yy\x00\x03123");
370 assert_next_eq!(io, b"zz\x00\x0bhello world");
371 assert_done!(io);
372 }
373
374 #[test]
read_single_frame_length_adjusted()375 fn read_single_frame_length_adjusted() {
376 let mut d: Vec<u8> = vec![];
377 d.extend_from_slice(b"\x00\x00\x0b\x0cHello world");
378
379 let io = length_delimited::Builder::new()
380 .length_field_offset(0)
381 .length_field_length(3)
382 .length_adjustment(0)
383 .num_skip(4)
384 .new_read(mock! {
385 data(&d),
386 });
387 pin_mut!(io);
388
389 assert_next_eq!(io, b"Hello world");
390 assert_done!(io);
391 }
392
393 #[test]
read_single_multi_frame_one_packet_length_includes_head()394 fn read_single_multi_frame_one_packet_length_includes_head() {
395 let mut d: Vec<u8> = vec![];
396 d.extend_from_slice(b"\x00\x0babcdefghi");
397 d.extend_from_slice(b"\x00\x05123");
398 d.extend_from_slice(b"\x00\x0dhello world");
399
400 let io = length_delimited::Builder::new()
401 .length_field_length(2)
402 .length_adjustment(-2)
403 .new_read(mock! {
404 data(&d),
405 });
406 pin_mut!(io);
407
408 assert_next_eq!(io, b"abcdefghi");
409 assert_next_eq!(io, b"123");
410 assert_next_eq!(io, b"hello world");
411 assert_done!(io);
412 }
413
414 #[test]
write_single_frame_length_adjusted()415 fn write_single_frame_length_adjusted() {
416 let io = length_delimited::Builder::new()
417 .length_adjustment(-2)
418 .new_write(mock! {
419 data(b"\x00\x00\x00\x0b"),
420 data(b"abcdefghi"),
421 flush(),
422 });
423 pin_mut!(io);
424
425 task::spawn(()).enter(|cx, _| {
426 assert_ready_ok!(io.as_mut().poll_ready(cx));
427 assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
428 assert_ready_ok!(io.as_mut().poll_flush(cx));
429 assert!(io.get_ref().calls.is_empty());
430 });
431 }
432
433 #[test]
write_nothing_yields_nothing()434 fn write_nothing_yields_nothing() {
435 let io = FramedWrite::new(mock!(), LengthDelimitedCodec::new());
436 pin_mut!(io);
437
438 task::spawn(()).enter(|cx, _| {
439 assert_ready_ok!(io.poll_flush(cx));
440 });
441 }
442
443 #[test]
write_single_frame_one_packet()444 fn write_single_frame_one_packet() {
445 let io = FramedWrite::new(
446 mock! {
447 data(b"\x00\x00\x00\x09"),
448 data(b"abcdefghi"),
449 flush(),
450 },
451 LengthDelimitedCodec::new(),
452 );
453 pin_mut!(io);
454
455 task::spawn(()).enter(|cx, _| {
456 assert_ready_ok!(io.as_mut().poll_ready(cx));
457 assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
458 assert_ready_ok!(io.as_mut().poll_flush(cx));
459 assert!(io.get_ref().calls.is_empty());
460 });
461 }
462
463 #[test]
write_single_multi_frame_one_packet()464 fn write_single_multi_frame_one_packet() {
465 let io = FramedWrite::new(
466 mock! {
467 data(b"\x00\x00\x00\x09"),
468 data(b"abcdefghi"),
469 data(b"\x00\x00\x00\x03"),
470 data(b"123"),
471 data(b"\x00\x00\x00\x0b"),
472 data(b"hello world"),
473 flush(),
474 },
475 LengthDelimitedCodec::new(),
476 );
477 pin_mut!(io);
478
479 task::spawn(()).enter(|cx, _| {
480 assert_ready_ok!(io.as_mut().poll_ready(cx));
481 assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
482
483 assert_ready_ok!(io.as_mut().poll_ready(cx));
484 assert_ok!(io.as_mut().start_send(Bytes::from("123")));
485
486 assert_ready_ok!(io.as_mut().poll_ready(cx));
487 assert_ok!(io.as_mut().start_send(Bytes::from("hello world")));
488
489 assert_ready_ok!(io.as_mut().poll_flush(cx));
490 assert!(io.get_ref().calls.is_empty());
491 });
492 }
493
494 #[test]
write_single_multi_frame_multi_packet()495 fn write_single_multi_frame_multi_packet() {
496 let io = FramedWrite::new(
497 mock! {
498 data(b"\x00\x00\x00\x09"),
499 data(b"abcdefghi"),
500 flush(),
501 data(b"\x00\x00\x00\x03"),
502 data(b"123"),
503 flush(),
504 data(b"\x00\x00\x00\x0b"),
505 data(b"hello world"),
506 flush(),
507 },
508 LengthDelimitedCodec::new(),
509 );
510 pin_mut!(io);
511
512 task::spawn(()).enter(|cx, _| {
513 assert_ready_ok!(io.as_mut().poll_ready(cx));
514 assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
515
516 assert_ready_ok!(io.as_mut().poll_flush(cx));
517
518 assert_ready_ok!(io.as_mut().poll_ready(cx));
519 assert_ok!(io.as_mut().start_send(Bytes::from("123")));
520
521 assert_ready_ok!(io.as_mut().poll_flush(cx));
522
523 assert_ready_ok!(io.as_mut().poll_ready(cx));
524 assert_ok!(io.as_mut().start_send(Bytes::from("hello world")));
525
526 assert_ready_ok!(io.as_mut().poll_flush(cx));
527 assert!(io.get_ref().calls.is_empty());
528 });
529 }
530
531 #[test]
write_single_frame_would_block()532 fn write_single_frame_would_block() {
533 let io = FramedWrite::new(
534 mock! {
535 Poll::Pending,
536 data(b"\x00\x00"),
537 Poll::Pending,
538 data(b"\x00\x09"),
539 data(b"abcdefghi"),
540 flush(),
541 },
542 LengthDelimitedCodec::new(),
543 );
544 pin_mut!(io);
545
546 task::spawn(()).enter(|cx, _| {
547 assert_ready_ok!(io.as_mut().poll_ready(cx));
548 assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
549
550 assert_pending!(io.as_mut().poll_flush(cx));
551 assert_pending!(io.as_mut().poll_flush(cx));
552 assert_ready_ok!(io.as_mut().poll_flush(cx));
553
554 assert!(io.get_ref().calls.is_empty());
555 });
556 }
557
558 #[test]
write_single_frame_little_endian()559 fn write_single_frame_little_endian() {
560 let io = length_delimited::Builder::new()
561 .little_endian()
562 .new_write(mock! {
563 data(b"\x09\x00\x00\x00"),
564 data(b"abcdefghi"),
565 flush(),
566 });
567 pin_mut!(io);
568
569 task::spawn(()).enter(|cx, _| {
570 assert_ready_ok!(io.as_mut().poll_ready(cx));
571 assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
572
573 assert_ready_ok!(io.as_mut().poll_flush(cx));
574 assert!(io.get_ref().calls.is_empty());
575 });
576 }
577
578 #[test]
write_single_frame_with_short_length_field()579 fn write_single_frame_with_short_length_field() {
580 let io = length_delimited::Builder::new()
581 .length_field_length(1)
582 .new_write(mock! {
583 data(b"\x09"),
584 data(b"abcdefghi"),
585 flush(),
586 });
587 pin_mut!(io);
588
589 task::spawn(()).enter(|cx, _| {
590 assert_ready_ok!(io.as_mut().poll_ready(cx));
591 assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
592
593 assert_ready_ok!(io.as_mut().poll_flush(cx));
594
595 assert!(io.get_ref().calls.is_empty());
596 });
597 }
598
599 #[test]
write_max_frame_len()600 fn write_max_frame_len() {
601 let io = length_delimited::Builder::new()
602 .max_frame_length(5)
603 .new_write(mock! {});
604 pin_mut!(io);
605
606 task::spawn(()).enter(|cx, _| {
607 assert_ready_ok!(io.as_mut().poll_ready(cx));
608 assert_err!(io.as_mut().start_send(Bytes::from("abcdef")));
609
610 assert!(io.get_ref().calls.is_empty());
611 });
612 }
613
614 #[test]
write_update_max_frame_len_at_rest()615 fn write_update_max_frame_len_at_rest() {
616 let io = length_delimited::Builder::new().new_write(mock! {
617 data(b"\x00\x00\x00\x06"),
618 data(b"abcdef"),
619 flush(),
620 });
621 pin_mut!(io);
622
623 task::spawn(()).enter(|cx, _| {
624 assert_ready_ok!(io.as_mut().poll_ready(cx));
625 assert_ok!(io.as_mut().start_send(Bytes::from("abcdef")));
626
627 assert_ready_ok!(io.as_mut().poll_flush(cx));
628
629 io.encoder_mut().set_max_frame_length(5);
630
631 assert_err!(io.as_mut().start_send(Bytes::from("abcdef")));
632
633 assert!(io.get_ref().calls.is_empty());
634 });
635 }
636
637 #[test]
write_update_max_frame_len_in_flight()638 fn write_update_max_frame_len_in_flight() {
639 let io = length_delimited::Builder::new().new_write(mock! {
640 data(b"\x00\x00\x00\x06"),
641 data(b"ab"),
642 Poll::Pending,
643 data(b"cdef"),
644 flush(),
645 });
646 pin_mut!(io);
647
648 task::spawn(()).enter(|cx, _| {
649 assert_ready_ok!(io.as_mut().poll_ready(cx));
650 assert_ok!(io.as_mut().start_send(Bytes::from("abcdef")));
651
652 assert_pending!(io.as_mut().poll_flush(cx));
653
654 io.encoder_mut().set_max_frame_length(5);
655
656 assert_ready_ok!(io.as_mut().poll_flush(cx));
657
658 assert_err!(io.as_mut().start_send(Bytes::from("abcdef")));
659 assert!(io.get_ref().calls.is_empty());
660 });
661 }
662
663 #[test]
write_zero()664 fn write_zero() {
665 let io = length_delimited::Builder::new().new_write(mock! {});
666 pin_mut!(io);
667
668 task::spawn(()).enter(|cx, _| {
669 assert_ready_ok!(io.as_mut().poll_ready(cx));
670 assert_ok!(io.as_mut().start_send(Bytes::from("abcdef")));
671
672 assert_ready_err!(io.as_mut().poll_flush(cx));
673
674 assert!(io.get_ref().calls.is_empty());
675 });
676 }
677
678 #[test]
encode_overflow()679 fn encode_overflow() {
680 // Test reproducing tokio-rs/tokio#681.
681 let mut codec = length_delimited::Builder::new().new_codec();
682 let mut buf = BytesMut::with_capacity(1024);
683
684 // Put some data into the buffer without resizing it to hold more.
685 let some_as = std::iter::repeat(b'a').take(1024).collect::<Vec<_>>();
686 buf.put_slice(&some_as[..]);
687
688 // Trying to encode the length header should resize the buffer if it won't fit.
689 codec.encode(Bytes::from("hello"), &mut buf).unwrap();
690 }
691
692 #[test]
frame_does_not_fit()693 fn frame_does_not_fit() {
694 let codec = LengthDelimitedCodec::builder()
695 .length_field_length(1)
696 .max_frame_length(256)
697 .new_codec();
698
699 assert_eq!(codec.max_frame_length(), 255);
700 }
701
702 #[test]
neg_adjusted_frame_does_not_fit()703 fn neg_adjusted_frame_does_not_fit() {
704 let codec = LengthDelimitedCodec::builder()
705 .length_field_length(1)
706 .length_adjustment(-1)
707 .new_codec();
708
709 assert_eq!(codec.max_frame_length(), 254);
710 }
711
712 #[test]
pos_adjusted_frame_does_not_fit()713 fn pos_adjusted_frame_does_not_fit() {
714 let codec = LengthDelimitedCodec::builder()
715 .length_field_length(1)
716 .length_adjustment(1)
717 .new_codec();
718
719 assert_eq!(codec.max_frame_length(), 256);
720 }
721
722 #[test]
max_allowed_frame_fits()723 fn max_allowed_frame_fits() {
724 let codec = LengthDelimitedCodec::builder()
725 .length_field_length(std::mem::size_of::<usize>())
726 .max_frame_length(usize::MAX)
727 .new_codec();
728
729 assert_eq!(codec.max_frame_length(), usize::MAX);
730 }
731
732 #[test]
smaller_frame_len_not_adjusted()733 fn smaller_frame_len_not_adjusted() {
734 let codec = LengthDelimitedCodec::builder()
735 .max_frame_length(10)
736 .length_field_length(std::mem::size_of::<usize>())
737 .new_codec();
738
739 assert_eq!(codec.max_frame_length(), 10);
740 }
741
742 #[test]
max_allowed_length_field()743 fn max_allowed_length_field() {
744 let codec = LengthDelimitedCodec::builder()
745 .length_field_length(8)
746 .max_frame_length(usize::MAX)
747 .new_codec();
748
749 assert_eq!(codec.max_frame_length(), usize::MAX);
750 }
751
752 // ===== Test utils =====
753
754 struct Mock {
755 calls: VecDeque<Poll<io::Result<Op>>>,
756 }
757
758 enum Op {
759 Data(Vec<u8>),
760 Flush,
761 }
762
763 impl AsyncRead for Mock {
poll_read( mut self: Pin<&mut Self>, _cx: &mut Context<'_>, dst: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>764 fn poll_read(
765 mut self: Pin<&mut Self>,
766 _cx: &mut Context<'_>,
767 dst: &mut ReadBuf<'_>,
768 ) -> Poll<io::Result<()>> {
769 match self.calls.pop_front() {
770 Some(Poll::Ready(Ok(Op::Data(data)))) => {
771 debug_assert!(dst.remaining() >= data.len());
772 dst.put_slice(&data);
773 Poll::Ready(Ok(()))
774 }
775 Some(Poll::Ready(Ok(_))) => panic!(),
776 Some(Poll::Ready(Err(e))) => Poll::Ready(Err(e)),
777 Some(Poll::Pending) => Poll::Pending,
778 None => Poll::Ready(Ok(())),
779 }
780 }
781 }
782
783 impl AsyncWrite for Mock {
poll_write( mut self: Pin<&mut Self>, _cx: &mut Context<'_>, src: &[u8], ) -> Poll<Result<usize, io::Error>>784 fn poll_write(
785 mut self: Pin<&mut Self>,
786 _cx: &mut Context<'_>,
787 src: &[u8],
788 ) -> Poll<Result<usize, io::Error>> {
789 match self.calls.pop_front() {
790 Some(Poll::Ready(Ok(Op::Data(data)))) => {
791 let len = data.len();
792 assert!(src.len() >= len, "expect={data:?}; actual={src:?}");
793 assert_eq!(&data[..], &src[..len]);
794 Poll::Ready(Ok(len))
795 }
796 Some(Poll::Ready(Ok(_))) => panic!(),
797 Some(Poll::Ready(Err(e))) => Poll::Ready(Err(e)),
798 Some(Poll::Pending) => Poll::Pending,
799 None => Poll::Ready(Ok(0)),
800 }
801 }
802
poll_flush(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>803 fn poll_flush(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
804 match self.calls.pop_front() {
805 Some(Poll::Ready(Ok(Op::Flush))) => Poll::Ready(Ok(())),
806 Some(Poll::Ready(Ok(_))) => panic!(),
807 Some(Poll::Ready(Err(e))) => Poll::Ready(Err(e)),
808 Some(Poll::Pending) => Poll::Pending,
809 None => Poll::Ready(Ok(())),
810 }
811 }
812
poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>813 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
814 Poll::Ready(Ok(()))
815 }
816 }
817
818 impl<'a> From<&'a [u8]> for Op {
from(src: &'a [u8]) -> Op819 fn from(src: &'a [u8]) -> Op {
820 Op::Data(src.into())
821 }
822 }
823
824 impl From<Vec<u8>> for Op {
from(src: Vec<u8>) -> Op825 fn from(src: Vec<u8>) -> Op {
826 Op::Data(src)
827 }
828 }
829
data(bytes: &[u8]) -> Poll<io::Result<Op>>830 fn data(bytes: &[u8]) -> Poll<io::Result<Op>> {
831 Poll::Ready(Ok(bytes.into()))
832 }
833
flush() -> Poll<io::Result<Op>>834 fn flush() -> Poll<io::Result<Op>> {
835 Poll::Ready(Ok(Op::Flush))
836 }
837