1 use std::io::{BufRead, Read, Seek, Write}; 2 use tokio::io::{ 3 AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, 4 AsyncWriteExt, 5 }; 6 7 /// Use a [`tokio::io::AsyncRead`] synchronously as a [`std::io::Read`] or 8 /// a [`tokio::io::AsyncWrite`] as a [`std::io::Write`]. 9 #[derive(Debug)] 10 pub struct SyncIoBridge<T> { 11 src: T, 12 rt: tokio::runtime::Handle, 13 } 14 15 impl<T: AsyncBufRead + Unpin> BufRead for SyncIoBridge<T> { fill_buf(&mut self) -> std::io::Result<&[u8]>16 fn fill_buf(&mut self) -> std::io::Result<&[u8]> { 17 let src = &mut self.src; 18 self.rt.block_on(AsyncBufReadExt::fill_buf(src)) 19 } 20 consume(&mut self, amt: usize)21 fn consume(&mut self, amt: usize) { 22 let src = &mut self.src; 23 AsyncBufReadExt::consume(src, amt) 24 } 25 read_until(&mut self, byte: u8, buf: &mut Vec<u8>) -> std::io::Result<usize>26 fn read_until(&mut self, byte: u8, buf: &mut Vec<u8>) -> std::io::Result<usize> { 27 let src = &mut self.src; 28 self.rt 29 .block_on(AsyncBufReadExt::read_until(src, byte, buf)) 30 } read_line(&mut self, buf: &mut String) -> std::io::Result<usize>31 fn read_line(&mut self, buf: &mut String) -> std::io::Result<usize> { 32 let src = &mut self.src; 33 self.rt.block_on(AsyncBufReadExt::read_line(src, buf)) 34 } 35 } 36 37 impl<T: AsyncRead + Unpin> Read for SyncIoBridge<T> { read(&mut self, buf: &mut [u8]) -> std::io::Result<usize>38 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { 39 let src = &mut self.src; 40 self.rt.block_on(AsyncReadExt::read(src, buf)) 41 } 42 read_to_end(&mut self, buf: &mut Vec<u8>) -> std::io::Result<usize>43 fn read_to_end(&mut self, buf: &mut Vec<u8>) -> std::io::Result<usize> { 44 let src = &mut self.src; 45 self.rt.block_on(src.read_to_end(buf)) 46 } 47 read_to_string(&mut self, buf: &mut String) -> std::io::Result<usize>48 fn read_to_string(&mut self, buf: &mut String) -> std::io::Result<usize> { 49 let src = &mut self.src; 50 self.rt.block_on(src.read_to_string(buf)) 51 } 52 read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()>53 fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> { 54 let src = &mut self.src; 55 // The AsyncRead trait returns the count, synchronous doesn't. 56 let _n = self.rt.block_on(src.read_exact(buf))?; 57 Ok(()) 58 } 59 } 60 61 impl<T: AsyncWrite + Unpin> Write for SyncIoBridge<T> { write(&mut self, buf: &[u8]) -> std::io::Result<usize>62 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { 63 let src = &mut self.src; 64 self.rt.block_on(src.write(buf)) 65 } 66 flush(&mut self) -> std::io::Result<()>67 fn flush(&mut self) -> std::io::Result<()> { 68 let src = &mut self.src; 69 self.rt.block_on(src.flush()) 70 } 71 write_all(&mut self, buf: &[u8]) -> std::io::Result<()>72 fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> { 73 let src = &mut self.src; 74 self.rt.block_on(src.write_all(buf)) 75 } 76 write_vectored(&mut self, bufs: &[std::io::IoSlice<'_>]) -> std::io::Result<usize>77 fn write_vectored(&mut self, bufs: &[std::io::IoSlice<'_>]) -> std::io::Result<usize> { 78 let src = &mut self.src; 79 self.rt.block_on(src.write_vectored(bufs)) 80 } 81 } 82 83 impl<T: AsyncSeek + Unpin> Seek for SyncIoBridge<T> { seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64>84 fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> { 85 let src = &mut self.src; 86 self.rt.block_on(AsyncSeekExt::seek(src, pos)) 87 } 88 } 89 90 // Because https://doc.rust-lang.org/std/io/trait.Write.html#method.is_write_vectored is at the time 91 // of this writing still unstable, we expose this as part of a standalone method. 92 impl<T: AsyncWrite> SyncIoBridge<T> { 93 /// Determines if the underlying [`tokio::io::AsyncWrite`] target supports efficient vectored writes. 94 /// 95 /// See [`tokio::io::AsyncWrite::is_write_vectored`]. is_write_vectored(&self) -> bool96 pub fn is_write_vectored(&self) -> bool { 97 self.src.is_write_vectored() 98 } 99 } 100 101 impl<T: AsyncWrite + Unpin> SyncIoBridge<T> { 102 /// Shutdown this writer. This method provides a way to call the [`AsyncWriteExt::shutdown`] 103 /// function of the inner [`tokio::io::AsyncWrite`] instance. 104 /// 105 /// # Errors 106 /// 107 /// This method returns the same errors as [`AsyncWriteExt::shutdown`]. 108 /// 109 /// [`AsyncWriteExt::shutdown`]: tokio::io::AsyncWriteExt::shutdown shutdown(&mut self) -> std::io::Result<()>110 pub fn shutdown(&mut self) -> std::io::Result<()> { 111 let src = &mut self.src; 112 self.rt.block_on(src.shutdown()) 113 } 114 } 115 116 impl<T: Unpin> SyncIoBridge<T> { 117 /// Use a [`tokio::io::AsyncRead`] synchronously as a [`std::io::Read`] or 118 /// a [`tokio::io::AsyncWrite`] as a [`std::io::Write`]. 119 /// 120 /// When this struct is created, it captures a handle to the current thread's runtime with [`tokio::runtime::Handle::current`]. 121 /// It is hence OK to move this struct into a separate thread outside the runtime, as created 122 /// by e.g. [`tokio::task::spawn_blocking`]. 123 /// 124 /// Stated even more strongly: to make use of this bridge, you *must* move 125 /// it into a separate thread outside the runtime. The synchronous I/O will use the 126 /// underlying handle to block on the backing asynchronous source, via 127 /// [`tokio::runtime::Handle::block_on`]. As noted in the documentation for that 128 /// function, an attempt to `block_on` from an asynchronous execution context 129 /// will panic. 130 /// 131 /// # Wrapping `!Unpin` types 132 /// 133 /// Use e.g. `SyncIoBridge::new(Box::pin(src))`. 134 /// 135 /// # Panics 136 /// 137 /// This will panic if called outside the context of a Tokio runtime. 138 #[track_caller] new(src: T) -> Self139 pub fn new(src: T) -> Self { 140 Self::new_with_handle(src, tokio::runtime::Handle::current()) 141 } 142 143 /// Use a [`tokio::io::AsyncRead`] synchronously as a [`std::io::Read`] or 144 /// a [`tokio::io::AsyncWrite`] as a [`std::io::Write`]. 145 /// 146 /// This is the same as [`SyncIoBridge::new`], but allows passing an arbitrary handle and hence may 147 /// be initially invoked outside of an asynchronous context. new_with_handle(src: T, rt: tokio::runtime::Handle) -> Self148 pub fn new_with_handle(src: T, rt: tokio::runtime::Handle) -> Self { 149 Self { src, rt } 150 } 151 152 /// Consume this bridge, returning the underlying stream. into_inner(self) -> T153 pub fn into_inner(self) -> T { 154 self.src 155 } 156 } 157