1*cda5da8dSAndroid Build Coastguard Worker__all__ = ( 2*cda5da8dSAndroid Build Coastguard Worker 'StreamReader', 'StreamWriter', 'StreamReaderProtocol', 3*cda5da8dSAndroid Build Coastguard Worker 'open_connection', 'start_server') 4*cda5da8dSAndroid Build Coastguard Worker 5*cda5da8dSAndroid Build Coastguard Workerimport collections 6*cda5da8dSAndroid Build Coastguard Workerimport socket 7*cda5da8dSAndroid Build Coastguard Workerimport sys 8*cda5da8dSAndroid Build Coastguard Workerimport warnings 9*cda5da8dSAndroid Build Coastguard Workerimport weakref 10*cda5da8dSAndroid Build Coastguard Worker 11*cda5da8dSAndroid Build Coastguard Workerif hasattr(socket, 'AF_UNIX'): 12*cda5da8dSAndroid Build Coastguard Worker __all__ += ('open_unix_connection', 'start_unix_server') 13*cda5da8dSAndroid Build Coastguard Worker 14*cda5da8dSAndroid Build Coastguard Workerfrom . import coroutines 15*cda5da8dSAndroid Build Coastguard Workerfrom . import events 16*cda5da8dSAndroid Build Coastguard Workerfrom . import exceptions 17*cda5da8dSAndroid Build Coastguard Workerfrom . import format_helpers 18*cda5da8dSAndroid Build Coastguard Workerfrom . import protocols 19*cda5da8dSAndroid Build Coastguard Workerfrom .log import logger 20*cda5da8dSAndroid Build Coastguard Workerfrom .tasks import sleep 21*cda5da8dSAndroid Build Coastguard Worker 22*cda5da8dSAndroid Build Coastguard Worker 23*cda5da8dSAndroid Build Coastguard Worker_DEFAULT_LIMIT = 2 ** 16 # 64 KiB 24*cda5da8dSAndroid Build Coastguard Worker 25*cda5da8dSAndroid Build Coastguard Worker 26*cda5da8dSAndroid Build Coastguard Workerasync def open_connection(host=None, port=None, *, 27*cda5da8dSAndroid Build Coastguard Worker limit=_DEFAULT_LIMIT, **kwds): 28*cda5da8dSAndroid Build Coastguard Worker """A wrapper for create_connection() returning a (reader, writer) pair. 29*cda5da8dSAndroid Build Coastguard Worker 30*cda5da8dSAndroid Build Coastguard Worker The reader returned is a StreamReader instance; the writer is a 31*cda5da8dSAndroid Build Coastguard Worker StreamWriter instance. 32*cda5da8dSAndroid Build Coastguard Worker 33*cda5da8dSAndroid Build Coastguard Worker The arguments are all the usual arguments to create_connection() 34*cda5da8dSAndroid Build Coastguard Worker except protocol_factory; most common are positional host and port, 35*cda5da8dSAndroid Build Coastguard Worker with various optional keyword arguments following. 36*cda5da8dSAndroid Build Coastguard Worker 37*cda5da8dSAndroid Build Coastguard Worker Additional optional keyword arguments are loop (to set the event loop 38*cda5da8dSAndroid Build Coastguard Worker instance to use) and limit (to set the buffer limit passed to the 39*cda5da8dSAndroid Build Coastguard Worker StreamReader). 40*cda5da8dSAndroid Build Coastguard Worker 41*cda5da8dSAndroid Build Coastguard Worker (If you want to customize the StreamReader and/or 42*cda5da8dSAndroid Build Coastguard Worker StreamReaderProtocol classes, just copy the code -- there's 43*cda5da8dSAndroid Build Coastguard Worker really nothing special here except some convenience.) 44*cda5da8dSAndroid Build Coastguard Worker """ 45*cda5da8dSAndroid Build Coastguard Worker loop = events.get_running_loop() 46*cda5da8dSAndroid Build Coastguard Worker reader = StreamReader(limit=limit, loop=loop) 47*cda5da8dSAndroid Build Coastguard Worker protocol = StreamReaderProtocol(reader, loop=loop) 48*cda5da8dSAndroid Build Coastguard Worker transport, _ = await loop.create_connection( 49*cda5da8dSAndroid Build Coastguard Worker lambda: protocol, host, port, **kwds) 50*cda5da8dSAndroid Build Coastguard Worker writer = StreamWriter(transport, protocol, reader, loop) 51*cda5da8dSAndroid Build Coastguard Worker return reader, writer 52*cda5da8dSAndroid Build Coastguard Worker 53*cda5da8dSAndroid Build Coastguard Worker 54*cda5da8dSAndroid Build Coastguard Workerasync def start_server(client_connected_cb, host=None, port=None, *, 55*cda5da8dSAndroid Build Coastguard Worker limit=_DEFAULT_LIMIT, **kwds): 56*cda5da8dSAndroid Build Coastguard Worker """Start a socket server, call back for each client connected. 57*cda5da8dSAndroid Build Coastguard Worker 58*cda5da8dSAndroid Build Coastguard Worker The first parameter, `client_connected_cb`, takes two parameters: 59*cda5da8dSAndroid Build Coastguard Worker client_reader, client_writer. client_reader is a StreamReader 60*cda5da8dSAndroid Build Coastguard Worker object, while client_writer is a StreamWriter object. This 61*cda5da8dSAndroid Build Coastguard Worker parameter can either be a plain callback function or a coroutine; 62*cda5da8dSAndroid Build Coastguard Worker if it is a coroutine, it will be automatically converted into a 63*cda5da8dSAndroid Build Coastguard Worker Task. 64*cda5da8dSAndroid Build Coastguard Worker 65*cda5da8dSAndroid Build Coastguard Worker The rest of the arguments are all the usual arguments to 66*cda5da8dSAndroid Build Coastguard Worker loop.create_server() except protocol_factory; most common are 67*cda5da8dSAndroid Build Coastguard Worker positional host and port, with various optional keyword arguments 68*cda5da8dSAndroid Build Coastguard Worker following. The return value is the same as loop.create_server(). 69*cda5da8dSAndroid Build Coastguard Worker 70*cda5da8dSAndroid Build Coastguard Worker Additional optional keyword arguments are loop (to set the event loop 71*cda5da8dSAndroid Build Coastguard Worker instance to use) and limit (to set the buffer limit passed to the 72*cda5da8dSAndroid Build Coastguard Worker StreamReader). 73*cda5da8dSAndroid Build Coastguard Worker 74*cda5da8dSAndroid Build Coastguard Worker The return value is the same as loop.create_server(), i.e. a 75*cda5da8dSAndroid Build Coastguard Worker Server object which can be used to stop the service. 76*cda5da8dSAndroid Build Coastguard Worker """ 77*cda5da8dSAndroid Build Coastguard Worker loop = events.get_running_loop() 78*cda5da8dSAndroid Build Coastguard Worker 79*cda5da8dSAndroid Build Coastguard Worker def factory(): 80*cda5da8dSAndroid Build Coastguard Worker reader = StreamReader(limit=limit, loop=loop) 81*cda5da8dSAndroid Build Coastguard Worker protocol = StreamReaderProtocol(reader, client_connected_cb, 82*cda5da8dSAndroid Build Coastguard Worker loop=loop) 83*cda5da8dSAndroid Build Coastguard Worker return protocol 84*cda5da8dSAndroid Build Coastguard Worker 85*cda5da8dSAndroid Build Coastguard Worker return await loop.create_server(factory, host, port, **kwds) 86*cda5da8dSAndroid Build Coastguard Worker 87*cda5da8dSAndroid Build Coastguard Worker 88*cda5da8dSAndroid Build Coastguard Workerif hasattr(socket, 'AF_UNIX'): 89*cda5da8dSAndroid Build Coastguard Worker # UNIX Domain Sockets are supported on this platform 90*cda5da8dSAndroid Build Coastguard Worker 91*cda5da8dSAndroid Build Coastguard Worker async def open_unix_connection(path=None, *, 92*cda5da8dSAndroid Build Coastguard Worker limit=_DEFAULT_LIMIT, **kwds): 93*cda5da8dSAndroid Build Coastguard Worker """Similar to `open_connection` but works with UNIX Domain Sockets.""" 94*cda5da8dSAndroid Build Coastguard Worker loop = events.get_running_loop() 95*cda5da8dSAndroid Build Coastguard Worker 96*cda5da8dSAndroid Build Coastguard Worker reader = StreamReader(limit=limit, loop=loop) 97*cda5da8dSAndroid Build Coastguard Worker protocol = StreamReaderProtocol(reader, loop=loop) 98*cda5da8dSAndroid Build Coastguard Worker transport, _ = await loop.create_unix_connection( 99*cda5da8dSAndroid Build Coastguard Worker lambda: protocol, path, **kwds) 100*cda5da8dSAndroid Build Coastguard Worker writer = StreamWriter(transport, protocol, reader, loop) 101*cda5da8dSAndroid Build Coastguard Worker return reader, writer 102*cda5da8dSAndroid Build Coastguard Worker 103*cda5da8dSAndroid Build Coastguard Worker async def start_unix_server(client_connected_cb, path=None, *, 104*cda5da8dSAndroid Build Coastguard Worker limit=_DEFAULT_LIMIT, **kwds): 105*cda5da8dSAndroid Build Coastguard Worker """Similar to `start_server` but works with UNIX Domain Sockets.""" 106*cda5da8dSAndroid Build Coastguard Worker loop = events.get_running_loop() 107*cda5da8dSAndroid Build Coastguard Worker 108*cda5da8dSAndroid Build Coastguard Worker def factory(): 109*cda5da8dSAndroid Build Coastguard Worker reader = StreamReader(limit=limit, loop=loop) 110*cda5da8dSAndroid Build Coastguard Worker protocol = StreamReaderProtocol(reader, client_connected_cb, 111*cda5da8dSAndroid Build Coastguard Worker loop=loop) 112*cda5da8dSAndroid Build Coastguard Worker return protocol 113*cda5da8dSAndroid Build Coastguard Worker 114*cda5da8dSAndroid Build Coastguard Worker return await loop.create_unix_server(factory, path, **kwds) 115*cda5da8dSAndroid Build Coastguard Worker 116*cda5da8dSAndroid Build Coastguard Worker 117*cda5da8dSAndroid Build Coastguard Workerclass FlowControlMixin(protocols.Protocol): 118*cda5da8dSAndroid Build Coastguard Worker """Reusable flow control logic for StreamWriter.drain(). 119*cda5da8dSAndroid Build Coastguard Worker 120*cda5da8dSAndroid Build Coastguard Worker This implements the protocol methods pause_writing(), 121*cda5da8dSAndroid Build Coastguard Worker resume_writing() and connection_lost(). If the subclass overrides 122*cda5da8dSAndroid Build Coastguard Worker these it must call the super methods. 123*cda5da8dSAndroid Build Coastguard Worker 124*cda5da8dSAndroid Build Coastguard Worker StreamWriter.drain() must wait for _drain_helper() coroutine. 125*cda5da8dSAndroid Build Coastguard Worker """ 126*cda5da8dSAndroid Build Coastguard Worker 127*cda5da8dSAndroid Build Coastguard Worker def __init__(self, loop=None): 128*cda5da8dSAndroid Build Coastguard Worker if loop is None: 129*cda5da8dSAndroid Build Coastguard Worker self._loop = events._get_event_loop(stacklevel=4) 130*cda5da8dSAndroid Build Coastguard Worker else: 131*cda5da8dSAndroid Build Coastguard Worker self._loop = loop 132*cda5da8dSAndroid Build Coastguard Worker self._paused = False 133*cda5da8dSAndroid Build Coastguard Worker self._drain_waiters = collections.deque() 134*cda5da8dSAndroid Build Coastguard Worker self._connection_lost = False 135*cda5da8dSAndroid Build Coastguard Worker 136*cda5da8dSAndroid Build Coastguard Worker def pause_writing(self): 137*cda5da8dSAndroid Build Coastguard Worker assert not self._paused 138*cda5da8dSAndroid Build Coastguard Worker self._paused = True 139*cda5da8dSAndroid Build Coastguard Worker if self._loop.get_debug(): 140*cda5da8dSAndroid Build Coastguard Worker logger.debug("%r pauses writing", self) 141*cda5da8dSAndroid Build Coastguard Worker 142*cda5da8dSAndroid Build Coastguard Worker def resume_writing(self): 143*cda5da8dSAndroid Build Coastguard Worker assert self._paused 144*cda5da8dSAndroid Build Coastguard Worker self._paused = False 145*cda5da8dSAndroid Build Coastguard Worker if self._loop.get_debug(): 146*cda5da8dSAndroid Build Coastguard Worker logger.debug("%r resumes writing", self) 147*cda5da8dSAndroid Build Coastguard Worker 148*cda5da8dSAndroid Build Coastguard Worker for waiter in self._drain_waiters: 149*cda5da8dSAndroid Build Coastguard Worker if not waiter.done(): 150*cda5da8dSAndroid Build Coastguard Worker waiter.set_result(None) 151*cda5da8dSAndroid Build Coastguard Worker 152*cda5da8dSAndroid Build Coastguard Worker def connection_lost(self, exc): 153*cda5da8dSAndroid Build Coastguard Worker self._connection_lost = True 154*cda5da8dSAndroid Build Coastguard Worker # Wake up the writer(s) if currently paused. 155*cda5da8dSAndroid Build Coastguard Worker if not self._paused: 156*cda5da8dSAndroid Build Coastguard Worker return 157*cda5da8dSAndroid Build Coastguard Worker 158*cda5da8dSAndroid Build Coastguard Worker for waiter in self._drain_waiters: 159*cda5da8dSAndroid Build Coastguard Worker if not waiter.done(): 160*cda5da8dSAndroid Build Coastguard Worker if exc is None: 161*cda5da8dSAndroid Build Coastguard Worker waiter.set_result(None) 162*cda5da8dSAndroid Build Coastguard Worker else: 163*cda5da8dSAndroid Build Coastguard Worker waiter.set_exception(exc) 164*cda5da8dSAndroid Build Coastguard Worker 165*cda5da8dSAndroid Build Coastguard Worker async def _drain_helper(self): 166*cda5da8dSAndroid Build Coastguard Worker if self._connection_lost: 167*cda5da8dSAndroid Build Coastguard Worker raise ConnectionResetError('Connection lost') 168*cda5da8dSAndroid Build Coastguard Worker if not self._paused: 169*cda5da8dSAndroid Build Coastguard Worker return 170*cda5da8dSAndroid Build Coastguard Worker waiter = self._loop.create_future() 171*cda5da8dSAndroid Build Coastguard Worker self._drain_waiters.append(waiter) 172*cda5da8dSAndroid Build Coastguard Worker try: 173*cda5da8dSAndroid Build Coastguard Worker await waiter 174*cda5da8dSAndroid Build Coastguard Worker finally: 175*cda5da8dSAndroid Build Coastguard Worker self._drain_waiters.remove(waiter) 176*cda5da8dSAndroid Build Coastguard Worker 177*cda5da8dSAndroid Build Coastguard Worker def _get_close_waiter(self, stream): 178*cda5da8dSAndroid Build Coastguard Worker raise NotImplementedError 179*cda5da8dSAndroid Build Coastguard Worker 180*cda5da8dSAndroid Build Coastguard Worker 181*cda5da8dSAndroid Build Coastguard Workerclass StreamReaderProtocol(FlowControlMixin, protocols.Protocol): 182*cda5da8dSAndroid Build Coastguard Worker """Helper class to adapt between Protocol and StreamReader. 183*cda5da8dSAndroid Build Coastguard Worker 184*cda5da8dSAndroid Build Coastguard Worker (This is a helper class instead of making StreamReader itself a 185*cda5da8dSAndroid Build Coastguard Worker Protocol subclass, because the StreamReader has other potential 186*cda5da8dSAndroid Build Coastguard Worker uses, and to prevent the user of the StreamReader to accidentally 187*cda5da8dSAndroid Build Coastguard Worker call inappropriate methods of the protocol.) 188*cda5da8dSAndroid Build Coastguard Worker """ 189*cda5da8dSAndroid Build Coastguard Worker 190*cda5da8dSAndroid Build Coastguard Worker _source_traceback = None 191*cda5da8dSAndroid Build Coastguard Worker 192*cda5da8dSAndroid Build Coastguard Worker def __init__(self, stream_reader, client_connected_cb=None, loop=None): 193*cda5da8dSAndroid Build Coastguard Worker super().__init__(loop=loop) 194*cda5da8dSAndroid Build Coastguard Worker if stream_reader is not None: 195*cda5da8dSAndroid Build Coastguard Worker self._stream_reader_wr = weakref.ref(stream_reader) 196*cda5da8dSAndroid Build Coastguard Worker self._source_traceback = stream_reader._source_traceback 197*cda5da8dSAndroid Build Coastguard Worker else: 198*cda5da8dSAndroid Build Coastguard Worker self._stream_reader_wr = None 199*cda5da8dSAndroid Build Coastguard Worker if client_connected_cb is not None: 200*cda5da8dSAndroid Build Coastguard Worker # This is a stream created by the `create_server()` function. 201*cda5da8dSAndroid Build Coastguard Worker # Keep a strong reference to the reader until a connection 202*cda5da8dSAndroid Build Coastguard Worker # is established. 203*cda5da8dSAndroid Build Coastguard Worker self._strong_reader = stream_reader 204*cda5da8dSAndroid Build Coastguard Worker self._reject_connection = False 205*cda5da8dSAndroid Build Coastguard Worker self._stream_writer = None 206*cda5da8dSAndroid Build Coastguard Worker self._task = None 207*cda5da8dSAndroid Build Coastguard Worker self._transport = None 208*cda5da8dSAndroid Build Coastguard Worker self._client_connected_cb = client_connected_cb 209*cda5da8dSAndroid Build Coastguard Worker self._over_ssl = False 210*cda5da8dSAndroid Build Coastguard Worker self._closed = self._loop.create_future() 211*cda5da8dSAndroid Build Coastguard Worker 212*cda5da8dSAndroid Build Coastguard Worker @property 213*cda5da8dSAndroid Build Coastguard Worker def _stream_reader(self): 214*cda5da8dSAndroid Build Coastguard Worker if self._stream_reader_wr is None: 215*cda5da8dSAndroid Build Coastguard Worker return None 216*cda5da8dSAndroid Build Coastguard Worker return self._stream_reader_wr() 217*cda5da8dSAndroid Build Coastguard Worker 218*cda5da8dSAndroid Build Coastguard Worker def _replace_writer(self, writer): 219*cda5da8dSAndroid Build Coastguard Worker loop = self._loop 220*cda5da8dSAndroid Build Coastguard Worker transport = writer.transport 221*cda5da8dSAndroid Build Coastguard Worker self._stream_writer = writer 222*cda5da8dSAndroid Build Coastguard Worker self._transport = transport 223*cda5da8dSAndroid Build Coastguard Worker self._over_ssl = transport.get_extra_info('sslcontext') is not None 224*cda5da8dSAndroid Build Coastguard Worker 225*cda5da8dSAndroid Build Coastguard Worker def connection_made(self, transport): 226*cda5da8dSAndroid Build Coastguard Worker if self._reject_connection: 227*cda5da8dSAndroid Build Coastguard Worker context = { 228*cda5da8dSAndroid Build Coastguard Worker 'message': ('An open stream was garbage collected prior to ' 229*cda5da8dSAndroid Build Coastguard Worker 'establishing network connection; ' 230*cda5da8dSAndroid Build Coastguard Worker 'call "stream.close()" explicitly.') 231*cda5da8dSAndroid Build Coastguard Worker } 232*cda5da8dSAndroid Build Coastguard Worker if self._source_traceback: 233*cda5da8dSAndroid Build Coastguard Worker context['source_traceback'] = self._source_traceback 234*cda5da8dSAndroid Build Coastguard Worker self._loop.call_exception_handler(context) 235*cda5da8dSAndroid Build Coastguard Worker transport.abort() 236*cda5da8dSAndroid Build Coastguard Worker return 237*cda5da8dSAndroid Build Coastguard Worker self._transport = transport 238*cda5da8dSAndroid Build Coastguard Worker reader = self._stream_reader 239*cda5da8dSAndroid Build Coastguard Worker if reader is not None: 240*cda5da8dSAndroid Build Coastguard Worker reader.set_transport(transport) 241*cda5da8dSAndroid Build Coastguard Worker self._over_ssl = transport.get_extra_info('sslcontext') is not None 242*cda5da8dSAndroid Build Coastguard Worker if self._client_connected_cb is not None: 243*cda5da8dSAndroid Build Coastguard Worker self._stream_writer = StreamWriter(transport, self, 244*cda5da8dSAndroid Build Coastguard Worker reader, 245*cda5da8dSAndroid Build Coastguard Worker self._loop) 246*cda5da8dSAndroid Build Coastguard Worker res = self._client_connected_cb(reader, 247*cda5da8dSAndroid Build Coastguard Worker self._stream_writer) 248*cda5da8dSAndroid Build Coastguard Worker if coroutines.iscoroutine(res): 249*cda5da8dSAndroid Build Coastguard Worker self._task = self._loop.create_task(res) 250*cda5da8dSAndroid Build Coastguard Worker self._strong_reader = None 251*cda5da8dSAndroid Build Coastguard Worker 252*cda5da8dSAndroid Build Coastguard Worker def connection_lost(self, exc): 253*cda5da8dSAndroid Build Coastguard Worker reader = self._stream_reader 254*cda5da8dSAndroid Build Coastguard Worker if reader is not None: 255*cda5da8dSAndroid Build Coastguard Worker if exc is None: 256*cda5da8dSAndroid Build Coastguard Worker reader.feed_eof() 257*cda5da8dSAndroid Build Coastguard Worker else: 258*cda5da8dSAndroid Build Coastguard Worker reader.set_exception(exc) 259*cda5da8dSAndroid Build Coastguard Worker if not self._closed.done(): 260*cda5da8dSAndroid Build Coastguard Worker if exc is None: 261*cda5da8dSAndroid Build Coastguard Worker self._closed.set_result(None) 262*cda5da8dSAndroid Build Coastguard Worker else: 263*cda5da8dSAndroid Build Coastguard Worker self._closed.set_exception(exc) 264*cda5da8dSAndroid Build Coastguard Worker super().connection_lost(exc) 265*cda5da8dSAndroid Build Coastguard Worker self._stream_reader_wr = None 266*cda5da8dSAndroid Build Coastguard Worker self._stream_writer = None 267*cda5da8dSAndroid Build Coastguard Worker self._task = None 268*cda5da8dSAndroid Build Coastguard Worker self._transport = None 269*cda5da8dSAndroid Build Coastguard Worker 270*cda5da8dSAndroid Build Coastguard Worker def data_received(self, data): 271*cda5da8dSAndroid Build Coastguard Worker reader = self._stream_reader 272*cda5da8dSAndroid Build Coastguard Worker if reader is not None: 273*cda5da8dSAndroid Build Coastguard Worker reader.feed_data(data) 274*cda5da8dSAndroid Build Coastguard Worker 275*cda5da8dSAndroid Build Coastguard Worker def eof_received(self): 276*cda5da8dSAndroid Build Coastguard Worker reader = self._stream_reader 277*cda5da8dSAndroid Build Coastguard Worker if reader is not None: 278*cda5da8dSAndroid Build Coastguard Worker reader.feed_eof() 279*cda5da8dSAndroid Build Coastguard Worker if self._over_ssl: 280*cda5da8dSAndroid Build Coastguard Worker # Prevent a warning in SSLProtocol.eof_received: 281*cda5da8dSAndroid Build Coastguard Worker # "returning true from eof_received() 282*cda5da8dSAndroid Build Coastguard Worker # has no effect when using ssl" 283*cda5da8dSAndroid Build Coastguard Worker return False 284*cda5da8dSAndroid Build Coastguard Worker return True 285*cda5da8dSAndroid Build Coastguard Worker 286*cda5da8dSAndroid Build Coastguard Worker def _get_close_waiter(self, stream): 287*cda5da8dSAndroid Build Coastguard Worker return self._closed 288*cda5da8dSAndroid Build Coastguard Worker 289*cda5da8dSAndroid Build Coastguard Worker def __del__(self): 290*cda5da8dSAndroid Build Coastguard Worker # Prevent reports about unhandled exceptions. 291*cda5da8dSAndroid Build Coastguard Worker # Better than self._closed._log_traceback = False hack 292*cda5da8dSAndroid Build Coastguard Worker try: 293*cda5da8dSAndroid Build Coastguard Worker closed = self._closed 294*cda5da8dSAndroid Build Coastguard Worker except AttributeError: 295*cda5da8dSAndroid Build Coastguard Worker pass # failed constructor 296*cda5da8dSAndroid Build Coastguard Worker else: 297*cda5da8dSAndroid Build Coastguard Worker if closed.done() and not closed.cancelled(): 298*cda5da8dSAndroid Build Coastguard Worker closed.exception() 299*cda5da8dSAndroid Build Coastguard Worker 300*cda5da8dSAndroid Build Coastguard Worker 301*cda5da8dSAndroid Build Coastguard Workerclass StreamWriter: 302*cda5da8dSAndroid Build Coastguard Worker """Wraps a Transport. 303*cda5da8dSAndroid Build Coastguard Worker 304*cda5da8dSAndroid Build Coastguard Worker This exposes write(), writelines(), [can_]write_eof(), 305*cda5da8dSAndroid Build Coastguard Worker get_extra_info() and close(). It adds drain() which returns an 306*cda5da8dSAndroid Build Coastguard Worker optional Future on which you can wait for flow control. It also 307*cda5da8dSAndroid Build Coastguard Worker adds a transport property which references the Transport 308*cda5da8dSAndroid Build Coastguard Worker directly. 309*cda5da8dSAndroid Build Coastguard Worker """ 310*cda5da8dSAndroid Build Coastguard Worker 311*cda5da8dSAndroid Build Coastguard Worker def __init__(self, transport, protocol, reader, loop): 312*cda5da8dSAndroid Build Coastguard Worker self._transport = transport 313*cda5da8dSAndroid Build Coastguard Worker self._protocol = protocol 314*cda5da8dSAndroid Build Coastguard Worker # drain() expects that the reader has an exception() method 315*cda5da8dSAndroid Build Coastguard Worker assert reader is None or isinstance(reader, StreamReader) 316*cda5da8dSAndroid Build Coastguard Worker self._reader = reader 317*cda5da8dSAndroid Build Coastguard Worker self._loop = loop 318*cda5da8dSAndroid Build Coastguard Worker self._complete_fut = self._loop.create_future() 319*cda5da8dSAndroid Build Coastguard Worker self._complete_fut.set_result(None) 320*cda5da8dSAndroid Build Coastguard Worker 321*cda5da8dSAndroid Build Coastguard Worker def __repr__(self): 322*cda5da8dSAndroid Build Coastguard Worker info = [self.__class__.__name__, f'transport={self._transport!r}'] 323*cda5da8dSAndroid Build Coastguard Worker if self._reader is not None: 324*cda5da8dSAndroid Build Coastguard Worker info.append(f'reader={self._reader!r}') 325*cda5da8dSAndroid Build Coastguard Worker return '<{}>'.format(' '.join(info)) 326*cda5da8dSAndroid Build Coastguard Worker 327*cda5da8dSAndroid Build Coastguard Worker @property 328*cda5da8dSAndroid Build Coastguard Worker def transport(self): 329*cda5da8dSAndroid Build Coastguard Worker return self._transport 330*cda5da8dSAndroid Build Coastguard Worker 331*cda5da8dSAndroid Build Coastguard Worker def write(self, data): 332*cda5da8dSAndroid Build Coastguard Worker self._transport.write(data) 333*cda5da8dSAndroid Build Coastguard Worker 334*cda5da8dSAndroid Build Coastguard Worker def writelines(self, data): 335*cda5da8dSAndroid Build Coastguard Worker self._transport.writelines(data) 336*cda5da8dSAndroid Build Coastguard Worker 337*cda5da8dSAndroid Build Coastguard Worker def write_eof(self): 338*cda5da8dSAndroid Build Coastguard Worker return self._transport.write_eof() 339*cda5da8dSAndroid Build Coastguard Worker 340*cda5da8dSAndroid Build Coastguard Worker def can_write_eof(self): 341*cda5da8dSAndroid Build Coastguard Worker return self._transport.can_write_eof() 342*cda5da8dSAndroid Build Coastguard Worker 343*cda5da8dSAndroid Build Coastguard Worker def close(self): 344*cda5da8dSAndroid Build Coastguard Worker return self._transport.close() 345*cda5da8dSAndroid Build Coastguard Worker 346*cda5da8dSAndroid Build Coastguard Worker def is_closing(self): 347*cda5da8dSAndroid Build Coastguard Worker return self._transport.is_closing() 348*cda5da8dSAndroid Build Coastguard Worker 349*cda5da8dSAndroid Build Coastguard Worker async def wait_closed(self): 350*cda5da8dSAndroid Build Coastguard Worker await self._protocol._get_close_waiter(self) 351*cda5da8dSAndroid Build Coastguard Worker 352*cda5da8dSAndroid Build Coastguard Worker def get_extra_info(self, name, default=None): 353*cda5da8dSAndroid Build Coastguard Worker return self._transport.get_extra_info(name, default) 354*cda5da8dSAndroid Build Coastguard Worker 355*cda5da8dSAndroid Build Coastguard Worker async def drain(self): 356*cda5da8dSAndroid Build Coastguard Worker """Flush the write buffer. 357*cda5da8dSAndroid Build Coastguard Worker 358*cda5da8dSAndroid Build Coastguard Worker The intended use is to write 359*cda5da8dSAndroid Build Coastguard Worker 360*cda5da8dSAndroid Build Coastguard Worker w.write(data) 361*cda5da8dSAndroid Build Coastguard Worker await w.drain() 362*cda5da8dSAndroid Build Coastguard Worker """ 363*cda5da8dSAndroid Build Coastguard Worker if self._reader is not None: 364*cda5da8dSAndroid Build Coastguard Worker exc = self._reader.exception() 365*cda5da8dSAndroid Build Coastguard Worker if exc is not None: 366*cda5da8dSAndroid Build Coastguard Worker raise exc 367*cda5da8dSAndroid Build Coastguard Worker if self._transport.is_closing(): 368*cda5da8dSAndroid Build Coastguard Worker # Wait for protocol.connection_lost() call 369*cda5da8dSAndroid Build Coastguard Worker # Raise connection closing error if any, 370*cda5da8dSAndroid Build Coastguard Worker # ConnectionResetError otherwise 371*cda5da8dSAndroid Build Coastguard Worker # Yield to the event loop so connection_lost() may be 372*cda5da8dSAndroid Build Coastguard Worker # called. Without this, _drain_helper() would return 373*cda5da8dSAndroid Build Coastguard Worker # immediately, and code that calls 374*cda5da8dSAndroid Build Coastguard Worker # write(...); await drain() 375*cda5da8dSAndroid Build Coastguard Worker # in a loop would never call connection_lost(), so it 376*cda5da8dSAndroid Build Coastguard Worker # would not see an error when the socket is closed. 377*cda5da8dSAndroid Build Coastguard Worker await sleep(0) 378*cda5da8dSAndroid Build Coastguard Worker await self._protocol._drain_helper() 379*cda5da8dSAndroid Build Coastguard Worker 380*cda5da8dSAndroid Build Coastguard Worker async def start_tls(self, sslcontext, *, 381*cda5da8dSAndroid Build Coastguard Worker server_hostname=None, 382*cda5da8dSAndroid Build Coastguard Worker ssl_handshake_timeout=None): 383*cda5da8dSAndroid Build Coastguard Worker """Upgrade an existing stream-based connection to TLS.""" 384*cda5da8dSAndroid Build Coastguard Worker server_side = self._protocol._client_connected_cb is not None 385*cda5da8dSAndroid Build Coastguard Worker protocol = self._protocol 386*cda5da8dSAndroid Build Coastguard Worker await self.drain() 387*cda5da8dSAndroid Build Coastguard Worker new_transport = await self._loop.start_tls( # type: ignore 388*cda5da8dSAndroid Build Coastguard Worker self._transport, protocol, sslcontext, 389*cda5da8dSAndroid Build Coastguard Worker server_side=server_side, server_hostname=server_hostname, 390*cda5da8dSAndroid Build Coastguard Worker ssl_handshake_timeout=ssl_handshake_timeout) 391*cda5da8dSAndroid Build Coastguard Worker self._transport = new_transport 392*cda5da8dSAndroid Build Coastguard Worker protocol._replace_writer(self) 393*cda5da8dSAndroid Build Coastguard Worker 394*cda5da8dSAndroid Build Coastguard Worker 395*cda5da8dSAndroid Build Coastguard Workerclass StreamReader: 396*cda5da8dSAndroid Build Coastguard Worker 397*cda5da8dSAndroid Build Coastguard Worker _source_traceback = None 398*cda5da8dSAndroid Build Coastguard Worker 399*cda5da8dSAndroid Build Coastguard Worker def __init__(self, limit=_DEFAULT_LIMIT, loop=None): 400*cda5da8dSAndroid Build Coastguard Worker # The line length limit is a security feature; 401*cda5da8dSAndroid Build Coastguard Worker # it also doubles as half the buffer limit. 402*cda5da8dSAndroid Build Coastguard Worker 403*cda5da8dSAndroid Build Coastguard Worker if limit <= 0: 404*cda5da8dSAndroid Build Coastguard Worker raise ValueError('Limit cannot be <= 0') 405*cda5da8dSAndroid Build Coastguard Worker 406*cda5da8dSAndroid Build Coastguard Worker self._limit = limit 407*cda5da8dSAndroid Build Coastguard Worker if loop is None: 408*cda5da8dSAndroid Build Coastguard Worker self._loop = events._get_event_loop() 409*cda5da8dSAndroid Build Coastguard Worker else: 410*cda5da8dSAndroid Build Coastguard Worker self._loop = loop 411*cda5da8dSAndroid Build Coastguard Worker self._buffer = bytearray() 412*cda5da8dSAndroid Build Coastguard Worker self._eof = False # Whether we're done. 413*cda5da8dSAndroid Build Coastguard Worker self._waiter = None # A future used by _wait_for_data() 414*cda5da8dSAndroid Build Coastguard Worker self._exception = None 415*cda5da8dSAndroid Build Coastguard Worker self._transport = None 416*cda5da8dSAndroid Build Coastguard Worker self._paused = False 417*cda5da8dSAndroid Build Coastguard Worker if self._loop.get_debug(): 418*cda5da8dSAndroid Build Coastguard Worker self._source_traceback = format_helpers.extract_stack( 419*cda5da8dSAndroid Build Coastguard Worker sys._getframe(1)) 420*cda5da8dSAndroid Build Coastguard Worker 421*cda5da8dSAndroid Build Coastguard Worker def __repr__(self): 422*cda5da8dSAndroid Build Coastguard Worker info = ['StreamReader'] 423*cda5da8dSAndroid Build Coastguard Worker if self._buffer: 424*cda5da8dSAndroid Build Coastguard Worker info.append(f'{len(self._buffer)} bytes') 425*cda5da8dSAndroid Build Coastguard Worker if self._eof: 426*cda5da8dSAndroid Build Coastguard Worker info.append('eof') 427*cda5da8dSAndroid Build Coastguard Worker if self._limit != _DEFAULT_LIMIT: 428*cda5da8dSAndroid Build Coastguard Worker info.append(f'limit={self._limit}') 429*cda5da8dSAndroid Build Coastguard Worker if self._waiter: 430*cda5da8dSAndroid Build Coastguard Worker info.append(f'waiter={self._waiter!r}') 431*cda5da8dSAndroid Build Coastguard Worker if self._exception: 432*cda5da8dSAndroid Build Coastguard Worker info.append(f'exception={self._exception!r}') 433*cda5da8dSAndroid Build Coastguard Worker if self._transport: 434*cda5da8dSAndroid Build Coastguard Worker info.append(f'transport={self._transport!r}') 435*cda5da8dSAndroid Build Coastguard Worker if self._paused: 436*cda5da8dSAndroid Build Coastguard Worker info.append('paused') 437*cda5da8dSAndroid Build Coastguard Worker return '<{}>'.format(' '.join(info)) 438*cda5da8dSAndroid Build Coastguard Worker 439*cda5da8dSAndroid Build Coastguard Worker def exception(self): 440*cda5da8dSAndroid Build Coastguard Worker return self._exception 441*cda5da8dSAndroid Build Coastguard Worker 442*cda5da8dSAndroid Build Coastguard Worker def set_exception(self, exc): 443*cda5da8dSAndroid Build Coastguard Worker self._exception = exc 444*cda5da8dSAndroid Build Coastguard Worker 445*cda5da8dSAndroid Build Coastguard Worker waiter = self._waiter 446*cda5da8dSAndroid Build Coastguard Worker if waiter is not None: 447*cda5da8dSAndroid Build Coastguard Worker self._waiter = None 448*cda5da8dSAndroid Build Coastguard Worker if not waiter.cancelled(): 449*cda5da8dSAndroid Build Coastguard Worker waiter.set_exception(exc) 450*cda5da8dSAndroid Build Coastguard Worker 451*cda5da8dSAndroid Build Coastguard Worker def _wakeup_waiter(self): 452*cda5da8dSAndroid Build Coastguard Worker """Wakeup read*() functions waiting for data or EOF.""" 453*cda5da8dSAndroid Build Coastguard Worker waiter = self._waiter 454*cda5da8dSAndroid Build Coastguard Worker if waiter is not None: 455*cda5da8dSAndroid Build Coastguard Worker self._waiter = None 456*cda5da8dSAndroid Build Coastguard Worker if not waiter.cancelled(): 457*cda5da8dSAndroid Build Coastguard Worker waiter.set_result(None) 458*cda5da8dSAndroid Build Coastguard Worker 459*cda5da8dSAndroid Build Coastguard Worker def set_transport(self, transport): 460*cda5da8dSAndroid Build Coastguard Worker assert self._transport is None, 'Transport already set' 461*cda5da8dSAndroid Build Coastguard Worker self._transport = transport 462*cda5da8dSAndroid Build Coastguard Worker 463*cda5da8dSAndroid Build Coastguard Worker def _maybe_resume_transport(self): 464*cda5da8dSAndroid Build Coastguard Worker if self._paused and len(self._buffer) <= self._limit: 465*cda5da8dSAndroid Build Coastguard Worker self._paused = False 466*cda5da8dSAndroid Build Coastguard Worker self._transport.resume_reading() 467*cda5da8dSAndroid Build Coastguard Worker 468*cda5da8dSAndroid Build Coastguard Worker def feed_eof(self): 469*cda5da8dSAndroid Build Coastguard Worker self._eof = True 470*cda5da8dSAndroid Build Coastguard Worker self._wakeup_waiter() 471*cda5da8dSAndroid Build Coastguard Worker 472*cda5da8dSAndroid Build Coastguard Worker def at_eof(self): 473*cda5da8dSAndroid Build Coastguard Worker """Return True if the buffer is empty and 'feed_eof' was called.""" 474*cda5da8dSAndroid Build Coastguard Worker return self._eof and not self._buffer 475*cda5da8dSAndroid Build Coastguard Worker 476*cda5da8dSAndroid Build Coastguard Worker def feed_data(self, data): 477*cda5da8dSAndroid Build Coastguard Worker assert not self._eof, 'feed_data after feed_eof' 478*cda5da8dSAndroid Build Coastguard Worker 479*cda5da8dSAndroid Build Coastguard Worker if not data: 480*cda5da8dSAndroid Build Coastguard Worker return 481*cda5da8dSAndroid Build Coastguard Worker 482*cda5da8dSAndroid Build Coastguard Worker self._buffer.extend(data) 483*cda5da8dSAndroid Build Coastguard Worker self._wakeup_waiter() 484*cda5da8dSAndroid Build Coastguard Worker 485*cda5da8dSAndroid Build Coastguard Worker if (self._transport is not None and 486*cda5da8dSAndroid Build Coastguard Worker not self._paused and 487*cda5da8dSAndroid Build Coastguard Worker len(self._buffer) > 2 * self._limit): 488*cda5da8dSAndroid Build Coastguard Worker try: 489*cda5da8dSAndroid Build Coastguard Worker self._transport.pause_reading() 490*cda5da8dSAndroid Build Coastguard Worker except NotImplementedError: 491*cda5da8dSAndroid Build Coastguard Worker # The transport can't be paused. 492*cda5da8dSAndroid Build Coastguard Worker # We'll just have to buffer all data. 493*cda5da8dSAndroid Build Coastguard Worker # Forget the transport so we don't keep trying. 494*cda5da8dSAndroid Build Coastguard Worker self._transport = None 495*cda5da8dSAndroid Build Coastguard Worker else: 496*cda5da8dSAndroid Build Coastguard Worker self._paused = True 497*cda5da8dSAndroid Build Coastguard Worker 498*cda5da8dSAndroid Build Coastguard Worker async def _wait_for_data(self, func_name): 499*cda5da8dSAndroid Build Coastguard Worker """Wait until feed_data() or feed_eof() is called. 500*cda5da8dSAndroid Build Coastguard Worker 501*cda5da8dSAndroid Build Coastguard Worker If stream was paused, automatically resume it. 502*cda5da8dSAndroid Build Coastguard Worker """ 503*cda5da8dSAndroid Build Coastguard Worker # StreamReader uses a future to link the protocol feed_data() method 504*cda5da8dSAndroid Build Coastguard Worker # to a read coroutine. Running two read coroutines at the same time 505*cda5da8dSAndroid Build Coastguard Worker # would have an unexpected behaviour. It would not possible to know 506*cda5da8dSAndroid Build Coastguard Worker # which coroutine would get the next data. 507*cda5da8dSAndroid Build Coastguard Worker if self._waiter is not None: 508*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError( 509*cda5da8dSAndroid Build Coastguard Worker f'{func_name}() called while another coroutine is ' 510*cda5da8dSAndroid Build Coastguard Worker f'already waiting for incoming data') 511*cda5da8dSAndroid Build Coastguard Worker 512*cda5da8dSAndroid Build Coastguard Worker assert not self._eof, '_wait_for_data after EOF' 513*cda5da8dSAndroid Build Coastguard Worker 514*cda5da8dSAndroid Build Coastguard Worker # Waiting for data while paused will make deadlock, so prevent it. 515*cda5da8dSAndroid Build Coastguard Worker # This is essential for readexactly(n) for case when n > self._limit. 516*cda5da8dSAndroid Build Coastguard Worker if self._paused: 517*cda5da8dSAndroid Build Coastguard Worker self._paused = False 518*cda5da8dSAndroid Build Coastguard Worker self._transport.resume_reading() 519*cda5da8dSAndroid Build Coastguard Worker 520*cda5da8dSAndroid Build Coastguard Worker self._waiter = self._loop.create_future() 521*cda5da8dSAndroid Build Coastguard Worker try: 522*cda5da8dSAndroid Build Coastguard Worker await self._waiter 523*cda5da8dSAndroid Build Coastguard Worker finally: 524*cda5da8dSAndroid Build Coastguard Worker self._waiter = None 525*cda5da8dSAndroid Build Coastguard Worker 526*cda5da8dSAndroid Build Coastguard Worker async def readline(self): 527*cda5da8dSAndroid Build Coastguard Worker """Read chunk of data from the stream until newline (b'\n') is found. 528*cda5da8dSAndroid Build Coastguard Worker 529*cda5da8dSAndroid Build Coastguard Worker On success, return chunk that ends with newline. If only partial 530*cda5da8dSAndroid Build Coastguard Worker line can be read due to EOF, return incomplete line without 531*cda5da8dSAndroid Build Coastguard Worker terminating newline. When EOF was reached while no bytes read, empty 532*cda5da8dSAndroid Build Coastguard Worker bytes object is returned. 533*cda5da8dSAndroid Build Coastguard Worker 534*cda5da8dSAndroid Build Coastguard Worker If limit is reached, ValueError will be raised. In that case, if 535*cda5da8dSAndroid Build Coastguard Worker newline was found, complete line including newline will be removed 536*cda5da8dSAndroid Build Coastguard Worker from internal buffer. Else, internal buffer will be cleared. Limit is 537*cda5da8dSAndroid Build Coastguard Worker compared against part of the line without newline. 538*cda5da8dSAndroid Build Coastguard Worker 539*cda5da8dSAndroid Build Coastguard Worker If stream was paused, this function will automatically resume it if 540*cda5da8dSAndroid Build Coastguard Worker needed. 541*cda5da8dSAndroid Build Coastguard Worker """ 542*cda5da8dSAndroid Build Coastguard Worker sep = b'\n' 543*cda5da8dSAndroid Build Coastguard Worker seplen = len(sep) 544*cda5da8dSAndroid Build Coastguard Worker try: 545*cda5da8dSAndroid Build Coastguard Worker line = await self.readuntil(sep) 546*cda5da8dSAndroid Build Coastguard Worker except exceptions.IncompleteReadError as e: 547*cda5da8dSAndroid Build Coastguard Worker return e.partial 548*cda5da8dSAndroid Build Coastguard Worker except exceptions.LimitOverrunError as e: 549*cda5da8dSAndroid Build Coastguard Worker if self._buffer.startswith(sep, e.consumed): 550*cda5da8dSAndroid Build Coastguard Worker del self._buffer[:e.consumed + seplen] 551*cda5da8dSAndroid Build Coastguard Worker else: 552*cda5da8dSAndroid Build Coastguard Worker self._buffer.clear() 553*cda5da8dSAndroid Build Coastguard Worker self._maybe_resume_transport() 554*cda5da8dSAndroid Build Coastguard Worker raise ValueError(e.args[0]) 555*cda5da8dSAndroid Build Coastguard Worker return line 556*cda5da8dSAndroid Build Coastguard Worker 557*cda5da8dSAndroid Build Coastguard Worker async def readuntil(self, separator=b'\n'): 558*cda5da8dSAndroid Build Coastguard Worker """Read data from the stream until ``separator`` is found. 559*cda5da8dSAndroid Build Coastguard Worker 560*cda5da8dSAndroid Build Coastguard Worker On success, the data and separator will be removed from the 561*cda5da8dSAndroid Build Coastguard Worker internal buffer (consumed). Returned data will include the 562*cda5da8dSAndroid Build Coastguard Worker separator at the end. 563*cda5da8dSAndroid Build Coastguard Worker 564*cda5da8dSAndroid Build Coastguard Worker Configured stream limit is used to check result. Limit sets the 565*cda5da8dSAndroid Build Coastguard Worker maximal length of data that can be returned, not counting the 566*cda5da8dSAndroid Build Coastguard Worker separator. 567*cda5da8dSAndroid Build Coastguard Worker 568*cda5da8dSAndroid Build Coastguard Worker If an EOF occurs and the complete separator is still not found, 569*cda5da8dSAndroid Build Coastguard Worker an IncompleteReadError exception will be raised, and the internal 570*cda5da8dSAndroid Build Coastguard Worker buffer will be reset. The IncompleteReadError.partial attribute 571*cda5da8dSAndroid Build Coastguard Worker may contain the separator partially. 572*cda5da8dSAndroid Build Coastguard Worker 573*cda5da8dSAndroid Build Coastguard Worker If the data cannot be read because of over limit, a 574*cda5da8dSAndroid Build Coastguard Worker LimitOverrunError exception will be raised, and the data 575*cda5da8dSAndroid Build Coastguard Worker will be left in the internal buffer, so it can be read again. 576*cda5da8dSAndroid Build Coastguard Worker """ 577*cda5da8dSAndroid Build Coastguard Worker seplen = len(separator) 578*cda5da8dSAndroid Build Coastguard Worker if seplen == 0: 579*cda5da8dSAndroid Build Coastguard Worker raise ValueError('Separator should be at least one-byte string') 580*cda5da8dSAndroid Build Coastguard Worker 581*cda5da8dSAndroid Build Coastguard Worker if self._exception is not None: 582*cda5da8dSAndroid Build Coastguard Worker raise self._exception 583*cda5da8dSAndroid Build Coastguard Worker 584*cda5da8dSAndroid Build Coastguard Worker # Consume whole buffer except last bytes, which length is 585*cda5da8dSAndroid Build Coastguard Worker # one less than seplen. Let's check corner cases with 586*cda5da8dSAndroid Build Coastguard Worker # separator='SEPARATOR': 587*cda5da8dSAndroid Build Coastguard Worker # * we have received almost complete separator (without last 588*cda5da8dSAndroid Build Coastguard Worker # byte). i.e buffer='some textSEPARATO'. In this case we 589*cda5da8dSAndroid Build Coastguard Worker # can safely consume len(separator) - 1 bytes. 590*cda5da8dSAndroid Build Coastguard Worker # * last byte of buffer is first byte of separator, i.e. 591*cda5da8dSAndroid Build Coastguard Worker # buffer='abcdefghijklmnopqrS'. We may safely consume 592*cda5da8dSAndroid Build Coastguard Worker # everything except that last byte, but this require to 593*cda5da8dSAndroid Build Coastguard Worker # analyze bytes of buffer that match partial separator. 594*cda5da8dSAndroid Build Coastguard Worker # This is slow and/or require FSM. For this case our 595*cda5da8dSAndroid Build Coastguard Worker # implementation is not optimal, since require rescanning 596*cda5da8dSAndroid Build Coastguard Worker # of data that is known to not belong to separator. In 597*cda5da8dSAndroid Build Coastguard Worker # real world, separator will not be so long to notice 598*cda5da8dSAndroid Build Coastguard Worker # performance problems. Even when reading MIME-encoded 599*cda5da8dSAndroid Build Coastguard Worker # messages :) 600*cda5da8dSAndroid Build Coastguard Worker 601*cda5da8dSAndroid Build Coastguard Worker # `offset` is the number of bytes from the beginning of the buffer 602*cda5da8dSAndroid Build Coastguard Worker # where there is no occurrence of `separator`. 603*cda5da8dSAndroid Build Coastguard Worker offset = 0 604*cda5da8dSAndroid Build Coastguard Worker 605*cda5da8dSAndroid Build Coastguard Worker # Loop until we find `separator` in the buffer, exceed the buffer size, 606*cda5da8dSAndroid Build Coastguard Worker # or an EOF has happened. 607*cda5da8dSAndroid Build Coastguard Worker while True: 608*cda5da8dSAndroid Build Coastguard Worker buflen = len(self._buffer) 609*cda5da8dSAndroid Build Coastguard Worker 610*cda5da8dSAndroid Build Coastguard Worker # Check if we now have enough data in the buffer for `separator` to 611*cda5da8dSAndroid Build Coastguard Worker # fit. 612*cda5da8dSAndroid Build Coastguard Worker if buflen - offset >= seplen: 613*cda5da8dSAndroid Build Coastguard Worker isep = self._buffer.find(separator, offset) 614*cda5da8dSAndroid Build Coastguard Worker 615*cda5da8dSAndroid Build Coastguard Worker if isep != -1: 616*cda5da8dSAndroid Build Coastguard Worker # `separator` is in the buffer. `isep` will be used later 617*cda5da8dSAndroid Build Coastguard Worker # to retrieve the data. 618*cda5da8dSAndroid Build Coastguard Worker break 619*cda5da8dSAndroid Build Coastguard Worker 620*cda5da8dSAndroid Build Coastguard Worker # see upper comment for explanation. 621*cda5da8dSAndroid Build Coastguard Worker offset = buflen + 1 - seplen 622*cda5da8dSAndroid Build Coastguard Worker if offset > self._limit: 623*cda5da8dSAndroid Build Coastguard Worker raise exceptions.LimitOverrunError( 624*cda5da8dSAndroid Build Coastguard Worker 'Separator is not found, and chunk exceed the limit', 625*cda5da8dSAndroid Build Coastguard Worker offset) 626*cda5da8dSAndroid Build Coastguard Worker 627*cda5da8dSAndroid Build Coastguard Worker # Complete message (with full separator) may be present in buffer 628*cda5da8dSAndroid Build Coastguard Worker # even when EOF flag is set. This may happen when the last chunk 629*cda5da8dSAndroid Build Coastguard Worker # adds data which makes separator be found. That's why we check for 630*cda5da8dSAndroid Build Coastguard Worker # EOF *ater* inspecting the buffer. 631*cda5da8dSAndroid Build Coastguard Worker if self._eof: 632*cda5da8dSAndroid Build Coastguard Worker chunk = bytes(self._buffer) 633*cda5da8dSAndroid Build Coastguard Worker self._buffer.clear() 634*cda5da8dSAndroid Build Coastguard Worker raise exceptions.IncompleteReadError(chunk, None) 635*cda5da8dSAndroid Build Coastguard Worker 636*cda5da8dSAndroid Build Coastguard Worker # _wait_for_data() will resume reading if stream was paused. 637*cda5da8dSAndroid Build Coastguard Worker await self._wait_for_data('readuntil') 638*cda5da8dSAndroid Build Coastguard Worker 639*cda5da8dSAndroid Build Coastguard Worker if isep > self._limit: 640*cda5da8dSAndroid Build Coastguard Worker raise exceptions.LimitOverrunError( 641*cda5da8dSAndroid Build Coastguard Worker 'Separator is found, but chunk is longer than limit', isep) 642*cda5da8dSAndroid Build Coastguard Worker 643*cda5da8dSAndroid Build Coastguard Worker chunk = self._buffer[:isep + seplen] 644*cda5da8dSAndroid Build Coastguard Worker del self._buffer[:isep + seplen] 645*cda5da8dSAndroid Build Coastguard Worker self._maybe_resume_transport() 646*cda5da8dSAndroid Build Coastguard Worker return bytes(chunk) 647*cda5da8dSAndroid Build Coastguard Worker 648*cda5da8dSAndroid Build Coastguard Worker async def read(self, n=-1): 649*cda5da8dSAndroid Build Coastguard Worker """Read up to `n` bytes from the stream. 650*cda5da8dSAndroid Build Coastguard Worker 651*cda5da8dSAndroid Build Coastguard Worker If `n` is not provided or set to -1, 652*cda5da8dSAndroid Build Coastguard Worker read until EOF, then return all read bytes. 653*cda5da8dSAndroid Build Coastguard Worker If EOF was received and the internal buffer is empty, 654*cda5da8dSAndroid Build Coastguard Worker return an empty bytes object. 655*cda5da8dSAndroid Build Coastguard Worker 656*cda5da8dSAndroid Build Coastguard Worker If `n` is 0, return an empty bytes object immediately. 657*cda5da8dSAndroid Build Coastguard Worker 658*cda5da8dSAndroid Build Coastguard Worker If `n` is positive, return at most `n` available bytes 659*cda5da8dSAndroid Build Coastguard Worker as soon as at least 1 byte is available in the internal buffer. 660*cda5da8dSAndroid Build Coastguard Worker If EOF is received before any byte is read, return an empty 661*cda5da8dSAndroid Build Coastguard Worker bytes object. 662*cda5da8dSAndroid Build Coastguard Worker 663*cda5da8dSAndroid Build Coastguard Worker Returned value is not limited with limit, configured at stream 664*cda5da8dSAndroid Build Coastguard Worker creation. 665*cda5da8dSAndroid Build Coastguard Worker 666*cda5da8dSAndroid Build Coastguard Worker If stream was paused, this function will automatically resume it if 667*cda5da8dSAndroid Build Coastguard Worker needed. 668*cda5da8dSAndroid Build Coastguard Worker """ 669*cda5da8dSAndroid Build Coastguard Worker 670*cda5da8dSAndroid Build Coastguard Worker if self._exception is not None: 671*cda5da8dSAndroid Build Coastguard Worker raise self._exception 672*cda5da8dSAndroid Build Coastguard Worker 673*cda5da8dSAndroid Build Coastguard Worker if n == 0: 674*cda5da8dSAndroid Build Coastguard Worker return b'' 675*cda5da8dSAndroid Build Coastguard Worker 676*cda5da8dSAndroid Build Coastguard Worker if n < 0: 677*cda5da8dSAndroid Build Coastguard Worker # This used to just loop creating a new waiter hoping to 678*cda5da8dSAndroid Build Coastguard Worker # collect everything in self._buffer, but that would 679*cda5da8dSAndroid Build Coastguard Worker # deadlock if the subprocess sends more than self.limit 680*cda5da8dSAndroid Build Coastguard Worker # bytes. So just call self.read(self._limit) until EOF. 681*cda5da8dSAndroid Build Coastguard Worker blocks = [] 682*cda5da8dSAndroid Build Coastguard Worker while True: 683*cda5da8dSAndroid Build Coastguard Worker block = await self.read(self._limit) 684*cda5da8dSAndroid Build Coastguard Worker if not block: 685*cda5da8dSAndroid Build Coastguard Worker break 686*cda5da8dSAndroid Build Coastguard Worker blocks.append(block) 687*cda5da8dSAndroid Build Coastguard Worker return b''.join(blocks) 688*cda5da8dSAndroid Build Coastguard Worker 689*cda5da8dSAndroid Build Coastguard Worker if not self._buffer and not self._eof: 690*cda5da8dSAndroid Build Coastguard Worker await self._wait_for_data('read') 691*cda5da8dSAndroid Build Coastguard Worker 692*cda5da8dSAndroid Build Coastguard Worker # This will work right even if buffer is less than n bytes 693*cda5da8dSAndroid Build Coastguard Worker data = bytes(self._buffer[:n]) 694*cda5da8dSAndroid Build Coastguard Worker del self._buffer[:n] 695*cda5da8dSAndroid Build Coastguard Worker 696*cda5da8dSAndroid Build Coastguard Worker self._maybe_resume_transport() 697*cda5da8dSAndroid Build Coastguard Worker return data 698*cda5da8dSAndroid Build Coastguard Worker 699*cda5da8dSAndroid Build Coastguard Worker async def readexactly(self, n): 700*cda5da8dSAndroid Build Coastguard Worker """Read exactly `n` bytes. 701*cda5da8dSAndroid Build Coastguard Worker 702*cda5da8dSAndroid Build Coastguard Worker Raise an IncompleteReadError if EOF is reached before `n` bytes can be 703*cda5da8dSAndroid Build Coastguard Worker read. The IncompleteReadError.partial attribute of the exception will 704*cda5da8dSAndroid Build Coastguard Worker contain the partial read bytes. 705*cda5da8dSAndroid Build Coastguard Worker 706*cda5da8dSAndroid Build Coastguard Worker if n is zero, return empty bytes object. 707*cda5da8dSAndroid Build Coastguard Worker 708*cda5da8dSAndroid Build Coastguard Worker Returned value is not limited with limit, configured at stream 709*cda5da8dSAndroid Build Coastguard Worker creation. 710*cda5da8dSAndroid Build Coastguard Worker 711*cda5da8dSAndroid Build Coastguard Worker If stream was paused, this function will automatically resume it if 712*cda5da8dSAndroid Build Coastguard Worker needed. 713*cda5da8dSAndroid Build Coastguard Worker """ 714*cda5da8dSAndroid Build Coastguard Worker if n < 0: 715*cda5da8dSAndroid Build Coastguard Worker raise ValueError('readexactly size can not be less than zero') 716*cda5da8dSAndroid Build Coastguard Worker 717*cda5da8dSAndroid Build Coastguard Worker if self._exception is not None: 718*cda5da8dSAndroid Build Coastguard Worker raise self._exception 719*cda5da8dSAndroid Build Coastguard Worker 720*cda5da8dSAndroid Build Coastguard Worker if n == 0: 721*cda5da8dSAndroid Build Coastguard Worker return b'' 722*cda5da8dSAndroid Build Coastguard Worker 723*cda5da8dSAndroid Build Coastguard Worker while len(self._buffer) < n: 724*cda5da8dSAndroid Build Coastguard Worker if self._eof: 725*cda5da8dSAndroid Build Coastguard Worker incomplete = bytes(self._buffer) 726*cda5da8dSAndroid Build Coastguard Worker self._buffer.clear() 727*cda5da8dSAndroid Build Coastguard Worker raise exceptions.IncompleteReadError(incomplete, n) 728*cda5da8dSAndroid Build Coastguard Worker 729*cda5da8dSAndroid Build Coastguard Worker await self._wait_for_data('readexactly') 730*cda5da8dSAndroid Build Coastguard Worker 731*cda5da8dSAndroid Build Coastguard Worker if len(self._buffer) == n: 732*cda5da8dSAndroid Build Coastguard Worker data = bytes(self._buffer) 733*cda5da8dSAndroid Build Coastguard Worker self._buffer.clear() 734*cda5da8dSAndroid Build Coastguard Worker else: 735*cda5da8dSAndroid Build Coastguard Worker data = bytes(self._buffer[:n]) 736*cda5da8dSAndroid Build Coastguard Worker del self._buffer[:n] 737*cda5da8dSAndroid Build Coastguard Worker self._maybe_resume_transport() 738*cda5da8dSAndroid Build Coastguard Worker return data 739*cda5da8dSAndroid Build Coastguard Worker 740*cda5da8dSAndroid Build Coastguard Worker def __aiter__(self): 741*cda5da8dSAndroid Build Coastguard Worker return self 742*cda5da8dSAndroid Build Coastguard Worker 743*cda5da8dSAndroid Build Coastguard Worker async def __anext__(self): 744*cda5da8dSAndroid Build Coastguard Worker val = await self.readline() 745*cda5da8dSAndroid Build Coastguard Worker if val == b'': 746*cda5da8dSAndroid Build Coastguard Worker raise StopAsyncIteration 747*cda5da8dSAndroid Build Coastguard Worker return val 748