1"""Tests for selector_events.py"""
2
3import sys
4import selectors
5import socket
6import unittest
7from unittest import mock
8try:
9    import ssl
10except ImportError:
11    ssl = None
12
13import asyncio
14from asyncio.selector_events import BaseSelectorEventLoop
15from asyncio.selector_events import _SelectorTransport
16from asyncio.selector_events import _SelectorSocketTransport
17from asyncio.selector_events import _SelectorDatagramTransport
18from test.test_asyncio import utils as test_utils
19
20
21MOCK_ANY = mock.ANY
22
23
24def tearDownModule():
25    asyncio.set_event_loop_policy(None)
26
27
28class TestBaseSelectorEventLoop(BaseSelectorEventLoop):
29
30    def _make_self_pipe(self):
31        self._ssock = mock.Mock()
32        self._csock = mock.Mock()
33        self._internal_fds += 1
34
35    def _close_self_pipe(self):
36        pass
37
38
39def list_to_buffer(l=()):
40    return bytearray().join(l)
41
42
43def close_transport(transport):
44    # Don't call transport.close() because the event loop and the selector
45    # are mocked
46    if transport._sock is None:
47        return
48    transport._sock.close()
49    transport._sock = None
50
51
52class BaseSelectorEventLoopTests(test_utils.TestCase):
53
54    def setUp(self):
55        super().setUp()
56        self.selector = mock.Mock()
57        self.selector.select.return_value = []
58        self.loop = TestBaseSelectorEventLoop(self.selector)
59        self.set_event_loop(self.loop)
60
61    def test_make_socket_transport(self):
62        m = mock.Mock()
63        self.loop.add_reader = mock.Mock()
64        transport = self.loop._make_socket_transport(m, asyncio.Protocol())
65        self.assertIsInstance(transport, _SelectorSocketTransport)
66
67        # Calling repr() must not fail when the event loop is closed
68        self.loop.close()
69        repr(transport)
70
71        close_transport(transport)
72
73    @mock.patch('asyncio.selector_events.ssl', None)
74    @mock.patch('asyncio.sslproto.ssl', None)
75    def test_make_ssl_transport_without_ssl_error(self):
76        m = mock.Mock()
77        self.loop.add_reader = mock.Mock()
78        self.loop.add_writer = mock.Mock()
79        self.loop.remove_reader = mock.Mock()
80        self.loop.remove_writer = mock.Mock()
81        with self.assertRaises(RuntimeError):
82            self.loop._make_ssl_transport(m, m, m, m)
83
84    def test_close(self):
85        class EventLoop(BaseSelectorEventLoop):
86            def _make_self_pipe(self):
87                self._ssock = mock.Mock()
88                self._csock = mock.Mock()
89                self._internal_fds += 1
90
91        self.loop = EventLoop(self.selector)
92        self.set_event_loop(self.loop)
93
94        ssock = self.loop._ssock
95        ssock.fileno.return_value = 7
96        csock = self.loop._csock
97        csock.fileno.return_value = 1
98        remove_reader = self.loop._remove_reader = mock.Mock()
99
100        self.loop._selector.close()
101        self.loop._selector = selector = mock.Mock()
102        self.assertFalse(self.loop.is_closed())
103
104        self.loop.close()
105        self.assertTrue(self.loop.is_closed())
106        self.assertIsNone(self.loop._selector)
107        self.assertIsNone(self.loop._csock)
108        self.assertIsNone(self.loop._ssock)
109        selector.close.assert_called_with()
110        ssock.close.assert_called_with()
111        csock.close.assert_called_with()
112        remove_reader.assert_called_with(7)
113
114        # it should be possible to call close() more than once
115        self.loop.close()
116        self.loop.close()
117
118        # operation blocked when the loop is closed
119        f = self.loop.create_future()
120        self.assertRaises(RuntimeError, self.loop.run_forever)
121        self.assertRaises(RuntimeError, self.loop.run_until_complete, f)
122        fd = 0
123        def callback():
124            pass
125        self.assertRaises(RuntimeError, self.loop.add_reader, fd, callback)
126        self.assertRaises(RuntimeError, self.loop.add_writer, fd, callback)
127
128    def test_close_no_selector(self):
129        self.loop.remove_reader = mock.Mock()
130        self.loop._selector.close()
131        self.loop._selector = None
132        self.loop.close()
133        self.assertIsNone(self.loop._selector)
134
135    def test_read_from_self_tryagain(self):
136        self.loop._ssock.recv.side_effect = BlockingIOError
137        self.assertIsNone(self.loop._read_from_self())
138
139    def test_read_from_self_exception(self):
140        self.loop._ssock.recv.side_effect = OSError
141        self.assertRaises(OSError, self.loop._read_from_self)
142
143    def test_write_to_self_tryagain(self):
144        self.loop._csock.send.side_effect = BlockingIOError
145        with test_utils.disable_logger():
146            self.assertIsNone(self.loop._write_to_self())
147
148    def test_write_to_self_exception(self):
149        # _write_to_self() swallows OSError
150        self.loop._csock.send.side_effect = RuntimeError()
151        self.assertRaises(RuntimeError, self.loop._write_to_self)
152
153    @mock.patch('socket.getaddrinfo')
154    def test_sock_connect_resolve_using_socket_params(self, m_gai):
155        addr = ('need-resolution.com', 8080)
156        for sock_type in [socket.SOCK_STREAM, socket.SOCK_DGRAM]:
157            with self.subTest(sock_type):
158                sock = test_utils.mock_nonblocking_socket(type=sock_type)
159
160                m_gai.side_effect = \
161                    lambda *args: [(None, None, None, None, ('127.0.0.1', 0))]
162
163                con = self.loop.create_task(self.loop.sock_connect(sock, addr))
164                self.loop.run_until_complete(con)
165                m_gai.assert_called_with(
166                    addr[0], addr[1], sock.family, sock.type, sock.proto, 0)
167
168                self.loop.run_until_complete(con)
169                sock.connect.assert_called_with(('127.0.0.1', 0))
170
171    def test_add_reader(self):
172        self.loop._selector.get_key.side_effect = KeyError
173        cb = lambda: True
174        self.loop.add_reader(1, cb)
175
176        self.assertTrue(self.loop._selector.register.called)
177        fd, mask, (r, w) = self.loop._selector.register.call_args[0]
178        self.assertEqual(1, fd)
179        self.assertEqual(selectors.EVENT_READ, mask)
180        self.assertEqual(cb, r._callback)
181        self.assertIsNone(w)
182
183    def test_add_reader_existing(self):
184        reader = mock.Mock()
185        writer = mock.Mock()
186        self.loop._selector.get_key.return_value = selectors.SelectorKey(
187            1, 1, selectors.EVENT_WRITE, (reader, writer))
188        cb = lambda: True
189        self.loop.add_reader(1, cb)
190
191        self.assertTrue(reader.cancel.called)
192        self.assertFalse(self.loop._selector.register.called)
193        self.assertTrue(self.loop._selector.modify.called)
194        fd, mask, (r, w) = self.loop._selector.modify.call_args[0]
195        self.assertEqual(1, fd)
196        self.assertEqual(selectors.EVENT_WRITE | selectors.EVENT_READ, mask)
197        self.assertEqual(cb, r._callback)
198        self.assertEqual(writer, w)
199
200    def test_add_reader_existing_writer(self):
201        writer = mock.Mock()
202        self.loop._selector.get_key.return_value = selectors.SelectorKey(
203            1, 1, selectors.EVENT_WRITE, (None, writer))
204        cb = lambda: True
205        self.loop.add_reader(1, cb)
206
207        self.assertFalse(self.loop._selector.register.called)
208        self.assertTrue(self.loop._selector.modify.called)
209        fd, mask, (r, w) = self.loop._selector.modify.call_args[0]
210        self.assertEqual(1, fd)
211        self.assertEqual(selectors.EVENT_WRITE | selectors.EVENT_READ, mask)
212        self.assertEqual(cb, r._callback)
213        self.assertEqual(writer, w)
214
215    def test_remove_reader(self):
216        self.loop._selector.get_key.return_value = selectors.SelectorKey(
217            1, 1, selectors.EVENT_READ, (None, None))
218        self.assertFalse(self.loop.remove_reader(1))
219
220        self.assertTrue(self.loop._selector.unregister.called)
221
222    def test_remove_reader_read_write(self):
223        reader = mock.Mock()
224        writer = mock.Mock()
225        self.loop._selector.get_key.return_value = selectors.SelectorKey(
226            1, 1, selectors.EVENT_READ | selectors.EVENT_WRITE,
227            (reader, writer))
228        self.assertTrue(
229            self.loop.remove_reader(1))
230
231        self.assertFalse(self.loop._selector.unregister.called)
232        self.assertEqual(
233            (1, selectors.EVENT_WRITE, (None, writer)),
234            self.loop._selector.modify.call_args[0])
235
236    def test_remove_reader_unknown(self):
237        self.loop._selector.get_key.side_effect = KeyError
238        self.assertFalse(
239            self.loop.remove_reader(1))
240
241    def test_add_writer(self):
242        self.loop._selector.get_key.side_effect = KeyError
243        cb = lambda: True
244        self.loop.add_writer(1, cb)
245
246        self.assertTrue(self.loop._selector.register.called)
247        fd, mask, (r, w) = self.loop._selector.register.call_args[0]
248        self.assertEqual(1, fd)
249        self.assertEqual(selectors.EVENT_WRITE, mask)
250        self.assertIsNone(r)
251        self.assertEqual(cb, w._callback)
252
253    def test_add_writer_existing(self):
254        reader = mock.Mock()
255        writer = mock.Mock()
256        self.loop._selector.get_key.return_value = selectors.SelectorKey(
257            1, 1, selectors.EVENT_READ, (reader, writer))
258        cb = lambda: True
259        self.loop.add_writer(1, cb)
260
261        self.assertTrue(writer.cancel.called)
262        self.assertFalse(self.loop._selector.register.called)
263        self.assertTrue(self.loop._selector.modify.called)
264        fd, mask, (r, w) = self.loop._selector.modify.call_args[0]
265        self.assertEqual(1, fd)
266        self.assertEqual(selectors.EVENT_WRITE | selectors.EVENT_READ, mask)
267        self.assertEqual(reader, r)
268        self.assertEqual(cb, w._callback)
269
270    def test_remove_writer(self):
271        self.loop._selector.get_key.return_value = selectors.SelectorKey(
272            1, 1, selectors.EVENT_WRITE, (None, None))
273        self.assertFalse(self.loop.remove_writer(1))
274
275        self.assertTrue(self.loop._selector.unregister.called)
276
277    def test_remove_writer_read_write(self):
278        reader = mock.Mock()
279        writer = mock.Mock()
280        self.loop._selector.get_key.return_value = selectors.SelectorKey(
281            1, 1, selectors.EVENT_READ | selectors.EVENT_WRITE,
282            (reader, writer))
283        self.assertTrue(
284            self.loop.remove_writer(1))
285
286        self.assertFalse(self.loop._selector.unregister.called)
287        self.assertEqual(
288            (1, selectors.EVENT_READ, (reader, None)),
289            self.loop._selector.modify.call_args[0])
290
291    def test_remove_writer_unknown(self):
292        self.loop._selector.get_key.side_effect = KeyError
293        self.assertFalse(
294            self.loop.remove_writer(1))
295
296    def test_process_events_read(self):
297        reader = mock.Mock()
298        reader._cancelled = False
299
300        self.loop._add_callback = mock.Mock()
301        self.loop._process_events(
302            [(selectors.SelectorKey(
303                1, 1, selectors.EVENT_READ, (reader, None)),
304              selectors.EVENT_READ)])
305        self.assertTrue(self.loop._add_callback.called)
306        self.loop._add_callback.assert_called_with(reader)
307
308    def test_process_events_read_cancelled(self):
309        reader = mock.Mock()
310        reader.cancelled = True
311
312        self.loop._remove_reader = mock.Mock()
313        self.loop._process_events(
314            [(selectors.SelectorKey(
315                1, 1, selectors.EVENT_READ, (reader, None)),
316             selectors.EVENT_READ)])
317        self.loop._remove_reader.assert_called_with(1)
318
319    def test_process_events_write(self):
320        writer = mock.Mock()
321        writer._cancelled = False
322
323        self.loop._add_callback = mock.Mock()
324        self.loop._process_events(
325            [(selectors.SelectorKey(1, 1, selectors.EVENT_WRITE,
326                                    (None, writer)),
327              selectors.EVENT_WRITE)])
328        self.loop._add_callback.assert_called_with(writer)
329
330    def test_process_events_write_cancelled(self):
331        writer = mock.Mock()
332        writer.cancelled = True
333        self.loop._remove_writer = mock.Mock()
334
335        self.loop._process_events(
336            [(selectors.SelectorKey(1, 1, selectors.EVENT_WRITE,
337                                    (None, writer)),
338              selectors.EVENT_WRITE)])
339        self.loop._remove_writer.assert_called_with(1)
340
341    def test_accept_connection_multiple(self):
342        sock = mock.Mock()
343        sock.accept.return_value = (mock.Mock(), mock.Mock())
344        backlog = 100
345        # Mock the coroutine generation for a connection to prevent
346        # warnings related to un-awaited coroutines. _accept_connection2
347        # is an async function that is patched with AsyncMock. create_task
348        # creates a task out of coroutine returned by AsyncMock, so use
349        # asyncio.sleep(0) to ensure created tasks are complete to avoid
350        # task pending warnings.
351        mock_obj = mock.patch.object
352        with mock_obj(self.loop, '_accept_connection2') as accept2_mock:
353            self.loop._accept_connection(
354                mock.Mock(), sock, backlog=backlog)
355        self.loop.run_until_complete(asyncio.sleep(0))
356        self.assertEqual(sock.accept.call_count, backlog)
357
358
359class SelectorTransportTests(test_utils.TestCase):
360
361    def setUp(self):
362        super().setUp()
363        self.loop = self.new_test_loop()
364        self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
365        self.sock = mock.Mock(socket.socket)
366        self.sock.fileno.return_value = 7
367
368    def create_transport(self):
369        transport = _SelectorTransport(self.loop, self.sock, self.protocol,
370                                       None)
371        self.addCleanup(close_transport, transport)
372        return transport
373
374    def test_ctor(self):
375        tr = self.create_transport()
376        self.assertIs(tr._loop, self.loop)
377        self.assertIs(tr._sock, self.sock)
378        self.assertIs(tr._sock_fd, 7)
379
380    def test_abort(self):
381        tr = self.create_transport()
382        tr._force_close = mock.Mock()
383
384        tr.abort()
385        tr._force_close.assert_called_with(None)
386
387    def test_close(self):
388        tr = self.create_transport()
389        tr.close()
390
391        self.assertTrue(tr.is_closing())
392        self.assertEqual(1, self.loop.remove_reader_count[7])
393        self.protocol.connection_lost(None)
394        self.assertEqual(tr._conn_lost, 1)
395
396        tr.close()
397        self.assertEqual(tr._conn_lost, 1)
398        self.assertEqual(1, self.loop.remove_reader_count[7])
399
400    def test_close_write_buffer(self):
401        tr = self.create_transport()
402        tr._buffer.extend(b'data')
403        tr.close()
404
405        self.assertFalse(self.loop.readers)
406        test_utils.run_briefly(self.loop)
407        self.assertFalse(self.protocol.connection_lost.called)
408
409    def test_force_close(self):
410        tr = self.create_transport()
411        tr._buffer.extend(b'1')
412        self.loop._add_reader(7, mock.sentinel)
413        self.loop._add_writer(7, mock.sentinel)
414        tr._force_close(None)
415
416        self.assertTrue(tr.is_closing())
417        self.assertEqual(tr._buffer, list_to_buffer())
418        self.assertFalse(self.loop.readers)
419        self.assertFalse(self.loop.writers)
420
421        # second close should not remove reader
422        tr._force_close(None)
423        self.assertFalse(self.loop.readers)
424        self.assertEqual(1, self.loop.remove_reader_count[7])
425
426    @mock.patch('asyncio.log.logger.error')
427    def test_fatal_error(self, m_exc):
428        exc = OSError()
429        tr = self.create_transport()
430        tr._force_close = mock.Mock()
431        tr._fatal_error(exc)
432
433        m_exc.assert_not_called()
434
435        tr._force_close.assert_called_with(exc)
436
437    @mock.patch('asyncio.log.logger.error')
438    def test_fatal_error_custom_exception(self, m_exc):
439        class MyError(Exception):
440            pass
441        exc = MyError()
442        tr = self.create_transport()
443        tr._force_close = mock.Mock()
444        tr._fatal_error(exc)
445
446        m_exc.assert_called_with(
447            test_utils.MockPattern(
448                'Fatal error on transport\nprotocol:.*\ntransport:.*'),
449            exc_info=(MyError, MOCK_ANY, MOCK_ANY))
450
451        tr._force_close.assert_called_with(exc)
452
453    def test_connection_lost(self):
454        exc = OSError()
455        tr = self.create_transport()
456        self.assertIsNotNone(tr._protocol)
457        self.assertIsNotNone(tr._loop)
458        tr._call_connection_lost(exc)
459
460        self.protocol.connection_lost.assert_called_with(exc)
461        self.sock.close.assert_called_with()
462        self.assertIsNone(tr._sock)
463
464        self.assertIsNone(tr._protocol)
465        self.assertIsNone(tr._loop)
466
467    def test__add_reader(self):
468        tr = self.create_transport()
469        tr._buffer.extend(b'1')
470        tr._add_reader(7, mock.sentinel)
471        self.assertTrue(self.loop.readers)
472
473        tr._force_close(None)
474
475        self.assertTrue(tr.is_closing())
476        self.assertFalse(self.loop.readers)
477
478        # can not add readers after closing
479        tr._add_reader(7, mock.sentinel)
480        self.assertFalse(self.loop.readers)
481
482
483class SelectorSocketTransportTests(test_utils.TestCase):
484
485    def setUp(self):
486        super().setUp()
487        self.loop = self.new_test_loop()
488        self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
489        self.sock = mock.Mock(socket.socket)
490        self.sock_fd = self.sock.fileno.return_value = 7
491
492    def socket_transport(self, waiter=None):
493        transport = _SelectorSocketTransport(self.loop, self.sock,
494                                             self.protocol, waiter=waiter)
495        self.addCleanup(close_transport, transport)
496        return transport
497
498    def test_ctor(self):
499        waiter = self.loop.create_future()
500        tr = self.socket_transport(waiter=waiter)
501        self.loop.run_until_complete(waiter)
502
503        self.loop.assert_reader(7, tr._read_ready)
504        test_utils.run_briefly(self.loop)
505        self.protocol.connection_made.assert_called_with(tr)
506
507    def test_ctor_with_waiter(self):
508        waiter = self.loop.create_future()
509        self.socket_transport(waiter=waiter)
510        self.loop.run_until_complete(waiter)
511
512        self.assertIsNone(waiter.result())
513
514    def test_pause_resume_reading(self):
515        tr = self.socket_transport()
516        test_utils.run_briefly(self.loop)
517        self.assertFalse(tr._paused)
518        self.assertTrue(tr.is_reading())
519        self.loop.assert_reader(7, tr._read_ready)
520
521        tr.pause_reading()
522        tr.pause_reading()
523        self.assertTrue(tr._paused)
524        self.assertFalse(tr.is_reading())
525        self.loop.assert_no_reader(7)
526
527        tr.resume_reading()
528        tr.resume_reading()
529        self.assertFalse(tr._paused)
530        self.assertTrue(tr.is_reading())
531        self.loop.assert_reader(7, tr._read_ready)
532
533        tr.close()
534        self.assertFalse(tr.is_reading())
535        self.loop.assert_no_reader(7)
536
537    def test_pause_reading_connection_made(self):
538        tr = self.socket_transport()
539        self.protocol.connection_made.side_effect = lambda _: tr.pause_reading()
540        test_utils.run_briefly(self.loop)
541        self.assertFalse(tr.is_reading())
542        self.loop.assert_no_reader(7)
543
544        tr.resume_reading()
545        self.assertTrue(tr.is_reading())
546        self.loop.assert_reader(7, tr._read_ready)
547
548        tr.close()
549        self.assertFalse(tr.is_reading())
550        self.loop.assert_no_reader(7)
551
552
553    def test_read_eof_received_error(self):
554        transport = self.socket_transport()
555        transport.close = mock.Mock()
556        transport._fatal_error = mock.Mock()
557
558        self.loop.call_exception_handler = mock.Mock()
559
560        self.protocol.eof_received.side_effect = LookupError()
561
562        self.sock.recv.return_value = b''
563        transport._read_ready()
564
565        self.protocol.eof_received.assert_called_with()
566        self.assertTrue(transport._fatal_error.called)
567
568    def test_data_received_error(self):
569        transport = self.socket_transport()
570        transport._fatal_error = mock.Mock()
571
572        self.loop.call_exception_handler = mock.Mock()
573        self.protocol.data_received.side_effect = LookupError()
574
575        self.sock.recv.return_value = b'data'
576        transport._read_ready()
577
578        self.assertTrue(transport._fatal_error.called)
579        self.assertTrue(self.protocol.data_received.called)
580
581    def test_read_ready(self):
582        transport = self.socket_transport()
583
584        self.sock.recv.return_value = b'data'
585        transport._read_ready()
586
587        self.protocol.data_received.assert_called_with(b'data')
588
589    def test_read_ready_eof(self):
590        transport = self.socket_transport()
591        transport.close = mock.Mock()
592
593        self.sock.recv.return_value = b''
594        transport._read_ready()
595
596        self.protocol.eof_received.assert_called_with()
597        transport.close.assert_called_with()
598
599    def test_read_ready_eof_keep_open(self):
600        transport = self.socket_transport()
601        transport.close = mock.Mock()
602
603        self.sock.recv.return_value = b''
604        self.protocol.eof_received.return_value = True
605        transport._read_ready()
606
607        self.protocol.eof_received.assert_called_with()
608        self.assertFalse(transport.close.called)
609
610    @mock.patch('logging.exception')
611    def test_read_ready_tryagain(self, m_exc):
612        self.sock.recv.side_effect = BlockingIOError
613
614        transport = self.socket_transport()
615        transport._fatal_error = mock.Mock()
616        transport._read_ready()
617
618        self.assertFalse(transport._fatal_error.called)
619
620    @mock.patch('logging.exception')
621    def test_read_ready_tryagain_interrupted(self, m_exc):
622        self.sock.recv.side_effect = InterruptedError
623
624        transport = self.socket_transport()
625        transport._fatal_error = mock.Mock()
626        transport._read_ready()
627
628        self.assertFalse(transport._fatal_error.called)
629
630    @mock.patch('logging.exception')
631    def test_read_ready_conn_reset(self, m_exc):
632        err = self.sock.recv.side_effect = ConnectionResetError()
633
634        transport = self.socket_transport()
635        transport._force_close = mock.Mock()
636        with test_utils.disable_logger():
637            transport._read_ready()
638        transport._force_close.assert_called_with(err)
639
640    @mock.patch('logging.exception')
641    def test_read_ready_err(self, m_exc):
642        err = self.sock.recv.side_effect = OSError()
643
644        transport = self.socket_transport()
645        transport._fatal_error = mock.Mock()
646        transport._read_ready()
647
648        transport._fatal_error.assert_called_with(
649                                   err,
650                                   'Fatal read error on socket transport')
651
652    def test_write(self):
653        data = b'data'
654        self.sock.send.return_value = len(data)
655
656        transport = self.socket_transport()
657        transport.write(data)
658        self.sock.send.assert_called_with(data)
659
660    def test_write_bytearray(self):
661        data = bytearray(b'data')
662        self.sock.send.return_value = len(data)
663
664        transport = self.socket_transport()
665        transport.write(data)
666        self.sock.send.assert_called_with(data)
667        self.assertEqual(data, bytearray(b'data'))  # Hasn't been mutated.
668
669    def test_write_memoryview(self):
670        data = memoryview(b'data')
671        self.sock.send.return_value = len(data)
672
673        transport = self.socket_transport()
674        transport.write(data)
675        self.sock.send.assert_called_with(data)
676
677    def test_write_no_data(self):
678        transport = self.socket_transport()
679        transport._buffer.extend(b'data')
680        transport.write(b'')
681        self.assertFalse(self.sock.send.called)
682        self.assertEqual(list_to_buffer([b'data']), transport._buffer)
683
684    def test_write_buffer(self):
685        transport = self.socket_transport()
686        transport._buffer.extend(b'data1')
687        transport.write(b'data2')
688        self.assertFalse(self.sock.send.called)
689        self.assertEqual(list_to_buffer([b'data1', b'data2']),
690                         transport._buffer)
691
692    def test_write_partial(self):
693        data = b'data'
694        self.sock.send.return_value = 2
695
696        transport = self.socket_transport()
697        transport.write(data)
698
699        self.loop.assert_writer(7, transport._write_ready)
700        self.assertEqual(list_to_buffer([b'ta']), transport._buffer)
701
702    def test_write_partial_bytearray(self):
703        data = bytearray(b'data')
704        self.sock.send.return_value = 2
705
706        transport = self.socket_transport()
707        transport.write(data)
708
709        self.loop.assert_writer(7, transport._write_ready)
710        self.assertEqual(list_to_buffer([b'ta']), transport._buffer)
711        self.assertEqual(data, bytearray(b'data'))  # Hasn't been mutated.
712
713    def test_write_partial_memoryview(self):
714        data = memoryview(b'data')
715        self.sock.send.return_value = 2
716
717        transport = self.socket_transport()
718        transport.write(data)
719
720        self.loop.assert_writer(7, transport._write_ready)
721        self.assertEqual(list_to_buffer([b'ta']), transport._buffer)
722
723    def test_write_partial_none(self):
724        data = b'data'
725        self.sock.send.return_value = 0
726        self.sock.fileno.return_value = 7
727
728        transport = self.socket_transport()
729        transport.write(data)
730
731        self.loop.assert_writer(7, transport._write_ready)
732        self.assertEqual(list_to_buffer([b'data']), transport._buffer)
733
734    def test_write_tryagain(self):
735        self.sock.send.side_effect = BlockingIOError
736
737        data = b'data'
738        transport = self.socket_transport()
739        transport.write(data)
740
741        self.loop.assert_writer(7, transport._write_ready)
742        self.assertEqual(list_to_buffer([b'data']), transport._buffer)
743
744    @mock.patch('asyncio.selector_events.logger')
745    def test_write_exception(self, m_log):
746        err = self.sock.send.side_effect = OSError()
747
748        data = b'data'
749        transport = self.socket_transport()
750        transport._fatal_error = mock.Mock()
751        transport.write(data)
752        transport._fatal_error.assert_called_with(
753                                   err,
754                                   'Fatal write error on socket transport')
755        transport._conn_lost = 1
756
757        self.sock.reset_mock()
758        transport.write(data)
759        self.assertFalse(self.sock.send.called)
760        self.assertEqual(transport._conn_lost, 2)
761        transport.write(data)
762        transport.write(data)
763        transport.write(data)
764        transport.write(data)
765        m_log.warning.assert_called_with('socket.send() raised exception.')
766
767    def test_write_str(self):
768        transport = self.socket_transport()
769        self.assertRaises(TypeError, transport.write, 'str')
770
771    def test_write_closing(self):
772        transport = self.socket_transport()
773        transport.close()
774        self.assertEqual(transport._conn_lost, 1)
775        transport.write(b'data')
776        self.assertEqual(transport._conn_lost, 2)
777
778    def test_write_ready(self):
779        data = b'data'
780        self.sock.send.return_value = len(data)
781
782        transport = self.socket_transport()
783        transport._buffer.extend(data)
784        self.loop._add_writer(7, transport._write_ready)
785        transport._write_ready()
786        self.assertTrue(self.sock.send.called)
787        self.assertFalse(self.loop.writers)
788
789    def test_write_ready_closing(self):
790        data = b'data'
791        self.sock.send.return_value = len(data)
792
793        transport = self.socket_transport()
794        transport._closing = True
795        transport._buffer.extend(data)
796        self.loop._add_writer(7, transport._write_ready)
797        transport._write_ready()
798        self.assertTrue(self.sock.send.called)
799        self.assertFalse(self.loop.writers)
800        self.sock.close.assert_called_with()
801        self.protocol.connection_lost.assert_called_with(None)
802
803    @unittest.skipIf(sys.flags.optimize, "Assertions are disabled in optimized mode")
804    def test_write_ready_no_data(self):
805        transport = self.socket_transport()
806        # This is an internal error.
807        self.assertRaises(AssertionError, transport._write_ready)
808
809    def test_write_ready_partial(self):
810        data = b'data'
811        self.sock.send.return_value = 2
812
813        transport = self.socket_transport()
814        transport._buffer.extend(data)
815        self.loop._add_writer(7, transport._write_ready)
816        transport._write_ready()
817        self.loop.assert_writer(7, transport._write_ready)
818        self.assertEqual(list_to_buffer([b'ta']), transport._buffer)
819
820    def test_write_ready_partial_none(self):
821        data = b'data'
822        self.sock.send.return_value = 0
823
824        transport = self.socket_transport()
825        transport._buffer.extend(data)
826        self.loop._add_writer(7, transport._write_ready)
827        transport._write_ready()
828        self.loop.assert_writer(7, transport._write_ready)
829        self.assertEqual(list_to_buffer([b'data']), transport._buffer)
830
831    def test_write_ready_tryagain(self):
832        self.sock.send.side_effect = BlockingIOError
833
834        transport = self.socket_transport()
835        transport._buffer = list_to_buffer([b'data1', b'data2'])
836        self.loop._add_writer(7, transport._write_ready)
837        transport._write_ready()
838
839        self.loop.assert_writer(7, transport._write_ready)
840        self.assertEqual(list_to_buffer([b'data1data2']), transport._buffer)
841
842    def test_write_ready_exception(self):
843        err = self.sock.send.side_effect = OSError()
844
845        transport = self.socket_transport()
846        transport._fatal_error = mock.Mock()
847        transport._buffer.extend(b'data')
848        transport._write_ready()
849        transport._fatal_error.assert_called_with(
850                                   err,
851                                   'Fatal write error on socket transport')
852
853    def test_write_eof(self):
854        tr = self.socket_transport()
855        self.assertTrue(tr.can_write_eof())
856        tr.write_eof()
857        self.sock.shutdown.assert_called_with(socket.SHUT_WR)
858        tr.write_eof()
859        self.assertEqual(self.sock.shutdown.call_count, 1)
860        tr.close()
861
862    def test_write_eof_buffer(self):
863        tr = self.socket_transport()
864        self.sock.send.side_effect = BlockingIOError
865        tr.write(b'data')
866        tr.write_eof()
867        self.assertEqual(tr._buffer, list_to_buffer([b'data']))
868        self.assertTrue(tr._eof)
869        self.assertFalse(self.sock.shutdown.called)
870        self.sock.send.side_effect = lambda _: 4
871        tr._write_ready()
872        self.assertTrue(self.sock.send.called)
873        self.sock.shutdown.assert_called_with(socket.SHUT_WR)
874        tr.close()
875
876    def test_write_eof_after_close(self):
877        tr = self.socket_transport()
878        tr.close()
879        self.loop.run_until_complete(asyncio.sleep(0))
880        tr.write_eof()
881
882    @mock.patch('asyncio.base_events.logger')
883    def test_transport_close_remove_writer(self, m_log):
884        remove_writer = self.loop._remove_writer = mock.Mock()
885
886        transport = self.socket_transport()
887        transport.close()
888        remove_writer.assert_called_with(self.sock_fd)
889
890
891class SelectorSocketTransportBufferedProtocolTests(test_utils.TestCase):
892
893    def setUp(self):
894        super().setUp()
895        self.loop = self.new_test_loop()
896
897        self.protocol = test_utils.make_test_protocol(asyncio.BufferedProtocol)
898        self.buf = bytearray(1)
899        self.protocol.get_buffer.side_effect = lambda hint: self.buf
900
901        self.sock = mock.Mock(socket.socket)
902        self.sock_fd = self.sock.fileno.return_value = 7
903
904    def socket_transport(self, waiter=None):
905        transport = _SelectorSocketTransport(self.loop, self.sock,
906                                             self.protocol, waiter=waiter)
907        self.addCleanup(close_transport, transport)
908        return transport
909
910    def test_ctor(self):
911        waiter = self.loop.create_future()
912        tr = self.socket_transport(waiter=waiter)
913        self.loop.run_until_complete(waiter)
914
915        self.loop.assert_reader(7, tr._read_ready)
916        test_utils.run_briefly(self.loop)
917        self.protocol.connection_made.assert_called_with(tr)
918
919    def test_get_buffer_error(self):
920        transport = self.socket_transport()
921        transport._fatal_error = mock.Mock()
922
923        self.loop.call_exception_handler = mock.Mock()
924        self.protocol.get_buffer.side_effect = LookupError()
925
926        transport._read_ready()
927
928        self.assertTrue(transport._fatal_error.called)
929        self.assertTrue(self.protocol.get_buffer.called)
930        self.assertFalse(self.protocol.buffer_updated.called)
931
932    def test_get_buffer_zerosized(self):
933        transport = self.socket_transport()
934        transport._fatal_error = mock.Mock()
935
936        self.loop.call_exception_handler = mock.Mock()
937        self.protocol.get_buffer.side_effect = lambda hint: bytearray(0)
938
939        transport._read_ready()
940
941        self.assertTrue(transport._fatal_error.called)
942        self.assertTrue(self.protocol.get_buffer.called)
943        self.assertFalse(self.protocol.buffer_updated.called)
944
945    def test_proto_type_switch(self):
946        self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
947        transport = self.socket_transport()
948
949        self.sock.recv.return_value = b'data'
950        transport._read_ready()
951
952        self.protocol.data_received.assert_called_with(b'data')
953
954        # switch protocol to a BufferedProtocol
955
956        buf_proto = test_utils.make_test_protocol(asyncio.BufferedProtocol)
957        buf = bytearray(4)
958        buf_proto.get_buffer.side_effect = lambda hint: buf
959
960        transport.set_protocol(buf_proto)
961
962        self.sock.recv_into.return_value = 10
963        transport._read_ready()
964
965        buf_proto.get_buffer.assert_called_with(-1)
966        buf_proto.buffer_updated.assert_called_with(10)
967
968    def test_buffer_updated_error(self):
969        transport = self.socket_transport()
970        transport._fatal_error = mock.Mock()
971
972        self.loop.call_exception_handler = mock.Mock()
973        self.protocol.buffer_updated.side_effect = LookupError()
974
975        self.sock.recv_into.return_value = 10
976        transport._read_ready()
977
978        self.assertTrue(transport._fatal_error.called)
979        self.assertTrue(self.protocol.get_buffer.called)
980        self.assertTrue(self.protocol.buffer_updated.called)
981
982    def test_read_eof_received_error(self):
983        transport = self.socket_transport()
984        transport.close = mock.Mock()
985        transport._fatal_error = mock.Mock()
986
987        self.loop.call_exception_handler = mock.Mock()
988
989        self.protocol.eof_received.side_effect = LookupError()
990
991        self.sock.recv_into.return_value = 0
992        transport._read_ready()
993
994        self.protocol.eof_received.assert_called_with()
995        self.assertTrue(transport._fatal_error.called)
996
997    def test_read_ready(self):
998        transport = self.socket_transport()
999
1000        self.sock.recv_into.return_value = 10
1001        transport._read_ready()
1002
1003        self.protocol.get_buffer.assert_called_with(-1)
1004        self.protocol.buffer_updated.assert_called_with(10)
1005
1006    def test_read_ready_eof(self):
1007        transport = self.socket_transport()
1008        transport.close = mock.Mock()
1009
1010        self.sock.recv_into.return_value = 0
1011        transport._read_ready()
1012
1013        self.protocol.eof_received.assert_called_with()
1014        transport.close.assert_called_with()
1015
1016    def test_read_ready_eof_keep_open(self):
1017        transport = self.socket_transport()
1018        transport.close = mock.Mock()
1019
1020        self.sock.recv_into.return_value = 0
1021        self.protocol.eof_received.return_value = True
1022        transport._read_ready()
1023
1024        self.protocol.eof_received.assert_called_with()
1025        self.assertFalse(transport.close.called)
1026
1027    @mock.patch('logging.exception')
1028    def test_read_ready_tryagain(self, m_exc):
1029        self.sock.recv_into.side_effect = BlockingIOError
1030
1031        transport = self.socket_transport()
1032        transport._fatal_error = mock.Mock()
1033        transport._read_ready()
1034
1035        self.assertFalse(transport._fatal_error.called)
1036
1037    @mock.patch('logging.exception')
1038    def test_read_ready_tryagain_interrupted(self, m_exc):
1039        self.sock.recv_into.side_effect = InterruptedError
1040
1041        transport = self.socket_transport()
1042        transport._fatal_error = mock.Mock()
1043        transport._read_ready()
1044
1045        self.assertFalse(transport._fatal_error.called)
1046
1047    @mock.patch('logging.exception')
1048    def test_read_ready_conn_reset(self, m_exc):
1049        err = self.sock.recv_into.side_effect = ConnectionResetError()
1050
1051        transport = self.socket_transport()
1052        transport._force_close = mock.Mock()
1053        with test_utils.disable_logger():
1054            transport._read_ready()
1055        transport._force_close.assert_called_with(err)
1056
1057    @mock.patch('logging.exception')
1058    def test_read_ready_err(self, m_exc):
1059        err = self.sock.recv_into.side_effect = OSError()
1060
1061        transport = self.socket_transport()
1062        transport._fatal_error = mock.Mock()
1063        transport._read_ready()
1064
1065        transport._fatal_error.assert_called_with(
1066                                   err,
1067                                   'Fatal read error on socket transport')
1068
1069
1070class SelectorDatagramTransportTests(test_utils.TestCase):
1071
1072    def setUp(self):
1073        super().setUp()
1074        self.loop = self.new_test_loop()
1075        self.protocol = test_utils.make_test_protocol(asyncio.DatagramProtocol)
1076        self.sock = mock.Mock(spec_set=socket.socket)
1077        self.sock.fileno.return_value = 7
1078
1079    def datagram_transport(self, address=None):
1080        self.sock.getpeername.side_effect = None if address else OSError
1081        transport = _SelectorDatagramTransport(self.loop, self.sock,
1082                                               self.protocol,
1083                                               address=address)
1084        self.addCleanup(close_transport, transport)
1085        return transport
1086
1087    def test_read_ready(self):
1088        transport = self.datagram_transport()
1089
1090        self.sock.recvfrom.return_value = (b'data', ('0.0.0.0', 1234))
1091        transport._read_ready()
1092
1093        self.protocol.datagram_received.assert_called_with(
1094            b'data', ('0.0.0.0', 1234))
1095
1096    def test_read_ready_tryagain(self):
1097        transport = self.datagram_transport()
1098
1099        self.sock.recvfrom.side_effect = BlockingIOError
1100        transport._fatal_error = mock.Mock()
1101        transport._read_ready()
1102
1103        self.assertFalse(transport._fatal_error.called)
1104
1105    def test_read_ready_err(self):
1106        transport = self.datagram_transport()
1107
1108        err = self.sock.recvfrom.side_effect = RuntimeError()
1109        transport._fatal_error = mock.Mock()
1110        transport._read_ready()
1111
1112        transport._fatal_error.assert_called_with(
1113                                   err,
1114                                   'Fatal read error on datagram transport')
1115
1116    def test_read_ready_oserr(self):
1117        transport = self.datagram_transport()
1118
1119        err = self.sock.recvfrom.side_effect = OSError()
1120        transport._fatal_error = mock.Mock()
1121        transport._read_ready()
1122
1123        self.assertFalse(transport._fatal_error.called)
1124        self.protocol.error_received.assert_called_with(err)
1125
1126    def test_sendto(self):
1127        data = b'data'
1128        transport = self.datagram_transport()
1129        transport.sendto(data, ('0.0.0.0', 1234))
1130        self.assertTrue(self.sock.sendto.called)
1131        self.assertEqual(
1132            self.sock.sendto.call_args[0], (data, ('0.0.0.0', 1234)))
1133
1134    def test_sendto_bytearray(self):
1135        data = bytearray(b'data')
1136        transport = self.datagram_transport()
1137        transport.sendto(data, ('0.0.0.0', 1234))
1138        self.assertTrue(self.sock.sendto.called)
1139        self.assertEqual(
1140            self.sock.sendto.call_args[0], (data, ('0.0.0.0', 1234)))
1141
1142    def test_sendto_memoryview(self):
1143        data = memoryview(b'data')
1144        transport = self.datagram_transport()
1145        transport.sendto(data, ('0.0.0.0', 1234))
1146        self.assertTrue(self.sock.sendto.called)
1147        self.assertEqual(
1148            self.sock.sendto.call_args[0], (data, ('0.0.0.0', 1234)))
1149
1150    def test_sendto_no_data(self):
1151        transport = self.datagram_transport()
1152        transport._buffer.append((b'data', ('0.0.0.0', 12345)))
1153        transport.sendto(b'', ())
1154        self.assertFalse(self.sock.sendto.called)
1155        self.assertEqual(
1156            [(b'data', ('0.0.0.0', 12345))], list(transport._buffer))
1157
1158    def test_sendto_buffer(self):
1159        transport = self.datagram_transport()
1160        transport._buffer.append((b'data1', ('0.0.0.0', 12345)))
1161        transport.sendto(b'data2', ('0.0.0.0', 12345))
1162        self.assertFalse(self.sock.sendto.called)
1163        self.assertEqual(
1164            [(b'data1', ('0.0.0.0', 12345)),
1165             (b'data2', ('0.0.0.0', 12345))],
1166            list(transport._buffer))
1167
1168    def test_sendto_buffer_bytearray(self):
1169        data2 = bytearray(b'data2')
1170        transport = self.datagram_transport()
1171        transport._buffer.append((b'data1', ('0.0.0.0', 12345)))
1172        transport.sendto(data2, ('0.0.0.0', 12345))
1173        self.assertFalse(self.sock.sendto.called)
1174        self.assertEqual(
1175            [(b'data1', ('0.0.0.0', 12345)),
1176             (b'data2', ('0.0.0.0', 12345))],
1177            list(transport._buffer))
1178        self.assertIsInstance(transport._buffer[1][0], bytes)
1179
1180    def test_sendto_buffer_memoryview(self):
1181        data2 = memoryview(b'data2')
1182        transport = self.datagram_transport()
1183        transport._buffer.append((b'data1', ('0.0.0.0', 12345)))
1184        transport.sendto(data2, ('0.0.0.0', 12345))
1185        self.assertFalse(self.sock.sendto.called)
1186        self.assertEqual(
1187            [(b'data1', ('0.0.0.0', 12345)),
1188             (b'data2', ('0.0.0.0', 12345))],
1189            list(transport._buffer))
1190        self.assertIsInstance(transport._buffer[1][0], bytes)
1191
1192    def test_sendto_tryagain(self):
1193        data = b'data'
1194
1195        self.sock.sendto.side_effect = BlockingIOError
1196
1197        transport = self.datagram_transport()
1198        transport.sendto(data, ('0.0.0.0', 12345))
1199
1200        self.loop.assert_writer(7, transport._sendto_ready)
1201        self.assertEqual(
1202            [(b'data', ('0.0.0.0', 12345))], list(transport._buffer))
1203
1204    @mock.patch('asyncio.selector_events.logger')
1205    def test_sendto_exception(self, m_log):
1206        data = b'data'
1207        err = self.sock.sendto.side_effect = RuntimeError()
1208
1209        transport = self.datagram_transport()
1210        transport._fatal_error = mock.Mock()
1211        transport.sendto(data, ())
1212
1213        self.assertTrue(transport._fatal_error.called)
1214        transport._fatal_error.assert_called_with(
1215                                   err,
1216                                   'Fatal write error on datagram transport')
1217        transport._conn_lost = 1
1218
1219        transport._address = ('123',)
1220        transport.sendto(data)
1221        transport.sendto(data)
1222        transport.sendto(data)
1223        transport.sendto(data)
1224        transport.sendto(data)
1225        m_log.warning.assert_called_with('socket.send() raised exception.')
1226
1227    def test_sendto_error_received(self):
1228        data = b'data'
1229
1230        self.sock.sendto.side_effect = ConnectionRefusedError
1231
1232        transport = self.datagram_transport()
1233        transport._fatal_error = mock.Mock()
1234        transport.sendto(data, ())
1235
1236        self.assertEqual(transport._conn_lost, 0)
1237        self.assertFalse(transport._fatal_error.called)
1238
1239    def test_sendto_error_received_connected(self):
1240        data = b'data'
1241
1242        self.sock.send.side_effect = ConnectionRefusedError
1243
1244        transport = self.datagram_transport(address=('0.0.0.0', 1))
1245        transport._fatal_error = mock.Mock()
1246        transport.sendto(data)
1247
1248        self.assertFalse(transport._fatal_error.called)
1249        self.assertTrue(self.protocol.error_received.called)
1250
1251    def test_sendto_str(self):
1252        transport = self.datagram_transport()
1253        self.assertRaises(TypeError, transport.sendto, 'str', ())
1254
1255    def test_sendto_connected_addr(self):
1256        transport = self.datagram_transport(address=('0.0.0.0', 1))
1257        self.assertRaises(
1258            ValueError, transport.sendto, b'str', ('0.0.0.0', 2))
1259
1260    def test_sendto_closing(self):
1261        transport = self.datagram_transport(address=(1,))
1262        transport.close()
1263        self.assertEqual(transport._conn_lost, 1)
1264        transport.sendto(b'data', (1,))
1265        self.assertEqual(transport._conn_lost, 2)
1266
1267    def test_sendto_ready(self):
1268        data = b'data'
1269        self.sock.sendto.return_value = len(data)
1270
1271        transport = self.datagram_transport()
1272        transport._buffer.append((data, ('0.0.0.0', 12345)))
1273        self.loop._add_writer(7, transport._sendto_ready)
1274        transport._sendto_ready()
1275        self.assertTrue(self.sock.sendto.called)
1276        self.assertEqual(
1277            self.sock.sendto.call_args[0], (data, ('0.0.0.0', 12345)))
1278        self.assertFalse(self.loop.writers)
1279
1280    def test_sendto_ready_closing(self):
1281        data = b'data'
1282        self.sock.send.return_value = len(data)
1283
1284        transport = self.datagram_transport()
1285        transport._closing = True
1286        transport._buffer.append((data, ()))
1287        self.loop._add_writer(7, transport._sendto_ready)
1288        transport._sendto_ready()
1289        self.sock.sendto.assert_called_with(data, ())
1290        self.assertFalse(self.loop.writers)
1291        self.sock.close.assert_called_with()
1292        self.protocol.connection_lost.assert_called_with(None)
1293
1294    def test_sendto_ready_no_data(self):
1295        transport = self.datagram_transport()
1296        self.loop._add_writer(7, transport._sendto_ready)
1297        transport._sendto_ready()
1298        self.assertFalse(self.sock.sendto.called)
1299        self.assertFalse(self.loop.writers)
1300
1301    def test_sendto_ready_tryagain(self):
1302        self.sock.sendto.side_effect = BlockingIOError
1303
1304        transport = self.datagram_transport()
1305        transport._buffer.extend([(b'data1', ()), (b'data2', ())])
1306        self.loop._add_writer(7, transport._sendto_ready)
1307        transport._sendto_ready()
1308
1309        self.loop.assert_writer(7, transport._sendto_ready)
1310        self.assertEqual(
1311            [(b'data1', ()), (b'data2', ())],
1312            list(transport._buffer))
1313
1314    def test_sendto_ready_exception(self):
1315        err = self.sock.sendto.side_effect = RuntimeError()
1316
1317        transport = self.datagram_transport()
1318        transport._fatal_error = mock.Mock()
1319        transport._buffer.append((b'data', ()))
1320        transport._sendto_ready()
1321
1322        transport._fatal_error.assert_called_with(
1323                                   err,
1324                                   'Fatal write error on datagram transport')
1325
1326    def test_sendto_ready_error_received(self):
1327        self.sock.sendto.side_effect = ConnectionRefusedError
1328
1329        transport = self.datagram_transport()
1330        transport._fatal_error = mock.Mock()
1331        transport._buffer.append((b'data', ()))
1332        transport._sendto_ready()
1333
1334        self.assertFalse(transport._fatal_error.called)
1335
1336    def test_sendto_ready_error_received_connection(self):
1337        self.sock.send.side_effect = ConnectionRefusedError
1338
1339        transport = self.datagram_transport(address=('0.0.0.0', 1))
1340        transport._fatal_error = mock.Mock()
1341        transport._buffer.append((b'data', ()))
1342        transport._sendto_ready()
1343
1344        self.assertFalse(transport._fatal_error.called)
1345        self.assertTrue(self.protocol.error_received.called)
1346
1347    @mock.patch('asyncio.base_events.logger.error')
1348    def test_fatal_error_connected(self, m_exc):
1349        transport = self.datagram_transport(address=('0.0.0.0', 1))
1350        err = ConnectionRefusedError()
1351        transport._fatal_error(err)
1352        self.assertFalse(self.protocol.error_received.called)
1353        m_exc.assert_not_called()
1354
1355    @mock.patch('asyncio.base_events.logger.error')
1356    def test_fatal_error_connected_custom_error(self, m_exc):
1357        class MyException(Exception):
1358            pass
1359        transport = self.datagram_transport(address=('0.0.0.0', 1))
1360        err = MyException()
1361        transport._fatal_error(err)
1362        self.assertFalse(self.protocol.error_received.called)
1363        m_exc.assert_called_with(
1364            test_utils.MockPattern(
1365                'Fatal error on transport\nprotocol:.*\ntransport:.*'),
1366            exc_info=(MyException, MOCK_ANY, MOCK_ANY))
1367
1368
1369if __name__ == '__main__':
1370    unittest.main()
1371