xref: /aosp_15_r20/prebuilts/build-tools/common/py3-stdlib/asyncio/streams.py (revision cda5da8d549138a6648c5ee6d7a49cf8f4a657be)
1*cda5da8dSAndroid Build Coastguard Worker__all__ = (
2*cda5da8dSAndroid Build Coastguard Worker    'StreamReader', 'StreamWriter', 'StreamReaderProtocol',
3*cda5da8dSAndroid Build Coastguard Worker    'open_connection', 'start_server')
4*cda5da8dSAndroid Build Coastguard Worker
5*cda5da8dSAndroid Build Coastguard Workerimport collections
6*cda5da8dSAndroid Build Coastguard Workerimport socket
7*cda5da8dSAndroid Build Coastguard Workerimport sys
8*cda5da8dSAndroid Build Coastguard Workerimport warnings
9*cda5da8dSAndroid Build Coastguard Workerimport weakref
10*cda5da8dSAndroid Build Coastguard Worker
11*cda5da8dSAndroid Build Coastguard Workerif hasattr(socket, 'AF_UNIX'):
12*cda5da8dSAndroid Build Coastguard Worker    __all__ += ('open_unix_connection', 'start_unix_server')
13*cda5da8dSAndroid Build Coastguard Worker
14*cda5da8dSAndroid Build Coastguard Workerfrom . import coroutines
15*cda5da8dSAndroid Build Coastguard Workerfrom . import events
16*cda5da8dSAndroid Build Coastguard Workerfrom . import exceptions
17*cda5da8dSAndroid Build Coastguard Workerfrom . import format_helpers
18*cda5da8dSAndroid Build Coastguard Workerfrom . import protocols
19*cda5da8dSAndroid Build Coastguard Workerfrom .log import logger
20*cda5da8dSAndroid Build Coastguard Workerfrom .tasks import sleep
21*cda5da8dSAndroid Build Coastguard Worker
22*cda5da8dSAndroid Build Coastguard Worker
23*cda5da8dSAndroid Build Coastguard Worker_DEFAULT_LIMIT = 2 ** 16  # 64 KiB
24*cda5da8dSAndroid Build Coastguard Worker
25*cda5da8dSAndroid Build Coastguard Worker
26*cda5da8dSAndroid Build Coastguard Workerasync def open_connection(host=None, port=None, *,
27*cda5da8dSAndroid Build Coastguard Worker                          limit=_DEFAULT_LIMIT, **kwds):
28*cda5da8dSAndroid Build Coastguard Worker    """A wrapper for create_connection() returning a (reader, writer) pair.
29*cda5da8dSAndroid Build Coastguard Worker
30*cda5da8dSAndroid Build Coastguard Worker    The reader returned is a StreamReader instance; the writer is a
31*cda5da8dSAndroid Build Coastguard Worker    StreamWriter instance.
32*cda5da8dSAndroid Build Coastguard Worker
33*cda5da8dSAndroid Build Coastguard Worker    The arguments are all the usual arguments to create_connection()
34*cda5da8dSAndroid Build Coastguard Worker    except protocol_factory; most common are positional host and port,
35*cda5da8dSAndroid Build Coastguard Worker    with various optional keyword arguments following.
36*cda5da8dSAndroid Build Coastguard Worker
37*cda5da8dSAndroid Build Coastguard Worker    Additional optional keyword arguments are loop (to set the event loop
38*cda5da8dSAndroid Build Coastguard Worker    instance to use) and limit (to set the buffer limit passed to the
39*cda5da8dSAndroid Build Coastguard Worker    StreamReader).
40*cda5da8dSAndroid Build Coastguard Worker
41*cda5da8dSAndroid Build Coastguard Worker    (If you want to customize the StreamReader and/or
42*cda5da8dSAndroid Build Coastguard Worker    StreamReaderProtocol classes, just copy the code -- there's
43*cda5da8dSAndroid Build Coastguard Worker    really nothing special here except some convenience.)
44*cda5da8dSAndroid Build Coastguard Worker    """
45*cda5da8dSAndroid Build Coastguard Worker    loop = events.get_running_loop()
46*cda5da8dSAndroid Build Coastguard Worker    reader = StreamReader(limit=limit, loop=loop)
47*cda5da8dSAndroid Build Coastguard Worker    protocol = StreamReaderProtocol(reader, loop=loop)
48*cda5da8dSAndroid Build Coastguard Worker    transport, _ = await loop.create_connection(
49*cda5da8dSAndroid Build Coastguard Worker        lambda: protocol, host, port, **kwds)
50*cda5da8dSAndroid Build Coastguard Worker    writer = StreamWriter(transport, protocol, reader, loop)
51*cda5da8dSAndroid Build Coastguard Worker    return reader, writer
52*cda5da8dSAndroid Build Coastguard Worker
53*cda5da8dSAndroid Build Coastguard Worker
54*cda5da8dSAndroid Build Coastguard Workerasync def start_server(client_connected_cb, host=None, port=None, *,
55*cda5da8dSAndroid Build Coastguard Worker                       limit=_DEFAULT_LIMIT, **kwds):
56*cda5da8dSAndroid Build Coastguard Worker    """Start a socket server, call back for each client connected.
57*cda5da8dSAndroid Build Coastguard Worker
58*cda5da8dSAndroid Build Coastguard Worker    The first parameter, `client_connected_cb`, takes two parameters:
59*cda5da8dSAndroid Build Coastguard Worker    client_reader, client_writer.  client_reader is a StreamReader
60*cda5da8dSAndroid Build Coastguard Worker    object, while client_writer is a StreamWriter object.  This
61*cda5da8dSAndroid Build Coastguard Worker    parameter can either be a plain callback function or a coroutine;
62*cda5da8dSAndroid Build Coastguard Worker    if it is a coroutine, it will be automatically converted into a
63*cda5da8dSAndroid Build Coastguard Worker    Task.
64*cda5da8dSAndroid Build Coastguard Worker
65*cda5da8dSAndroid Build Coastguard Worker    The rest of the arguments are all the usual arguments to
66*cda5da8dSAndroid Build Coastguard Worker    loop.create_server() except protocol_factory; most common are
67*cda5da8dSAndroid Build Coastguard Worker    positional host and port, with various optional keyword arguments
68*cda5da8dSAndroid Build Coastguard Worker    following.  The return value is the same as loop.create_server().
69*cda5da8dSAndroid Build Coastguard Worker
70*cda5da8dSAndroid Build Coastguard Worker    Additional optional keyword arguments are loop (to set the event loop
71*cda5da8dSAndroid Build Coastguard Worker    instance to use) and limit (to set the buffer limit passed to the
72*cda5da8dSAndroid Build Coastguard Worker    StreamReader).
73*cda5da8dSAndroid Build Coastguard Worker
74*cda5da8dSAndroid Build Coastguard Worker    The return value is the same as loop.create_server(), i.e. a
75*cda5da8dSAndroid Build Coastguard Worker    Server object which can be used to stop the service.
76*cda5da8dSAndroid Build Coastguard Worker    """
77*cda5da8dSAndroid Build Coastguard Worker    loop = events.get_running_loop()
78*cda5da8dSAndroid Build Coastguard Worker
79*cda5da8dSAndroid Build Coastguard Worker    def factory():
80*cda5da8dSAndroid Build Coastguard Worker        reader = StreamReader(limit=limit, loop=loop)
81*cda5da8dSAndroid Build Coastguard Worker        protocol = StreamReaderProtocol(reader, client_connected_cb,
82*cda5da8dSAndroid Build Coastguard Worker                                        loop=loop)
83*cda5da8dSAndroid Build Coastguard Worker        return protocol
84*cda5da8dSAndroid Build Coastguard Worker
85*cda5da8dSAndroid Build Coastguard Worker    return await loop.create_server(factory, host, port, **kwds)
86*cda5da8dSAndroid Build Coastguard Worker
87*cda5da8dSAndroid Build Coastguard Worker
88*cda5da8dSAndroid Build Coastguard Workerif hasattr(socket, 'AF_UNIX'):
89*cda5da8dSAndroid Build Coastguard Worker    # UNIX Domain Sockets are supported on this platform
90*cda5da8dSAndroid Build Coastguard Worker
91*cda5da8dSAndroid Build Coastguard Worker    async def open_unix_connection(path=None, *,
92*cda5da8dSAndroid Build Coastguard Worker                                   limit=_DEFAULT_LIMIT, **kwds):
93*cda5da8dSAndroid Build Coastguard Worker        """Similar to `open_connection` but works with UNIX Domain Sockets."""
94*cda5da8dSAndroid Build Coastguard Worker        loop = events.get_running_loop()
95*cda5da8dSAndroid Build Coastguard Worker
96*cda5da8dSAndroid Build Coastguard Worker        reader = StreamReader(limit=limit, loop=loop)
97*cda5da8dSAndroid Build Coastguard Worker        protocol = StreamReaderProtocol(reader, loop=loop)
98*cda5da8dSAndroid Build Coastguard Worker        transport, _ = await loop.create_unix_connection(
99*cda5da8dSAndroid Build Coastguard Worker            lambda: protocol, path, **kwds)
100*cda5da8dSAndroid Build Coastguard Worker        writer = StreamWriter(transport, protocol, reader, loop)
101*cda5da8dSAndroid Build Coastguard Worker        return reader, writer
102*cda5da8dSAndroid Build Coastguard Worker
103*cda5da8dSAndroid Build Coastguard Worker    async def start_unix_server(client_connected_cb, path=None, *,
104*cda5da8dSAndroid Build Coastguard Worker                                limit=_DEFAULT_LIMIT, **kwds):
105*cda5da8dSAndroid Build Coastguard Worker        """Similar to `start_server` but works with UNIX Domain Sockets."""
106*cda5da8dSAndroid Build Coastguard Worker        loop = events.get_running_loop()
107*cda5da8dSAndroid Build Coastguard Worker
108*cda5da8dSAndroid Build Coastguard Worker        def factory():
109*cda5da8dSAndroid Build Coastguard Worker            reader = StreamReader(limit=limit, loop=loop)
110*cda5da8dSAndroid Build Coastguard Worker            protocol = StreamReaderProtocol(reader, client_connected_cb,
111*cda5da8dSAndroid Build Coastguard Worker                                            loop=loop)
112*cda5da8dSAndroid Build Coastguard Worker            return protocol
113*cda5da8dSAndroid Build Coastguard Worker
114*cda5da8dSAndroid Build Coastguard Worker        return await loop.create_unix_server(factory, path, **kwds)
115*cda5da8dSAndroid Build Coastguard Worker
116*cda5da8dSAndroid Build Coastguard Worker
117*cda5da8dSAndroid Build Coastguard Workerclass FlowControlMixin(protocols.Protocol):
118*cda5da8dSAndroid Build Coastguard Worker    """Reusable flow control logic for StreamWriter.drain().
119*cda5da8dSAndroid Build Coastguard Worker
120*cda5da8dSAndroid Build Coastguard Worker    This implements the protocol methods pause_writing(),
121*cda5da8dSAndroid Build Coastguard Worker    resume_writing() and connection_lost().  If the subclass overrides
122*cda5da8dSAndroid Build Coastguard Worker    these it must call the super methods.
123*cda5da8dSAndroid Build Coastguard Worker
124*cda5da8dSAndroid Build Coastguard Worker    StreamWriter.drain() must wait for _drain_helper() coroutine.
125*cda5da8dSAndroid Build Coastguard Worker    """
126*cda5da8dSAndroid Build Coastguard Worker
127*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, loop=None):
128*cda5da8dSAndroid Build Coastguard Worker        if loop is None:
129*cda5da8dSAndroid Build Coastguard Worker            self._loop = events._get_event_loop(stacklevel=4)
130*cda5da8dSAndroid Build Coastguard Worker        else:
131*cda5da8dSAndroid Build Coastguard Worker            self._loop = loop
132*cda5da8dSAndroid Build Coastguard Worker        self._paused = False
133*cda5da8dSAndroid Build Coastguard Worker        self._drain_waiters = collections.deque()
134*cda5da8dSAndroid Build Coastguard Worker        self._connection_lost = False
135*cda5da8dSAndroid Build Coastguard Worker
136*cda5da8dSAndroid Build Coastguard Worker    def pause_writing(self):
137*cda5da8dSAndroid Build Coastguard Worker        assert not self._paused
138*cda5da8dSAndroid Build Coastguard Worker        self._paused = True
139*cda5da8dSAndroid Build Coastguard Worker        if self._loop.get_debug():
140*cda5da8dSAndroid Build Coastguard Worker            logger.debug("%r pauses writing", self)
141*cda5da8dSAndroid Build Coastguard Worker
142*cda5da8dSAndroid Build Coastguard Worker    def resume_writing(self):
143*cda5da8dSAndroid Build Coastguard Worker        assert self._paused
144*cda5da8dSAndroid Build Coastguard Worker        self._paused = False
145*cda5da8dSAndroid Build Coastguard Worker        if self._loop.get_debug():
146*cda5da8dSAndroid Build Coastguard Worker            logger.debug("%r resumes writing", self)
147*cda5da8dSAndroid Build Coastguard Worker
148*cda5da8dSAndroid Build Coastguard Worker        for waiter in self._drain_waiters:
149*cda5da8dSAndroid Build Coastguard Worker            if not waiter.done():
150*cda5da8dSAndroid Build Coastguard Worker                waiter.set_result(None)
151*cda5da8dSAndroid Build Coastguard Worker
152*cda5da8dSAndroid Build Coastguard Worker    def connection_lost(self, exc):
153*cda5da8dSAndroid Build Coastguard Worker        self._connection_lost = True
154*cda5da8dSAndroid Build Coastguard Worker        # Wake up the writer(s) if currently paused.
155*cda5da8dSAndroid Build Coastguard Worker        if not self._paused:
156*cda5da8dSAndroid Build Coastguard Worker            return
157*cda5da8dSAndroid Build Coastguard Worker
158*cda5da8dSAndroid Build Coastguard Worker        for waiter in self._drain_waiters:
159*cda5da8dSAndroid Build Coastguard Worker            if not waiter.done():
160*cda5da8dSAndroid Build Coastguard Worker                if exc is None:
161*cda5da8dSAndroid Build Coastguard Worker                    waiter.set_result(None)
162*cda5da8dSAndroid Build Coastguard Worker                else:
163*cda5da8dSAndroid Build Coastguard Worker                    waiter.set_exception(exc)
164*cda5da8dSAndroid Build Coastguard Worker
165*cda5da8dSAndroid Build Coastguard Worker    async def _drain_helper(self):
166*cda5da8dSAndroid Build Coastguard Worker        if self._connection_lost:
167*cda5da8dSAndroid Build Coastguard Worker            raise ConnectionResetError('Connection lost')
168*cda5da8dSAndroid Build Coastguard Worker        if not self._paused:
169*cda5da8dSAndroid Build Coastguard Worker            return
170*cda5da8dSAndroid Build Coastguard Worker        waiter = self._loop.create_future()
171*cda5da8dSAndroid Build Coastguard Worker        self._drain_waiters.append(waiter)
172*cda5da8dSAndroid Build Coastguard Worker        try:
173*cda5da8dSAndroid Build Coastguard Worker            await waiter
174*cda5da8dSAndroid Build Coastguard Worker        finally:
175*cda5da8dSAndroid Build Coastguard Worker            self._drain_waiters.remove(waiter)
176*cda5da8dSAndroid Build Coastguard Worker
177*cda5da8dSAndroid Build Coastguard Worker    def _get_close_waiter(self, stream):
178*cda5da8dSAndroid Build Coastguard Worker        raise NotImplementedError
179*cda5da8dSAndroid Build Coastguard Worker
180*cda5da8dSAndroid Build Coastguard Worker
181*cda5da8dSAndroid Build Coastguard Workerclass StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
182*cda5da8dSAndroid Build Coastguard Worker    """Helper class to adapt between Protocol and StreamReader.
183*cda5da8dSAndroid Build Coastguard Worker
184*cda5da8dSAndroid Build Coastguard Worker    (This is a helper class instead of making StreamReader itself a
185*cda5da8dSAndroid Build Coastguard Worker    Protocol subclass, because the StreamReader has other potential
186*cda5da8dSAndroid Build Coastguard Worker    uses, and to prevent the user of the StreamReader to accidentally
187*cda5da8dSAndroid Build Coastguard Worker    call inappropriate methods of the protocol.)
188*cda5da8dSAndroid Build Coastguard Worker    """
189*cda5da8dSAndroid Build Coastguard Worker
190*cda5da8dSAndroid Build Coastguard Worker    _source_traceback = None
191*cda5da8dSAndroid Build Coastguard Worker
192*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, stream_reader, client_connected_cb=None, loop=None):
193*cda5da8dSAndroid Build Coastguard Worker        super().__init__(loop=loop)
194*cda5da8dSAndroid Build Coastguard Worker        if stream_reader is not None:
195*cda5da8dSAndroid Build Coastguard Worker            self._stream_reader_wr = weakref.ref(stream_reader)
196*cda5da8dSAndroid Build Coastguard Worker            self._source_traceback = stream_reader._source_traceback
197*cda5da8dSAndroid Build Coastguard Worker        else:
198*cda5da8dSAndroid Build Coastguard Worker            self._stream_reader_wr = None
199*cda5da8dSAndroid Build Coastguard Worker        if client_connected_cb is not None:
200*cda5da8dSAndroid Build Coastguard Worker            # This is a stream created by the `create_server()` function.
201*cda5da8dSAndroid Build Coastguard Worker            # Keep a strong reference to the reader until a connection
202*cda5da8dSAndroid Build Coastguard Worker            # is established.
203*cda5da8dSAndroid Build Coastguard Worker            self._strong_reader = stream_reader
204*cda5da8dSAndroid Build Coastguard Worker        self._reject_connection = False
205*cda5da8dSAndroid Build Coastguard Worker        self._stream_writer = None
206*cda5da8dSAndroid Build Coastguard Worker        self._task = None
207*cda5da8dSAndroid Build Coastguard Worker        self._transport = None
208*cda5da8dSAndroid Build Coastguard Worker        self._client_connected_cb = client_connected_cb
209*cda5da8dSAndroid Build Coastguard Worker        self._over_ssl = False
210*cda5da8dSAndroid Build Coastguard Worker        self._closed = self._loop.create_future()
211*cda5da8dSAndroid Build Coastguard Worker
212*cda5da8dSAndroid Build Coastguard Worker    @property
213*cda5da8dSAndroid Build Coastguard Worker    def _stream_reader(self):
214*cda5da8dSAndroid Build Coastguard Worker        if self._stream_reader_wr is None:
215*cda5da8dSAndroid Build Coastguard Worker            return None
216*cda5da8dSAndroid Build Coastguard Worker        return self._stream_reader_wr()
217*cda5da8dSAndroid Build Coastguard Worker
218*cda5da8dSAndroid Build Coastguard Worker    def _replace_writer(self, writer):
219*cda5da8dSAndroid Build Coastguard Worker        loop = self._loop
220*cda5da8dSAndroid Build Coastguard Worker        transport = writer.transport
221*cda5da8dSAndroid Build Coastguard Worker        self._stream_writer = writer
222*cda5da8dSAndroid Build Coastguard Worker        self._transport = transport
223*cda5da8dSAndroid Build Coastguard Worker        self._over_ssl = transport.get_extra_info('sslcontext') is not None
224*cda5da8dSAndroid Build Coastguard Worker
225*cda5da8dSAndroid Build Coastguard Worker    def connection_made(self, transport):
226*cda5da8dSAndroid Build Coastguard Worker        if self._reject_connection:
227*cda5da8dSAndroid Build Coastguard Worker            context = {
228*cda5da8dSAndroid Build Coastguard Worker                'message': ('An open stream was garbage collected prior to '
229*cda5da8dSAndroid Build Coastguard Worker                            'establishing network connection; '
230*cda5da8dSAndroid Build Coastguard Worker                            'call "stream.close()" explicitly.')
231*cda5da8dSAndroid Build Coastguard Worker            }
232*cda5da8dSAndroid Build Coastguard Worker            if self._source_traceback:
233*cda5da8dSAndroid Build Coastguard Worker                context['source_traceback'] = self._source_traceback
234*cda5da8dSAndroid Build Coastguard Worker            self._loop.call_exception_handler(context)
235*cda5da8dSAndroid Build Coastguard Worker            transport.abort()
236*cda5da8dSAndroid Build Coastguard Worker            return
237*cda5da8dSAndroid Build Coastguard Worker        self._transport = transport
238*cda5da8dSAndroid Build Coastguard Worker        reader = self._stream_reader
239*cda5da8dSAndroid Build Coastguard Worker        if reader is not None:
240*cda5da8dSAndroid Build Coastguard Worker            reader.set_transport(transport)
241*cda5da8dSAndroid Build Coastguard Worker        self._over_ssl = transport.get_extra_info('sslcontext') is not None
242*cda5da8dSAndroid Build Coastguard Worker        if self._client_connected_cb is not None:
243*cda5da8dSAndroid Build Coastguard Worker            self._stream_writer = StreamWriter(transport, self,
244*cda5da8dSAndroid Build Coastguard Worker                                               reader,
245*cda5da8dSAndroid Build Coastguard Worker                                               self._loop)
246*cda5da8dSAndroid Build Coastguard Worker            res = self._client_connected_cb(reader,
247*cda5da8dSAndroid Build Coastguard Worker                                            self._stream_writer)
248*cda5da8dSAndroid Build Coastguard Worker            if coroutines.iscoroutine(res):
249*cda5da8dSAndroid Build Coastguard Worker                self._task = self._loop.create_task(res)
250*cda5da8dSAndroid Build Coastguard Worker            self._strong_reader = None
251*cda5da8dSAndroid Build Coastguard Worker
252*cda5da8dSAndroid Build Coastguard Worker    def connection_lost(self, exc):
253*cda5da8dSAndroid Build Coastguard Worker        reader = self._stream_reader
254*cda5da8dSAndroid Build Coastguard Worker        if reader is not None:
255*cda5da8dSAndroid Build Coastguard Worker            if exc is None:
256*cda5da8dSAndroid Build Coastguard Worker                reader.feed_eof()
257*cda5da8dSAndroid Build Coastguard Worker            else:
258*cda5da8dSAndroid Build Coastguard Worker                reader.set_exception(exc)
259*cda5da8dSAndroid Build Coastguard Worker        if not self._closed.done():
260*cda5da8dSAndroid Build Coastguard Worker            if exc is None:
261*cda5da8dSAndroid Build Coastguard Worker                self._closed.set_result(None)
262*cda5da8dSAndroid Build Coastguard Worker            else:
263*cda5da8dSAndroid Build Coastguard Worker                self._closed.set_exception(exc)
264*cda5da8dSAndroid Build Coastguard Worker        super().connection_lost(exc)
265*cda5da8dSAndroid Build Coastguard Worker        self._stream_reader_wr = None
266*cda5da8dSAndroid Build Coastguard Worker        self._stream_writer = None
267*cda5da8dSAndroid Build Coastguard Worker        self._task = None
268*cda5da8dSAndroid Build Coastguard Worker        self._transport = None
269*cda5da8dSAndroid Build Coastguard Worker
270*cda5da8dSAndroid Build Coastguard Worker    def data_received(self, data):
271*cda5da8dSAndroid Build Coastguard Worker        reader = self._stream_reader
272*cda5da8dSAndroid Build Coastguard Worker        if reader is not None:
273*cda5da8dSAndroid Build Coastguard Worker            reader.feed_data(data)
274*cda5da8dSAndroid Build Coastguard Worker
275*cda5da8dSAndroid Build Coastguard Worker    def eof_received(self):
276*cda5da8dSAndroid Build Coastguard Worker        reader = self._stream_reader
277*cda5da8dSAndroid Build Coastguard Worker        if reader is not None:
278*cda5da8dSAndroid Build Coastguard Worker            reader.feed_eof()
279*cda5da8dSAndroid Build Coastguard Worker        if self._over_ssl:
280*cda5da8dSAndroid Build Coastguard Worker            # Prevent a warning in SSLProtocol.eof_received:
281*cda5da8dSAndroid Build Coastguard Worker            # "returning true from eof_received()
282*cda5da8dSAndroid Build Coastguard Worker            # has no effect when using ssl"
283*cda5da8dSAndroid Build Coastguard Worker            return False
284*cda5da8dSAndroid Build Coastguard Worker        return True
285*cda5da8dSAndroid Build Coastguard Worker
286*cda5da8dSAndroid Build Coastguard Worker    def _get_close_waiter(self, stream):
287*cda5da8dSAndroid Build Coastguard Worker        return self._closed
288*cda5da8dSAndroid Build Coastguard Worker
289*cda5da8dSAndroid Build Coastguard Worker    def __del__(self):
290*cda5da8dSAndroid Build Coastguard Worker        # Prevent reports about unhandled exceptions.
291*cda5da8dSAndroid Build Coastguard Worker        # Better than self._closed._log_traceback = False hack
292*cda5da8dSAndroid Build Coastguard Worker        try:
293*cda5da8dSAndroid Build Coastguard Worker            closed = self._closed
294*cda5da8dSAndroid Build Coastguard Worker        except AttributeError:
295*cda5da8dSAndroid Build Coastguard Worker            pass  # failed constructor
296*cda5da8dSAndroid Build Coastguard Worker        else:
297*cda5da8dSAndroid Build Coastguard Worker            if closed.done() and not closed.cancelled():
298*cda5da8dSAndroid Build Coastguard Worker                closed.exception()
299*cda5da8dSAndroid Build Coastguard Worker
300*cda5da8dSAndroid Build Coastguard Worker
301*cda5da8dSAndroid Build Coastguard Workerclass StreamWriter:
302*cda5da8dSAndroid Build Coastguard Worker    """Wraps a Transport.
303*cda5da8dSAndroid Build Coastguard Worker
304*cda5da8dSAndroid Build Coastguard Worker    This exposes write(), writelines(), [can_]write_eof(),
305*cda5da8dSAndroid Build Coastguard Worker    get_extra_info() and close().  It adds drain() which returns an
306*cda5da8dSAndroid Build Coastguard Worker    optional Future on which you can wait for flow control.  It also
307*cda5da8dSAndroid Build Coastguard Worker    adds a transport property which references the Transport
308*cda5da8dSAndroid Build Coastguard Worker    directly.
309*cda5da8dSAndroid Build Coastguard Worker    """
310*cda5da8dSAndroid Build Coastguard Worker
311*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, transport, protocol, reader, loop):
312*cda5da8dSAndroid Build Coastguard Worker        self._transport = transport
313*cda5da8dSAndroid Build Coastguard Worker        self._protocol = protocol
314*cda5da8dSAndroid Build Coastguard Worker        # drain() expects that the reader has an exception() method
315*cda5da8dSAndroid Build Coastguard Worker        assert reader is None or isinstance(reader, StreamReader)
316*cda5da8dSAndroid Build Coastguard Worker        self._reader = reader
317*cda5da8dSAndroid Build Coastguard Worker        self._loop = loop
318*cda5da8dSAndroid Build Coastguard Worker        self._complete_fut = self._loop.create_future()
319*cda5da8dSAndroid Build Coastguard Worker        self._complete_fut.set_result(None)
320*cda5da8dSAndroid Build Coastguard Worker
321*cda5da8dSAndroid Build Coastguard Worker    def __repr__(self):
322*cda5da8dSAndroid Build Coastguard Worker        info = [self.__class__.__name__, f'transport={self._transport!r}']
323*cda5da8dSAndroid Build Coastguard Worker        if self._reader is not None:
324*cda5da8dSAndroid Build Coastguard Worker            info.append(f'reader={self._reader!r}')
325*cda5da8dSAndroid Build Coastguard Worker        return '<{}>'.format(' '.join(info))
326*cda5da8dSAndroid Build Coastguard Worker
327*cda5da8dSAndroid Build Coastguard Worker    @property
328*cda5da8dSAndroid Build Coastguard Worker    def transport(self):
329*cda5da8dSAndroid Build Coastguard Worker        return self._transport
330*cda5da8dSAndroid Build Coastguard Worker
331*cda5da8dSAndroid Build Coastguard Worker    def write(self, data):
332*cda5da8dSAndroid Build Coastguard Worker        self._transport.write(data)
333*cda5da8dSAndroid Build Coastguard Worker
334*cda5da8dSAndroid Build Coastguard Worker    def writelines(self, data):
335*cda5da8dSAndroid Build Coastguard Worker        self._transport.writelines(data)
336*cda5da8dSAndroid Build Coastguard Worker
337*cda5da8dSAndroid Build Coastguard Worker    def write_eof(self):
338*cda5da8dSAndroid Build Coastguard Worker        return self._transport.write_eof()
339*cda5da8dSAndroid Build Coastguard Worker
340*cda5da8dSAndroid Build Coastguard Worker    def can_write_eof(self):
341*cda5da8dSAndroid Build Coastguard Worker        return self._transport.can_write_eof()
342*cda5da8dSAndroid Build Coastguard Worker
343*cda5da8dSAndroid Build Coastguard Worker    def close(self):
344*cda5da8dSAndroid Build Coastguard Worker        return self._transport.close()
345*cda5da8dSAndroid Build Coastguard Worker
346*cda5da8dSAndroid Build Coastguard Worker    def is_closing(self):
347*cda5da8dSAndroid Build Coastguard Worker        return self._transport.is_closing()
348*cda5da8dSAndroid Build Coastguard Worker
349*cda5da8dSAndroid Build Coastguard Worker    async def wait_closed(self):
350*cda5da8dSAndroid Build Coastguard Worker        await self._protocol._get_close_waiter(self)
351*cda5da8dSAndroid Build Coastguard Worker
352*cda5da8dSAndroid Build Coastguard Worker    def get_extra_info(self, name, default=None):
353*cda5da8dSAndroid Build Coastguard Worker        return self._transport.get_extra_info(name, default)
354*cda5da8dSAndroid Build Coastguard Worker
355*cda5da8dSAndroid Build Coastguard Worker    async def drain(self):
356*cda5da8dSAndroid Build Coastguard Worker        """Flush the write buffer.
357*cda5da8dSAndroid Build Coastguard Worker
358*cda5da8dSAndroid Build Coastguard Worker        The intended use is to write
359*cda5da8dSAndroid Build Coastguard Worker
360*cda5da8dSAndroid Build Coastguard Worker          w.write(data)
361*cda5da8dSAndroid Build Coastguard Worker          await w.drain()
362*cda5da8dSAndroid Build Coastguard Worker        """
363*cda5da8dSAndroid Build Coastguard Worker        if self._reader is not None:
364*cda5da8dSAndroid Build Coastguard Worker            exc = self._reader.exception()
365*cda5da8dSAndroid Build Coastguard Worker            if exc is not None:
366*cda5da8dSAndroid Build Coastguard Worker                raise exc
367*cda5da8dSAndroid Build Coastguard Worker        if self._transport.is_closing():
368*cda5da8dSAndroid Build Coastguard Worker            # Wait for protocol.connection_lost() call
369*cda5da8dSAndroid Build Coastguard Worker            # Raise connection closing error if any,
370*cda5da8dSAndroid Build Coastguard Worker            # ConnectionResetError otherwise
371*cda5da8dSAndroid Build Coastguard Worker            # Yield to the event loop so connection_lost() may be
372*cda5da8dSAndroid Build Coastguard Worker            # called.  Without this, _drain_helper() would return
373*cda5da8dSAndroid Build Coastguard Worker            # immediately, and code that calls
374*cda5da8dSAndroid Build Coastguard Worker            #     write(...); await drain()
375*cda5da8dSAndroid Build Coastguard Worker            # in a loop would never call connection_lost(), so it
376*cda5da8dSAndroid Build Coastguard Worker            # would not see an error when the socket is closed.
377*cda5da8dSAndroid Build Coastguard Worker            await sleep(0)
378*cda5da8dSAndroid Build Coastguard Worker        await self._protocol._drain_helper()
379*cda5da8dSAndroid Build Coastguard Worker
380*cda5da8dSAndroid Build Coastguard Worker    async def start_tls(self, sslcontext, *,
381*cda5da8dSAndroid Build Coastguard Worker                        server_hostname=None,
382*cda5da8dSAndroid Build Coastguard Worker                        ssl_handshake_timeout=None):
383*cda5da8dSAndroid Build Coastguard Worker        """Upgrade an existing stream-based connection to TLS."""
384*cda5da8dSAndroid Build Coastguard Worker        server_side = self._protocol._client_connected_cb is not None
385*cda5da8dSAndroid Build Coastguard Worker        protocol = self._protocol
386*cda5da8dSAndroid Build Coastguard Worker        await self.drain()
387*cda5da8dSAndroid Build Coastguard Worker        new_transport = await self._loop.start_tls(  # type: ignore
388*cda5da8dSAndroid Build Coastguard Worker            self._transport, protocol, sslcontext,
389*cda5da8dSAndroid Build Coastguard Worker            server_side=server_side, server_hostname=server_hostname,
390*cda5da8dSAndroid Build Coastguard Worker            ssl_handshake_timeout=ssl_handshake_timeout)
391*cda5da8dSAndroid Build Coastguard Worker        self._transport = new_transport
392*cda5da8dSAndroid Build Coastguard Worker        protocol._replace_writer(self)
393*cda5da8dSAndroid Build Coastguard Worker
394*cda5da8dSAndroid Build Coastguard Worker
395*cda5da8dSAndroid Build Coastguard Workerclass StreamReader:
396*cda5da8dSAndroid Build Coastguard Worker
397*cda5da8dSAndroid Build Coastguard Worker    _source_traceback = None
398*cda5da8dSAndroid Build Coastguard Worker
399*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
400*cda5da8dSAndroid Build Coastguard Worker        # The line length limit is  a security feature;
401*cda5da8dSAndroid Build Coastguard Worker        # it also doubles as half the buffer limit.
402*cda5da8dSAndroid Build Coastguard Worker
403*cda5da8dSAndroid Build Coastguard Worker        if limit <= 0:
404*cda5da8dSAndroid Build Coastguard Worker            raise ValueError('Limit cannot be <= 0')
405*cda5da8dSAndroid Build Coastguard Worker
406*cda5da8dSAndroid Build Coastguard Worker        self._limit = limit
407*cda5da8dSAndroid Build Coastguard Worker        if loop is None:
408*cda5da8dSAndroid Build Coastguard Worker            self._loop = events._get_event_loop()
409*cda5da8dSAndroid Build Coastguard Worker        else:
410*cda5da8dSAndroid Build Coastguard Worker            self._loop = loop
411*cda5da8dSAndroid Build Coastguard Worker        self._buffer = bytearray()
412*cda5da8dSAndroid Build Coastguard Worker        self._eof = False    # Whether we're done.
413*cda5da8dSAndroid Build Coastguard Worker        self._waiter = None  # A future used by _wait_for_data()
414*cda5da8dSAndroid Build Coastguard Worker        self._exception = None
415*cda5da8dSAndroid Build Coastguard Worker        self._transport = None
416*cda5da8dSAndroid Build Coastguard Worker        self._paused = False
417*cda5da8dSAndroid Build Coastguard Worker        if self._loop.get_debug():
418*cda5da8dSAndroid Build Coastguard Worker            self._source_traceback = format_helpers.extract_stack(
419*cda5da8dSAndroid Build Coastguard Worker                sys._getframe(1))
420*cda5da8dSAndroid Build Coastguard Worker
421*cda5da8dSAndroid Build Coastguard Worker    def __repr__(self):
422*cda5da8dSAndroid Build Coastguard Worker        info = ['StreamReader']
423*cda5da8dSAndroid Build Coastguard Worker        if self._buffer:
424*cda5da8dSAndroid Build Coastguard Worker            info.append(f'{len(self._buffer)} bytes')
425*cda5da8dSAndroid Build Coastguard Worker        if self._eof:
426*cda5da8dSAndroid Build Coastguard Worker            info.append('eof')
427*cda5da8dSAndroid Build Coastguard Worker        if self._limit != _DEFAULT_LIMIT:
428*cda5da8dSAndroid Build Coastguard Worker            info.append(f'limit={self._limit}')
429*cda5da8dSAndroid Build Coastguard Worker        if self._waiter:
430*cda5da8dSAndroid Build Coastguard Worker            info.append(f'waiter={self._waiter!r}')
431*cda5da8dSAndroid Build Coastguard Worker        if self._exception:
432*cda5da8dSAndroid Build Coastguard Worker            info.append(f'exception={self._exception!r}')
433*cda5da8dSAndroid Build Coastguard Worker        if self._transport:
434*cda5da8dSAndroid Build Coastguard Worker            info.append(f'transport={self._transport!r}')
435*cda5da8dSAndroid Build Coastguard Worker        if self._paused:
436*cda5da8dSAndroid Build Coastguard Worker            info.append('paused')
437*cda5da8dSAndroid Build Coastguard Worker        return '<{}>'.format(' '.join(info))
438*cda5da8dSAndroid Build Coastguard Worker
439*cda5da8dSAndroid Build Coastguard Worker    def exception(self):
440*cda5da8dSAndroid Build Coastguard Worker        return self._exception
441*cda5da8dSAndroid Build Coastguard Worker
442*cda5da8dSAndroid Build Coastguard Worker    def set_exception(self, exc):
443*cda5da8dSAndroid Build Coastguard Worker        self._exception = exc
444*cda5da8dSAndroid Build Coastguard Worker
445*cda5da8dSAndroid Build Coastguard Worker        waiter = self._waiter
446*cda5da8dSAndroid Build Coastguard Worker        if waiter is not None:
447*cda5da8dSAndroid Build Coastguard Worker            self._waiter = None
448*cda5da8dSAndroid Build Coastguard Worker            if not waiter.cancelled():
449*cda5da8dSAndroid Build Coastguard Worker                waiter.set_exception(exc)
450*cda5da8dSAndroid Build Coastguard Worker
451*cda5da8dSAndroid Build Coastguard Worker    def _wakeup_waiter(self):
452*cda5da8dSAndroid Build Coastguard Worker        """Wakeup read*() functions waiting for data or EOF."""
453*cda5da8dSAndroid Build Coastguard Worker        waiter = self._waiter
454*cda5da8dSAndroid Build Coastguard Worker        if waiter is not None:
455*cda5da8dSAndroid Build Coastguard Worker            self._waiter = None
456*cda5da8dSAndroid Build Coastguard Worker            if not waiter.cancelled():
457*cda5da8dSAndroid Build Coastguard Worker                waiter.set_result(None)
458*cda5da8dSAndroid Build Coastguard Worker
459*cda5da8dSAndroid Build Coastguard Worker    def set_transport(self, transport):
460*cda5da8dSAndroid Build Coastguard Worker        assert self._transport is None, 'Transport already set'
461*cda5da8dSAndroid Build Coastguard Worker        self._transport = transport
462*cda5da8dSAndroid Build Coastguard Worker
463*cda5da8dSAndroid Build Coastguard Worker    def _maybe_resume_transport(self):
464*cda5da8dSAndroid Build Coastguard Worker        if self._paused and len(self._buffer) <= self._limit:
465*cda5da8dSAndroid Build Coastguard Worker            self._paused = False
466*cda5da8dSAndroid Build Coastguard Worker            self._transport.resume_reading()
467*cda5da8dSAndroid Build Coastguard Worker
468*cda5da8dSAndroid Build Coastguard Worker    def feed_eof(self):
469*cda5da8dSAndroid Build Coastguard Worker        self._eof = True
470*cda5da8dSAndroid Build Coastguard Worker        self._wakeup_waiter()
471*cda5da8dSAndroid Build Coastguard Worker
472*cda5da8dSAndroid Build Coastguard Worker    def at_eof(self):
473*cda5da8dSAndroid Build Coastguard Worker        """Return True if the buffer is empty and 'feed_eof' was called."""
474*cda5da8dSAndroid Build Coastguard Worker        return self._eof and not self._buffer
475*cda5da8dSAndroid Build Coastguard Worker
476*cda5da8dSAndroid Build Coastguard Worker    def feed_data(self, data):
477*cda5da8dSAndroid Build Coastguard Worker        assert not self._eof, 'feed_data after feed_eof'
478*cda5da8dSAndroid Build Coastguard Worker
479*cda5da8dSAndroid Build Coastguard Worker        if not data:
480*cda5da8dSAndroid Build Coastguard Worker            return
481*cda5da8dSAndroid Build Coastguard Worker
482*cda5da8dSAndroid Build Coastguard Worker        self._buffer.extend(data)
483*cda5da8dSAndroid Build Coastguard Worker        self._wakeup_waiter()
484*cda5da8dSAndroid Build Coastguard Worker
485*cda5da8dSAndroid Build Coastguard Worker        if (self._transport is not None and
486*cda5da8dSAndroid Build Coastguard Worker                not self._paused and
487*cda5da8dSAndroid Build Coastguard Worker                len(self._buffer) > 2 * self._limit):
488*cda5da8dSAndroid Build Coastguard Worker            try:
489*cda5da8dSAndroid Build Coastguard Worker                self._transport.pause_reading()
490*cda5da8dSAndroid Build Coastguard Worker            except NotImplementedError:
491*cda5da8dSAndroid Build Coastguard Worker                # The transport can't be paused.
492*cda5da8dSAndroid Build Coastguard Worker                # We'll just have to buffer all data.
493*cda5da8dSAndroid Build Coastguard Worker                # Forget the transport so we don't keep trying.
494*cda5da8dSAndroid Build Coastguard Worker                self._transport = None
495*cda5da8dSAndroid Build Coastguard Worker            else:
496*cda5da8dSAndroid Build Coastguard Worker                self._paused = True
497*cda5da8dSAndroid Build Coastguard Worker
498*cda5da8dSAndroid Build Coastguard Worker    async def _wait_for_data(self, func_name):
499*cda5da8dSAndroid Build Coastguard Worker        """Wait until feed_data() or feed_eof() is called.
500*cda5da8dSAndroid Build Coastguard Worker
501*cda5da8dSAndroid Build Coastguard Worker        If stream was paused, automatically resume it.
502*cda5da8dSAndroid Build Coastguard Worker        """
503*cda5da8dSAndroid Build Coastguard Worker        # StreamReader uses a future to link the protocol feed_data() method
504*cda5da8dSAndroid Build Coastguard Worker        # to a read coroutine. Running two read coroutines at the same time
505*cda5da8dSAndroid Build Coastguard Worker        # would have an unexpected behaviour. It would not possible to know
506*cda5da8dSAndroid Build Coastguard Worker        # which coroutine would get the next data.
507*cda5da8dSAndroid Build Coastguard Worker        if self._waiter is not None:
508*cda5da8dSAndroid Build Coastguard Worker            raise RuntimeError(
509*cda5da8dSAndroid Build Coastguard Worker                f'{func_name}() called while another coroutine is '
510*cda5da8dSAndroid Build Coastguard Worker                f'already waiting for incoming data')
511*cda5da8dSAndroid Build Coastguard Worker
512*cda5da8dSAndroid Build Coastguard Worker        assert not self._eof, '_wait_for_data after EOF'
513*cda5da8dSAndroid Build Coastguard Worker
514*cda5da8dSAndroid Build Coastguard Worker        # Waiting for data while paused will make deadlock, so prevent it.
515*cda5da8dSAndroid Build Coastguard Worker        # This is essential for readexactly(n) for case when n > self._limit.
516*cda5da8dSAndroid Build Coastguard Worker        if self._paused:
517*cda5da8dSAndroid Build Coastguard Worker            self._paused = False
518*cda5da8dSAndroid Build Coastguard Worker            self._transport.resume_reading()
519*cda5da8dSAndroid Build Coastguard Worker
520*cda5da8dSAndroid Build Coastguard Worker        self._waiter = self._loop.create_future()
521*cda5da8dSAndroid Build Coastguard Worker        try:
522*cda5da8dSAndroid Build Coastguard Worker            await self._waiter
523*cda5da8dSAndroid Build Coastguard Worker        finally:
524*cda5da8dSAndroid Build Coastguard Worker            self._waiter = None
525*cda5da8dSAndroid Build Coastguard Worker
526*cda5da8dSAndroid Build Coastguard Worker    async def readline(self):
527*cda5da8dSAndroid Build Coastguard Worker        """Read chunk of data from the stream until newline (b'\n') is found.
528*cda5da8dSAndroid Build Coastguard Worker
529*cda5da8dSAndroid Build Coastguard Worker        On success, return chunk that ends with newline. If only partial
530*cda5da8dSAndroid Build Coastguard Worker        line can be read due to EOF, return incomplete line without
531*cda5da8dSAndroid Build Coastguard Worker        terminating newline. When EOF was reached while no bytes read, empty
532*cda5da8dSAndroid Build Coastguard Worker        bytes object is returned.
533*cda5da8dSAndroid Build Coastguard Worker
534*cda5da8dSAndroid Build Coastguard Worker        If limit is reached, ValueError will be raised. In that case, if
535*cda5da8dSAndroid Build Coastguard Worker        newline was found, complete line including newline will be removed
536*cda5da8dSAndroid Build Coastguard Worker        from internal buffer. Else, internal buffer will be cleared. Limit is
537*cda5da8dSAndroid Build Coastguard Worker        compared against part of the line without newline.
538*cda5da8dSAndroid Build Coastguard Worker
539*cda5da8dSAndroid Build Coastguard Worker        If stream was paused, this function will automatically resume it if
540*cda5da8dSAndroid Build Coastguard Worker        needed.
541*cda5da8dSAndroid Build Coastguard Worker        """
542*cda5da8dSAndroid Build Coastguard Worker        sep = b'\n'
543*cda5da8dSAndroid Build Coastguard Worker        seplen = len(sep)
544*cda5da8dSAndroid Build Coastguard Worker        try:
545*cda5da8dSAndroid Build Coastguard Worker            line = await self.readuntil(sep)
546*cda5da8dSAndroid Build Coastguard Worker        except exceptions.IncompleteReadError as e:
547*cda5da8dSAndroid Build Coastguard Worker            return e.partial
548*cda5da8dSAndroid Build Coastguard Worker        except exceptions.LimitOverrunError as e:
549*cda5da8dSAndroid Build Coastguard Worker            if self._buffer.startswith(sep, e.consumed):
550*cda5da8dSAndroid Build Coastguard Worker                del self._buffer[:e.consumed + seplen]
551*cda5da8dSAndroid Build Coastguard Worker            else:
552*cda5da8dSAndroid Build Coastguard Worker                self._buffer.clear()
553*cda5da8dSAndroid Build Coastguard Worker            self._maybe_resume_transport()
554*cda5da8dSAndroid Build Coastguard Worker            raise ValueError(e.args[0])
555*cda5da8dSAndroid Build Coastguard Worker        return line
556*cda5da8dSAndroid Build Coastguard Worker
557*cda5da8dSAndroid Build Coastguard Worker    async def readuntil(self, separator=b'\n'):
558*cda5da8dSAndroid Build Coastguard Worker        """Read data from the stream until ``separator`` is found.
559*cda5da8dSAndroid Build Coastguard Worker
560*cda5da8dSAndroid Build Coastguard Worker        On success, the data and separator will be removed from the
561*cda5da8dSAndroid Build Coastguard Worker        internal buffer (consumed). Returned data will include the
562*cda5da8dSAndroid Build Coastguard Worker        separator at the end.
563*cda5da8dSAndroid Build Coastguard Worker
564*cda5da8dSAndroid Build Coastguard Worker        Configured stream limit is used to check result. Limit sets the
565*cda5da8dSAndroid Build Coastguard Worker        maximal length of data that can be returned, not counting the
566*cda5da8dSAndroid Build Coastguard Worker        separator.
567*cda5da8dSAndroid Build Coastguard Worker
568*cda5da8dSAndroid Build Coastguard Worker        If an EOF occurs and the complete separator is still not found,
569*cda5da8dSAndroid Build Coastguard Worker        an IncompleteReadError exception will be raised, and the internal
570*cda5da8dSAndroid Build Coastguard Worker        buffer will be reset.  The IncompleteReadError.partial attribute
571*cda5da8dSAndroid Build Coastguard Worker        may contain the separator partially.
572*cda5da8dSAndroid Build Coastguard Worker
573*cda5da8dSAndroid Build Coastguard Worker        If the data cannot be read because of over limit, a
574*cda5da8dSAndroid Build Coastguard Worker        LimitOverrunError exception  will be raised, and the data
575*cda5da8dSAndroid Build Coastguard Worker        will be left in the internal buffer, so it can be read again.
576*cda5da8dSAndroid Build Coastguard Worker        """
577*cda5da8dSAndroid Build Coastguard Worker        seplen = len(separator)
578*cda5da8dSAndroid Build Coastguard Worker        if seplen == 0:
579*cda5da8dSAndroid Build Coastguard Worker            raise ValueError('Separator should be at least one-byte string')
580*cda5da8dSAndroid Build Coastguard Worker
581*cda5da8dSAndroid Build Coastguard Worker        if self._exception is not None:
582*cda5da8dSAndroid Build Coastguard Worker            raise self._exception
583*cda5da8dSAndroid Build Coastguard Worker
584*cda5da8dSAndroid Build Coastguard Worker        # Consume whole buffer except last bytes, which length is
585*cda5da8dSAndroid Build Coastguard Worker        # one less than seplen. Let's check corner cases with
586*cda5da8dSAndroid Build Coastguard Worker        # separator='SEPARATOR':
587*cda5da8dSAndroid Build Coastguard Worker        # * we have received almost complete separator (without last
588*cda5da8dSAndroid Build Coastguard Worker        #   byte). i.e buffer='some textSEPARATO'. In this case we
589*cda5da8dSAndroid Build Coastguard Worker        #   can safely consume len(separator) - 1 bytes.
590*cda5da8dSAndroid Build Coastguard Worker        # * last byte of buffer is first byte of separator, i.e.
591*cda5da8dSAndroid Build Coastguard Worker        #   buffer='abcdefghijklmnopqrS'. We may safely consume
592*cda5da8dSAndroid Build Coastguard Worker        #   everything except that last byte, but this require to
593*cda5da8dSAndroid Build Coastguard Worker        #   analyze bytes of buffer that match partial separator.
594*cda5da8dSAndroid Build Coastguard Worker        #   This is slow and/or require FSM. For this case our
595*cda5da8dSAndroid Build Coastguard Worker        #   implementation is not optimal, since require rescanning
596*cda5da8dSAndroid Build Coastguard Worker        #   of data that is known to not belong to separator. In
597*cda5da8dSAndroid Build Coastguard Worker        #   real world, separator will not be so long to notice
598*cda5da8dSAndroid Build Coastguard Worker        #   performance problems. Even when reading MIME-encoded
599*cda5da8dSAndroid Build Coastguard Worker        #   messages :)
600*cda5da8dSAndroid Build Coastguard Worker
601*cda5da8dSAndroid Build Coastguard Worker        # `offset` is the number of bytes from the beginning of the buffer
602*cda5da8dSAndroid Build Coastguard Worker        # where there is no occurrence of `separator`.
603*cda5da8dSAndroid Build Coastguard Worker        offset = 0
604*cda5da8dSAndroid Build Coastguard Worker
605*cda5da8dSAndroid Build Coastguard Worker        # Loop until we find `separator` in the buffer, exceed the buffer size,
606*cda5da8dSAndroid Build Coastguard Worker        # or an EOF has happened.
607*cda5da8dSAndroid Build Coastguard Worker        while True:
608*cda5da8dSAndroid Build Coastguard Worker            buflen = len(self._buffer)
609*cda5da8dSAndroid Build Coastguard Worker
610*cda5da8dSAndroid Build Coastguard Worker            # Check if we now have enough data in the buffer for `separator` to
611*cda5da8dSAndroid Build Coastguard Worker            # fit.
612*cda5da8dSAndroid Build Coastguard Worker            if buflen - offset >= seplen:
613*cda5da8dSAndroid Build Coastguard Worker                isep = self._buffer.find(separator, offset)
614*cda5da8dSAndroid Build Coastguard Worker
615*cda5da8dSAndroid Build Coastguard Worker                if isep != -1:
616*cda5da8dSAndroid Build Coastguard Worker                    # `separator` is in the buffer. `isep` will be used later
617*cda5da8dSAndroid Build Coastguard Worker                    # to retrieve the data.
618*cda5da8dSAndroid Build Coastguard Worker                    break
619*cda5da8dSAndroid Build Coastguard Worker
620*cda5da8dSAndroid Build Coastguard Worker                # see upper comment for explanation.
621*cda5da8dSAndroid Build Coastguard Worker                offset = buflen + 1 - seplen
622*cda5da8dSAndroid Build Coastguard Worker                if offset > self._limit:
623*cda5da8dSAndroid Build Coastguard Worker                    raise exceptions.LimitOverrunError(
624*cda5da8dSAndroid Build Coastguard Worker                        'Separator is not found, and chunk exceed the limit',
625*cda5da8dSAndroid Build Coastguard Worker                        offset)
626*cda5da8dSAndroid Build Coastguard Worker
627*cda5da8dSAndroid Build Coastguard Worker            # Complete message (with full separator) may be present in buffer
628*cda5da8dSAndroid Build Coastguard Worker            # even when EOF flag is set. This may happen when the last chunk
629*cda5da8dSAndroid Build Coastguard Worker            # adds data which makes separator be found. That's why we check for
630*cda5da8dSAndroid Build Coastguard Worker            # EOF *ater* inspecting the buffer.
631*cda5da8dSAndroid Build Coastguard Worker            if self._eof:
632*cda5da8dSAndroid Build Coastguard Worker                chunk = bytes(self._buffer)
633*cda5da8dSAndroid Build Coastguard Worker                self._buffer.clear()
634*cda5da8dSAndroid Build Coastguard Worker                raise exceptions.IncompleteReadError(chunk, None)
635*cda5da8dSAndroid Build Coastguard Worker
636*cda5da8dSAndroid Build Coastguard Worker            # _wait_for_data() will resume reading if stream was paused.
637*cda5da8dSAndroid Build Coastguard Worker            await self._wait_for_data('readuntil')
638*cda5da8dSAndroid Build Coastguard Worker
639*cda5da8dSAndroid Build Coastguard Worker        if isep > self._limit:
640*cda5da8dSAndroid Build Coastguard Worker            raise exceptions.LimitOverrunError(
641*cda5da8dSAndroid Build Coastguard Worker                'Separator is found, but chunk is longer than limit', isep)
642*cda5da8dSAndroid Build Coastguard Worker
643*cda5da8dSAndroid Build Coastguard Worker        chunk = self._buffer[:isep + seplen]
644*cda5da8dSAndroid Build Coastguard Worker        del self._buffer[:isep + seplen]
645*cda5da8dSAndroid Build Coastguard Worker        self._maybe_resume_transport()
646*cda5da8dSAndroid Build Coastguard Worker        return bytes(chunk)
647*cda5da8dSAndroid Build Coastguard Worker
648*cda5da8dSAndroid Build Coastguard Worker    async def read(self, n=-1):
649*cda5da8dSAndroid Build Coastguard Worker        """Read up to `n` bytes from the stream.
650*cda5da8dSAndroid Build Coastguard Worker
651*cda5da8dSAndroid Build Coastguard Worker        If `n` is not provided or set to -1,
652*cda5da8dSAndroid Build Coastguard Worker        read until EOF, then return all read bytes.
653*cda5da8dSAndroid Build Coastguard Worker        If EOF was received and the internal buffer is empty,
654*cda5da8dSAndroid Build Coastguard Worker        return an empty bytes object.
655*cda5da8dSAndroid Build Coastguard Worker
656*cda5da8dSAndroid Build Coastguard Worker        If `n` is 0, return an empty bytes object immediately.
657*cda5da8dSAndroid Build Coastguard Worker
658*cda5da8dSAndroid Build Coastguard Worker        If `n` is positive, return at most `n` available bytes
659*cda5da8dSAndroid Build Coastguard Worker        as soon as at least 1 byte is available in the internal buffer.
660*cda5da8dSAndroid Build Coastguard Worker        If EOF is received before any byte is read, return an empty
661*cda5da8dSAndroid Build Coastguard Worker        bytes object.
662*cda5da8dSAndroid Build Coastguard Worker
663*cda5da8dSAndroid Build Coastguard Worker        Returned value is not limited with limit, configured at stream
664*cda5da8dSAndroid Build Coastguard Worker        creation.
665*cda5da8dSAndroid Build Coastguard Worker
666*cda5da8dSAndroid Build Coastguard Worker        If stream was paused, this function will automatically resume it if
667*cda5da8dSAndroid Build Coastguard Worker        needed.
668*cda5da8dSAndroid Build Coastguard Worker        """
669*cda5da8dSAndroid Build Coastguard Worker
670*cda5da8dSAndroid Build Coastguard Worker        if self._exception is not None:
671*cda5da8dSAndroid Build Coastguard Worker            raise self._exception
672*cda5da8dSAndroid Build Coastguard Worker
673*cda5da8dSAndroid Build Coastguard Worker        if n == 0:
674*cda5da8dSAndroid Build Coastguard Worker            return b''
675*cda5da8dSAndroid Build Coastguard Worker
676*cda5da8dSAndroid Build Coastguard Worker        if n < 0:
677*cda5da8dSAndroid Build Coastguard Worker            # This used to just loop creating a new waiter hoping to
678*cda5da8dSAndroid Build Coastguard Worker            # collect everything in self._buffer, but that would
679*cda5da8dSAndroid Build Coastguard Worker            # deadlock if the subprocess sends more than self.limit
680*cda5da8dSAndroid Build Coastguard Worker            # bytes.  So just call self.read(self._limit) until EOF.
681*cda5da8dSAndroid Build Coastguard Worker            blocks = []
682*cda5da8dSAndroid Build Coastguard Worker            while True:
683*cda5da8dSAndroid Build Coastguard Worker                block = await self.read(self._limit)
684*cda5da8dSAndroid Build Coastguard Worker                if not block:
685*cda5da8dSAndroid Build Coastguard Worker                    break
686*cda5da8dSAndroid Build Coastguard Worker                blocks.append(block)
687*cda5da8dSAndroid Build Coastguard Worker            return b''.join(blocks)
688*cda5da8dSAndroid Build Coastguard Worker
689*cda5da8dSAndroid Build Coastguard Worker        if not self._buffer and not self._eof:
690*cda5da8dSAndroid Build Coastguard Worker            await self._wait_for_data('read')
691*cda5da8dSAndroid Build Coastguard Worker
692*cda5da8dSAndroid Build Coastguard Worker        # This will work right even if buffer is less than n bytes
693*cda5da8dSAndroid Build Coastguard Worker        data = bytes(self._buffer[:n])
694*cda5da8dSAndroid Build Coastguard Worker        del self._buffer[:n]
695*cda5da8dSAndroid Build Coastguard Worker
696*cda5da8dSAndroid Build Coastguard Worker        self._maybe_resume_transport()
697*cda5da8dSAndroid Build Coastguard Worker        return data
698*cda5da8dSAndroid Build Coastguard Worker
699*cda5da8dSAndroid Build Coastguard Worker    async def readexactly(self, n):
700*cda5da8dSAndroid Build Coastguard Worker        """Read exactly `n` bytes.
701*cda5da8dSAndroid Build Coastguard Worker
702*cda5da8dSAndroid Build Coastguard Worker        Raise an IncompleteReadError if EOF is reached before `n` bytes can be
703*cda5da8dSAndroid Build Coastguard Worker        read. The IncompleteReadError.partial attribute of the exception will
704*cda5da8dSAndroid Build Coastguard Worker        contain the partial read bytes.
705*cda5da8dSAndroid Build Coastguard Worker
706*cda5da8dSAndroid Build Coastguard Worker        if n is zero, return empty bytes object.
707*cda5da8dSAndroid Build Coastguard Worker
708*cda5da8dSAndroid Build Coastguard Worker        Returned value is not limited with limit, configured at stream
709*cda5da8dSAndroid Build Coastguard Worker        creation.
710*cda5da8dSAndroid Build Coastguard Worker
711*cda5da8dSAndroid Build Coastguard Worker        If stream was paused, this function will automatically resume it if
712*cda5da8dSAndroid Build Coastguard Worker        needed.
713*cda5da8dSAndroid Build Coastguard Worker        """
714*cda5da8dSAndroid Build Coastguard Worker        if n < 0:
715*cda5da8dSAndroid Build Coastguard Worker            raise ValueError('readexactly size can not be less than zero')
716*cda5da8dSAndroid Build Coastguard Worker
717*cda5da8dSAndroid Build Coastguard Worker        if self._exception is not None:
718*cda5da8dSAndroid Build Coastguard Worker            raise self._exception
719*cda5da8dSAndroid Build Coastguard Worker
720*cda5da8dSAndroid Build Coastguard Worker        if n == 0:
721*cda5da8dSAndroid Build Coastguard Worker            return b''
722*cda5da8dSAndroid Build Coastguard Worker
723*cda5da8dSAndroid Build Coastguard Worker        while len(self._buffer) < n:
724*cda5da8dSAndroid Build Coastguard Worker            if self._eof:
725*cda5da8dSAndroid Build Coastguard Worker                incomplete = bytes(self._buffer)
726*cda5da8dSAndroid Build Coastguard Worker                self._buffer.clear()
727*cda5da8dSAndroid Build Coastguard Worker                raise exceptions.IncompleteReadError(incomplete, n)
728*cda5da8dSAndroid Build Coastguard Worker
729*cda5da8dSAndroid Build Coastguard Worker            await self._wait_for_data('readexactly')
730*cda5da8dSAndroid Build Coastguard Worker
731*cda5da8dSAndroid Build Coastguard Worker        if len(self._buffer) == n:
732*cda5da8dSAndroid Build Coastguard Worker            data = bytes(self._buffer)
733*cda5da8dSAndroid Build Coastguard Worker            self._buffer.clear()
734*cda5da8dSAndroid Build Coastguard Worker        else:
735*cda5da8dSAndroid Build Coastguard Worker            data = bytes(self._buffer[:n])
736*cda5da8dSAndroid Build Coastguard Worker            del self._buffer[:n]
737*cda5da8dSAndroid Build Coastguard Worker        self._maybe_resume_transport()
738*cda5da8dSAndroid Build Coastguard Worker        return data
739*cda5da8dSAndroid Build Coastguard Worker
740*cda5da8dSAndroid Build Coastguard Worker    def __aiter__(self):
741*cda5da8dSAndroid Build Coastguard Worker        return self
742*cda5da8dSAndroid Build Coastguard Worker
743*cda5da8dSAndroid Build Coastguard Worker    async def __anext__(self):
744*cda5da8dSAndroid Build Coastguard Worker        val = await self.readline()
745*cda5da8dSAndroid Build Coastguard Worker        if val == b'':
746*cda5da8dSAndroid Build Coastguard Worker            raise StopAsyncIteration
747*cda5da8dSAndroid Build Coastguard Worker        return val
748