1"""Tests for selector_events.py""" 2 3import sys 4import selectors 5import socket 6import unittest 7from unittest import mock 8try: 9 import ssl 10except ImportError: 11 ssl = None 12 13import asyncio 14from asyncio.selector_events import BaseSelectorEventLoop 15from asyncio.selector_events import _SelectorTransport 16from asyncio.selector_events import _SelectorSocketTransport 17from asyncio.selector_events import _SelectorDatagramTransport 18from test.test_asyncio import utils as test_utils 19 20 21MOCK_ANY = mock.ANY 22 23 24def tearDownModule(): 25 asyncio.set_event_loop_policy(None) 26 27 28class TestBaseSelectorEventLoop(BaseSelectorEventLoop): 29 30 def _make_self_pipe(self): 31 self._ssock = mock.Mock() 32 self._csock = mock.Mock() 33 self._internal_fds += 1 34 35 def _close_self_pipe(self): 36 pass 37 38 39def list_to_buffer(l=()): 40 return bytearray().join(l) 41 42 43def close_transport(transport): 44 # Don't call transport.close() because the event loop and the selector 45 # are mocked 46 if transport._sock is None: 47 return 48 transport._sock.close() 49 transport._sock = None 50 51 52class BaseSelectorEventLoopTests(test_utils.TestCase): 53 54 def setUp(self): 55 super().setUp() 56 self.selector = mock.Mock() 57 self.selector.select.return_value = [] 58 self.loop = TestBaseSelectorEventLoop(self.selector) 59 self.set_event_loop(self.loop) 60 61 def test_make_socket_transport(self): 62 m = mock.Mock() 63 self.loop.add_reader = mock.Mock() 64 transport = self.loop._make_socket_transport(m, asyncio.Protocol()) 65 self.assertIsInstance(transport, _SelectorSocketTransport) 66 67 # Calling repr() must not fail when the event loop is closed 68 self.loop.close() 69 repr(transport) 70 71 close_transport(transport) 72 73 @mock.patch('asyncio.selector_events.ssl', None) 74 @mock.patch('asyncio.sslproto.ssl', None) 75 def test_make_ssl_transport_without_ssl_error(self): 76 m = mock.Mock() 77 self.loop.add_reader = mock.Mock() 78 self.loop.add_writer = mock.Mock() 79 self.loop.remove_reader = mock.Mock() 80 self.loop.remove_writer = mock.Mock() 81 with self.assertRaises(RuntimeError): 82 self.loop._make_ssl_transport(m, m, m, m) 83 84 def test_close(self): 85 class EventLoop(BaseSelectorEventLoop): 86 def _make_self_pipe(self): 87 self._ssock = mock.Mock() 88 self._csock = mock.Mock() 89 self._internal_fds += 1 90 91 self.loop = EventLoop(self.selector) 92 self.set_event_loop(self.loop) 93 94 ssock = self.loop._ssock 95 ssock.fileno.return_value = 7 96 csock = self.loop._csock 97 csock.fileno.return_value = 1 98 remove_reader = self.loop._remove_reader = mock.Mock() 99 100 self.loop._selector.close() 101 self.loop._selector = selector = mock.Mock() 102 self.assertFalse(self.loop.is_closed()) 103 104 self.loop.close() 105 self.assertTrue(self.loop.is_closed()) 106 self.assertIsNone(self.loop._selector) 107 self.assertIsNone(self.loop._csock) 108 self.assertIsNone(self.loop._ssock) 109 selector.close.assert_called_with() 110 ssock.close.assert_called_with() 111 csock.close.assert_called_with() 112 remove_reader.assert_called_with(7) 113 114 # it should be possible to call close() more than once 115 self.loop.close() 116 self.loop.close() 117 118 # operation blocked when the loop is closed 119 f = self.loop.create_future() 120 self.assertRaises(RuntimeError, self.loop.run_forever) 121 self.assertRaises(RuntimeError, self.loop.run_until_complete, f) 122 fd = 0 123 def callback(): 124 pass 125 self.assertRaises(RuntimeError, self.loop.add_reader, fd, callback) 126 self.assertRaises(RuntimeError, self.loop.add_writer, fd, callback) 127 128 def test_close_no_selector(self): 129 self.loop.remove_reader = mock.Mock() 130 self.loop._selector.close() 131 self.loop._selector = None 132 self.loop.close() 133 self.assertIsNone(self.loop._selector) 134 135 def test_read_from_self_tryagain(self): 136 self.loop._ssock.recv.side_effect = BlockingIOError 137 self.assertIsNone(self.loop._read_from_self()) 138 139 def test_read_from_self_exception(self): 140 self.loop._ssock.recv.side_effect = OSError 141 self.assertRaises(OSError, self.loop._read_from_self) 142 143 def test_write_to_self_tryagain(self): 144 self.loop._csock.send.side_effect = BlockingIOError 145 with test_utils.disable_logger(): 146 self.assertIsNone(self.loop._write_to_self()) 147 148 def test_write_to_self_exception(self): 149 # _write_to_self() swallows OSError 150 self.loop._csock.send.side_effect = RuntimeError() 151 self.assertRaises(RuntimeError, self.loop._write_to_self) 152 153 @mock.patch('socket.getaddrinfo') 154 def test_sock_connect_resolve_using_socket_params(self, m_gai): 155 addr = ('need-resolution.com', 8080) 156 for sock_type in [socket.SOCK_STREAM, socket.SOCK_DGRAM]: 157 with self.subTest(sock_type): 158 sock = test_utils.mock_nonblocking_socket(type=sock_type) 159 160 m_gai.side_effect = \ 161 lambda *args: [(None, None, None, None, ('127.0.0.1', 0))] 162 163 con = self.loop.create_task(self.loop.sock_connect(sock, addr)) 164 self.loop.run_until_complete(con) 165 m_gai.assert_called_with( 166 addr[0], addr[1], sock.family, sock.type, sock.proto, 0) 167 168 self.loop.run_until_complete(con) 169 sock.connect.assert_called_with(('127.0.0.1', 0)) 170 171 def test_add_reader(self): 172 self.loop._selector.get_key.side_effect = KeyError 173 cb = lambda: True 174 self.loop.add_reader(1, cb) 175 176 self.assertTrue(self.loop._selector.register.called) 177 fd, mask, (r, w) = self.loop._selector.register.call_args[0] 178 self.assertEqual(1, fd) 179 self.assertEqual(selectors.EVENT_READ, mask) 180 self.assertEqual(cb, r._callback) 181 self.assertIsNone(w) 182 183 def test_add_reader_existing(self): 184 reader = mock.Mock() 185 writer = mock.Mock() 186 self.loop._selector.get_key.return_value = selectors.SelectorKey( 187 1, 1, selectors.EVENT_WRITE, (reader, writer)) 188 cb = lambda: True 189 self.loop.add_reader(1, cb) 190 191 self.assertTrue(reader.cancel.called) 192 self.assertFalse(self.loop._selector.register.called) 193 self.assertTrue(self.loop._selector.modify.called) 194 fd, mask, (r, w) = self.loop._selector.modify.call_args[0] 195 self.assertEqual(1, fd) 196 self.assertEqual(selectors.EVENT_WRITE | selectors.EVENT_READ, mask) 197 self.assertEqual(cb, r._callback) 198 self.assertEqual(writer, w) 199 200 def test_add_reader_existing_writer(self): 201 writer = mock.Mock() 202 self.loop._selector.get_key.return_value = selectors.SelectorKey( 203 1, 1, selectors.EVENT_WRITE, (None, writer)) 204 cb = lambda: True 205 self.loop.add_reader(1, cb) 206 207 self.assertFalse(self.loop._selector.register.called) 208 self.assertTrue(self.loop._selector.modify.called) 209 fd, mask, (r, w) = self.loop._selector.modify.call_args[0] 210 self.assertEqual(1, fd) 211 self.assertEqual(selectors.EVENT_WRITE | selectors.EVENT_READ, mask) 212 self.assertEqual(cb, r._callback) 213 self.assertEqual(writer, w) 214 215 def test_remove_reader(self): 216 self.loop._selector.get_key.return_value = selectors.SelectorKey( 217 1, 1, selectors.EVENT_READ, (None, None)) 218 self.assertFalse(self.loop.remove_reader(1)) 219 220 self.assertTrue(self.loop._selector.unregister.called) 221 222 def test_remove_reader_read_write(self): 223 reader = mock.Mock() 224 writer = mock.Mock() 225 self.loop._selector.get_key.return_value = selectors.SelectorKey( 226 1, 1, selectors.EVENT_READ | selectors.EVENT_WRITE, 227 (reader, writer)) 228 self.assertTrue( 229 self.loop.remove_reader(1)) 230 231 self.assertFalse(self.loop._selector.unregister.called) 232 self.assertEqual( 233 (1, selectors.EVENT_WRITE, (None, writer)), 234 self.loop._selector.modify.call_args[0]) 235 236 def test_remove_reader_unknown(self): 237 self.loop._selector.get_key.side_effect = KeyError 238 self.assertFalse( 239 self.loop.remove_reader(1)) 240 241 def test_add_writer(self): 242 self.loop._selector.get_key.side_effect = KeyError 243 cb = lambda: True 244 self.loop.add_writer(1, cb) 245 246 self.assertTrue(self.loop._selector.register.called) 247 fd, mask, (r, w) = self.loop._selector.register.call_args[0] 248 self.assertEqual(1, fd) 249 self.assertEqual(selectors.EVENT_WRITE, mask) 250 self.assertIsNone(r) 251 self.assertEqual(cb, w._callback) 252 253 def test_add_writer_existing(self): 254 reader = mock.Mock() 255 writer = mock.Mock() 256 self.loop._selector.get_key.return_value = selectors.SelectorKey( 257 1, 1, selectors.EVENT_READ, (reader, writer)) 258 cb = lambda: True 259 self.loop.add_writer(1, cb) 260 261 self.assertTrue(writer.cancel.called) 262 self.assertFalse(self.loop._selector.register.called) 263 self.assertTrue(self.loop._selector.modify.called) 264 fd, mask, (r, w) = self.loop._selector.modify.call_args[0] 265 self.assertEqual(1, fd) 266 self.assertEqual(selectors.EVENT_WRITE | selectors.EVENT_READ, mask) 267 self.assertEqual(reader, r) 268 self.assertEqual(cb, w._callback) 269 270 def test_remove_writer(self): 271 self.loop._selector.get_key.return_value = selectors.SelectorKey( 272 1, 1, selectors.EVENT_WRITE, (None, None)) 273 self.assertFalse(self.loop.remove_writer(1)) 274 275 self.assertTrue(self.loop._selector.unregister.called) 276 277 def test_remove_writer_read_write(self): 278 reader = mock.Mock() 279 writer = mock.Mock() 280 self.loop._selector.get_key.return_value = selectors.SelectorKey( 281 1, 1, selectors.EVENT_READ | selectors.EVENT_WRITE, 282 (reader, writer)) 283 self.assertTrue( 284 self.loop.remove_writer(1)) 285 286 self.assertFalse(self.loop._selector.unregister.called) 287 self.assertEqual( 288 (1, selectors.EVENT_READ, (reader, None)), 289 self.loop._selector.modify.call_args[0]) 290 291 def test_remove_writer_unknown(self): 292 self.loop._selector.get_key.side_effect = KeyError 293 self.assertFalse( 294 self.loop.remove_writer(1)) 295 296 def test_process_events_read(self): 297 reader = mock.Mock() 298 reader._cancelled = False 299 300 self.loop._add_callback = mock.Mock() 301 self.loop._process_events( 302 [(selectors.SelectorKey( 303 1, 1, selectors.EVENT_READ, (reader, None)), 304 selectors.EVENT_READ)]) 305 self.assertTrue(self.loop._add_callback.called) 306 self.loop._add_callback.assert_called_with(reader) 307 308 def test_process_events_read_cancelled(self): 309 reader = mock.Mock() 310 reader.cancelled = True 311 312 self.loop._remove_reader = mock.Mock() 313 self.loop._process_events( 314 [(selectors.SelectorKey( 315 1, 1, selectors.EVENT_READ, (reader, None)), 316 selectors.EVENT_READ)]) 317 self.loop._remove_reader.assert_called_with(1) 318 319 def test_process_events_write(self): 320 writer = mock.Mock() 321 writer._cancelled = False 322 323 self.loop._add_callback = mock.Mock() 324 self.loop._process_events( 325 [(selectors.SelectorKey(1, 1, selectors.EVENT_WRITE, 326 (None, writer)), 327 selectors.EVENT_WRITE)]) 328 self.loop._add_callback.assert_called_with(writer) 329 330 def test_process_events_write_cancelled(self): 331 writer = mock.Mock() 332 writer.cancelled = True 333 self.loop._remove_writer = mock.Mock() 334 335 self.loop._process_events( 336 [(selectors.SelectorKey(1, 1, selectors.EVENT_WRITE, 337 (None, writer)), 338 selectors.EVENT_WRITE)]) 339 self.loop._remove_writer.assert_called_with(1) 340 341 def test_accept_connection_multiple(self): 342 sock = mock.Mock() 343 sock.accept.return_value = (mock.Mock(), mock.Mock()) 344 backlog = 100 345 # Mock the coroutine generation for a connection to prevent 346 # warnings related to un-awaited coroutines. _accept_connection2 347 # is an async function that is patched with AsyncMock. create_task 348 # creates a task out of coroutine returned by AsyncMock, so use 349 # asyncio.sleep(0) to ensure created tasks are complete to avoid 350 # task pending warnings. 351 mock_obj = mock.patch.object 352 with mock_obj(self.loop, '_accept_connection2') as accept2_mock: 353 self.loop._accept_connection( 354 mock.Mock(), sock, backlog=backlog) 355 self.loop.run_until_complete(asyncio.sleep(0)) 356 self.assertEqual(sock.accept.call_count, backlog) 357 358 359class SelectorTransportTests(test_utils.TestCase): 360 361 def setUp(self): 362 super().setUp() 363 self.loop = self.new_test_loop() 364 self.protocol = test_utils.make_test_protocol(asyncio.Protocol) 365 self.sock = mock.Mock(socket.socket) 366 self.sock.fileno.return_value = 7 367 368 def create_transport(self): 369 transport = _SelectorTransport(self.loop, self.sock, self.protocol, 370 None) 371 self.addCleanup(close_transport, transport) 372 return transport 373 374 def test_ctor(self): 375 tr = self.create_transport() 376 self.assertIs(tr._loop, self.loop) 377 self.assertIs(tr._sock, self.sock) 378 self.assertIs(tr._sock_fd, 7) 379 380 def test_abort(self): 381 tr = self.create_transport() 382 tr._force_close = mock.Mock() 383 384 tr.abort() 385 tr._force_close.assert_called_with(None) 386 387 def test_close(self): 388 tr = self.create_transport() 389 tr.close() 390 391 self.assertTrue(tr.is_closing()) 392 self.assertEqual(1, self.loop.remove_reader_count[7]) 393 self.protocol.connection_lost(None) 394 self.assertEqual(tr._conn_lost, 1) 395 396 tr.close() 397 self.assertEqual(tr._conn_lost, 1) 398 self.assertEqual(1, self.loop.remove_reader_count[7]) 399 400 def test_close_write_buffer(self): 401 tr = self.create_transport() 402 tr._buffer.extend(b'data') 403 tr.close() 404 405 self.assertFalse(self.loop.readers) 406 test_utils.run_briefly(self.loop) 407 self.assertFalse(self.protocol.connection_lost.called) 408 409 def test_force_close(self): 410 tr = self.create_transport() 411 tr._buffer.extend(b'1') 412 self.loop._add_reader(7, mock.sentinel) 413 self.loop._add_writer(7, mock.sentinel) 414 tr._force_close(None) 415 416 self.assertTrue(tr.is_closing()) 417 self.assertEqual(tr._buffer, list_to_buffer()) 418 self.assertFalse(self.loop.readers) 419 self.assertFalse(self.loop.writers) 420 421 # second close should not remove reader 422 tr._force_close(None) 423 self.assertFalse(self.loop.readers) 424 self.assertEqual(1, self.loop.remove_reader_count[7]) 425 426 @mock.patch('asyncio.log.logger.error') 427 def test_fatal_error(self, m_exc): 428 exc = OSError() 429 tr = self.create_transport() 430 tr._force_close = mock.Mock() 431 tr._fatal_error(exc) 432 433 m_exc.assert_not_called() 434 435 tr._force_close.assert_called_with(exc) 436 437 @mock.patch('asyncio.log.logger.error') 438 def test_fatal_error_custom_exception(self, m_exc): 439 class MyError(Exception): 440 pass 441 exc = MyError() 442 tr = self.create_transport() 443 tr._force_close = mock.Mock() 444 tr._fatal_error(exc) 445 446 m_exc.assert_called_with( 447 test_utils.MockPattern( 448 'Fatal error on transport\nprotocol:.*\ntransport:.*'), 449 exc_info=(MyError, MOCK_ANY, MOCK_ANY)) 450 451 tr._force_close.assert_called_with(exc) 452 453 def test_connection_lost(self): 454 exc = OSError() 455 tr = self.create_transport() 456 self.assertIsNotNone(tr._protocol) 457 self.assertIsNotNone(tr._loop) 458 tr._call_connection_lost(exc) 459 460 self.protocol.connection_lost.assert_called_with(exc) 461 self.sock.close.assert_called_with() 462 self.assertIsNone(tr._sock) 463 464 self.assertIsNone(tr._protocol) 465 self.assertIsNone(tr._loop) 466 467 def test__add_reader(self): 468 tr = self.create_transport() 469 tr._buffer.extend(b'1') 470 tr._add_reader(7, mock.sentinel) 471 self.assertTrue(self.loop.readers) 472 473 tr._force_close(None) 474 475 self.assertTrue(tr.is_closing()) 476 self.assertFalse(self.loop.readers) 477 478 # can not add readers after closing 479 tr._add_reader(7, mock.sentinel) 480 self.assertFalse(self.loop.readers) 481 482 483class SelectorSocketTransportTests(test_utils.TestCase): 484 485 def setUp(self): 486 super().setUp() 487 self.loop = self.new_test_loop() 488 self.protocol = test_utils.make_test_protocol(asyncio.Protocol) 489 self.sock = mock.Mock(socket.socket) 490 self.sock_fd = self.sock.fileno.return_value = 7 491 492 def socket_transport(self, waiter=None): 493 transport = _SelectorSocketTransport(self.loop, self.sock, 494 self.protocol, waiter=waiter) 495 self.addCleanup(close_transport, transport) 496 return transport 497 498 def test_ctor(self): 499 waiter = self.loop.create_future() 500 tr = self.socket_transport(waiter=waiter) 501 self.loop.run_until_complete(waiter) 502 503 self.loop.assert_reader(7, tr._read_ready) 504 test_utils.run_briefly(self.loop) 505 self.protocol.connection_made.assert_called_with(tr) 506 507 def test_ctor_with_waiter(self): 508 waiter = self.loop.create_future() 509 self.socket_transport(waiter=waiter) 510 self.loop.run_until_complete(waiter) 511 512 self.assertIsNone(waiter.result()) 513 514 def test_pause_resume_reading(self): 515 tr = self.socket_transport() 516 test_utils.run_briefly(self.loop) 517 self.assertFalse(tr._paused) 518 self.assertTrue(tr.is_reading()) 519 self.loop.assert_reader(7, tr._read_ready) 520 521 tr.pause_reading() 522 tr.pause_reading() 523 self.assertTrue(tr._paused) 524 self.assertFalse(tr.is_reading()) 525 self.loop.assert_no_reader(7) 526 527 tr.resume_reading() 528 tr.resume_reading() 529 self.assertFalse(tr._paused) 530 self.assertTrue(tr.is_reading()) 531 self.loop.assert_reader(7, tr._read_ready) 532 533 tr.close() 534 self.assertFalse(tr.is_reading()) 535 self.loop.assert_no_reader(7) 536 537 def test_pause_reading_connection_made(self): 538 tr = self.socket_transport() 539 self.protocol.connection_made.side_effect = lambda _: tr.pause_reading() 540 test_utils.run_briefly(self.loop) 541 self.assertFalse(tr.is_reading()) 542 self.loop.assert_no_reader(7) 543 544 tr.resume_reading() 545 self.assertTrue(tr.is_reading()) 546 self.loop.assert_reader(7, tr._read_ready) 547 548 tr.close() 549 self.assertFalse(tr.is_reading()) 550 self.loop.assert_no_reader(7) 551 552 553 def test_read_eof_received_error(self): 554 transport = self.socket_transport() 555 transport.close = mock.Mock() 556 transport._fatal_error = mock.Mock() 557 558 self.loop.call_exception_handler = mock.Mock() 559 560 self.protocol.eof_received.side_effect = LookupError() 561 562 self.sock.recv.return_value = b'' 563 transport._read_ready() 564 565 self.protocol.eof_received.assert_called_with() 566 self.assertTrue(transport._fatal_error.called) 567 568 def test_data_received_error(self): 569 transport = self.socket_transport() 570 transport._fatal_error = mock.Mock() 571 572 self.loop.call_exception_handler = mock.Mock() 573 self.protocol.data_received.side_effect = LookupError() 574 575 self.sock.recv.return_value = b'data' 576 transport._read_ready() 577 578 self.assertTrue(transport._fatal_error.called) 579 self.assertTrue(self.protocol.data_received.called) 580 581 def test_read_ready(self): 582 transport = self.socket_transport() 583 584 self.sock.recv.return_value = b'data' 585 transport._read_ready() 586 587 self.protocol.data_received.assert_called_with(b'data') 588 589 def test_read_ready_eof(self): 590 transport = self.socket_transport() 591 transport.close = mock.Mock() 592 593 self.sock.recv.return_value = b'' 594 transport._read_ready() 595 596 self.protocol.eof_received.assert_called_with() 597 transport.close.assert_called_with() 598 599 def test_read_ready_eof_keep_open(self): 600 transport = self.socket_transport() 601 transport.close = mock.Mock() 602 603 self.sock.recv.return_value = b'' 604 self.protocol.eof_received.return_value = True 605 transport._read_ready() 606 607 self.protocol.eof_received.assert_called_with() 608 self.assertFalse(transport.close.called) 609 610 @mock.patch('logging.exception') 611 def test_read_ready_tryagain(self, m_exc): 612 self.sock.recv.side_effect = BlockingIOError 613 614 transport = self.socket_transport() 615 transport._fatal_error = mock.Mock() 616 transport._read_ready() 617 618 self.assertFalse(transport._fatal_error.called) 619 620 @mock.patch('logging.exception') 621 def test_read_ready_tryagain_interrupted(self, m_exc): 622 self.sock.recv.side_effect = InterruptedError 623 624 transport = self.socket_transport() 625 transport._fatal_error = mock.Mock() 626 transport._read_ready() 627 628 self.assertFalse(transport._fatal_error.called) 629 630 @mock.patch('logging.exception') 631 def test_read_ready_conn_reset(self, m_exc): 632 err = self.sock.recv.side_effect = ConnectionResetError() 633 634 transport = self.socket_transport() 635 transport._force_close = mock.Mock() 636 with test_utils.disable_logger(): 637 transport._read_ready() 638 transport._force_close.assert_called_with(err) 639 640 @mock.patch('logging.exception') 641 def test_read_ready_err(self, m_exc): 642 err = self.sock.recv.side_effect = OSError() 643 644 transport = self.socket_transport() 645 transport._fatal_error = mock.Mock() 646 transport._read_ready() 647 648 transport._fatal_error.assert_called_with( 649 err, 650 'Fatal read error on socket transport') 651 652 def test_write(self): 653 data = b'data' 654 self.sock.send.return_value = len(data) 655 656 transport = self.socket_transport() 657 transport.write(data) 658 self.sock.send.assert_called_with(data) 659 660 def test_write_bytearray(self): 661 data = bytearray(b'data') 662 self.sock.send.return_value = len(data) 663 664 transport = self.socket_transport() 665 transport.write(data) 666 self.sock.send.assert_called_with(data) 667 self.assertEqual(data, bytearray(b'data')) # Hasn't been mutated. 668 669 def test_write_memoryview(self): 670 data = memoryview(b'data') 671 self.sock.send.return_value = len(data) 672 673 transport = self.socket_transport() 674 transport.write(data) 675 self.sock.send.assert_called_with(data) 676 677 def test_write_no_data(self): 678 transport = self.socket_transport() 679 transport._buffer.extend(b'data') 680 transport.write(b'') 681 self.assertFalse(self.sock.send.called) 682 self.assertEqual(list_to_buffer([b'data']), transport._buffer) 683 684 def test_write_buffer(self): 685 transport = self.socket_transport() 686 transport._buffer.extend(b'data1') 687 transport.write(b'data2') 688 self.assertFalse(self.sock.send.called) 689 self.assertEqual(list_to_buffer([b'data1', b'data2']), 690 transport._buffer) 691 692 def test_write_partial(self): 693 data = b'data' 694 self.sock.send.return_value = 2 695 696 transport = self.socket_transport() 697 transport.write(data) 698 699 self.loop.assert_writer(7, transport._write_ready) 700 self.assertEqual(list_to_buffer([b'ta']), transport._buffer) 701 702 def test_write_partial_bytearray(self): 703 data = bytearray(b'data') 704 self.sock.send.return_value = 2 705 706 transport = self.socket_transport() 707 transport.write(data) 708 709 self.loop.assert_writer(7, transport._write_ready) 710 self.assertEqual(list_to_buffer([b'ta']), transport._buffer) 711 self.assertEqual(data, bytearray(b'data')) # Hasn't been mutated. 712 713 def test_write_partial_memoryview(self): 714 data = memoryview(b'data') 715 self.sock.send.return_value = 2 716 717 transport = self.socket_transport() 718 transport.write(data) 719 720 self.loop.assert_writer(7, transport._write_ready) 721 self.assertEqual(list_to_buffer([b'ta']), transport._buffer) 722 723 def test_write_partial_none(self): 724 data = b'data' 725 self.sock.send.return_value = 0 726 self.sock.fileno.return_value = 7 727 728 transport = self.socket_transport() 729 transport.write(data) 730 731 self.loop.assert_writer(7, transport._write_ready) 732 self.assertEqual(list_to_buffer([b'data']), transport._buffer) 733 734 def test_write_tryagain(self): 735 self.sock.send.side_effect = BlockingIOError 736 737 data = b'data' 738 transport = self.socket_transport() 739 transport.write(data) 740 741 self.loop.assert_writer(7, transport._write_ready) 742 self.assertEqual(list_to_buffer([b'data']), transport._buffer) 743 744 @mock.patch('asyncio.selector_events.logger') 745 def test_write_exception(self, m_log): 746 err = self.sock.send.side_effect = OSError() 747 748 data = b'data' 749 transport = self.socket_transport() 750 transport._fatal_error = mock.Mock() 751 transport.write(data) 752 transport._fatal_error.assert_called_with( 753 err, 754 'Fatal write error on socket transport') 755 transport._conn_lost = 1 756 757 self.sock.reset_mock() 758 transport.write(data) 759 self.assertFalse(self.sock.send.called) 760 self.assertEqual(transport._conn_lost, 2) 761 transport.write(data) 762 transport.write(data) 763 transport.write(data) 764 transport.write(data) 765 m_log.warning.assert_called_with('socket.send() raised exception.') 766 767 def test_write_str(self): 768 transport = self.socket_transport() 769 self.assertRaises(TypeError, transport.write, 'str') 770 771 def test_write_closing(self): 772 transport = self.socket_transport() 773 transport.close() 774 self.assertEqual(transport._conn_lost, 1) 775 transport.write(b'data') 776 self.assertEqual(transport._conn_lost, 2) 777 778 def test_write_ready(self): 779 data = b'data' 780 self.sock.send.return_value = len(data) 781 782 transport = self.socket_transport() 783 transport._buffer.extend(data) 784 self.loop._add_writer(7, transport._write_ready) 785 transport._write_ready() 786 self.assertTrue(self.sock.send.called) 787 self.assertFalse(self.loop.writers) 788 789 def test_write_ready_closing(self): 790 data = b'data' 791 self.sock.send.return_value = len(data) 792 793 transport = self.socket_transport() 794 transport._closing = True 795 transport._buffer.extend(data) 796 self.loop._add_writer(7, transport._write_ready) 797 transport._write_ready() 798 self.assertTrue(self.sock.send.called) 799 self.assertFalse(self.loop.writers) 800 self.sock.close.assert_called_with() 801 self.protocol.connection_lost.assert_called_with(None) 802 803 @unittest.skipIf(sys.flags.optimize, "Assertions are disabled in optimized mode") 804 def test_write_ready_no_data(self): 805 transport = self.socket_transport() 806 # This is an internal error. 807 self.assertRaises(AssertionError, transport._write_ready) 808 809 def test_write_ready_partial(self): 810 data = b'data' 811 self.sock.send.return_value = 2 812 813 transport = self.socket_transport() 814 transport._buffer.extend(data) 815 self.loop._add_writer(7, transport._write_ready) 816 transport._write_ready() 817 self.loop.assert_writer(7, transport._write_ready) 818 self.assertEqual(list_to_buffer([b'ta']), transport._buffer) 819 820 def test_write_ready_partial_none(self): 821 data = b'data' 822 self.sock.send.return_value = 0 823 824 transport = self.socket_transport() 825 transport._buffer.extend(data) 826 self.loop._add_writer(7, transport._write_ready) 827 transport._write_ready() 828 self.loop.assert_writer(7, transport._write_ready) 829 self.assertEqual(list_to_buffer([b'data']), transport._buffer) 830 831 def test_write_ready_tryagain(self): 832 self.sock.send.side_effect = BlockingIOError 833 834 transport = self.socket_transport() 835 transport._buffer = list_to_buffer([b'data1', b'data2']) 836 self.loop._add_writer(7, transport._write_ready) 837 transport._write_ready() 838 839 self.loop.assert_writer(7, transport._write_ready) 840 self.assertEqual(list_to_buffer([b'data1data2']), transport._buffer) 841 842 def test_write_ready_exception(self): 843 err = self.sock.send.side_effect = OSError() 844 845 transport = self.socket_transport() 846 transport._fatal_error = mock.Mock() 847 transport._buffer.extend(b'data') 848 transport._write_ready() 849 transport._fatal_error.assert_called_with( 850 err, 851 'Fatal write error on socket transport') 852 853 def test_write_eof(self): 854 tr = self.socket_transport() 855 self.assertTrue(tr.can_write_eof()) 856 tr.write_eof() 857 self.sock.shutdown.assert_called_with(socket.SHUT_WR) 858 tr.write_eof() 859 self.assertEqual(self.sock.shutdown.call_count, 1) 860 tr.close() 861 862 def test_write_eof_buffer(self): 863 tr = self.socket_transport() 864 self.sock.send.side_effect = BlockingIOError 865 tr.write(b'data') 866 tr.write_eof() 867 self.assertEqual(tr._buffer, list_to_buffer([b'data'])) 868 self.assertTrue(tr._eof) 869 self.assertFalse(self.sock.shutdown.called) 870 self.sock.send.side_effect = lambda _: 4 871 tr._write_ready() 872 self.assertTrue(self.sock.send.called) 873 self.sock.shutdown.assert_called_with(socket.SHUT_WR) 874 tr.close() 875 876 def test_write_eof_after_close(self): 877 tr = self.socket_transport() 878 tr.close() 879 self.loop.run_until_complete(asyncio.sleep(0)) 880 tr.write_eof() 881 882 @mock.patch('asyncio.base_events.logger') 883 def test_transport_close_remove_writer(self, m_log): 884 remove_writer = self.loop._remove_writer = mock.Mock() 885 886 transport = self.socket_transport() 887 transport.close() 888 remove_writer.assert_called_with(self.sock_fd) 889 890 891class SelectorSocketTransportBufferedProtocolTests(test_utils.TestCase): 892 893 def setUp(self): 894 super().setUp() 895 self.loop = self.new_test_loop() 896 897 self.protocol = test_utils.make_test_protocol(asyncio.BufferedProtocol) 898 self.buf = bytearray(1) 899 self.protocol.get_buffer.side_effect = lambda hint: self.buf 900 901 self.sock = mock.Mock(socket.socket) 902 self.sock_fd = self.sock.fileno.return_value = 7 903 904 def socket_transport(self, waiter=None): 905 transport = _SelectorSocketTransport(self.loop, self.sock, 906 self.protocol, waiter=waiter) 907 self.addCleanup(close_transport, transport) 908 return transport 909 910 def test_ctor(self): 911 waiter = self.loop.create_future() 912 tr = self.socket_transport(waiter=waiter) 913 self.loop.run_until_complete(waiter) 914 915 self.loop.assert_reader(7, tr._read_ready) 916 test_utils.run_briefly(self.loop) 917 self.protocol.connection_made.assert_called_with(tr) 918 919 def test_get_buffer_error(self): 920 transport = self.socket_transport() 921 transport._fatal_error = mock.Mock() 922 923 self.loop.call_exception_handler = mock.Mock() 924 self.protocol.get_buffer.side_effect = LookupError() 925 926 transport._read_ready() 927 928 self.assertTrue(transport._fatal_error.called) 929 self.assertTrue(self.protocol.get_buffer.called) 930 self.assertFalse(self.protocol.buffer_updated.called) 931 932 def test_get_buffer_zerosized(self): 933 transport = self.socket_transport() 934 transport._fatal_error = mock.Mock() 935 936 self.loop.call_exception_handler = mock.Mock() 937 self.protocol.get_buffer.side_effect = lambda hint: bytearray(0) 938 939 transport._read_ready() 940 941 self.assertTrue(transport._fatal_error.called) 942 self.assertTrue(self.protocol.get_buffer.called) 943 self.assertFalse(self.protocol.buffer_updated.called) 944 945 def test_proto_type_switch(self): 946 self.protocol = test_utils.make_test_protocol(asyncio.Protocol) 947 transport = self.socket_transport() 948 949 self.sock.recv.return_value = b'data' 950 transport._read_ready() 951 952 self.protocol.data_received.assert_called_with(b'data') 953 954 # switch protocol to a BufferedProtocol 955 956 buf_proto = test_utils.make_test_protocol(asyncio.BufferedProtocol) 957 buf = bytearray(4) 958 buf_proto.get_buffer.side_effect = lambda hint: buf 959 960 transport.set_protocol(buf_proto) 961 962 self.sock.recv_into.return_value = 10 963 transport._read_ready() 964 965 buf_proto.get_buffer.assert_called_with(-1) 966 buf_proto.buffer_updated.assert_called_with(10) 967 968 def test_buffer_updated_error(self): 969 transport = self.socket_transport() 970 transport._fatal_error = mock.Mock() 971 972 self.loop.call_exception_handler = mock.Mock() 973 self.protocol.buffer_updated.side_effect = LookupError() 974 975 self.sock.recv_into.return_value = 10 976 transport._read_ready() 977 978 self.assertTrue(transport._fatal_error.called) 979 self.assertTrue(self.protocol.get_buffer.called) 980 self.assertTrue(self.protocol.buffer_updated.called) 981 982 def test_read_eof_received_error(self): 983 transport = self.socket_transport() 984 transport.close = mock.Mock() 985 transport._fatal_error = mock.Mock() 986 987 self.loop.call_exception_handler = mock.Mock() 988 989 self.protocol.eof_received.side_effect = LookupError() 990 991 self.sock.recv_into.return_value = 0 992 transport._read_ready() 993 994 self.protocol.eof_received.assert_called_with() 995 self.assertTrue(transport._fatal_error.called) 996 997 def test_read_ready(self): 998 transport = self.socket_transport() 999 1000 self.sock.recv_into.return_value = 10 1001 transport._read_ready() 1002 1003 self.protocol.get_buffer.assert_called_with(-1) 1004 self.protocol.buffer_updated.assert_called_with(10) 1005 1006 def test_read_ready_eof(self): 1007 transport = self.socket_transport() 1008 transport.close = mock.Mock() 1009 1010 self.sock.recv_into.return_value = 0 1011 transport._read_ready() 1012 1013 self.protocol.eof_received.assert_called_with() 1014 transport.close.assert_called_with() 1015 1016 def test_read_ready_eof_keep_open(self): 1017 transport = self.socket_transport() 1018 transport.close = mock.Mock() 1019 1020 self.sock.recv_into.return_value = 0 1021 self.protocol.eof_received.return_value = True 1022 transport._read_ready() 1023 1024 self.protocol.eof_received.assert_called_with() 1025 self.assertFalse(transport.close.called) 1026 1027 @mock.patch('logging.exception') 1028 def test_read_ready_tryagain(self, m_exc): 1029 self.sock.recv_into.side_effect = BlockingIOError 1030 1031 transport = self.socket_transport() 1032 transport._fatal_error = mock.Mock() 1033 transport._read_ready() 1034 1035 self.assertFalse(transport._fatal_error.called) 1036 1037 @mock.patch('logging.exception') 1038 def test_read_ready_tryagain_interrupted(self, m_exc): 1039 self.sock.recv_into.side_effect = InterruptedError 1040 1041 transport = self.socket_transport() 1042 transport._fatal_error = mock.Mock() 1043 transport._read_ready() 1044 1045 self.assertFalse(transport._fatal_error.called) 1046 1047 @mock.patch('logging.exception') 1048 def test_read_ready_conn_reset(self, m_exc): 1049 err = self.sock.recv_into.side_effect = ConnectionResetError() 1050 1051 transport = self.socket_transport() 1052 transport._force_close = mock.Mock() 1053 with test_utils.disable_logger(): 1054 transport._read_ready() 1055 transport._force_close.assert_called_with(err) 1056 1057 @mock.patch('logging.exception') 1058 def test_read_ready_err(self, m_exc): 1059 err = self.sock.recv_into.side_effect = OSError() 1060 1061 transport = self.socket_transport() 1062 transport._fatal_error = mock.Mock() 1063 transport._read_ready() 1064 1065 transport._fatal_error.assert_called_with( 1066 err, 1067 'Fatal read error on socket transport') 1068 1069 1070class SelectorDatagramTransportTests(test_utils.TestCase): 1071 1072 def setUp(self): 1073 super().setUp() 1074 self.loop = self.new_test_loop() 1075 self.protocol = test_utils.make_test_protocol(asyncio.DatagramProtocol) 1076 self.sock = mock.Mock(spec_set=socket.socket) 1077 self.sock.fileno.return_value = 7 1078 1079 def datagram_transport(self, address=None): 1080 self.sock.getpeername.side_effect = None if address else OSError 1081 transport = _SelectorDatagramTransport(self.loop, self.sock, 1082 self.protocol, 1083 address=address) 1084 self.addCleanup(close_transport, transport) 1085 return transport 1086 1087 def test_read_ready(self): 1088 transport = self.datagram_transport() 1089 1090 self.sock.recvfrom.return_value = (b'data', ('0.0.0.0', 1234)) 1091 transport._read_ready() 1092 1093 self.protocol.datagram_received.assert_called_with( 1094 b'data', ('0.0.0.0', 1234)) 1095 1096 def test_read_ready_tryagain(self): 1097 transport = self.datagram_transport() 1098 1099 self.sock.recvfrom.side_effect = BlockingIOError 1100 transport._fatal_error = mock.Mock() 1101 transport._read_ready() 1102 1103 self.assertFalse(transport._fatal_error.called) 1104 1105 def test_read_ready_err(self): 1106 transport = self.datagram_transport() 1107 1108 err = self.sock.recvfrom.side_effect = RuntimeError() 1109 transport._fatal_error = mock.Mock() 1110 transport._read_ready() 1111 1112 transport._fatal_error.assert_called_with( 1113 err, 1114 'Fatal read error on datagram transport') 1115 1116 def test_read_ready_oserr(self): 1117 transport = self.datagram_transport() 1118 1119 err = self.sock.recvfrom.side_effect = OSError() 1120 transport._fatal_error = mock.Mock() 1121 transport._read_ready() 1122 1123 self.assertFalse(transport._fatal_error.called) 1124 self.protocol.error_received.assert_called_with(err) 1125 1126 def test_sendto(self): 1127 data = b'data' 1128 transport = self.datagram_transport() 1129 transport.sendto(data, ('0.0.0.0', 1234)) 1130 self.assertTrue(self.sock.sendto.called) 1131 self.assertEqual( 1132 self.sock.sendto.call_args[0], (data, ('0.0.0.0', 1234))) 1133 1134 def test_sendto_bytearray(self): 1135 data = bytearray(b'data') 1136 transport = self.datagram_transport() 1137 transport.sendto(data, ('0.0.0.0', 1234)) 1138 self.assertTrue(self.sock.sendto.called) 1139 self.assertEqual( 1140 self.sock.sendto.call_args[0], (data, ('0.0.0.0', 1234))) 1141 1142 def test_sendto_memoryview(self): 1143 data = memoryview(b'data') 1144 transport = self.datagram_transport() 1145 transport.sendto(data, ('0.0.0.0', 1234)) 1146 self.assertTrue(self.sock.sendto.called) 1147 self.assertEqual( 1148 self.sock.sendto.call_args[0], (data, ('0.0.0.0', 1234))) 1149 1150 def test_sendto_no_data(self): 1151 transport = self.datagram_transport() 1152 transport._buffer.append((b'data', ('0.0.0.0', 12345))) 1153 transport.sendto(b'', ()) 1154 self.assertFalse(self.sock.sendto.called) 1155 self.assertEqual( 1156 [(b'data', ('0.0.0.0', 12345))], list(transport._buffer)) 1157 1158 def test_sendto_buffer(self): 1159 transport = self.datagram_transport() 1160 transport._buffer.append((b'data1', ('0.0.0.0', 12345))) 1161 transport.sendto(b'data2', ('0.0.0.0', 12345)) 1162 self.assertFalse(self.sock.sendto.called) 1163 self.assertEqual( 1164 [(b'data1', ('0.0.0.0', 12345)), 1165 (b'data2', ('0.0.0.0', 12345))], 1166 list(transport._buffer)) 1167 1168 def test_sendto_buffer_bytearray(self): 1169 data2 = bytearray(b'data2') 1170 transport = self.datagram_transport() 1171 transport._buffer.append((b'data1', ('0.0.0.0', 12345))) 1172 transport.sendto(data2, ('0.0.0.0', 12345)) 1173 self.assertFalse(self.sock.sendto.called) 1174 self.assertEqual( 1175 [(b'data1', ('0.0.0.0', 12345)), 1176 (b'data2', ('0.0.0.0', 12345))], 1177 list(transport._buffer)) 1178 self.assertIsInstance(transport._buffer[1][0], bytes) 1179 1180 def test_sendto_buffer_memoryview(self): 1181 data2 = memoryview(b'data2') 1182 transport = self.datagram_transport() 1183 transport._buffer.append((b'data1', ('0.0.0.0', 12345))) 1184 transport.sendto(data2, ('0.0.0.0', 12345)) 1185 self.assertFalse(self.sock.sendto.called) 1186 self.assertEqual( 1187 [(b'data1', ('0.0.0.0', 12345)), 1188 (b'data2', ('0.0.0.0', 12345))], 1189 list(transport._buffer)) 1190 self.assertIsInstance(transport._buffer[1][0], bytes) 1191 1192 def test_sendto_tryagain(self): 1193 data = b'data' 1194 1195 self.sock.sendto.side_effect = BlockingIOError 1196 1197 transport = self.datagram_transport() 1198 transport.sendto(data, ('0.0.0.0', 12345)) 1199 1200 self.loop.assert_writer(7, transport._sendto_ready) 1201 self.assertEqual( 1202 [(b'data', ('0.0.0.0', 12345))], list(transport._buffer)) 1203 1204 @mock.patch('asyncio.selector_events.logger') 1205 def test_sendto_exception(self, m_log): 1206 data = b'data' 1207 err = self.sock.sendto.side_effect = RuntimeError() 1208 1209 transport = self.datagram_transport() 1210 transport._fatal_error = mock.Mock() 1211 transport.sendto(data, ()) 1212 1213 self.assertTrue(transport._fatal_error.called) 1214 transport._fatal_error.assert_called_with( 1215 err, 1216 'Fatal write error on datagram transport') 1217 transport._conn_lost = 1 1218 1219 transport._address = ('123',) 1220 transport.sendto(data) 1221 transport.sendto(data) 1222 transport.sendto(data) 1223 transport.sendto(data) 1224 transport.sendto(data) 1225 m_log.warning.assert_called_with('socket.send() raised exception.') 1226 1227 def test_sendto_error_received(self): 1228 data = b'data' 1229 1230 self.sock.sendto.side_effect = ConnectionRefusedError 1231 1232 transport = self.datagram_transport() 1233 transport._fatal_error = mock.Mock() 1234 transport.sendto(data, ()) 1235 1236 self.assertEqual(transport._conn_lost, 0) 1237 self.assertFalse(transport._fatal_error.called) 1238 1239 def test_sendto_error_received_connected(self): 1240 data = b'data' 1241 1242 self.sock.send.side_effect = ConnectionRefusedError 1243 1244 transport = self.datagram_transport(address=('0.0.0.0', 1)) 1245 transport._fatal_error = mock.Mock() 1246 transport.sendto(data) 1247 1248 self.assertFalse(transport._fatal_error.called) 1249 self.assertTrue(self.protocol.error_received.called) 1250 1251 def test_sendto_str(self): 1252 transport = self.datagram_transport() 1253 self.assertRaises(TypeError, transport.sendto, 'str', ()) 1254 1255 def test_sendto_connected_addr(self): 1256 transport = self.datagram_transport(address=('0.0.0.0', 1)) 1257 self.assertRaises( 1258 ValueError, transport.sendto, b'str', ('0.0.0.0', 2)) 1259 1260 def test_sendto_closing(self): 1261 transport = self.datagram_transport(address=(1,)) 1262 transport.close() 1263 self.assertEqual(transport._conn_lost, 1) 1264 transport.sendto(b'data', (1,)) 1265 self.assertEqual(transport._conn_lost, 2) 1266 1267 def test_sendto_ready(self): 1268 data = b'data' 1269 self.sock.sendto.return_value = len(data) 1270 1271 transport = self.datagram_transport() 1272 transport._buffer.append((data, ('0.0.0.0', 12345))) 1273 self.loop._add_writer(7, transport._sendto_ready) 1274 transport._sendto_ready() 1275 self.assertTrue(self.sock.sendto.called) 1276 self.assertEqual( 1277 self.sock.sendto.call_args[0], (data, ('0.0.0.0', 12345))) 1278 self.assertFalse(self.loop.writers) 1279 1280 def test_sendto_ready_closing(self): 1281 data = b'data' 1282 self.sock.send.return_value = len(data) 1283 1284 transport = self.datagram_transport() 1285 transport._closing = True 1286 transport._buffer.append((data, ())) 1287 self.loop._add_writer(7, transport._sendto_ready) 1288 transport._sendto_ready() 1289 self.sock.sendto.assert_called_with(data, ()) 1290 self.assertFalse(self.loop.writers) 1291 self.sock.close.assert_called_with() 1292 self.protocol.connection_lost.assert_called_with(None) 1293 1294 def test_sendto_ready_no_data(self): 1295 transport = self.datagram_transport() 1296 self.loop._add_writer(7, transport._sendto_ready) 1297 transport._sendto_ready() 1298 self.assertFalse(self.sock.sendto.called) 1299 self.assertFalse(self.loop.writers) 1300 1301 def test_sendto_ready_tryagain(self): 1302 self.sock.sendto.side_effect = BlockingIOError 1303 1304 transport = self.datagram_transport() 1305 transport._buffer.extend([(b'data1', ()), (b'data2', ())]) 1306 self.loop._add_writer(7, transport._sendto_ready) 1307 transport._sendto_ready() 1308 1309 self.loop.assert_writer(7, transport._sendto_ready) 1310 self.assertEqual( 1311 [(b'data1', ()), (b'data2', ())], 1312 list(transport._buffer)) 1313 1314 def test_sendto_ready_exception(self): 1315 err = self.sock.sendto.side_effect = RuntimeError() 1316 1317 transport = self.datagram_transport() 1318 transport._fatal_error = mock.Mock() 1319 transport._buffer.append((b'data', ())) 1320 transport._sendto_ready() 1321 1322 transport._fatal_error.assert_called_with( 1323 err, 1324 'Fatal write error on datagram transport') 1325 1326 def test_sendto_ready_error_received(self): 1327 self.sock.sendto.side_effect = ConnectionRefusedError 1328 1329 transport = self.datagram_transport() 1330 transport._fatal_error = mock.Mock() 1331 transport._buffer.append((b'data', ())) 1332 transport._sendto_ready() 1333 1334 self.assertFalse(transport._fatal_error.called) 1335 1336 def test_sendto_ready_error_received_connection(self): 1337 self.sock.send.side_effect = ConnectionRefusedError 1338 1339 transport = self.datagram_transport(address=('0.0.0.0', 1)) 1340 transport._fatal_error = mock.Mock() 1341 transport._buffer.append((b'data', ())) 1342 transport._sendto_ready() 1343 1344 self.assertFalse(transport._fatal_error.called) 1345 self.assertTrue(self.protocol.error_received.called) 1346 1347 @mock.patch('asyncio.base_events.logger.error') 1348 def test_fatal_error_connected(self, m_exc): 1349 transport = self.datagram_transport(address=('0.0.0.0', 1)) 1350 err = ConnectionRefusedError() 1351 transport._fatal_error(err) 1352 self.assertFalse(self.protocol.error_received.called) 1353 m_exc.assert_not_called() 1354 1355 @mock.patch('asyncio.base_events.logger.error') 1356 def test_fatal_error_connected_custom_error(self, m_exc): 1357 class MyException(Exception): 1358 pass 1359 transport = self.datagram_transport(address=('0.0.0.0', 1)) 1360 err = MyException() 1361 transport._fatal_error(err) 1362 self.assertFalse(self.protocol.error_received.called) 1363 m_exc.assert_called_with( 1364 test_utils.MockPattern( 1365 'Fatal error on transport\nprotocol:.*\ntransport:.*'), 1366 exc_info=(MyException, MOCK_ANY, MOCK_ANY)) 1367 1368 1369if __name__ == '__main__': 1370 unittest.main() 1371