1.. currentmodule:: asyncio 2 3.. _asyncio-streams: 4 5======= 6Streams 7======= 8 9**Source code:** :source:`Lib/asyncio/streams.py` 10 11------------------------------------------------- 12 13Streams are high-level async/await-ready primitives to work with 14network connections. Streams allow sending and receiving data without 15using callbacks or low-level protocols and transports. 16 17.. _asyncio_example_stream: 18 19Here is an example of a TCP echo client written using asyncio 20streams:: 21 22 import asyncio 23 24 async def tcp_echo_client(message): 25 reader, writer = await asyncio.open_connection( 26 '127.0.0.1', 8888) 27 28 print(f'Send: {message!r}') 29 writer.write(message.encode()) 30 await writer.drain() 31 32 data = await reader.read(100) 33 print(f'Received: {data.decode()!r}') 34 35 print('Close the connection') 36 writer.close() 37 await writer.wait_closed() 38 39 asyncio.run(tcp_echo_client('Hello World!')) 40 41 42See also the `Examples`_ section below. 43 44 45.. rubric:: Stream Functions 46 47The following top-level asyncio functions can be used to create 48and work with streams: 49 50 51.. coroutinefunction:: open_connection(host=None, port=None, *, \ 52 limit=None, ssl=None, family=0, proto=0, \ 53 flags=0, sock=None, local_addr=None, \ 54 server_hostname=None, ssl_handshake_timeout=None, \ 55 ssl_shutdown_timeout=None, \ 56 happy_eyeballs_delay=None, interleave=None) 57 58 Establish a network connection and return a pair of 59 ``(reader, writer)`` objects. 60 61 The returned *reader* and *writer* objects are instances of 62 :class:`StreamReader` and :class:`StreamWriter` classes. 63 64 *limit* determines the buffer size limit used by the 65 returned :class:`StreamReader` instance. By default the *limit* 66 is set to 64 KiB. 67 68 The rest of the arguments are passed directly to 69 :meth:`loop.create_connection`. 70 71 .. note:: 72 73 The *sock* argument transfers ownership of the socket to the 74 :class:`StreamWriter` created. To close the socket, call its 75 :meth:`~asyncio.StreamWriter.close` method. 76 77 .. versionchanged:: 3.7 78 Added the *ssl_handshake_timeout* parameter. 79 80 .. versionadded:: 3.8 81 Added *happy_eyeballs_delay* and *interleave* parameters. 82 83 .. versionchanged:: 3.10 84 Removed the *loop* parameter. 85 86 .. versionchanged:: 3.11 87 Added the *ssl_shutdown_timeout* parameter. 88 89 90.. coroutinefunction:: start_server(client_connected_cb, host=None, \ 91 port=None, *, limit=None, \ 92 family=socket.AF_UNSPEC, \ 93 flags=socket.AI_PASSIVE, sock=None, \ 94 backlog=100, ssl=None, reuse_address=None, \ 95 reuse_port=None, ssl_handshake_timeout=None, \ 96 ssl_shutdown_timeout=None, start_serving=True) 97 98 Start a socket server. 99 100 The *client_connected_cb* callback is called whenever a new client 101 connection is established. It receives a ``(reader, writer)`` pair 102 as two arguments, instances of the :class:`StreamReader` and 103 :class:`StreamWriter` classes. 104 105 *client_connected_cb* can be a plain callable or a 106 :ref:`coroutine function <coroutine>`; if it is a coroutine function, 107 it will be automatically scheduled as a :class:`Task`. 108 109 *limit* determines the buffer size limit used by the 110 returned :class:`StreamReader` instance. By default the *limit* 111 is set to 64 KiB. 112 113 The rest of the arguments are passed directly to 114 :meth:`loop.create_server`. 115 116 .. note:: 117 118 The *sock* argument transfers ownership of the socket to the 119 server created. To close the socket, call the server's 120 :meth:`~asyncio.Server.close` method. 121 122 .. versionchanged:: 3.7 123 Added the *ssl_handshake_timeout* and *start_serving* parameters. 124 125 .. versionchanged:: 3.10 126 Removed the *loop* parameter. 127 128 .. versionchanged:: 3.11 129 Added the *ssl_shutdown_timeout* parameter. 130 131 132.. rubric:: Unix Sockets 133 134.. coroutinefunction:: open_unix_connection(path=None, *, limit=None, \ 135 ssl=None, sock=None, server_hostname=None, \ 136 ssl_handshake_timeout=None, ssl_shutdown_timeout=None) 137 138 Establish a Unix socket connection and return a pair of 139 ``(reader, writer)``. 140 141 Similar to :func:`open_connection` but operates on Unix sockets. 142 143 See also the documentation of :meth:`loop.create_unix_connection`. 144 145 .. note:: 146 147 The *sock* argument transfers ownership of the socket to the 148 :class:`StreamWriter` created. To close the socket, call its 149 :meth:`~asyncio.StreamWriter.close` method. 150 151 .. availability:: Unix. 152 153 .. versionchanged:: 3.7 154 Added the *ssl_handshake_timeout* parameter. 155 The *path* parameter can now be a :term:`path-like object` 156 157 .. versionchanged:: 3.10 158 Removed the *loop* parameter. 159 160 .. versionchanged:: 3.11 161 Added the *ssl_shutdown_timeout* parameter. 162 163 164.. coroutinefunction:: start_unix_server(client_connected_cb, path=None, \ 165 *, limit=None, sock=None, backlog=100, ssl=None, \ 166 ssl_handshake_timeout=None, \ 167 ssl_shutdown_timeout=None, start_serving=True) 168 169 Start a Unix socket server. 170 171 Similar to :func:`start_server` but works with Unix sockets. 172 173 See also the documentation of :meth:`loop.create_unix_server`. 174 175 .. note:: 176 177 The *sock* argument transfers ownership of the socket to the 178 server created. To close the socket, call the server's 179 :meth:`~asyncio.Server.close` method. 180 181 .. availability:: Unix. 182 183 .. versionchanged:: 3.7 184 Added the *ssl_handshake_timeout* and *start_serving* parameters. 185 The *path* parameter can now be a :term:`path-like object`. 186 187 .. versionchanged:: 3.10 188 Removed the *loop* parameter. 189 190 .. versionchanged:: 3.11 191 Added the *ssl_shutdown_timeout* parameter. 192 193 194StreamReader 195============ 196 197.. class:: StreamReader 198 199 Represents a reader object that provides APIs to read data 200 from the IO stream. As an :term:`asynchronous iterable`, the 201 object supports the :keyword:`async for` statement. 202 203 It is not recommended to instantiate *StreamReader* objects 204 directly; use :func:`open_connection` and :func:`start_server` 205 instead. 206 207 .. coroutinemethod:: read(n=-1) 208 209 Read up to *n* bytes from the stream. 210 211 If *n* is not provided or set to ``-1``, 212 read until EOF, then return all read :class:`bytes`. 213 If EOF was received and the internal buffer is empty, 214 return an empty ``bytes`` object. 215 216 If *n* is ``0``, return an empty ``bytes`` object immediately. 217 218 If *n* is positive, return at most *n* available ``bytes`` 219 as soon as at least 1 byte is available in the internal buffer. 220 If EOF is received before any byte is read, return an empty 221 ``bytes`` object. 222 223 .. coroutinemethod:: readline() 224 225 Read one line, where "line" is a sequence of bytes 226 ending with ``\n``. 227 228 If EOF is received and ``\n`` was not found, the method 229 returns partially read data. 230 231 If EOF is received and the internal buffer is empty, 232 return an empty ``bytes`` object. 233 234 .. coroutinemethod:: readexactly(n) 235 236 Read exactly *n* bytes. 237 238 Raise an :exc:`IncompleteReadError` if EOF is reached before *n* 239 can be read. Use the :attr:`IncompleteReadError.partial` 240 attribute to get the partially read data. 241 242 .. coroutinemethod:: readuntil(separator=b'\n') 243 244 Read data from the stream until *separator* is found. 245 246 On success, the data and separator will be removed from the 247 internal buffer (consumed). Returned data will include the 248 separator at the end. 249 250 If the amount of data read exceeds the configured stream limit, a 251 :exc:`LimitOverrunError` exception is raised, and the data 252 is left in the internal buffer and can be read again. 253 254 If EOF is reached before the complete separator is found, 255 an :exc:`IncompleteReadError` exception is raised, and the internal 256 buffer is reset. The :attr:`IncompleteReadError.partial` attribute 257 may contain a portion of the separator. 258 259 .. versionadded:: 3.5.2 260 261 .. method:: at_eof() 262 263 Return ``True`` if the buffer is empty and :meth:`feed_eof` 264 was called. 265 266 267StreamWriter 268============ 269 270.. class:: StreamWriter 271 272 Represents a writer object that provides APIs to write data 273 to the IO stream. 274 275 It is not recommended to instantiate *StreamWriter* objects 276 directly; use :func:`open_connection` and :func:`start_server` 277 instead. 278 279 .. method:: write(data) 280 281 The method attempts to write the *data* to the underlying socket immediately. 282 If that fails, the data is queued in an internal write buffer until it can be 283 sent. 284 285 The method should be used along with the ``drain()`` method:: 286 287 stream.write(data) 288 await stream.drain() 289 290 .. method:: writelines(data) 291 292 The method writes a list (or any iterable) of bytes to the underlying socket 293 immediately. 294 If that fails, the data is queued in an internal write buffer until it can be 295 sent. 296 297 The method should be used along with the ``drain()`` method:: 298 299 stream.writelines(lines) 300 await stream.drain() 301 302 .. method:: close() 303 304 The method closes the stream and the underlying socket. 305 306 The method should be used, though not mandatory, 307 along with the ``wait_closed()`` method:: 308 309 stream.close() 310 await stream.wait_closed() 311 312 .. method:: can_write_eof() 313 314 Return ``True`` if the underlying transport supports 315 the :meth:`write_eof` method, ``False`` otherwise. 316 317 .. method:: write_eof() 318 319 Close the write end of the stream after the buffered write 320 data is flushed. 321 322 .. attribute:: transport 323 324 Return the underlying asyncio transport. 325 326 .. method:: get_extra_info(name, default=None) 327 328 Access optional transport information; see 329 :meth:`BaseTransport.get_extra_info` for details. 330 331 .. coroutinemethod:: drain() 332 333 Wait until it is appropriate to resume writing to the stream. 334 Example:: 335 336 writer.write(data) 337 await writer.drain() 338 339 This is a flow control method that interacts with the underlying 340 IO write buffer. When the size of the buffer reaches 341 the high watermark, *drain()* blocks until the size of the 342 buffer is drained down to the low watermark and writing can 343 be resumed. When there is nothing to wait for, the :meth:`drain` 344 returns immediately. 345 346 .. coroutinemethod:: start_tls(sslcontext, \*, server_hostname=None, \ 347 ssl_handshake_timeout=None) 348 349 Upgrade an existing stream-based connection to TLS. 350 351 Parameters: 352 353 * *sslcontext*: a configured instance of :class:`~ssl.SSLContext`. 354 355 * *server_hostname*: sets or overrides the host name that the target 356 server's certificate will be matched against. 357 358 * *ssl_handshake_timeout* is the time in seconds to wait for the TLS 359 handshake to complete before aborting the connection. ``60.0`` seconds 360 if ``None`` (default). 361 362 .. versionadded:: 3.11 363 364 .. method:: is_closing() 365 366 Return ``True`` if the stream is closed or in the process of 367 being closed. 368 369 .. versionadded:: 3.7 370 371 .. coroutinemethod:: wait_closed() 372 373 Wait until the stream is closed. 374 375 Should be called after :meth:`close` to wait until the underlying 376 connection is closed, ensuring that all data has been flushed 377 before e.g. exiting the program. 378 379 .. versionadded:: 3.7 380 381 382Examples 383======== 384 385.. _asyncio-tcp-echo-client-streams: 386 387TCP echo client using streams 388----------------------------- 389 390TCP echo client using the :func:`asyncio.open_connection` function:: 391 392 import asyncio 393 394 async def tcp_echo_client(message): 395 reader, writer = await asyncio.open_connection( 396 '127.0.0.1', 8888) 397 398 print(f'Send: {message!r}') 399 writer.write(message.encode()) 400 await writer.drain() 401 402 data = await reader.read(100) 403 print(f'Received: {data.decode()!r}') 404 405 print('Close the connection') 406 writer.close() 407 await writer.wait_closed() 408 409 asyncio.run(tcp_echo_client('Hello World!')) 410 411 412.. seealso:: 413 414 The :ref:`TCP echo client protocol <asyncio_example_tcp_echo_client_protocol>` 415 example uses the low-level :meth:`loop.create_connection` method. 416 417 418.. _asyncio-tcp-echo-server-streams: 419 420TCP echo server using streams 421----------------------------- 422 423TCP echo server using the :func:`asyncio.start_server` function:: 424 425 import asyncio 426 427 async def handle_echo(reader, writer): 428 data = await reader.read(100) 429 message = data.decode() 430 addr = writer.get_extra_info('peername') 431 432 print(f"Received {message!r} from {addr!r}") 433 434 print(f"Send: {message!r}") 435 writer.write(data) 436 await writer.drain() 437 438 print("Close the connection") 439 writer.close() 440 await writer.wait_closed() 441 442 async def main(): 443 server = await asyncio.start_server( 444 handle_echo, '127.0.0.1', 8888) 445 446 addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets) 447 print(f'Serving on {addrs}') 448 449 async with server: 450 await server.serve_forever() 451 452 asyncio.run(main()) 453 454 455.. seealso:: 456 457 The :ref:`TCP echo server protocol <asyncio_example_tcp_echo_server_protocol>` 458 example uses the :meth:`loop.create_server` method. 459 460 461Get HTTP headers 462---------------- 463 464Simple example querying HTTP headers of the URL passed on the command line:: 465 466 import asyncio 467 import urllib.parse 468 import sys 469 470 async def print_http_headers(url): 471 url = urllib.parse.urlsplit(url) 472 if url.scheme == 'https': 473 reader, writer = await asyncio.open_connection( 474 url.hostname, 443, ssl=True) 475 else: 476 reader, writer = await asyncio.open_connection( 477 url.hostname, 80) 478 479 query = ( 480 f"HEAD {url.path or '/'} HTTP/1.0\r\n" 481 f"Host: {url.hostname}\r\n" 482 f"\r\n" 483 ) 484 485 writer.write(query.encode('latin-1')) 486 while True: 487 line = await reader.readline() 488 if not line: 489 break 490 491 line = line.decode('latin1').rstrip() 492 if line: 493 print(f'HTTP header> {line}') 494 495 # Ignore the body, close the socket 496 writer.close() 497 await writer.wait_closed() 498 499 url = sys.argv[1] 500 asyncio.run(print_http_headers(url)) 501 502 503Usage:: 504 505 python example.py http://example.com/path/page.html 506 507or with HTTPS:: 508 509 python example.py https://example.com/path/page.html 510 511 512.. _asyncio_example_create_connection-streams: 513 514Register an open socket to wait for data using streams 515------------------------------------------------------ 516 517Coroutine waiting until a socket receives data using the 518:func:`open_connection` function:: 519 520 import asyncio 521 import socket 522 523 async def wait_for_data(): 524 # Get a reference to the current event loop because 525 # we want to access low-level APIs. 526 loop = asyncio.get_running_loop() 527 528 # Create a pair of connected sockets. 529 rsock, wsock = socket.socketpair() 530 531 # Register the open socket to wait for data. 532 reader, writer = await asyncio.open_connection(sock=rsock) 533 534 # Simulate the reception of data from the network 535 loop.call_soon(wsock.send, 'abc'.encode()) 536 537 # Wait for data 538 data = await reader.read(100) 539 540 # Got data, we are done: close the socket 541 print("Received:", data.decode()) 542 writer.close() 543 await writer.wait_closed() 544 545 # Close the second socket 546 wsock.close() 547 548 asyncio.run(wait_for_data()) 549 550.. seealso:: 551 552 The :ref:`register an open socket to wait for data using a protocol 553 <asyncio_example_create_connection>` example uses a low-level protocol and 554 the :meth:`loop.create_connection` method. 555 556 The :ref:`watch a file descriptor for read events 557 <asyncio_example_watch_fd>` example uses the low-level 558 :meth:`loop.add_reader` method to watch a file descriptor. 559