1 """Tests for unix_events.py.""" 2 3 import contextlib 4 import errno 5 import io 6 import os 7 import pathlib 8 import signal 9 import socket 10 import stat 11 import sys 12 import tempfile 13 import threading 14 import unittest 15 from unittest import mock 16 from test.support import os_helper 17 from test.support import socket_helper 18 19 if sys.platform == 'win32': 20 raise unittest.SkipTest('UNIX only') 21 22 23 import asyncio 24 from asyncio import log 25 from asyncio import unix_events 26 from test.test_asyncio import utils as test_utils 27 28 29 def tearDownModule(): 30 asyncio.set_event_loop_policy(None) 31 32 33 MOCK_ANY = mock.ANY 34 35 36 def EXITCODE(exitcode): 37 return 32768 + exitcode 38 39 40 def SIGNAL(signum): 41 if not 1 <= signum <= 68: 42 raise AssertionError(f'invalid signum {signum}') 43 return 32768 - signum 44 45 46 def close_pipe_transport(transport): 47 # Don't call transport.close() because the event loop and the selector 48 # are mocked 49 if transport._pipe is None: 50 return 51 transport._pipe.close() 52 transport._pipe = None 53 54 55 @unittest.skipUnless(signal, 'Signals are not supported') 56 class SelectorEventLoopSignalTests(test_utils.TestCase): 57 58 def setUp(self): 59 super().setUp() 60 self.loop = asyncio.SelectorEventLoop() 61 self.set_event_loop(self.loop) 62 63 def test_check_signal(self): 64 self.assertRaises( 65 TypeError, self.loop._check_signal, '1') 66 self.assertRaises( 67 ValueError, self.loop._check_signal, signal.NSIG + 1) 68 69 def test_handle_signal_no_handler(self): 70 self.loop._handle_signal(signal.NSIG + 1) 71 72 def test_handle_signal_cancelled_handler(self): 73 h = asyncio.Handle(mock.Mock(), (), 74 loop=mock.Mock()) 75 h.cancel() 76 self.loop._signal_handlers[signal.NSIG + 1] = h 77 self.loop.remove_signal_handler = mock.Mock() 78 self.loop._handle_signal(signal.NSIG + 1) 79 self.loop.remove_signal_handler.assert_called_with(signal.NSIG + 1) 80 81 @mock.patch('asyncio.unix_events.signal') 82 def test_add_signal_handler_setup_error(self, m_signal): 83 m_signal.NSIG = signal.NSIG 84 m_signal.valid_signals = signal.valid_signals 85 m_signal.set_wakeup_fd.side_effect = ValueError 86 87 self.assertRaises( 88 RuntimeError, 89 self.loop.add_signal_handler, 90 signal.SIGINT, lambda: True) 91 92 @mock.patch('asyncio.unix_events.signal') 93 def test_add_signal_handler_coroutine_error(self, m_signal): 94 m_signal.NSIG = signal.NSIG 95 96 async def simple_coroutine(): 97 pass 98 99 # callback must not be a coroutine function 100 coro_func = simple_coroutine 101 coro_obj = coro_func() 102 self.addCleanup(coro_obj.close) 103 for func in (coro_func, coro_obj): 104 self.assertRaisesRegex( 105 TypeError, 'coroutines cannot be used with add_signal_handler', 106 self.loop.add_signal_handler, 107 signal.SIGINT, func) 108 109 @mock.patch('asyncio.unix_events.signal') 110 def test_add_signal_handler(self, m_signal): 111 m_signal.NSIG = signal.NSIG 112 m_signal.valid_signals = signal.valid_signals 113 114 cb = lambda: True 115 self.loop.add_signal_handler(signal.SIGHUP, cb) 116 h = self.loop._signal_handlers.get(signal.SIGHUP) 117 self.assertIsInstance(h, asyncio.Handle) 118 self.assertEqual(h._callback, cb) 119 120 @mock.patch('asyncio.unix_events.signal') 121 def test_add_signal_handler_install_error(self, m_signal): 122 m_signal.NSIG = signal.NSIG 123 m_signal.valid_signals = signal.valid_signals 124 125 def set_wakeup_fd(fd): 126 if fd == -1: 127 raise ValueError() 128 m_signal.set_wakeup_fd = set_wakeup_fd 129 130 class Err(OSError): 131 errno = errno.EFAULT 132 m_signal.signal.side_effect = Err 133 134 self.assertRaises( 135 Err, 136 self.loop.add_signal_handler, 137 signal.SIGINT, lambda: True) 138 139 @mock.patch('asyncio.unix_events.signal') 140 @mock.patch('asyncio.base_events.logger') 141 def test_add_signal_handler_install_error2(self, m_logging, m_signal): 142 m_signal.NSIG = signal.NSIG 143 m_signal.valid_signals = signal.valid_signals 144 145 class Err(OSError): 146 errno = errno.EINVAL 147 m_signal.signal.side_effect = Err 148 149 self.loop._signal_handlers[signal.SIGHUP] = lambda: True 150 self.assertRaises( 151 RuntimeError, 152 self.loop.add_signal_handler, 153 signal.SIGINT, lambda: True) 154 self.assertFalse(m_logging.info.called) 155 self.assertEqual(1, m_signal.set_wakeup_fd.call_count) 156 157 @mock.patch('asyncio.unix_events.signal') 158 @mock.patch('asyncio.base_events.logger') 159 def test_add_signal_handler_install_error3(self, m_logging, m_signal): 160 class Err(OSError): 161 errno = errno.EINVAL 162 m_signal.signal.side_effect = Err 163 m_signal.NSIG = signal.NSIG 164 m_signal.valid_signals = signal.valid_signals 165 166 self.assertRaises( 167 RuntimeError, 168 self.loop.add_signal_handler, 169 signal.SIGINT, lambda: True) 170 self.assertFalse(m_logging.info.called) 171 self.assertEqual(2, m_signal.set_wakeup_fd.call_count) 172 173 @mock.patch('asyncio.unix_events.signal') 174 def test_remove_signal_handler(self, m_signal): 175 m_signal.NSIG = signal.NSIG 176 m_signal.valid_signals = signal.valid_signals 177 178 self.loop.add_signal_handler(signal.SIGHUP, lambda: True) 179 180 self.assertTrue( 181 self.loop.remove_signal_handler(signal.SIGHUP)) 182 self.assertTrue(m_signal.set_wakeup_fd.called) 183 self.assertTrue(m_signal.signal.called) 184 self.assertEqual( 185 (signal.SIGHUP, m_signal.SIG_DFL), m_signal.signal.call_args[0]) 186 187 @mock.patch('asyncio.unix_events.signal') 188 def test_remove_signal_handler_2(self, m_signal): 189 m_signal.NSIG = signal.NSIG 190 m_signal.SIGINT = signal.SIGINT 191 m_signal.valid_signals = signal.valid_signals 192 193 self.loop.add_signal_handler(signal.SIGINT, lambda: True) 194 self.loop._signal_handlers[signal.SIGHUP] = object() 195 m_signal.set_wakeup_fd.reset_mock() 196 197 self.assertTrue( 198 self.loop.remove_signal_handler(signal.SIGINT)) 199 self.assertFalse(m_signal.set_wakeup_fd.called) 200 self.assertTrue(m_signal.signal.called) 201 self.assertEqual( 202 (signal.SIGINT, m_signal.default_int_handler), 203 m_signal.signal.call_args[0]) 204 205 @mock.patch('asyncio.unix_events.signal') 206 @mock.patch('asyncio.base_events.logger') 207 def test_remove_signal_handler_cleanup_error(self, m_logging, m_signal): 208 m_signal.NSIG = signal.NSIG 209 m_signal.valid_signals = signal.valid_signals 210 self.loop.add_signal_handler(signal.SIGHUP, lambda: True) 211 212 m_signal.set_wakeup_fd.side_effect = ValueError 213 214 self.loop.remove_signal_handler(signal.SIGHUP) 215 self.assertTrue(m_logging.info) 216 217 @mock.patch('asyncio.unix_events.signal') 218 def test_remove_signal_handler_error(self, m_signal): 219 m_signal.NSIG = signal.NSIG 220 m_signal.valid_signals = signal.valid_signals 221 self.loop.add_signal_handler(signal.SIGHUP, lambda: True) 222 223 m_signal.signal.side_effect = OSError 224 225 self.assertRaises( 226 OSError, self.loop.remove_signal_handler, signal.SIGHUP) 227 228 @mock.patch('asyncio.unix_events.signal') 229 def test_remove_signal_handler_error2(self, m_signal): 230 m_signal.NSIG = signal.NSIG 231 m_signal.valid_signals = signal.valid_signals 232 self.loop.add_signal_handler(signal.SIGHUP, lambda: True) 233 234 class Err(OSError): 235 errno = errno.EINVAL 236 m_signal.signal.side_effect = Err 237 238 self.assertRaises( 239 RuntimeError, self.loop.remove_signal_handler, signal.SIGHUP) 240 241 @mock.patch('asyncio.unix_events.signal') 242 def test_close(self, m_signal): 243 m_signal.NSIG = signal.NSIG 244 m_signal.valid_signals = signal.valid_signals 245 246 self.loop.add_signal_handler(signal.SIGHUP, lambda: True) 247 self.loop.add_signal_handler(signal.SIGCHLD, lambda: True) 248 249 self.assertEqual(len(self.loop._signal_handlers), 2) 250 251 m_signal.set_wakeup_fd.reset_mock() 252 253 self.loop.close() 254 255 self.assertEqual(len(self.loop._signal_handlers), 0) 256 m_signal.set_wakeup_fd.assert_called_once_with(-1) 257 258 @mock.patch('asyncio.unix_events.sys') 259 @mock.patch('asyncio.unix_events.signal') 260 def test_close_on_finalizing(self, m_signal, m_sys): 261 m_signal.NSIG = signal.NSIG 262 m_signal.valid_signals = signal.valid_signals 263 self.loop.add_signal_handler(signal.SIGHUP, lambda: True) 264 265 self.assertEqual(len(self.loop._signal_handlers), 1) 266 m_sys.is_finalizing.return_value = True 267 m_signal.signal.reset_mock() 268 269 with self.assertWarnsRegex(ResourceWarning, 270 "skipping signal handlers removal"): 271 self.loop.close() 272 273 self.assertEqual(len(self.loop._signal_handlers), 0) 274 self.assertFalse(m_signal.signal.called) 275 276 277 @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 278 'UNIX Sockets are not supported') 279 class SelectorEventLoopUnixSocketTests(test_utils.TestCase): 280 281 def setUp(self): 282 super().setUp() 283 self.loop = asyncio.SelectorEventLoop() 284 self.set_event_loop(self.loop) 285 286 @socket_helper.skip_unless_bind_unix_socket 287 def test_create_unix_server_existing_path_sock(self): 288 with test_utils.unix_socket_path() as path: 289 sock = socket.socket(socket.AF_UNIX) 290 sock.bind(path) 291 sock.listen(1) 292 sock.close() 293 294 coro = self.loop.create_unix_server(lambda: None, path) 295 srv = self.loop.run_until_complete(coro) 296 srv.close() 297 self.loop.run_until_complete(srv.wait_closed()) 298 299 @socket_helper.skip_unless_bind_unix_socket 300 def test_create_unix_server_pathlib(self): 301 with test_utils.unix_socket_path() as path: 302 path = pathlib.Path(path) 303 srv_coro = self.loop.create_unix_server(lambda: None, path) 304 srv = self.loop.run_until_complete(srv_coro) 305 srv.close() 306 self.loop.run_until_complete(srv.wait_closed()) 307 308 def test_create_unix_connection_pathlib(self): 309 with test_utils.unix_socket_path() as path: 310 path = pathlib.Path(path) 311 coro = self.loop.create_unix_connection(lambda: None, path) 312 with self.assertRaises(FileNotFoundError): 313 # If pathlib.Path wasn't supported, the exception would be 314 # different. 315 self.loop.run_until_complete(coro) 316 317 def test_create_unix_server_existing_path_nonsock(self): 318 with tempfile.NamedTemporaryFile() as file: 319 coro = self.loop.create_unix_server(lambda: None, file.name) 320 with self.assertRaisesRegex(OSError, 321 'Address.*is already in use'): 322 self.loop.run_until_complete(coro) 323 324 def test_create_unix_server_ssl_bool(self): 325 coro = self.loop.create_unix_server(lambda: None, path='spam', 326 ssl=True) 327 with self.assertRaisesRegex(TypeError, 328 'ssl argument must be an SSLContext'): 329 self.loop.run_until_complete(coro) 330 331 def test_create_unix_server_nopath_nosock(self): 332 coro = self.loop.create_unix_server(lambda: None, path=None) 333 with self.assertRaisesRegex(ValueError, 334 'path was not specified, and no sock'): 335 self.loop.run_until_complete(coro) 336 337 def test_create_unix_server_path_inetsock(self): 338 sock = socket.socket() 339 with sock: 340 coro = self.loop.create_unix_server(lambda: None, path=None, 341 sock=sock) 342 with self.assertRaisesRegex(ValueError, 343 'A UNIX Domain Stream.*was expected'): 344 self.loop.run_until_complete(coro) 345 346 def test_create_unix_server_path_dgram(self): 347 sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) 348 with sock: 349 coro = self.loop.create_unix_server(lambda: None, path=None, 350 sock=sock) 351 with self.assertRaisesRegex(ValueError, 352 'A UNIX Domain Stream.*was expected'): 353 self.loop.run_until_complete(coro) 354 355 @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'), 356 'no socket.SOCK_NONBLOCK (linux only)') 357 @socket_helper.skip_unless_bind_unix_socket 358 def test_create_unix_server_path_stream_bittype(self): 359 sock = socket.socket( 360 socket.AF_UNIX, socket.SOCK_STREAM | socket.SOCK_NONBLOCK) 361 with tempfile.NamedTemporaryFile() as file: 362 fn = file.name 363 try: 364 with sock: 365 sock.bind(fn) 366 coro = self.loop.create_unix_server(lambda: None, path=None, 367 sock=sock) 368 srv = self.loop.run_until_complete(coro) 369 srv.close() 370 self.loop.run_until_complete(srv.wait_closed()) 371 finally: 372 os.unlink(fn) 373 374 def test_create_unix_server_ssl_timeout_with_plain_sock(self): 375 coro = self.loop.create_unix_server(lambda: None, path='spam', 376 ssl_handshake_timeout=1) 377 with self.assertRaisesRegex( 378 ValueError, 379 'ssl_handshake_timeout is only meaningful with ssl'): 380 self.loop.run_until_complete(coro) 381 382 def test_create_unix_connection_path_inetsock(self): 383 sock = socket.socket() 384 with sock: 385 coro = self.loop.create_unix_connection(lambda: None, 386 sock=sock) 387 with self.assertRaisesRegex(ValueError, 388 'A UNIX Domain Stream.*was expected'): 389 self.loop.run_until_complete(coro) 390 391 @mock.patch('asyncio.unix_events.socket') 392 def test_create_unix_server_bind_error(self, m_socket): 393 # Ensure that the socket is closed on any bind error 394 sock = mock.Mock() 395 m_socket.socket.return_value = sock 396 397 sock.bind.side_effect = OSError 398 coro = self.loop.create_unix_server(lambda: None, path="/test") 399 with self.assertRaises(OSError): 400 self.loop.run_until_complete(coro) 401 self.assertTrue(sock.close.called) 402 403 sock.bind.side_effect = MemoryError 404 coro = self.loop.create_unix_server(lambda: None, path="/test") 405 with self.assertRaises(MemoryError): 406 self.loop.run_until_complete(coro) 407 self.assertTrue(sock.close.called) 408 409 def test_create_unix_connection_path_sock(self): 410 coro = self.loop.create_unix_connection( 411 lambda: None, os.devnull, sock=object()) 412 with self.assertRaisesRegex(ValueError, 'path and sock can not be'): 413 self.loop.run_until_complete(coro) 414 415 def test_create_unix_connection_nopath_nosock(self): 416 coro = self.loop.create_unix_connection( 417 lambda: None, None) 418 with self.assertRaisesRegex(ValueError, 419 'no path and sock were specified'): 420 self.loop.run_until_complete(coro) 421 422 def test_create_unix_connection_nossl_serverhost(self): 423 coro = self.loop.create_unix_connection( 424 lambda: None, os.devnull, server_hostname='spam') 425 with self.assertRaisesRegex(ValueError, 426 'server_hostname is only meaningful'): 427 self.loop.run_until_complete(coro) 428 429 def test_create_unix_connection_ssl_noserverhost(self): 430 coro = self.loop.create_unix_connection( 431 lambda: None, os.devnull, ssl=True) 432 433 with self.assertRaisesRegex( 434 ValueError, 'you have to pass server_hostname when using ssl'): 435 436 self.loop.run_until_complete(coro) 437 438 def test_create_unix_connection_ssl_timeout_with_plain_sock(self): 439 coro = self.loop.create_unix_connection(lambda: None, path='spam', 440 ssl_handshake_timeout=1) 441 with self.assertRaisesRegex( 442 ValueError, 443 'ssl_handshake_timeout is only meaningful with ssl'): 444 self.loop.run_until_complete(coro) 445 446 447 @unittest.skipUnless(hasattr(os, 'sendfile'), 448 'sendfile is not supported') 449 class SelectorEventLoopUnixSockSendfileTests(test_utils.TestCase): 450 DATA = b"12345abcde" * 16 * 1024 # 160 KiB 451 452 class MyProto(asyncio.Protocol): 453 454 def __init__(self, loop): 455 self.started = False 456 self.closed = False 457 self.data = bytearray() 458 self.fut = loop.create_future() 459 self.transport = None 460 self._ready = loop.create_future() 461 462 def connection_made(self, transport): 463 self.started = True 464 self.transport = transport 465 self._ready.set_result(None) 466 467 def data_received(self, data): 468 self.data.extend(data) 469 470 def connection_lost(self, exc): 471 self.closed = True 472 self.fut.set_result(None) 473 474 async def wait_closed(self): 475 await self.fut 476 477 @classmethod 478 def setUpClass(cls): 479 with open(os_helper.TESTFN, 'wb') as fp: 480 fp.write(cls.DATA) 481 super().setUpClass() 482 483 @classmethod 484 def tearDownClass(cls): 485 os_helper.unlink(os_helper.TESTFN) 486 super().tearDownClass() 487 488 def setUp(self): 489 self.loop = asyncio.new_event_loop() 490 self.set_event_loop(self.loop) 491 self.file = open(os_helper.TESTFN, 'rb') 492 self.addCleanup(self.file.close) 493 super().setUp() 494 495 def make_socket(self, cleanup=True): 496 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 497 sock.setblocking(False) 498 sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024) 499 sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024) 500 if cleanup: 501 self.addCleanup(sock.close) 502 return sock 503 504 def run_loop(self, coro): 505 return self.loop.run_until_complete(coro) 506 507 def prepare(self): 508 sock = self.make_socket() 509 proto = self.MyProto(self.loop) 510 port = socket_helper.find_unused_port() 511 srv_sock = self.make_socket(cleanup=False) 512 srv_sock.bind((socket_helper.HOST, port)) 513 server = self.run_loop(self.loop.create_server( 514 lambda: proto, sock=srv_sock)) 515 self.run_loop(self.loop.sock_connect(sock, (socket_helper.HOST, port))) 516 self.run_loop(proto._ready) 517 518 def cleanup(): 519 proto.transport.close() 520 self.run_loop(proto.wait_closed()) 521 522 server.close() 523 self.run_loop(server.wait_closed()) 524 525 self.addCleanup(cleanup) 526 527 return sock, proto 528 529 def test_sock_sendfile_not_available(self): 530 sock, proto = self.prepare() 531 with mock.patch('asyncio.unix_events.os', spec=[]): 532 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 533 "os[.]sendfile[(][)] is not available"): 534 self.run_loop(self.loop._sock_sendfile_native(sock, self.file, 535 0, None)) 536 self.assertEqual(self.file.tell(), 0) 537 538 def test_sock_sendfile_not_a_file(self): 539 sock, proto = self.prepare() 540 f = object() 541 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 542 "not a regular file"): 543 self.run_loop(self.loop._sock_sendfile_native(sock, f, 544 0, None)) 545 self.assertEqual(self.file.tell(), 0) 546 547 def test_sock_sendfile_iobuffer(self): 548 sock, proto = self.prepare() 549 f = io.BytesIO() 550 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 551 "not a regular file"): 552 self.run_loop(self.loop._sock_sendfile_native(sock, f, 553 0, None)) 554 self.assertEqual(self.file.tell(), 0) 555 556 def test_sock_sendfile_not_regular_file(self): 557 sock, proto = self.prepare() 558 f = mock.Mock() 559 f.fileno.return_value = -1 560 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 561 "not a regular file"): 562 self.run_loop(self.loop._sock_sendfile_native(sock, f, 563 0, None)) 564 self.assertEqual(self.file.tell(), 0) 565 566 def test_sock_sendfile_cancel1(self): 567 sock, proto = self.prepare() 568 569 fut = self.loop.create_future() 570 fileno = self.file.fileno() 571 self.loop._sock_sendfile_native_impl(fut, None, sock, fileno, 572 0, None, len(self.DATA), 0) 573 fut.cancel() 574 with contextlib.suppress(asyncio.CancelledError): 575 self.run_loop(fut) 576 with self.assertRaises(KeyError): 577 self.loop._selector.get_key(sock) 578 579 def test_sock_sendfile_cancel2(self): 580 sock, proto = self.prepare() 581 582 fut = self.loop.create_future() 583 fileno = self.file.fileno() 584 self.loop._sock_sendfile_native_impl(fut, None, sock, fileno, 585 0, None, len(self.DATA), 0) 586 fut.cancel() 587 self.loop._sock_sendfile_native_impl(fut, sock.fileno(), sock, fileno, 588 0, None, len(self.DATA), 0) 589 with self.assertRaises(KeyError): 590 self.loop._selector.get_key(sock) 591 592 def test_sock_sendfile_blocking_error(self): 593 sock, proto = self.prepare() 594 595 fileno = self.file.fileno() 596 fut = mock.Mock() 597 fut.cancelled.return_value = False 598 with mock.patch('os.sendfile', side_effect=BlockingIOError()): 599 self.loop._sock_sendfile_native_impl(fut, None, sock, fileno, 600 0, None, len(self.DATA), 0) 601 key = self.loop._selector.get_key(sock) 602 self.assertIsNotNone(key) 603 fut.add_done_callback.assert_called_once_with(mock.ANY) 604 605 def test_sock_sendfile_os_error_first_call(self): 606 sock, proto = self.prepare() 607 608 fileno = self.file.fileno() 609 fut = self.loop.create_future() 610 with mock.patch('os.sendfile', side_effect=OSError()): 611 self.loop._sock_sendfile_native_impl(fut, None, sock, fileno, 612 0, None, len(self.DATA), 0) 613 with self.assertRaises(KeyError): 614 self.loop._selector.get_key(sock) 615 exc = fut.exception() 616 self.assertIsInstance(exc, asyncio.SendfileNotAvailableError) 617 self.assertEqual(0, self.file.tell()) 618 619 def test_sock_sendfile_os_error_next_call(self): 620 sock, proto = self.prepare() 621 622 fileno = self.file.fileno() 623 fut = self.loop.create_future() 624 err = OSError() 625 with mock.patch('os.sendfile', side_effect=err): 626 self.loop._sock_sendfile_native_impl(fut, sock.fileno(), 627 sock, fileno, 628 1000, None, len(self.DATA), 629 1000) 630 with self.assertRaises(KeyError): 631 self.loop._selector.get_key(sock) 632 exc = fut.exception() 633 self.assertIs(exc, err) 634 self.assertEqual(1000, self.file.tell()) 635 636 def test_sock_sendfile_exception(self): 637 sock, proto = self.prepare() 638 639 fileno = self.file.fileno() 640 fut = self.loop.create_future() 641 err = asyncio.SendfileNotAvailableError() 642 with mock.patch('os.sendfile', side_effect=err): 643 self.loop._sock_sendfile_native_impl(fut, sock.fileno(), 644 sock, fileno, 645 1000, None, len(self.DATA), 646 1000) 647 with self.assertRaises(KeyError): 648 self.loop._selector.get_key(sock) 649 exc = fut.exception() 650 self.assertIs(exc, err) 651 self.assertEqual(1000, self.file.tell()) 652 653 654 class UnixReadPipeTransportTests(test_utils.TestCase): 655 656 def setUp(self): 657 super().setUp() 658 self.loop = self.new_test_loop() 659 self.protocol = test_utils.make_test_protocol(asyncio.Protocol) 660 self.pipe = mock.Mock(spec_set=io.RawIOBase) 661 self.pipe.fileno.return_value = 5 662 663 blocking_patcher = mock.patch('os.set_blocking') 664 blocking_patcher.start() 665 self.addCleanup(blocking_patcher.stop) 666 667 fstat_patcher = mock.patch('os.fstat') 668 m_fstat = fstat_patcher.start() 669 st = mock.Mock() 670 st.st_mode = stat.S_IFIFO 671 m_fstat.return_value = st 672 self.addCleanup(fstat_patcher.stop) 673 674 def read_pipe_transport(self, waiter=None): 675 transport = unix_events._UnixReadPipeTransport(self.loop, self.pipe, 676 self.protocol, 677 waiter=waiter) 678 self.addCleanup(close_pipe_transport, transport) 679 return transport 680 681 def test_ctor(self): 682 waiter = self.loop.create_future() 683 tr = self.read_pipe_transport(waiter=waiter) 684 self.loop.run_until_complete(waiter) 685 686 self.protocol.connection_made.assert_called_with(tr) 687 self.loop.assert_reader(5, tr._read_ready) 688 self.assertIsNone(waiter.result()) 689 690 @mock.patch('os.read') 691 def test__read_ready(self, m_read): 692 tr = self.read_pipe_transport() 693 m_read.return_value = b'data' 694 tr._read_ready() 695 696 m_read.assert_called_with(5, tr.max_size) 697 self.protocol.data_received.assert_called_with(b'data') 698 699 @mock.patch('os.read') 700 def test__read_ready_eof(self, m_read): 701 tr = self.read_pipe_transport() 702 m_read.return_value = b'' 703 tr._read_ready() 704 705 m_read.assert_called_with(5, tr.max_size) 706 self.assertFalse(self.loop.readers) 707 test_utils.run_briefly(self.loop) 708 self.protocol.eof_received.assert_called_with() 709 self.protocol.connection_lost.assert_called_with(None) 710 711 @mock.patch('os.read') 712 def test__read_ready_blocked(self, m_read): 713 tr = self.read_pipe_transport() 714 m_read.side_effect = BlockingIOError 715 tr._read_ready() 716 717 m_read.assert_called_with(5, tr.max_size) 718 test_utils.run_briefly(self.loop) 719 self.assertFalse(self.protocol.data_received.called) 720 721 @mock.patch('asyncio.log.logger.error') 722 @mock.patch('os.read') 723 def test__read_ready_error(self, m_read, m_logexc): 724 tr = self.read_pipe_transport() 725 err = OSError() 726 m_read.side_effect = err 727 tr._close = mock.Mock() 728 tr._read_ready() 729 730 m_read.assert_called_with(5, tr.max_size) 731 tr._close.assert_called_with(err) 732 m_logexc.assert_called_with( 733 test_utils.MockPattern( 734 'Fatal read error on pipe transport' 735 '\nprotocol:.*\ntransport:.*'), 736 exc_info=(OSError, MOCK_ANY, MOCK_ANY)) 737 738 @mock.patch('os.read') 739 def test_pause_reading(self, m_read): 740 tr = self.read_pipe_transport() 741 m = mock.Mock() 742 self.loop.add_reader(5, m) 743 tr.pause_reading() 744 self.assertFalse(self.loop.readers) 745 746 @mock.patch('os.read') 747 def test_resume_reading(self, m_read): 748 tr = self.read_pipe_transport() 749 tr.pause_reading() 750 tr.resume_reading() 751 self.loop.assert_reader(5, tr._read_ready) 752 753 @mock.patch('os.read') 754 def test_close(self, m_read): 755 tr = self.read_pipe_transport() 756 tr._close = mock.Mock() 757 tr.close() 758 tr._close.assert_called_with(None) 759 760 @mock.patch('os.read') 761 def test_close_already_closing(self, m_read): 762 tr = self.read_pipe_transport() 763 tr._closing = True 764 tr._close = mock.Mock() 765 tr.close() 766 self.assertFalse(tr._close.called) 767 768 @mock.patch('os.read') 769 def test__close(self, m_read): 770 tr = self.read_pipe_transport() 771 err = object() 772 tr._close(err) 773 self.assertTrue(tr.is_closing()) 774 self.assertFalse(self.loop.readers) 775 test_utils.run_briefly(self.loop) 776 self.protocol.connection_lost.assert_called_with(err) 777 778 def test__call_connection_lost(self): 779 tr = self.read_pipe_transport() 780 self.assertIsNotNone(tr._protocol) 781 self.assertIsNotNone(tr._loop) 782 783 err = None 784 tr._call_connection_lost(err) 785 self.protocol.connection_lost.assert_called_with(err) 786 self.pipe.close.assert_called_with() 787 788 self.assertIsNone(tr._protocol) 789 self.assertIsNone(tr._loop) 790 791 def test__call_connection_lost_with_err(self): 792 tr = self.read_pipe_transport() 793 self.assertIsNotNone(tr._protocol) 794 self.assertIsNotNone(tr._loop) 795 796 err = OSError() 797 tr._call_connection_lost(err) 798 self.protocol.connection_lost.assert_called_with(err) 799 self.pipe.close.assert_called_with() 800 801 self.assertIsNone(tr._protocol) 802 self.assertIsNone(tr._loop) 803 804 def test_pause_reading_on_closed_pipe(self): 805 tr = self.read_pipe_transport() 806 tr.close() 807 test_utils.run_briefly(self.loop) 808 self.assertIsNone(tr._loop) 809 tr.pause_reading() 810 811 def test_pause_reading_on_paused_pipe(self): 812 tr = self.read_pipe_transport() 813 tr.pause_reading() 814 # the second call should do nothing 815 tr.pause_reading() 816 817 def test_resume_reading_on_closed_pipe(self): 818 tr = self.read_pipe_transport() 819 tr.close() 820 test_utils.run_briefly(self.loop) 821 self.assertIsNone(tr._loop) 822 tr.resume_reading() 823 824 def test_resume_reading_on_paused_pipe(self): 825 tr = self.read_pipe_transport() 826 # the pipe is not paused 827 # resuming should do nothing 828 tr.resume_reading() 829 830 831 class UnixWritePipeTransportTests(test_utils.TestCase): 832 833 def setUp(self): 834 super().setUp() 835 self.loop = self.new_test_loop() 836 self.protocol = test_utils.make_test_protocol(asyncio.BaseProtocol) 837 self.pipe = mock.Mock(spec_set=io.RawIOBase) 838 self.pipe.fileno.return_value = 5 839 840 blocking_patcher = mock.patch('os.set_blocking') 841 blocking_patcher.start() 842 self.addCleanup(blocking_patcher.stop) 843 844 fstat_patcher = mock.patch('os.fstat') 845 m_fstat = fstat_patcher.start() 846 st = mock.Mock() 847 st.st_mode = stat.S_IFSOCK 848 m_fstat.return_value = st 849 self.addCleanup(fstat_patcher.stop) 850 851 def write_pipe_transport(self, waiter=None): 852 transport = unix_events._UnixWritePipeTransport(self.loop, self.pipe, 853 self.protocol, 854 waiter=waiter) 855 self.addCleanup(close_pipe_transport, transport) 856 return transport 857 858 def test_ctor(self): 859 waiter = self.loop.create_future() 860 tr = self.write_pipe_transport(waiter=waiter) 861 self.loop.run_until_complete(waiter) 862 863 self.protocol.connection_made.assert_called_with(tr) 864 self.loop.assert_reader(5, tr._read_ready) 865 self.assertEqual(None, waiter.result()) 866 867 def test_can_write_eof(self): 868 tr = self.write_pipe_transport() 869 self.assertTrue(tr.can_write_eof()) 870 871 @mock.patch('os.write') 872 def test_write(self, m_write): 873 tr = self.write_pipe_transport() 874 m_write.return_value = 4 875 tr.write(b'data') 876 m_write.assert_called_with(5, b'data') 877 self.assertFalse(self.loop.writers) 878 self.assertEqual(bytearray(), tr._buffer) 879 880 @mock.patch('os.write') 881 def test_write_no_data(self, m_write): 882 tr = self.write_pipe_transport() 883 tr.write(b'') 884 self.assertFalse(m_write.called) 885 self.assertFalse(self.loop.writers) 886 self.assertEqual(bytearray(b''), tr._buffer) 887 888 @mock.patch('os.write') 889 def test_write_partial(self, m_write): 890 tr = self.write_pipe_transport() 891 m_write.return_value = 2 892 tr.write(b'data') 893 self.loop.assert_writer(5, tr._write_ready) 894 self.assertEqual(bytearray(b'ta'), tr._buffer) 895 896 @mock.patch('os.write') 897 def test_write_buffer(self, m_write): 898 tr = self.write_pipe_transport() 899 self.loop.add_writer(5, tr._write_ready) 900 tr._buffer = bytearray(b'previous') 901 tr.write(b'data') 902 self.assertFalse(m_write.called) 903 self.loop.assert_writer(5, tr._write_ready) 904 self.assertEqual(bytearray(b'previousdata'), tr._buffer) 905 906 @mock.patch('os.write') 907 def test_write_again(self, m_write): 908 tr = self.write_pipe_transport() 909 m_write.side_effect = BlockingIOError() 910 tr.write(b'data') 911 m_write.assert_called_with(5, bytearray(b'data')) 912 self.loop.assert_writer(5, tr._write_ready) 913 self.assertEqual(bytearray(b'data'), tr._buffer) 914 915 @mock.patch('asyncio.unix_events.logger') 916 @mock.patch('os.write') 917 def test_write_err(self, m_write, m_log): 918 tr = self.write_pipe_transport() 919 err = OSError() 920 m_write.side_effect = err 921 tr._fatal_error = mock.Mock() 922 tr.write(b'data') 923 m_write.assert_called_with(5, b'data') 924 self.assertFalse(self.loop.writers) 925 self.assertEqual(bytearray(), tr._buffer) 926 tr._fatal_error.assert_called_with( 927 err, 928 'Fatal write error on pipe transport') 929 self.assertEqual(1, tr._conn_lost) 930 931 tr.write(b'data') 932 self.assertEqual(2, tr._conn_lost) 933 tr.write(b'data') 934 tr.write(b'data') 935 tr.write(b'data') 936 tr.write(b'data') 937 # This is a bit overspecified. :-( 938 m_log.warning.assert_called_with( 939 'pipe closed by peer or os.write(pipe, data) raised exception.') 940 tr.close() 941 942 @mock.patch('os.write') 943 def test_write_close(self, m_write): 944 tr = self.write_pipe_transport() 945 tr._read_ready() # pipe was closed by peer 946 947 tr.write(b'data') 948 self.assertEqual(tr._conn_lost, 1) 949 tr.write(b'data') 950 self.assertEqual(tr._conn_lost, 2) 951 952 def test__read_ready(self): 953 tr = self.write_pipe_transport() 954 tr._read_ready() 955 self.assertFalse(self.loop.readers) 956 self.assertFalse(self.loop.writers) 957 self.assertTrue(tr.is_closing()) 958 test_utils.run_briefly(self.loop) 959 self.protocol.connection_lost.assert_called_with(None) 960 961 @mock.patch('os.write') 962 def test__write_ready(self, m_write): 963 tr = self.write_pipe_transport() 964 self.loop.add_writer(5, tr._write_ready) 965 tr._buffer = bytearray(b'data') 966 m_write.return_value = 4 967 tr._write_ready() 968 self.assertFalse(self.loop.writers) 969 self.assertEqual(bytearray(), tr._buffer) 970 971 @mock.patch('os.write') 972 def test__write_ready_partial(self, m_write): 973 tr = self.write_pipe_transport() 974 self.loop.add_writer(5, tr._write_ready) 975 tr._buffer = bytearray(b'data') 976 m_write.return_value = 3 977 tr._write_ready() 978 self.loop.assert_writer(5, tr._write_ready) 979 self.assertEqual(bytearray(b'a'), tr._buffer) 980 981 @mock.patch('os.write') 982 def test__write_ready_again(self, m_write): 983 tr = self.write_pipe_transport() 984 self.loop.add_writer(5, tr._write_ready) 985 tr._buffer = bytearray(b'data') 986 m_write.side_effect = BlockingIOError() 987 tr._write_ready() 988 m_write.assert_called_with(5, bytearray(b'data')) 989 self.loop.assert_writer(5, tr._write_ready) 990 self.assertEqual(bytearray(b'data'), tr._buffer) 991 992 @mock.patch('os.write') 993 def test__write_ready_empty(self, m_write): 994 tr = self.write_pipe_transport() 995 self.loop.add_writer(5, tr._write_ready) 996 tr._buffer = bytearray(b'data') 997 m_write.return_value = 0 998 tr._write_ready() 999 m_write.assert_called_with(5, bytearray(b'data')) 1000 self.loop.assert_writer(5, tr._write_ready) 1001 self.assertEqual(bytearray(b'data'), tr._buffer) 1002 1003 @mock.patch('asyncio.log.logger.error') 1004 @mock.patch('os.write') 1005 def test__write_ready_err(self, m_write, m_logexc): 1006 tr = self.write_pipe_transport() 1007 self.loop.add_writer(5, tr._write_ready) 1008 tr._buffer = bytearray(b'data') 1009 m_write.side_effect = err = OSError() 1010 tr._write_ready() 1011 self.assertFalse(self.loop.writers) 1012 self.assertFalse(self.loop.readers) 1013 self.assertEqual(bytearray(), tr._buffer) 1014 self.assertTrue(tr.is_closing()) 1015 m_logexc.assert_not_called() 1016 self.assertEqual(1, tr._conn_lost) 1017 test_utils.run_briefly(self.loop) 1018 self.protocol.connection_lost.assert_called_with(err) 1019 1020 @mock.patch('os.write') 1021 def test__write_ready_closing(self, m_write): 1022 tr = self.write_pipe_transport() 1023 self.loop.add_writer(5, tr._write_ready) 1024 tr._closing = True 1025 tr._buffer = bytearray(b'data') 1026 m_write.return_value = 4 1027 tr._write_ready() 1028 self.assertFalse(self.loop.writers) 1029 self.assertFalse(self.loop.readers) 1030 self.assertEqual(bytearray(), tr._buffer) 1031 self.protocol.connection_lost.assert_called_with(None) 1032 self.pipe.close.assert_called_with() 1033 1034 @mock.patch('os.write') 1035 def test_abort(self, m_write): 1036 tr = self.write_pipe_transport() 1037 self.loop.add_writer(5, tr._write_ready) 1038 self.loop.add_reader(5, tr._read_ready) 1039 tr._buffer = [b'da', b'ta'] 1040 tr.abort() 1041 self.assertFalse(m_write.called) 1042 self.assertFalse(self.loop.readers) 1043 self.assertFalse(self.loop.writers) 1044 self.assertEqual([], tr._buffer) 1045 self.assertTrue(tr.is_closing()) 1046 test_utils.run_briefly(self.loop) 1047 self.protocol.connection_lost.assert_called_with(None) 1048 1049 def test__call_connection_lost(self): 1050 tr = self.write_pipe_transport() 1051 self.assertIsNotNone(tr._protocol) 1052 self.assertIsNotNone(tr._loop) 1053 1054 err = None 1055 tr._call_connection_lost(err) 1056 self.protocol.connection_lost.assert_called_with(err) 1057 self.pipe.close.assert_called_with() 1058 1059 self.assertIsNone(tr._protocol) 1060 self.assertIsNone(tr._loop) 1061 1062 def test__call_connection_lost_with_err(self): 1063 tr = self.write_pipe_transport() 1064 self.assertIsNotNone(tr._protocol) 1065 self.assertIsNotNone(tr._loop) 1066 1067 err = OSError() 1068 tr._call_connection_lost(err) 1069 self.protocol.connection_lost.assert_called_with(err) 1070 self.pipe.close.assert_called_with() 1071 1072 self.assertIsNone(tr._protocol) 1073 self.assertIsNone(tr._loop) 1074 1075 def test_close(self): 1076 tr = self.write_pipe_transport() 1077 tr.write_eof = mock.Mock() 1078 tr.close() 1079 tr.write_eof.assert_called_with() 1080 1081 # closing the transport twice must not fail 1082 tr.close() 1083 1084 def test_close_closing(self): 1085 tr = self.write_pipe_transport() 1086 tr.write_eof = mock.Mock() 1087 tr._closing = True 1088 tr.close() 1089 self.assertFalse(tr.write_eof.called) 1090 1091 def test_write_eof(self): 1092 tr = self.write_pipe_transport() 1093 tr.write_eof() 1094 self.assertTrue(tr.is_closing()) 1095 self.assertFalse(self.loop.readers) 1096 test_utils.run_briefly(self.loop) 1097 self.protocol.connection_lost.assert_called_with(None) 1098 1099 def test_write_eof_pending(self): 1100 tr = self.write_pipe_transport() 1101 tr._buffer = [b'data'] 1102 tr.write_eof() 1103 self.assertTrue(tr.is_closing()) 1104 self.assertFalse(self.protocol.connection_lost.called) 1105 1106 1107 class AbstractChildWatcherTests(unittest.TestCase): 1108 1109 def test_not_implemented(self): 1110 f = mock.Mock() 1111 watcher = asyncio.AbstractChildWatcher() 1112 self.assertRaises( 1113 NotImplementedError, watcher.add_child_handler, f, f) 1114 self.assertRaises( 1115 NotImplementedError, watcher.remove_child_handler, f) 1116 self.assertRaises( 1117 NotImplementedError, watcher.attach_loop, f) 1118 self.assertRaises( 1119 NotImplementedError, watcher.close) 1120 self.assertRaises( 1121 NotImplementedError, watcher.is_active) 1122 self.assertRaises( 1123 NotImplementedError, watcher.__enter__) 1124 self.assertRaises( 1125 NotImplementedError, watcher.__exit__, f, f, f) 1126 1127 1128 class BaseChildWatcherTests(unittest.TestCase): 1129 1130 def test_not_implemented(self): 1131 f = mock.Mock() 1132 watcher = unix_events.BaseChildWatcher() 1133 self.assertRaises( 1134 NotImplementedError, watcher._do_waitpid, f) 1135 1136 1137 class ChildWatcherTestsMixin: 1138 1139 ignore_warnings = mock.patch.object(log.logger, "warning") 1140 1141 def setUp(self): 1142 super().setUp() 1143 self.loop = self.new_test_loop() 1144 self.running = False 1145 self.zombies = {} 1146 1147 with mock.patch.object( 1148 self.loop, "add_signal_handler") as self.m_add_signal_handler: 1149 self.watcher = self.create_watcher() 1150 self.watcher.attach_loop(self.loop) 1151 1152 def waitpid(self, pid, flags): 1153 if isinstance(self.watcher, asyncio.SafeChildWatcher) or pid != -1: 1154 self.assertGreater(pid, 0) 1155 try: 1156 if pid < 0: 1157 return self.zombies.popitem() 1158 else: 1159 return pid, self.zombies.pop(pid) 1160 except KeyError: 1161 pass 1162 if self.running: 1163 return 0, 0 1164 else: 1165 raise ChildProcessError() 1166 1167 def add_zombie(self, pid, status): 1168 self.zombies[pid] = status 1169 1170 def waitstatus_to_exitcode(self, status): 1171 if status > 32768: 1172 return status - 32768 1173 elif 32700 < status < 32768: 1174 return status - 32768 1175 else: 1176 return status 1177 1178 def test_create_watcher(self): 1179 self.m_add_signal_handler.assert_called_once_with( 1180 signal.SIGCHLD, self.watcher._sig_chld) 1181 1182 def waitpid_mocks(func): 1183 def wrapped_func(self): 1184 def patch(target, wrapper): 1185 return mock.patch(target, wraps=wrapper, 1186 new_callable=mock.Mock) 1187 1188 with patch('asyncio.unix_events.waitstatus_to_exitcode', self.waitstatus_to_exitcode), \ 1189 patch('os.waitpid', self.waitpid) as m_waitpid: 1190 func(self, m_waitpid) 1191 return wrapped_func 1192 1193 @waitpid_mocks 1194 def test_sigchld(self, m_waitpid): 1195 # register a child 1196 callback = mock.Mock() 1197 1198 with self.watcher: 1199 self.running = True 1200 self.watcher.add_child_handler(42, callback, 9, 10, 14) 1201 1202 self.assertFalse(callback.called) 1203 1204 # child is running 1205 self.watcher._sig_chld() 1206 1207 self.assertFalse(callback.called) 1208 1209 # child terminates (returncode 12) 1210 self.running = False 1211 self.add_zombie(42, EXITCODE(12)) 1212 self.watcher._sig_chld() 1213 1214 callback.assert_called_once_with(42, 12, 9, 10, 14) 1215 1216 callback.reset_mock() 1217 1218 # ensure that the child is effectively reaped 1219 self.add_zombie(42, EXITCODE(13)) 1220 with self.ignore_warnings: 1221 self.watcher._sig_chld() 1222 1223 self.assertFalse(callback.called) 1224 1225 # sigchld called again 1226 self.zombies.clear() 1227 self.watcher._sig_chld() 1228 1229 self.assertFalse(callback.called) 1230 1231 @waitpid_mocks 1232 def test_sigchld_two_children(self, m_waitpid): 1233 callback1 = mock.Mock() 1234 callback2 = mock.Mock() 1235 1236 # register child 1 1237 with self.watcher: 1238 self.running = True 1239 self.watcher.add_child_handler(43, callback1, 7, 8) 1240 1241 self.assertFalse(callback1.called) 1242 self.assertFalse(callback2.called) 1243 1244 # register child 2 1245 with self.watcher: 1246 self.watcher.add_child_handler(44, callback2, 147, 18) 1247 1248 self.assertFalse(callback1.called) 1249 self.assertFalse(callback2.called) 1250 1251 # children are running 1252 self.watcher._sig_chld() 1253 1254 self.assertFalse(callback1.called) 1255 self.assertFalse(callback2.called) 1256 1257 # child 1 terminates (signal 3) 1258 self.add_zombie(43, SIGNAL(3)) 1259 self.watcher._sig_chld() 1260 1261 callback1.assert_called_once_with(43, -3, 7, 8) 1262 self.assertFalse(callback2.called) 1263 1264 callback1.reset_mock() 1265 1266 # child 2 still running 1267 self.watcher._sig_chld() 1268 1269 self.assertFalse(callback1.called) 1270 self.assertFalse(callback2.called) 1271 1272 # child 2 terminates (code 108) 1273 self.add_zombie(44, EXITCODE(108)) 1274 self.running = False 1275 self.watcher._sig_chld() 1276 1277 callback2.assert_called_once_with(44, 108, 147, 18) 1278 self.assertFalse(callback1.called) 1279 1280 callback2.reset_mock() 1281 1282 # ensure that the children are effectively reaped 1283 self.add_zombie(43, EXITCODE(14)) 1284 self.add_zombie(44, EXITCODE(15)) 1285 with self.ignore_warnings: 1286 self.watcher._sig_chld() 1287 1288 self.assertFalse(callback1.called) 1289 self.assertFalse(callback2.called) 1290 1291 # sigchld called again 1292 self.zombies.clear() 1293 self.watcher._sig_chld() 1294 1295 self.assertFalse(callback1.called) 1296 self.assertFalse(callback2.called) 1297 1298 @waitpid_mocks 1299 def test_sigchld_two_children_terminating_together(self, m_waitpid): 1300 callback1 = mock.Mock() 1301 callback2 = mock.Mock() 1302 1303 # register child 1 1304 with self.watcher: 1305 self.running = True 1306 self.watcher.add_child_handler(45, callback1, 17, 8) 1307 1308 self.assertFalse(callback1.called) 1309 self.assertFalse(callback2.called) 1310 1311 # register child 2 1312 with self.watcher: 1313 self.watcher.add_child_handler(46, callback2, 1147, 18) 1314 1315 self.assertFalse(callback1.called) 1316 self.assertFalse(callback2.called) 1317 1318 # children are running 1319 self.watcher._sig_chld() 1320 1321 self.assertFalse(callback1.called) 1322 self.assertFalse(callback2.called) 1323 1324 # child 1 terminates (code 78) 1325 # child 2 terminates (signal 5) 1326 self.add_zombie(45, EXITCODE(78)) 1327 self.add_zombie(46, SIGNAL(5)) 1328 self.running = False 1329 self.watcher._sig_chld() 1330 1331 callback1.assert_called_once_with(45, 78, 17, 8) 1332 callback2.assert_called_once_with(46, -5, 1147, 18) 1333 1334 callback1.reset_mock() 1335 callback2.reset_mock() 1336 1337 # ensure that the children are effectively reaped 1338 self.add_zombie(45, EXITCODE(14)) 1339 self.add_zombie(46, EXITCODE(15)) 1340 with self.ignore_warnings: 1341 self.watcher._sig_chld() 1342 1343 self.assertFalse(callback1.called) 1344 self.assertFalse(callback2.called) 1345 1346 @waitpid_mocks 1347 def test_sigchld_race_condition(self, m_waitpid): 1348 # register a child 1349 callback = mock.Mock() 1350 1351 with self.watcher: 1352 # child terminates before being registered 1353 self.add_zombie(50, EXITCODE(4)) 1354 self.watcher._sig_chld() 1355 1356 self.watcher.add_child_handler(50, callback, 1, 12) 1357 1358 callback.assert_called_once_with(50, 4, 1, 12) 1359 callback.reset_mock() 1360 1361 # ensure that the child is effectively reaped 1362 self.add_zombie(50, SIGNAL(1)) 1363 with self.ignore_warnings: 1364 self.watcher._sig_chld() 1365 1366 self.assertFalse(callback.called) 1367 1368 @waitpid_mocks 1369 def test_sigchld_replace_handler(self, m_waitpid): 1370 callback1 = mock.Mock() 1371 callback2 = mock.Mock() 1372 1373 # register a child 1374 with self.watcher: 1375 self.running = True 1376 self.watcher.add_child_handler(51, callback1, 19) 1377 1378 self.assertFalse(callback1.called) 1379 self.assertFalse(callback2.called) 1380 1381 # register the same child again 1382 with self.watcher: 1383 self.watcher.add_child_handler(51, callback2, 21) 1384 1385 self.assertFalse(callback1.called) 1386 self.assertFalse(callback2.called) 1387 1388 # child terminates (signal 8) 1389 self.running = False 1390 self.add_zombie(51, SIGNAL(8)) 1391 self.watcher._sig_chld() 1392 1393 callback2.assert_called_once_with(51, -8, 21) 1394 self.assertFalse(callback1.called) 1395 1396 callback2.reset_mock() 1397 1398 # ensure that the child is effectively reaped 1399 self.add_zombie(51, EXITCODE(13)) 1400 with self.ignore_warnings: 1401 self.watcher._sig_chld() 1402 1403 self.assertFalse(callback1.called) 1404 self.assertFalse(callback2.called) 1405 1406 @waitpid_mocks 1407 def test_sigchld_remove_handler(self, m_waitpid): 1408 callback = mock.Mock() 1409 1410 # register a child 1411 with self.watcher: 1412 self.running = True 1413 self.watcher.add_child_handler(52, callback, 1984) 1414 1415 self.assertFalse(callback.called) 1416 1417 # unregister the child 1418 self.watcher.remove_child_handler(52) 1419 1420 self.assertFalse(callback.called) 1421 1422 # child terminates (code 99) 1423 self.running = False 1424 self.add_zombie(52, EXITCODE(99)) 1425 with self.ignore_warnings: 1426 self.watcher._sig_chld() 1427 1428 self.assertFalse(callback.called) 1429 1430 @waitpid_mocks 1431 def test_sigchld_unknown_status(self, m_waitpid): 1432 callback = mock.Mock() 1433 1434 # register a child 1435 with self.watcher: 1436 self.running = True 1437 self.watcher.add_child_handler(53, callback, -19) 1438 1439 self.assertFalse(callback.called) 1440 1441 # terminate with unknown status 1442 self.zombies[53] = 1178 1443 self.running = False 1444 self.watcher._sig_chld() 1445 1446 callback.assert_called_once_with(53, 1178, -19) 1447 1448 callback.reset_mock() 1449 1450 # ensure that the child is effectively reaped 1451 self.add_zombie(53, EXITCODE(101)) 1452 with self.ignore_warnings: 1453 self.watcher._sig_chld() 1454 1455 self.assertFalse(callback.called) 1456 1457 @waitpid_mocks 1458 def test_remove_child_handler(self, m_waitpid): 1459 callback1 = mock.Mock() 1460 callback2 = mock.Mock() 1461 callback3 = mock.Mock() 1462 1463 # register children 1464 with self.watcher: 1465 self.running = True 1466 self.watcher.add_child_handler(54, callback1, 1) 1467 self.watcher.add_child_handler(55, callback2, 2) 1468 self.watcher.add_child_handler(56, callback3, 3) 1469 1470 # remove child handler 1 1471 self.assertTrue(self.watcher.remove_child_handler(54)) 1472 1473 # remove child handler 2 multiple times 1474 self.assertTrue(self.watcher.remove_child_handler(55)) 1475 self.assertFalse(self.watcher.remove_child_handler(55)) 1476 self.assertFalse(self.watcher.remove_child_handler(55)) 1477 1478 # all children terminate 1479 self.add_zombie(54, EXITCODE(0)) 1480 self.add_zombie(55, EXITCODE(1)) 1481 self.add_zombie(56, EXITCODE(2)) 1482 self.running = False 1483 with self.ignore_warnings: 1484 self.watcher._sig_chld() 1485 1486 self.assertFalse(callback1.called) 1487 self.assertFalse(callback2.called) 1488 callback3.assert_called_once_with(56, 2, 3) 1489 1490 @waitpid_mocks 1491 def test_sigchld_unhandled_exception(self, m_waitpid): 1492 callback = mock.Mock() 1493 1494 # register a child 1495 with self.watcher: 1496 self.running = True 1497 self.watcher.add_child_handler(57, callback) 1498 1499 # raise an exception 1500 m_waitpid.side_effect = ValueError 1501 1502 with mock.patch.object(log.logger, 1503 'error') as m_error: 1504 1505 self.assertEqual(self.watcher._sig_chld(), None) 1506 self.assertTrue(m_error.called) 1507 1508 @waitpid_mocks 1509 def test_sigchld_child_reaped_elsewhere(self, m_waitpid): 1510 # register a child 1511 callback = mock.Mock() 1512 1513 with self.watcher: 1514 self.running = True 1515 self.watcher.add_child_handler(58, callback) 1516 1517 self.assertFalse(callback.called) 1518 1519 # child terminates 1520 self.running = False 1521 self.add_zombie(58, EXITCODE(4)) 1522 1523 # waitpid is called elsewhere 1524 os.waitpid(58, os.WNOHANG) 1525 1526 m_waitpid.reset_mock() 1527 1528 # sigchld 1529 with self.ignore_warnings: 1530 self.watcher._sig_chld() 1531 1532 if isinstance(self.watcher, asyncio.FastChildWatcher): 1533 # here the FastChildWatcher enters a deadlock 1534 # (there is no way to prevent it) 1535 self.assertFalse(callback.called) 1536 else: 1537 callback.assert_called_once_with(58, 255) 1538 1539 @waitpid_mocks 1540 def test_sigchld_unknown_pid_during_registration(self, m_waitpid): 1541 # register two children 1542 callback1 = mock.Mock() 1543 callback2 = mock.Mock() 1544 1545 with self.ignore_warnings, self.watcher: 1546 self.running = True 1547 # child 1 terminates 1548 self.add_zombie(591, EXITCODE(7)) 1549 # an unknown child terminates 1550 self.add_zombie(593, EXITCODE(17)) 1551 1552 self.watcher._sig_chld() 1553 1554 self.watcher.add_child_handler(591, callback1) 1555 self.watcher.add_child_handler(592, callback2) 1556 1557 callback1.assert_called_once_with(591, 7) 1558 self.assertFalse(callback2.called) 1559 1560 @waitpid_mocks 1561 def test_set_loop(self, m_waitpid): 1562 # register a child 1563 callback = mock.Mock() 1564 1565 with self.watcher: 1566 self.running = True 1567 self.watcher.add_child_handler(60, callback) 1568 1569 # attach a new loop 1570 old_loop = self.loop 1571 self.loop = self.new_test_loop() 1572 patch = mock.patch.object 1573 1574 with patch(old_loop, "remove_signal_handler") as m_old_remove, \ 1575 patch(self.loop, "add_signal_handler") as m_new_add: 1576 1577 self.watcher.attach_loop(self.loop) 1578 1579 m_old_remove.assert_called_once_with( 1580 signal.SIGCHLD) 1581 m_new_add.assert_called_once_with( 1582 signal.SIGCHLD, self.watcher._sig_chld) 1583 1584 # child terminates 1585 self.running = False 1586 self.add_zombie(60, EXITCODE(9)) 1587 self.watcher._sig_chld() 1588 1589 callback.assert_called_once_with(60, 9) 1590 1591 @waitpid_mocks 1592 def test_set_loop_race_condition(self, m_waitpid): 1593 # register 3 children 1594 callback1 = mock.Mock() 1595 callback2 = mock.Mock() 1596 callback3 = mock.Mock() 1597 1598 with self.watcher: 1599 self.running = True 1600 self.watcher.add_child_handler(61, callback1) 1601 self.watcher.add_child_handler(62, callback2) 1602 self.watcher.add_child_handler(622, callback3) 1603 1604 # detach the loop 1605 old_loop = self.loop 1606 self.loop = None 1607 1608 with mock.patch.object( 1609 old_loop, "remove_signal_handler") as m_remove_signal_handler: 1610 1611 with self.assertWarnsRegex( 1612 RuntimeWarning, 'A loop is being detached'): 1613 self.watcher.attach_loop(None) 1614 1615 m_remove_signal_handler.assert_called_once_with( 1616 signal.SIGCHLD) 1617 1618 # child 1 & 2 terminate 1619 self.add_zombie(61, EXITCODE(11)) 1620 self.add_zombie(62, SIGNAL(5)) 1621 1622 # SIGCHLD was not caught 1623 self.assertFalse(callback1.called) 1624 self.assertFalse(callback2.called) 1625 self.assertFalse(callback3.called) 1626 1627 # attach a new loop 1628 self.loop = self.new_test_loop() 1629 1630 with mock.patch.object( 1631 self.loop, "add_signal_handler") as m_add_signal_handler: 1632 1633 self.watcher.attach_loop(self.loop) 1634 1635 m_add_signal_handler.assert_called_once_with( 1636 signal.SIGCHLD, self.watcher._sig_chld) 1637 callback1.assert_called_once_with(61, 11) # race condition! 1638 callback2.assert_called_once_with(62, -5) # race condition! 1639 self.assertFalse(callback3.called) 1640 1641 callback1.reset_mock() 1642 callback2.reset_mock() 1643 1644 # child 3 terminates 1645 self.running = False 1646 self.add_zombie(622, EXITCODE(19)) 1647 self.watcher._sig_chld() 1648 1649 self.assertFalse(callback1.called) 1650 self.assertFalse(callback2.called) 1651 callback3.assert_called_once_with(622, 19) 1652 1653 @waitpid_mocks 1654 def test_close(self, m_waitpid): 1655 # register two children 1656 callback1 = mock.Mock() 1657 1658 with self.watcher: 1659 self.running = True 1660 # child 1 terminates 1661 self.add_zombie(63, EXITCODE(9)) 1662 # other child terminates 1663 self.add_zombie(65, EXITCODE(18)) 1664 self.watcher._sig_chld() 1665 1666 self.watcher.add_child_handler(63, callback1) 1667 self.watcher.add_child_handler(64, callback1) 1668 1669 self.assertEqual(len(self.watcher._callbacks), 1) 1670 if isinstance(self.watcher, asyncio.FastChildWatcher): 1671 self.assertEqual(len(self.watcher._zombies), 1) 1672 1673 with mock.patch.object( 1674 self.loop, 1675 "remove_signal_handler") as m_remove_signal_handler: 1676 1677 self.watcher.close() 1678 1679 m_remove_signal_handler.assert_called_once_with( 1680 signal.SIGCHLD) 1681 self.assertFalse(self.watcher._callbacks) 1682 if isinstance(self.watcher, asyncio.FastChildWatcher): 1683 self.assertFalse(self.watcher._zombies) 1684 1685 1686 class SafeChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase): 1687 def create_watcher(self): 1688 return asyncio.SafeChildWatcher() 1689 1690 1691 class FastChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase): 1692 def create_watcher(self): 1693 return asyncio.FastChildWatcher() 1694 1695 1696 class PolicyTests(unittest.TestCase): 1697 1698 def create_policy(self): 1699 return asyncio.DefaultEventLoopPolicy() 1700 1701 def test_get_default_child_watcher(self): 1702 policy = self.create_policy() 1703 self.assertIsNone(policy._watcher) 1704 1705 watcher = policy.get_child_watcher() 1706 self.assertIsInstance(watcher, asyncio.ThreadedChildWatcher) 1707 1708 self.assertIs(policy._watcher, watcher) 1709 1710 self.assertIs(watcher, policy.get_child_watcher()) 1711 1712 def test_get_child_watcher_after_set(self): 1713 policy = self.create_policy() 1714 watcher = asyncio.FastChildWatcher() 1715 1716 policy.set_child_watcher(watcher) 1717 self.assertIs(policy._watcher, watcher) 1718 self.assertIs(watcher, policy.get_child_watcher()) 1719 1720 def test_get_child_watcher_thread(self): 1721 1722 def f(): 1723 policy.set_event_loop(policy.new_event_loop()) 1724 1725 self.assertIsInstance(policy.get_event_loop(), 1726 asyncio.AbstractEventLoop) 1727 watcher = policy.get_child_watcher() 1728 1729 self.assertIsInstance(watcher, asyncio.SafeChildWatcher) 1730 self.assertIsNone(watcher._loop) 1731 1732 policy.get_event_loop().close() 1733 1734 policy = self.create_policy() 1735 policy.set_child_watcher(asyncio.SafeChildWatcher()) 1736 1737 th = threading.Thread(target=f) 1738 th.start() 1739 th.join() 1740 1741 def test_child_watcher_replace_mainloop_existing(self): 1742 policy = self.create_policy() 1743 loop = policy.new_event_loop() 1744 policy.set_event_loop(loop) 1745 1746 # Explicitly setup SafeChildWatcher, 1747 # default ThreadedChildWatcher has no _loop property 1748 watcher = asyncio.SafeChildWatcher() 1749 policy.set_child_watcher(watcher) 1750 watcher.attach_loop(loop) 1751 1752 self.assertIs(watcher._loop, loop) 1753 1754 new_loop = policy.new_event_loop() 1755 policy.set_event_loop(new_loop) 1756 1757 self.assertIs(watcher._loop, new_loop) 1758 1759 policy.set_event_loop(None) 1760 1761 self.assertIs(watcher._loop, None) 1762 1763 loop.close() 1764 new_loop.close() 1765 1766 1767 class TestFunctional(unittest.TestCase): 1768 1769 def setUp(self): 1770 self.loop = asyncio.new_event_loop() 1771 asyncio.set_event_loop(self.loop) 1772 1773 def tearDown(self): 1774 self.loop.close() 1775 asyncio.set_event_loop(None) 1776 1777 def test_add_reader_invalid_argument(self): 1778 def assert_raises(): 1779 return self.assertRaisesRegex(ValueError, r'Invalid file object') 1780 1781 cb = lambda: None 1782 1783 with assert_raises(): 1784 self.loop.add_reader(object(), cb) 1785 with assert_raises(): 1786 self.loop.add_writer(object(), cb) 1787 1788 with assert_raises(): 1789 self.loop.remove_reader(object()) 1790 with assert_raises(): 1791 self.loop.remove_writer(object()) 1792 1793 def test_add_reader_or_writer_transport_fd(self): 1794 def assert_raises(): 1795 return self.assertRaisesRegex( 1796 RuntimeError, 1797 r'File descriptor .* is used by transport') 1798 1799 async def runner(): 1800 tr, pr = await self.loop.create_connection( 1801 lambda: asyncio.Protocol(), sock=rsock) 1802 1803 try: 1804 cb = lambda: None 1805 1806 with assert_raises(): 1807 self.loop.add_reader(rsock, cb) 1808 with assert_raises(): 1809 self.loop.add_reader(rsock.fileno(), cb) 1810 1811 with assert_raises(): 1812 self.loop.remove_reader(rsock) 1813 with assert_raises(): 1814 self.loop.remove_reader(rsock.fileno()) 1815 1816 with assert_raises(): 1817 self.loop.add_writer(rsock, cb) 1818 with assert_raises(): 1819 self.loop.add_writer(rsock.fileno(), cb) 1820 1821 with assert_raises(): 1822 self.loop.remove_writer(rsock) 1823 with assert_raises(): 1824 self.loop.remove_writer(rsock.fileno()) 1825 1826 finally: 1827 tr.close() 1828 1829 rsock, wsock = socket.socketpair() 1830 try: 1831 self.loop.run_until_complete(runner()) 1832 finally: 1833 rsock.close() 1834 wsock.close() 1835 1836 1837 if __name__ == '__main__': 1838 unittest.main() 1839