1.. currentmodule:: asyncio
2
3.. _asyncio-streams:
4
5=======
6Streams
7=======
8
9**Source code:** :source:`Lib/asyncio/streams.py`
10
11-------------------------------------------------
12
13Streams are high-level async/await-ready primitives to work with
14network connections.  Streams allow sending and receiving data without
15using callbacks or low-level protocols and transports.
16
17.. _asyncio_example_stream:
18
19Here is an example of a TCP echo client written using asyncio
20streams::
21
22    import asyncio
23
24    async def tcp_echo_client(message):
25        reader, writer = await asyncio.open_connection(
26            '127.0.0.1', 8888)
27
28        print(f'Send: {message!r}')
29        writer.write(message.encode())
30        await writer.drain()
31
32        data = await reader.read(100)
33        print(f'Received: {data.decode()!r}')
34
35        print('Close the connection')
36        writer.close()
37        await writer.wait_closed()
38
39    asyncio.run(tcp_echo_client('Hello World!'))
40
41
42See also the `Examples`_ section below.
43
44
45.. rubric:: Stream Functions
46
47The following top-level asyncio functions can be used to create
48and work with streams:
49
50
51.. coroutinefunction:: open_connection(host=None, port=None, *, \
52                          limit=None, ssl=None, family=0, proto=0, \
53                          flags=0, sock=None, local_addr=None, \
54                          server_hostname=None, ssl_handshake_timeout=None, \
55                          ssl_shutdown_timeout=None, \
56                          happy_eyeballs_delay=None, interleave=None)
57
58   Establish a network connection and return a pair of
59   ``(reader, writer)`` objects.
60
61   The returned *reader* and *writer* objects are instances of
62   :class:`StreamReader` and :class:`StreamWriter` classes.
63
64   *limit* determines the buffer size limit used by the
65   returned :class:`StreamReader` instance.  By default the *limit*
66   is set to 64 KiB.
67
68   The rest of the arguments are passed directly to
69   :meth:`loop.create_connection`.
70
71   .. note::
72
73      The *sock* argument transfers ownership of the socket to the
74      :class:`StreamWriter` created. To close the socket, call its
75      :meth:`~asyncio.StreamWriter.close` method.
76
77   .. versionchanged:: 3.7
78      Added the *ssl_handshake_timeout* parameter.
79
80   .. versionadded:: 3.8
81      Added *happy_eyeballs_delay* and *interleave* parameters.
82
83   .. versionchanged:: 3.10
84      Removed the *loop* parameter.
85
86   .. versionchanged:: 3.11
87      Added the *ssl_shutdown_timeout* parameter.
88
89
90.. coroutinefunction:: start_server(client_connected_cb, host=None, \
91                          port=None, *, limit=None, \
92                          family=socket.AF_UNSPEC, \
93                          flags=socket.AI_PASSIVE, sock=None, \
94                          backlog=100, ssl=None, reuse_address=None, \
95                          reuse_port=None, ssl_handshake_timeout=None, \
96                          ssl_shutdown_timeout=None, start_serving=True)
97
98   Start a socket server.
99
100   The *client_connected_cb* callback is called whenever a new client
101   connection is established.  It receives a ``(reader, writer)`` pair
102   as two arguments, instances of the :class:`StreamReader` and
103   :class:`StreamWriter` classes.
104
105   *client_connected_cb* can be a plain callable or a
106   :ref:`coroutine function <coroutine>`; if it is a coroutine function,
107   it will be automatically scheduled as a :class:`Task`.
108
109   *limit* determines the buffer size limit used by the
110   returned :class:`StreamReader` instance.  By default the *limit*
111   is set to 64 KiB.
112
113   The rest of the arguments are passed directly to
114   :meth:`loop.create_server`.
115
116   .. note::
117
118      The *sock* argument transfers ownership of the socket to the
119      server created. To close the socket, call the server's
120      :meth:`~asyncio.Server.close` method.
121
122   .. versionchanged:: 3.7
123      Added the *ssl_handshake_timeout* and *start_serving* parameters.
124
125   .. versionchanged:: 3.10
126      Removed the *loop* parameter.
127
128   .. versionchanged:: 3.11
129      Added the *ssl_shutdown_timeout* parameter.
130
131
132.. rubric:: Unix Sockets
133
134.. coroutinefunction:: open_unix_connection(path=None, *, limit=None, \
135                        ssl=None, sock=None, server_hostname=None, \
136                        ssl_handshake_timeout=None, ssl_shutdown_timeout=None)
137
138   Establish a Unix socket connection and return a pair of
139   ``(reader, writer)``.
140
141   Similar to :func:`open_connection` but operates on Unix sockets.
142
143   See also the documentation of :meth:`loop.create_unix_connection`.
144
145   .. note::
146
147      The *sock* argument transfers ownership of the socket to the
148      :class:`StreamWriter` created. To close the socket, call its
149      :meth:`~asyncio.StreamWriter.close` method.
150
151   .. availability:: Unix.
152
153   .. versionchanged:: 3.7
154      Added the *ssl_handshake_timeout* parameter.
155      The *path* parameter can now be a :term:`path-like object`
156
157   .. versionchanged:: 3.10
158      Removed the *loop* parameter.
159
160  .. versionchanged:: 3.11
161     Added the *ssl_shutdown_timeout* parameter.
162
163
164.. coroutinefunction:: start_unix_server(client_connected_cb, path=None, \
165                          *, limit=None, sock=None, backlog=100, ssl=None, \
166                          ssl_handshake_timeout=None, \
167                          ssl_shutdown_timeout=None, start_serving=True)
168
169   Start a Unix socket server.
170
171   Similar to :func:`start_server` but works with Unix sockets.
172
173   See also the documentation of :meth:`loop.create_unix_server`.
174
175   .. note::
176
177      The *sock* argument transfers ownership of the socket to the
178      server created. To close the socket, call the server's
179      :meth:`~asyncio.Server.close` method.
180
181   .. availability:: Unix.
182
183   .. versionchanged:: 3.7
184      Added the *ssl_handshake_timeout* and *start_serving* parameters.
185      The *path* parameter can now be a :term:`path-like object`.
186
187   .. versionchanged:: 3.10
188      Removed the *loop* parameter.
189
190   .. versionchanged:: 3.11
191      Added the *ssl_shutdown_timeout* parameter.
192
193
194StreamReader
195============
196
197.. class:: StreamReader
198
199   Represents a reader object that provides APIs to read data
200   from the IO stream. As an :term:`asynchronous iterable`, the
201   object supports the :keyword:`async for` statement.
202
203   It is not recommended to instantiate *StreamReader* objects
204   directly; use :func:`open_connection` and :func:`start_server`
205   instead.
206
207   .. coroutinemethod:: read(n=-1)
208
209      Read up to *n* bytes from the stream.
210
211      If *n* is not provided or set to ``-1``,
212      read until EOF, then return all read :class:`bytes`.
213      If EOF was received and the internal buffer is empty,
214      return an empty ``bytes`` object.
215
216      If *n* is ``0``, return an empty ``bytes`` object immediately.
217
218      If *n* is positive, return at most *n* available ``bytes``
219      as soon as at least 1 byte is available in the internal buffer.
220      If EOF is received before any byte is read, return an empty
221      ``bytes`` object.
222
223   .. coroutinemethod:: readline()
224
225      Read one line, where "line" is a sequence of bytes
226      ending with ``\n``.
227
228      If EOF is received and ``\n`` was not found, the method
229      returns partially read data.
230
231      If EOF is received and the internal buffer is empty,
232      return an empty ``bytes`` object.
233
234   .. coroutinemethod:: readexactly(n)
235
236      Read exactly *n* bytes.
237
238      Raise an :exc:`IncompleteReadError` if EOF is reached before *n*
239      can be read.  Use the :attr:`IncompleteReadError.partial`
240      attribute to get the partially read data.
241
242   .. coroutinemethod:: readuntil(separator=b'\n')
243
244      Read data from the stream until *separator* is found.
245
246      On success, the data and separator will be removed from the
247      internal buffer (consumed). Returned data will include the
248      separator at the end.
249
250      If the amount of data read exceeds the configured stream limit, a
251      :exc:`LimitOverrunError` exception is raised, and the data
252      is left in the internal buffer and can be read again.
253
254      If EOF is reached before the complete separator is found,
255      an :exc:`IncompleteReadError` exception is raised, and the internal
256      buffer is reset.  The :attr:`IncompleteReadError.partial` attribute
257      may contain a portion of the separator.
258
259      .. versionadded:: 3.5.2
260
261   .. method:: at_eof()
262
263      Return ``True`` if the buffer is empty and :meth:`feed_eof`
264      was called.
265
266
267StreamWriter
268============
269
270.. class:: StreamWriter
271
272   Represents a writer object that provides APIs to write data
273   to the IO stream.
274
275   It is not recommended to instantiate *StreamWriter* objects
276   directly; use :func:`open_connection` and :func:`start_server`
277   instead.
278
279   .. method:: write(data)
280
281      The method attempts to write the *data* to the underlying socket immediately.
282      If that fails, the data is queued in an internal write buffer until it can be
283      sent.
284
285      The method should be used along with the ``drain()`` method::
286
287         stream.write(data)
288         await stream.drain()
289
290   .. method:: writelines(data)
291
292      The method writes a list (or any iterable) of bytes to the underlying socket
293      immediately.
294      If that fails, the data is queued in an internal write buffer until it can be
295      sent.
296
297      The method should be used along with the ``drain()`` method::
298
299         stream.writelines(lines)
300         await stream.drain()
301
302   .. method:: close()
303
304      The method closes the stream and the underlying socket.
305
306      The method should be used, though not mandatory,
307      along with the ``wait_closed()`` method::
308
309         stream.close()
310         await stream.wait_closed()
311
312   .. method:: can_write_eof()
313
314      Return ``True`` if the underlying transport supports
315      the :meth:`write_eof` method, ``False`` otherwise.
316
317   .. method:: write_eof()
318
319      Close the write end of the stream after the buffered write
320      data is flushed.
321
322   .. attribute:: transport
323
324      Return the underlying asyncio transport.
325
326   .. method:: get_extra_info(name, default=None)
327
328      Access optional transport information; see
329      :meth:`BaseTransport.get_extra_info` for details.
330
331   .. coroutinemethod:: drain()
332
333      Wait until it is appropriate to resume writing to the stream.
334      Example::
335
336          writer.write(data)
337          await writer.drain()
338
339      This is a flow control method that interacts with the underlying
340      IO write buffer.  When the size of the buffer reaches
341      the high watermark, *drain()* blocks until the size of the
342      buffer is drained down to the low watermark and writing can
343      be resumed.  When there is nothing to wait for, the :meth:`drain`
344      returns immediately.
345
346   .. coroutinemethod:: start_tls(sslcontext, \*, server_hostname=None, \
347                          ssl_handshake_timeout=None)
348
349      Upgrade an existing stream-based connection to TLS.
350
351      Parameters:
352
353      * *sslcontext*: a configured instance of :class:`~ssl.SSLContext`.
354
355      * *server_hostname*: sets or overrides the host name that the target
356        server's certificate will be matched against.
357
358      * *ssl_handshake_timeout* is the time in seconds to wait for the TLS
359        handshake to complete before aborting the connection.  ``60.0`` seconds
360        if ``None`` (default).
361
362      .. versionadded:: 3.11
363
364   .. method:: is_closing()
365
366      Return ``True`` if the stream is closed or in the process of
367      being closed.
368
369      .. versionadded:: 3.7
370
371   .. coroutinemethod:: wait_closed()
372
373      Wait until the stream is closed.
374
375      Should be called after :meth:`close` to wait until the underlying
376      connection is closed, ensuring that all data has been flushed
377      before e.g. exiting the program.
378
379      .. versionadded:: 3.7
380
381
382Examples
383========
384
385.. _asyncio-tcp-echo-client-streams:
386
387TCP echo client using streams
388-----------------------------
389
390TCP echo client using the :func:`asyncio.open_connection` function::
391
392    import asyncio
393
394    async def tcp_echo_client(message):
395        reader, writer = await asyncio.open_connection(
396            '127.0.0.1', 8888)
397
398        print(f'Send: {message!r}')
399        writer.write(message.encode())
400        await writer.drain()
401
402        data = await reader.read(100)
403        print(f'Received: {data.decode()!r}')
404
405        print('Close the connection')
406        writer.close()
407        await writer.wait_closed()
408
409    asyncio.run(tcp_echo_client('Hello World!'))
410
411
412.. seealso::
413
414   The :ref:`TCP echo client protocol <asyncio_example_tcp_echo_client_protocol>`
415   example uses the low-level :meth:`loop.create_connection` method.
416
417
418.. _asyncio-tcp-echo-server-streams:
419
420TCP echo server using streams
421-----------------------------
422
423TCP echo server using the :func:`asyncio.start_server` function::
424
425    import asyncio
426
427    async def handle_echo(reader, writer):
428        data = await reader.read(100)
429        message = data.decode()
430        addr = writer.get_extra_info('peername')
431
432        print(f"Received {message!r} from {addr!r}")
433
434        print(f"Send: {message!r}")
435        writer.write(data)
436        await writer.drain()
437
438        print("Close the connection")
439        writer.close()
440        await writer.wait_closed()
441
442    async def main():
443        server = await asyncio.start_server(
444            handle_echo, '127.0.0.1', 8888)
445
446        addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
447        print(f'Serving on {addrs}')
448
449        async with server:
450            await server.serve_forever()
451
452    asyncio.run(main())
453
454
455.. seealso::
456
457   The :ref:`TCP echo server protocol <asyncio_example_tcp_echo_server_protocol>`
458   example uses the :meth:`loop.create_server` method.
459
460
461Get HTTP headers
462----------------
463
464Simple example querying HTTP headers of the URL passed on the command line::
465
466    import asyncio
467    import urllib.parse
468    import sys
469
470    async def print_http_headers(url):
471        url = urllib.parse.urlsplit(url)
472        if url.scheme == 'https':
473            reader, writer = await asyncio.open_connection(
474                url.hostname, 443, ssl=True)
475        else:
476            reader, writer = await asyncio.open_connection(
477                url.hostname, 80)
478
479        query = (
480            f"HEAD {url.path or '/'} HTTP/1.0\r\n"
481            f"Host: {url.hostname}\r\n"
482            f"\r\n"
483        )
484
485        writer.write(query.encode('latin-1'))
486        while True:
487            line = await reader.readline()
488            if not line:
489                break
490
491            line = line.decode('latin1').rstrip()
492            if line:
493                print(f'HTTP header> {line}')
494
495        # Ignore the body, close the socket
496        writer.close()
497        await writer.wait_closed()
498
499    url = sys.argv[1]
500    asyncio.run(print_http_headers(url))
501
502
503Usage::
504
505    python example.py http://example.com/path/page.html
506
507or with HTTPS::
508
509    python example.py https://example.com/path/page.html
510
511
512.. _asyncio_example_create_connection-streams:
513
514Register an open socket to wait for data using streams
515------------------------------------------------------
516
517Coroutine waiting until a socket receives data using the
518:func:`open_connection` function::
519
520    import asyncio
521    import socket
522
523    async def wait_for_data():
524        # Get a reference to the current event loop because
525        # we want to access low-level APIs.
526        loop = asyncio.get_running_loop()
527
528        # Create a pair of connected sockets.
529        rsock, wsock = socket.socketpair()
530
531        # Register the open socket to wait for data.
532        reader, writer = await asyncio.open_connection(sock=rsock)
533
534        # Simulate the reception of data from the network
535        loop.call_soon(wsock.send, 'abc'.encode())
536
537        # Wait for data
538        data = await reader.read(100)
539
540        # Got data, we are done: close the socket
541        print("Received:", data.decode())
542        writer.close()
543        await writer.wait_closed()
544
545        # Close the second socket
546        wsock.close()
547
548    asyncio.run(wait_for_data())
549
550.. seealso::
551
552   The :ref:`register an open socket to wait for data using a protocol
553   <asyncio_example_create_connection>` example uses a low-level protocol and
554   the :meth:`loop.create_connection` method.
555
556   The :ref:`watch a file descriptor for read events
557   <asyncio_example_watch_fd>` example uses the low-level
558   :meth:`loop.add_reader` method to watch a file descriptor.
559