1 .. currentmodule:: asyncio
2 
3 
4 .. _asyncio-transports-protocols:
5 
6 
7 ========================
8 Transports and Protocols
9 ========================
10 
11 .. rubric:: Preface
12 
13 Transports and Protocols are used by the **low-level** event loop
14 APIs such as :meth:`loop.create_connection`.  They use
15 callback-based programming style and enable high-performance
16 implementations of network or IPC protocols (e.g. HTTP).
17 
18 Essentially, transports and protocols should only be used in
19 libraries and frameworks and never in high-level asyncio
20 applications.
21 
22 This documentation page covers both `Transports`_ and `Protocols`_.
23 
24 .. rubric:: Introduction
25 
26 At the highest level, the transport is concerned with *how* bytes
27 are transmitted, while the protocol determines *which* bytes to
28 transmit (and to some extent when).
29 
30 A different way of saying the same thing: a transport is an
31 abstraction for a socket (or similar I/O endpoint) while a protocol
32 is an abstraction for an application, from the transport's point
33 of view.
34 
35 Yet another view is the transport and protocol interfaces
36 together define an abstract interface for using network I/O and
37 interprocess I/O.
38 
39 There is always a 1:1 relationship between transport and protocol
40 objects: the protocol calls transport methods to send data,
41 while the transport calls protocol methods to pass it data that
42 has been received.
43 
44 Most of connection oriented event loop methods
45 (such as :meth:`loop.create_connection`) usually accept a
46 *protocol_factory* argument used to create a *Protocol* object
47 for an accepted connection, represented by a *Transport* object.
48 Such methods usually return a tuple of ``(transport, protocol)``.
49 
50 .. rubric:: Contents
51 
52 This documentation page contains the following sections:
53 
54 * The `Transports`_ section documents asyncio :class:`BaseTransport`,
55   :class:`ReadTransport`, :class:`WriteTransport`, :class:`Transport`,
56   :class:`DatagramTransport`, and :class:`SubprocessTransport`
57   classes.
58 
59 * The `Protocols`_ section documents asyncio :class:`BaseProtocol`,
60   :class:`Protocol`, :class:`BufferedProtocol`,
61   :class:`DatagramProtocol`, and :class:`SubprocessProtocol` classes.
62 
63 * The `Examples`_ section showcases how to work with transports,
64   protocols, and low-level event loop APIs.
65 
66 
67 .. _asyncio-transport:
68 
69 Transports
70 ==========
71 
72 **Source code:** :source:`Lib/asyncio/transports.py`
73 
74 ----------------------------------------------------
75 
76 Transports are classes provided by :mod:`asyncio` in order to abstract
77 various kinds of communication channels.
78 
79 Transport objects are always instantiated by an
80 :ref:`asyncio event loop <asyncio-event-loop>`.
81 
82 asyncio implements transports for TCP, UDP, SSL, and subprocess pipes.
83 The methods available on a transport depend on the transport's kind.
84 
85 The transport classes are :ref:`not thread safe <asyncio-multithreading>`.
86 
87 
88 Transports Hierarchy
89 --------------------
90 
91 .. class:: BaseTransport
92 
93    Base class for all transports.  Contains methods that all
94    asyncio transports share.
95 
96 .. class:: WriteTransport(BaseTransport)
97 
98    A base transport for write-only connections.
99 
100    Instances of the *WriteTransport* class are returned from
101    the :meth:`loop.connect_write_pipe` event loop method and
102    are also used by subprocess-related methods like
103    :meth:`loop.subprocess_exec`.
104 
105 .. class:: ReadTransport(BaseTransport)
106 
107    A base transport for read-only connections.
108 
109    Instances of the *ReadTransport* class are returned from
110    the :meth:`loop.connect_read_pipe` event loop method and
111    are also used by subprocess-related methods like
112    :meth:`loop.subprocess_exec`.
113 
114 .. class:: Transport(WriteTransport, ReadTransport)
115 
116    Interface representing a bidirectional transport, such as a
117    TCP connection.
118 
119    The user does not instantiate a transport directly; they call a
120    utility function, passing it a protocol factory and other
121    information necessary to create the transport and protocol.
122 
123    Instances of the *Transport* class are returned from or used by
124    event loop methods like :meth:`loop.create_connection`,
125    :meth:`loop.create_unix_connection`,
126    :meth:`loop.create_server`, :meth:`loop.sendfile`, etc.
127 
128 
129 .. class:: DatagramTransport(BaseTransport)
130 
131    A transport for datagram (UDP) connections.
132 
133    Instances of the *DatagramTransport* class are returned from
134    the :meth:`loop.create_datagram_endpoint` event loop method.
135 
136 
137 .. class:: SubprocessTransport(BaseTransport)
138 
139    An abstraction to represent a connection between a parent and its
140    child OS process.
141 
142    Instances of the *SubprocessTransport* class are returned from
143    event loop methods :meth:`loop.subprocess_shell` and
144    :meth:`loop.subprocess_exec`.
145 
146 
147 Base Transport
148 --------------
149 
150 .. method:: BaseTransport.close()
151 
152    Close the transport.
153 
154    If the transport has a buffer for outgoing
155    data, buffered data will be flushed asynchronously.  No more data
156    will be received.  After all buffered data is flushed, the
157    protocol's :meth:`protocol.connection_lost()
158    <BaseProtocol.connection_lost>` method will be called with
159    :const:`None` as its argument. The transport should not be
160    used once it is closed.
161 
162 .. method:: BaseTransport.is_closing()
163 
164    Return ``True`` if the transport is closing or is closed.
165 
166 .. method:: BaseTransport.get_extra_info(name, default=None)
167 
168    Return information about the transport or underlying resources
169    it uses.
170 
171    *name* is a string representing the piece of transport-specific
172    information to get.
173 
174    *default* is the value to return if the information is not
175    available, or if the transport does not support querying it
176    with the given third-party event loop implementation or on the
177    current platform.
178 
179    For example, the following code attempts to get the underlying
180    socket object of the transport::
181 
182       sock = transport.get_extra_info('socket')
183       if sock is not None:
184           print(sock.getsockopt(...))
185 
186    Categories of information that can be queried on some transports:
187 
188    * socket:
189 
190      - ``'peername'``: the remote address to which the socket is
191        connected, result of :meth:`socket.socket.getpeername`
192        (``None`` on error)
193 
194      - ``'socket'``: :class:`socket.socket` instance
195 
196      - ``'sockname'``: the socket's own address,
197        result of :meth:`socket.socket.getsockname`
198 
199    * SSL socket:
200 
201      - ``'compression'``: the compression algorithm being used as a
202        string, or ``None`` if the connection isn't compressed; result
203        of :meth:`ssl.SSLSocket.compression`
204 
205      - ``'cipher'``: a three-value tuple containing the name of the
206        cipher being used, the version of the SSL protocol that defines
207        its use, and the number of secret bits being used; result of
208        :meth:`ssl.SSLSocket.cipher`
209 
210      - ``'peercert'``: peer certificate; result of
211        :meth:`ssl.SSLSocket.getpeercert`
212 
213      - ``'sslcontext'``: :class:`ssl.SSLContext` instance
214 
215      - ``'ssl_object'``: :class:`ssl.SSLObject` or
216        :class:`ssl.SSLSocket` instance
217 
218    * pipe:
219 
220      - ``'pipe'``: pipe object
221 
222    * subprocess:
223 
224      - ``'subprocess'``: :class:`subprocess.Popen` instance
225 
226 .. method:: BaseTransport.set_protocol(protocol)
227 
228    Set a new protocol.
229 
230    Switching protocol should only be done when both
231    protocols are documented to support the switch.
232 
233 .. method:: BaseTransport.get_protocol()
234 
235    Return the current protocol.
236 
237 
238 Read-only Transports
239 --------------------
240 
241 .. method:: ReadTransport.is_reading()
242 
243    Return ``True`` if the transport is receiving new data.
244 
245    .. versionadded:: 3.7
246 
247 .. method:: ReadTransport.pause_reading()
248 
249    Pause the receiving end of the transport.  No data will be passed to
250    the protocol's :meth:`protocol.data_received() <Protocol.data_received>`
251    method until :meth:`resume_reading` is called.
252 
253    .. versionchanged:: 3.7
254       The method is idempotent, i.e. it can be called when the
255       transport is already paused or closed.
256 
257 .. method:: ReadTransport.resume_reading()
258 
259    Resume the receiving end.  The protocol's
260    :meth:`protocol.data_received() <Protocol.data_received>` method
261    will be called once again if some data is available for reading.
262 
263    .. versionchanged:: 3.7
264       The method is idempotent, i.e. it can be called when the
265       transport is already reading.
266 
267 
268 Write-only Transports
269 ---------------------
270 
271 .. method:: WriteTransport.abort()
272 
273    Close the transport immediately, without waiting for pending operations
274    to complete.  Buffered data will be lost.  No more data will be received.
275    The protocol's :meth:`protocol.connection_lost()
276    <BaseProtocol.connection_lost>` method will eventually be
277    called with :const:`None` as its argument.
278 
279 .. method:: WriteTransport.can_write_eof()
280 
281    Return :const:`True` if the transport supports
282    :meth:`~WriteTransport.write_eof`, :const:`False` if not.
283 
284 .. method:: WriteTransport.get_write_buffer_size()
285 
286    Return the current size of the output buffer used by the transport.
287 
288 .. method:: WriteTransport.get_write_buffer_limits()
289 
290    Get the *high* and *low* watermarks for write flow control. Return a
291    tuple ``(low, high)`` where *low* and *high* are positive number of
292    bytes.
293 
294    Use :meth:`set_write_buffer_limits` to set the limits.
295 
296    .. versionadded:: 3.4.2
297 
298 .. method:: WriteTransport.set_write_buffer_limits(high=None, low=None)
299 
300    Set the *high* and *low* watermarks for write flow control.
301 
302    These two values (measured in number of
303    bytes) control when the protocol's
304    :meth:`protocol.pause_writing() <BaseProtocol.pause_writing>`
305    and :meth:`protocol.resume_writing() <BaseProtocol.resume_writing>`
306    methods are called. If specified, the low watermark must be less
307    than or equal to the high watermark.  Neither *high* nor *low*
308    can be negative.
309 
310    :meth:`~BaseProtocol.pause_writing` is called when the buffer size
311    becomes greater than or equal to the *high* value. If writing has
312    been paused, :meth:`~BaseProtocol.resume_writing` is called when
313    the buffer size becomes less than or equal to the *low* value.
314 
315    The defaults are implementation-specific.  If only the
316    high watermark is given, the low watermark defaults to an
317    implementation-specific value less than or equal to the
318    high watermark.  Setting *high* to zero forces *low* to zero as
319    well, and causes :meth:`~BaseProtocol.pause_writing` to be called
320    whenever the buffer becomes non-empty.  Setting *low* to zero causes
321    :meth:`~BaseProtocol.resume_writing` to be called only once the
322    buffer is empty. Use of zero for either limit is generally
323    sub-optimal as it reduces opportunities for doing I/O and
324    computation concurrently.
325 
326    Use :meth:`~WriteTransport.get_write_buffer_limits`
327    to get the limits.
328 
329 .. method:: WriteTransport.write(data)
330 
331    Write some *data* bytes to the transport.
332 
333    This method does not block; it buffers the data and arranges for it
334    to be sent out asynchronously.
335 
336 .. method:: WriteTransport.writelines(list_of_data)
337 
338    Write a list (or any iterable) of data bytes to the transport.
339    This is functionally equivalent to calling :meth:`write` on each
340    element yielded by the iterable, but may be implemented more
341    efficiently.
342 
343 .. method:: WriteTransport.write_eof()
344 
345    Close the write end of the transport after flushing all buffered data.
346    Data may still be received.
347 
348    This method can raise :exc:`NotImplementedError` if the transport
349    (e.g. SSL) doesn't support half-closed connections.
350 
351 
352 Datagram Transports
353 -------------------
354 
355 .. method:: DatagramTransport.sendto(data, addr=None)
356 
357    Send the *data* bytes to the remote peer given by *addr* (a
358    transport-dependent target address).  If *addr* is :const:`None`,
359    the data is sent to the target address given on transport
360    creation.
361 
362    This method does not block; it buffers the data and arranges
363    for it to be sent out asynchronously.
364 
365 .. method:: DatagramTransport.abort()
366 
367    Close the transport immediately, without waiting for pending
368    operations to complete.  Buffered data will be lost.
369    No more data will be received.  The protocol's
370    :meth:`protocol.connection_lost() <BaseProtocol.connection_lost>`
371    method will eventually be called with :const:`None` as its argument.
372 
373 
374 .. _asyncio-subprocess-transports:
375 
376 Subprocess Transports
377 ---------------------
378 
379 .. method:: SubprocessTransport.get_pid()
380 
381    Return the subprocess process id as an integer.
382 
383 .. method:: SubprocessTransport.get_pipe_transport(fd)
384 
385    Return the transport for the communication pipe corresponding to the
386    integer file descriptor *fd*:
387 
388    * ``0``: readable streaming transport of the standard input (*stdin*),
389      or :const:`None` if the subprocess was not created with ``stdin=PIPE``
390    * ``1``: writable streaming transport of the standard output (*stdout*),
391      or :const:`None` if the subprocess was not created with ``stdout=PIPE``
392    * ``2``: writable streaming transport of the standard error (*stderr*),
393      or :const:`None` if the subprocess was not created with ``stderr=PIPE``
394    * other *fd*: :const:`None`
395 
396 .. method:: SubprocessTransport.get_returncode()
397 
398    Return the subprocess return code as an integer or :const:`None`
399    if it hasn't returned, which is similar to the
400    :attr:`subprocess.Popen.returncode` attribute.
401 
402 .. method:: SubprocessTransport.kill()
403 
404    Kill the subprocess.
405 
406    On POSIX systems, the function sends SIGKILL to the subprocess.
407    On Windows, this method is an alias for :meth:`terminate`.
408 
409    See also :meth:`subprocess.Popen.kill`.
410 
411 .. method:: SubprocessTransport.send_signal(signal)
412 
413    Send the *signal* number to the subprocess, as in
414    :meth:`subprocess.Popen.send_signal`.
415 
416 .. method:: SubprocessTransport.terminate()
417 
418    Stop the subprocess.
419 
420    On POSIX systems, this method sends SIGTERM to the subprocess.
421    On Windows, the Windows API function TerminateProcess() is called to
422    stop the subprocess.
423 
424    See also :meth:`subprocess.Popen.terminate`.
425 
426 .. method:: SubprocessTransport.close()
427 
428    Kill the subprocess by calling the :meth:`kill` method.
429 
430    If the subprocess hasn't returned yet, and close transports of
431    *stdin*, *stdout*, and *stderr* pipes.
432 
433 
434 .. _asyncio-protocol:
435 
436 Protocols
437 =========
438 
439 **Source code:** :source:`Lib/asyncio/protocols.py`
440 
441 ---------------------------------------------------
442 
443 asyncio provides a set of abstract base classes that should be used
444 to implement network protocols.  Those classes are meant to be used
445 together with :ref:`transports <asyncio-transport>`.
446 
447 Subclasses of abstract base protocol classes may implement some or
448 all methods.  All these methods are callbacks: they are called by
449 transports on certain events, for example when some data is received.
450 A base protocol method should be called by the corresponding transport.
451 
452 
453 Base Protocols
454 --------------
455 
456 .. class:: BaseProtocol
457 
458    Base protocol with methods that all protocols share.
459 
460 .. class:: Protocol(BaseProtocol)
461 
462    The base class for implementing streaming protocols
463    (TCP, Unix sockets, etc).
464 
465 .. class:: BufferedProtocol(BaseProtocol)
466 
467    A base class for implementing streaming protocols with manual
468    control of the receive buffer.
469 
470 .. class:: DatagramProtocol(BaseProtocol)
471 
472    The base class for implementing datagram (UDP) protocols.
473 
474 .. class:: SubprocessProtocol(BaseProtocol)
475 
476    The base class for implementing protocols communicating with child
477    processes (unidirectional pipes).
478 
479 
480 Base Protocol
481 -------------
482 
483 All asyncio protocols can implement Base Protocol callbacks.
484 
485 .. rubric:: Connection Callbacks
486 
487 Connection callbacks are called on all protocols, exactly once per
488 a successful connection.  All other protocol callbacks can only be
489 called between those two methods.
490 
491 .. method:: BaseProtocol.connection_made(transport)
492 
493    Called when a connection is made.
494 
495    The *transport* argument is the transport representing the
496    connection.  The protocol is responsible for storing the reference
497    to its transport.
498 
499 .. method:: BaseProtocol.connection_lost(exc)
500 
501    Called when the connection is lost or closed.
502 
503    The argument is either an exception object or :const:`None`.
504    The latter means a regular EOF is received, or the connection was
505    aborted or closed by this side of the connection.
506 
507 
508 .. rubric:: Flow Control Callbacks
509 
510 Flow control callbacks can be called by transports to pause or
511 resume writing performed by the protocol.
512 
513 See the documentation of the :meth:`~WriteTransport.set_write_buffer_limits`
514 method for more details.
515 
516 .. method:: BaseProtocol.pause_writing()
517 
518    Called when the transport's buffer goes over the high watermark.
519 
520 .. method:: BaseProtocol.resume_writing()
521 
522    Called when the transport's buffer drains below the low watermark.
523 
524 If the buffer size equals the high watermark,
525 :meth:`~BaseProtocol.pause_writing` is not called: the buffer size must
526 go strictly over.
527 
528 Conversely, :meth:`~BaseProtocol.resume_writing` is called when the
529 buffer size is equal or lower than the low watermark.  These end
530 conditions are important to ensure that things go as expected when
531 either mark is zero.
532 
533 
534 Streaming Protocols
535 -------------------
536 
537 Event methods, such as :meth:`loop.create_server`,
538 :meth:`loop.create_unix_server`, :meth:`loop.create_connection`,
539 :meth:`loop.create_unix_connection`, :meth:`loop.connect_accepted_socket`,
540 :meth:`loop.connect_read_pipe`, and :meth:`loop.connect_write_pipe`
541 accept factories that return streaming protocols.
542 
543 .. method:: Protocol.data_received(data)
544 
545    Called when some data is received.  *data* is a non-empty bytes
546    object containing the incoming data.
547 
548    Whether the data is buffered, chunked or reassembled depends on
549    the transport.  In general, you shouldn't rely on specific semantics
550    and instead make your parsing generic and flexible. However,
551    data is always received in the correct order.
552 
553    The method can be called an arbitrary number of times while
554    a connection is open.
555 
556    However, :meth:`protocol.eof_received() <Protocol.eof_received>`
557    is called at most once.  Once ``eof_received()`` is called,
558    ``data_received()`` is not called anymore.
559 
560 .. method:: Protocol.eof_received()
561 
562    Called when the other end signals it won't send any more data
563    (for example by calling :meth:`transport.write_eof()
564    <WriteTransport.write_eof>`, if the other end also uses
565    asyncio).
566 
567    This method may return a false value (including ``None``), in which case
568    the transport will close itself.  Conversely, if this method returns a
569    true value, the protocol used determines whether to close the transport.
570    Since the default implementation returns ``None``, it implicitly closes the
571    connection.
572 
573    Some transports, including SSL, don't support half-closed connections,
574    in which case returning true from this method will result in the connection
575    being closed.
576 
577 
578 State machine:
579 
580 .. code-block:: none
581 
582     start -> connection_made
583         [-> data_received]*
584         [-> eof_received]?
585     -> connection_lost -> end
586 
587 
588 Buffered Streaming Protocols
589 ----------------------------
590 
591 .. versionadded:: 3.7
592 
593 Buffered Protocols can be used with any event loop method
594 that supports `Streaming Protocols`_.
595 
596 ``BufferedProtocol`` implementations allow explicit manual allocation
597 and control of the receive buffer.  Event loops can then use the buffer
598 provided by the protocol to avoid unnecessary data copies.  This
599 can result in noticeable performance improvement for protocols that
600 receive big amounts of data.  Sophisticated protocol implementations
601 can significantly reduce the number of buffer allocations.
602 
603 The following callbacks are called on :class:`BufferedProtocol`
604 instances:
605 
606 .. method:: BufferedProtocol.get_buffer(sizehint)
607 
608    Called to allocate a new receive buffer.
609 
610    *sizehint* is the recommended minimum size for the returned
611    buffer.  It is acceptable to return smaller or larger buffers
612    than what *sizehint* suggests.  When set to -1, the buffer size
613    can be arbitrary. It is an error to return a buffer with a zero size.
614 
615    ``get_buffer()`` must return an object implementing the
616    :ref:`buffer protocol <bufferobjects>`.
617 
618 .. method:: BufferedProtocol.buffer_updated(nbytes)
619 
620    Called when the buffer was updated with the received data.
621 
622    *nbytes* is the total number of bytes that were written to the buffer.
623 
624 .. method:: BufferedProtocol.eof_received()
625 
626    See the documentation of the :meth:`protocol.eof_received()
627    <Protocol.eof_received>` method.
628 
629 
630 :meth:`~BufferedProtocol.get_buffer` can be called an arbitrary number
631 of times during a connection.  However, :meth:`protocol.eof_received()
632 <Protocol.eof_received>` is called at most once
633 and, if called, :meth:`~BufferedProtocol.get_buffer` and
634 :meth:`~BufferedProtocol.buffer_updated` won't be called after it.
635 
636 State machine:
637 
638 .. code-block:: none
639 
640     start -> connection_made
641         [-> get_buffer
642             [-> buffer_updated]?
643         ]*
644         [-> eof_received]?
645     -> connection_lost -> end
646 
647 
648 Datagram Protocols
649 ------------------
650 
651 Datagram Protocol instances should be constructed by protocol
652 factories passed to the :meth:`loop.create_datagram_endpoint` method.
653 
654 .. method:: DatagramProtocol.datagram_received(data, addr)
655 
656    Called when a datagram is received.  *data* is a bytes object containing
657    the incoming data.  *addr* is the address of the peer sending the data;
658    the exact format depends on the transport.
659 
660 .. method:: DatagramProtocol.error_received(exc)
661 
662    Called when a previous send or receive operation raises an
663    :class:`OSError`.  *exc* is the :class:`OSError` instance.
664 
665    This method is called in rare conditions, when the transport (e.g. UDP)
666    detects that a datagram could not be delivered to its recipient.
667    In many conditions though, undeliverable datagrams will be silently
668    dropped.
669 
670 .. note::
671 
672    On BSD systems (macOS, FreeBSD, etc.) flow control is not supported
673    for datagram protocols, because there is no reliable way to detect send
674    failures caused by writing too many packets.
675 
676    The socket always appears 'ready' and excess packets are dropped. An
677    :class:`OSError` with ``errno`` set to :const:`errno.ENOBUFS` may
678    or may not be raised; if it is raised, it will be reported to
679    :meth:`DatagramProtocol.error_received` but otherwise ignored.
680 
681 
682 .. _asyncio-subprocess-protocols:
683 
684 Subprocess Protocols
685 --------------------
686 
687 Subprocess Protocol instances should be constructed by protocol
688 factories passed to the :meth:`loop.subprocess_exec` and
689 :meth:`loop.subprocess_shell` methods.
690 
691 .. method:: SubprocessProtocol.pipe_data_received(fd, data)
692 
693    Called when the child process writes data into its stdout or stderr
694    pipe.
695 
696    *fd* is the integer file descriptor of the pipe.
697 
698    *data* is a non-empty bytes object containing the received data.
699 
700 .. method:: SubprocessProtocol.pipe_connection_lost(fd, exc)
701 
702    Called when one of the pipes communicating with the child process
703    is closed.
704 
705    *fd* is the integer file descriptor that was closed.
706 
707 .. method:: SubprocessProtocol.process_exited()
708 
709    Called when the child process has exited.
710 
711 
712 Examples
713 ========
714 
715 .. _asyncio_example_tcp_echo_server_protocol:
716 
717 TCP Echo Server
718 ---------------
719 
720 Create a TCP echo server using the :meth:`loop.create_server` method, send back
721 received data, and close the connection::
722 
723     import asyncio
724 
725 
726     class EchoServerProtocol(asyncio.Protocol):
727         def connection_made(self, transport):
728             peername = transport.get_extra_info('peername')
729             print('Connection from {}'.format(peername))
730             self.transport = transport
731 
732         def data_received(self, data):
733             message = data.decode()
734             print('Data received: {!r}'.format(message))
735 
736             print('Send: {!r}'.format(message))
737             self.transport.write(data)
738 
739             print('Close the client socket')
740             self.transport.close()
741 
742 
743     async def main():
744         # Get a reference to the event loop as we plan to use
745         # low-level APIs.
746         loop = asyncio.get_running_loop()
747 
748         server = await loop.create_server(
749             lambda: EchoServerProtocol(),
750             '127.0.0.1', 8888)
751 
752         async with server:
753             await server.serve_forever()
754 
755 
756     asyncio.run(main())
757 
758 
759 .. seealso::
760 
761    The :ref:`TCP echo server using streams <asyncio-tcp-echo-server-streams>`
762    example uses the high-level :func:`asyncio.start_server` function.
763 
764 .. _asyncio_example_tcp_echo_client_protocol:
765 
766 TCP Echo Client
767 ---------------
768 
769 A TCP echo client using the :meth:`loop.create_connection` method, sends
770 data, and waits until the connection is closed::
771 
772     import asyncio
773 
774 
775     class EchoClientProtocol(asyncio.Protocol):
776         def __init__(self, message, on_con_lost):
777             self.message = message
778             self.on_con_lost = on_con_lost
779 
780         def connection_made(self, transport):
781             transport.write(self.message.encode())
782             print('Data sent: {!r}'.format(self.message))
783 
784         def data_received(self, data):
785             print('Data received: {!r}'.format(data.decode()))
786 
787         def connection_lost(self, exc):
788             print('The server closed the connection')
789             self.on_con_lost.set_result(True)
790 
791 
792     async def main():
793         # Get a reference to the event loop as we plan to use
794         # low-level APIs.
795         loop = asyncio.get_running_loop()
796 
797         on_con_lost = loop.create_future()
798         message = 'Hello World!'
799 
800         transport, protocol = await loop.create_connection(
801             lambda: EchoClientProtocol(message, on_con_lost),
802             '127.0.0.1', 8888)
803 
804         # Wait until the protocol signals that the connection
805         # is lost and close the transport.
806         try:
807             await on_con_lost
808         finally:
809             transport.close()
810 
811 
812     asyncio.run(main())
813 
814 
815 .. seealso::
816 
817    The :ref:`TCP echo client using streams <asyncio-tcp-echo-client-streams>`
818    example uses the high-level :func:`asyncio.open_connection` function.
819 
820 
821 .. _asyncio-udp-echo-server-protocol:
822 
823 UDP Echo Server
824 ---------------
825 
826 A UDP echo server, using the :meth:`loop.create_datagram_endpoint`
827 method, sends back received data::
828 
829     import asyncio
830 
831 
832     class EchoServerProtocol:
833         def connection_made(self, transport):
834             self.transport = transport
835 
836         def datagram_received(self, data, addr):
837             message = data.decode()
838             print('Received %r from %s' % (message, addr))
839             print('Send %r to %s' % (message, addr))
840             self.transport.sendto(data, addr)
841 
842 
843     async def main():
844         print("Starting UDP server")
845 
846         # Get a reference to the event loop as we plan to use
847         # low-level APIs.
848         loop = asyncio.get_running_loop()
849 
850         # One protocol instance will be created to serve all
851         # client requests.
852         transport, protocol = await loop.create_datagram_endpoint(
853             lambda: EchoServerProtocol(),
854             local_addr=('127.0.0.1', 9999))
855 
856         try:
857             await asyncio.sleep(3600)  # Serve for 1 hour.
858         finally:
859             transport.close()
860 
861 
862     asyncio.run(main())
863 
864 
865 .. _asyncio-udp-echo-client-protocol:
866 
867 UDP Echo Client
868 ---------------
869 
870 A UDP echo client, using the :meth:`loop.create_datagram_endpoint`
871 method, sends data and closes the transport when it receives the answer::
872 
873     import asyncio
874 
875 
876     class EchoClientProtocol:
877         def __init__(self, message, on_con_lost):
878             self.message = message
879             self.on_con_lost = on_con_lost
880             self.transport = None
881 
882         def connection_made(self, transport):
883             self.transport = transport
884             print('Send:', self.message)
885             self.transport.sendto(self.message.encode())
886 
887         def datagram_received(self, data, addr):
888             print("Received:", data.decode())
889 
890             print("Close the socket")
891             self.transport.close()
892 
893         def error_received(self, exc):
894             print('Error received:', exc)
895 
896         def connection_lost(self, exc):
897             print("Connection closed")
898             self.on_con_lost.set_result(True)
899 
900 
901     async def main():
902         # Get a reference to the event loop as we plan to use
903         # low-level APIs.
904         loop = asyncio.get_running_loop()
905 
906         on_con_lost = loop.create_future()
907         message = "Hello World!"
908 
909         transport, protocol = await loop.create_datagram_endpoint(
910             lambda: EchoClientProtocol(message, on_con_lost),
911             remote_addr=('127.0.0.1', 9999))
912 
913         try:
914             await on_con_lost
915         finally:
916             transport.close()
917 
918 
919     asyncio.run(main())
920 
921 
922 .. _asyncio_example_create_connection:
923 
924 Connecting Existing Sockets
925 ---------------------------
926 
927 Wait until a socket receives data using the
928 :meth:`loop.create_connection` method with a protocol::
929 
930     import asyncio
931     import socket
932 
933 
934     class MyProtocol(asyncio.Protocol):
935 
936         def __init__(self, on_con_lost):
937             self.transport = None
938             self.on_con_lost = on_con_lost
939 
940         def connection_made(self, transport):
941             self.transport = transport
942 
943         def data_received(self, data):
944             print("Received:", data.decode())
945 
946             # We are done: close the transport;
947             # connection_lost() will be called automatically.
948             self.transport.close()
949 
950         def connection_lost(self, exc):
951             # The socket has been closed
952             self.on_con_lost.set_result(True)
953 
954 
955     async def main():
956         # Get a reference to the event loop as we plan to use
957         # low-level APIs.
958         loop = asyncio.get_running_loop()
959         on_con_lost = loop.create_future()
960 
961         # Create a pair of connected sockets
962         rsock, wsock = socket.socketpair()
963 
964         # Register the socket to wait for data.
965         transport, protocol = await loop.create_connection(
966             lambda: MyProtocol(on_con_lost), sock=rsock)
967 
968         # Simulate the reception of data from the network.
969         loop.call_soon(wsock.send, 'abc'.encode())
970 
971         try:
972             await protocol.on_con_lost
973         finally:
974             transport.close()
975             wsock.close()
976 
977     asyncio.run(main())
978 
979 .. seealso::
980 
981    The :ref:`watch a file descriptor for read events
982    <asyncio_example_watch_fd>` example uses the low-level
983    :meth:`loop.add_reader` method to register an FD.
984 
985    The :ref:`register an open socket to wait for data using streams
986    <asyncio_example_create_connection-streams>` example uses high-level streams
987    created by the :func:`open_connection` function in a coroutine.
988 
989 .. _asyncio_example_subprocess_proto:
990 
991 loop.subprocess_exec() and SubprocessProtocol
992 ---------------------------------------------
993 
994 An example of a subprocess protocol used to get the output of a
995 subprocess and to wait for the subprocess exit.
996 
997 The subprocess is created by the :meth:`loop.subprocess_exec` method::
998 
999     import asyncio
1000     import sys
1001 
1002     class DateProtocol(asyncio.SubprocessProtocol):
1003         def __init__(self, exit_future):
1004             self.exit_future = exit_future
1005             self.output = bytearray()
1006 
1007         def pipe_data_received(self, fd, data):
1008             self.output.extend(data)
1009 
1010         def process_exited(self):
1011             self.exit_future.set_result(True)
1012 
1013     async def get_date():
1014         # Get a reference to the event loop as we plan to use
1015         # low-level APIs.
1016         loop = asyncio.get_running_loop()
1017 
1018         code = 'import datetime; print(datetime.datetime.now())'
1019         exit_future = asyncio.Future(loop=loop)
1020 
1021         # Create the subprocess controlled by DateProtocol;
1022         # redirect the standard output into a pipe.
1023         transport, protocol = await loop.subprocess_exec(
1024             lambda: DateProtocol(exit_future),
1025             sys.executable, '-c', code,
1026             stdin=None, stderr=None)
1027 
1028         # Wait for the subprocess exit using the process_exited()
1029         # method of the protocol.
1030         await exit_future
1031 
1032         # Close the stdout pipe.
1033         transport.close()
1034 
1035         # Read the output which was collected by the
1036         # pipe_data_received() method of the protocol.
1037         data = bytes(protocol.output)
1038         return data.decode('ascii').rstrip()
1039 
1040     date = asyncio.run(get_date())
1041     print(f"Current date: {date}")
1042 
1043 See also the :ref:`same example <asyncio_example_create_subprocess_exec>`
1044 written using high-level APIs.
1045