1"""Tests for base_events.py""" 2 3import concurrent.futures 4import errno 5import math 6import socket 7import sys 8import threading 9import time 10import unittest 11from unittest import mock 12 13import asyncio 14from asyncio import base_events 15from asyncio import constants 16from test.test_asyncio import utils as test_utils 17from test import support 18from test.support.script_helper import assert_python_ok 19from test.support import os_helper 20from test.support import socket_helper 21import warnings 22 23MOCK_ANY = mock.ANY 24 25 26def tearDownModule(): 27 asyncio.set_event_loop_policy(None) 28 29 30def mock_socket_module(): 31 m_socket = mock.MagicMock(spec=socket) 32 for name in ( 33 'AF_INET', 'AF_INET6', 'AF_UNSPEC', 'IPPROTO_TCP', 'IPPROTO_UDP', 34 'SOCK_STREAM', 'SOCK_DGRAM', 'SOL_SOCKET', 'SO_REUSEADDR', 'inet_pton' 35 ): 36 if hasattr(socket, name): 37 setattr(m_socket, name, getattr(socket, name)) 38 else: 39 delattr(m_socket, name) 40 41 m_socket.socket = mock.MagicMock() 42 m_socket.socket.return_value = test_utils.mock_nonblocking_socket() 43 44 return m_socket 45 46 47def patch_socket(f): 48 return mock.patch('asyncio.base_events.socket', 49 new_callable=mock_socket_module)(f) 50 51 52class BaseEventTests(test_utils.TestCase): 53 54 def test_ipaddr_info(self): 55 UNSPEC = socket.AF_UNSPEC 56 INET = socket.AF_INET 57 INET6 = socket.AF_INET6 58 STREAM = socket.SOCK_STREAM 59 DGRAM = socket.SOCK_DGRAM 60 TCP = socket.IPPROTO_TCP 61 UDP = socket.IPPROTO_UDP 62 63 self.assertEqual( 64 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 65 base_events._ipaddr_info('1.2.3.4', 1, INET, STREAM, TCP)) 66 67 self.assertEqual( 68 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 69 base_events._ipaddr_info(b'1.2.3.4', 1, INET, STREAM, TCP)) 70 71 self.assertEqual( 72 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 73 base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, STREAM, TCP)) 74 75 self.assertEqual( 76 (INET, DGRAM, UDP, '', ('1.2.3.4', 1)), 77 base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, DGRAM, UDP)) 78 79 # Socket type STREAM implies TCP protocol. 80 self.assertEqual( 81 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 82 base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, STREAM, 0)) 83 84 # Socket type DGRAM implies UDP protocol. 85 self.assertEqual( 86 (INET, DGRAM, UDP, '', ('1.2.3.4', 1)), 87 base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, DGRAM, 0)) 88 89 # No socket type. 90 self.assertIsNone( 91 base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, 0, 0)) 92 93 if socket_helper.IPV6_ENABLED: 94 # IPv4 address with family IPv6. 95 self.assertIsNone( 96 base_events._ipaddr_info('1.2.3.4', 1, INET6, STREAM, TCP)) 97 98 self.assertEqual( 99 (INET6, STREAM, TCP, '', ('::3', 1, 0, 0)), 100 base_events._ipaddr_info('::3', 1, INET6, STREAM, TCP)) 101 102 self.assertEqual( 103 (INET6, STREAM, TCP, '', ('::3', 1, 0, 0)), 104 base_events._ipaddr_info('::3', 1, UNSPEC, STREAM, TCP)) 105 106 # IPv6 address with family IPv4. 107 self.assertIsNone( 108 base_events._ipaddr_info('::3', 1, INET, STREAM, TCP)) 109 110 # IPv6 address with zone index. 111 self.assertIsNone( 112 base_events._ipaddr_info('::3%lo0', 1, INET6, STREAM, TCP)) 113 114 def test_port_parameter_types(self): 115 # Test obscure kinds of arguments for "port". 116 INET = socket.AF_INET 117 STREAM = socket.SOCK_STREAM 118 TCP = socket.IPPROTO_TCP 119 120 self.assertEqual( 121 (INET, STREAM, TCP, '', ('1.2.3.4', 0)), 122 base_events._ipaddr_info('1.2.3.4', None, INET, STREAM, TCP)) 123 124 self.assertEqual( 125 (INET, STREAM, TCP, '', ('1.2.3.4', 0)), 126 base_events._ipaddr_info('1.2.3.4', b'', INET, STREAM, TCP)) 127 128 self.assertEqual( 129 (INET, STREAM, TCP, '', ('1.2.3.4', 0)), 130 base_events._ipaddr_info('1.2.3.4', '', INET, STREAM, TCP)) 131 132 self.assertEqual( 133 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 134 base_events._ipaddr_info('1.2.3.4', '1', INET, STREAM, TCP)) 135 136 self.assertEqual( 137 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 138 base_events._ipaddr_info('1.2.3.4', b'1', INET, STREAM, TCP)) 139 140 @patch_socket 141 def test_ipaddr_info_no_inet_pton(self, m_socket): 142 del m_socket.inet_pton 143 self.assertIsNone(base_events._ipaddr_info('1.2.3.4', 1, 144 socket.AF_INET, 145 socket.SOCK_STREAM, 146 socket.IPPROTO_TCP)) 147 148 149class BaseEventLoopTests(test_utils.TestCase): 150 151 def setUp(self): 152 super().setUp() 153 self.loop = base_events.BaseEventLoop() 154 self.loop._selector = mock.Mock() 155 self.loop._selector.select.return_value = () 156 self.set_event_loop(self.loop) 157 158 def test_not_implemented(self): 159 m = mock.Mock() 160 self.assertRaises( 161 NotImplementedError, 162 self.loop._make_socket_transport, m, m) 163 self.assertRaises( 164 NotImplementedError, 165 self.loop._make_ssl_transport, m, m, m, m) 166 self.assertRaises( 167 NotImplementedError, 168 self.loop._make_datagram_transport, m, m) 169 self.assertRaises( 170 NotImplementedError, self.loop._process_events, []) 171 self.assertRaises( 172 NotImplementedError, self.loop._write_to_self) 173 self.assertRaises( 174 NotImplementedError, 175 self.loop._make_read_pipe_transport, m, m) 176 self.assertRaises( 177 NotImplementedError, 178 self.loop._make_write_pipe_transport, m, m) 179 gen = self.loop._make_subprocess_transport(m, m, m, m, m, m, m) 180 with self.assertRaises(NotImplementedError): 181 gen.send(None) 182 183 def test_close(self): 184 self.assertFalse(self.loop.is_closed()) 185 self.loop.close() 186 self.assertTrue(self.loop.is_closed()) 187 188 # it should be possible to call close() more than once 189 self.loop.close() 190 self.loop.close() 191 192 # operation blocked when the loop is closed 193 f = self.loop.create_future() 194 self.assertRaises(RuntimeError, self.loop.run_forever) 195 self.assertRaises(RuntimeError, self.loop.run_until_complete, f) 196 197 def test__add_callback_handle(self): 198 h = asyncio.Handle(lambda: False, (), self.loop, None) 199 200 self.loop._add_callback(h) 201 self.assertFalse(self.loop._scheduled) 202 self.assertIn(h, self.loop._ready) 203 204 def test__add_callback_cancelled_handle(self): 205 h = asyncio.Handle(lambda: False, (), self.loop, None) 206 h.cancel() 207 208 self.loop._add_callback(h) 209 self.assertFalse(self.loop._scheduled) 210 self.assertFalse(self.loop._ready) 211 212 def test_set_default_executor(self): 213 class DummyExecutor(concurrent.futures.ThreadPoolExecutor): 214 def submit(self, fn, *args, **kwargs): 215 raise NotImplementedError( 216 'cannot submit into a dummy executor') 217 218 self.loop._process_events = mock.Mock() 219 self.loop._write_to_self = mock.Mock() 220 221 executor = DummyExecutor() 222 self.loop.set_default_executor(executor) 223 self.assertIs(executor, self.loop._default_executor) 224 225 def test_set_default_executor_error(self): 226 executor = mock.Mock() 227 228 msg = 'executor must be ThreadPoolExecutor instance' 229 with self.assertRaisesRegex(TypeError, msg): 230 self.loop.set_default_executor(executor) 231 232 self.assertIsNone(self.loop._default_executor) 233 234 def test_call_soon(self): 235 def cb(): 236 pass 237 238 h = self.loop.call_soon(cb) 239 self.assertEqual(h._callback, cb) 240 self.assertIsInstance(h, asyncio.Handle) 241 self.assertIn(h, self.loop._ready) 242 243 def test_call_soon_non_callable(self): 244 self.loop.set_debug(True) 245 with self.assertRaisesRegex(TypeError, 'a callable object'): 246 self.loop.call_soon(1) 247 248 def test_call_later(self): 249 def cb(): 250 pass 251 252 h = self.loop.call_later(10.0, cb) 253 self.assertIsInstance(h, asyncio.TimerHandle) 254 self.assertIn(h, self.loop._scheduled) 255 self.assertNotIn(h, self.loop._ready) 256 with self.assertRaises(TypeError, msg="delay must not be None"): 257 self.loop.call_later(None, cb) 258 259 def test_call_later_negative_delays(self): 260 calls = [] 261 262 def cb(arg): 263 calls.append(arg) 264 265 self.loop._process_events = mock.Mock() 266 self.loop.call_later(-1, cb, 'a') 267 self.loop.call_later(-2, cb, 'b') 268 test_utils.run_briefly(self.loop) 269 self.assertEqual(calls, ['b', 'a']) 270 271 def test_time_and_call_at(self): 272 def cb(): 273 self.loop.stop() 274 275 self.loop._process_events = mock.Mock() 276 delay = 0.1 277 278 when = self.loop.time() + delay 279 self.loop.call_at(when, cb) 280 t0 = self.loop.time() 281 self.loop.run_forever() 282 dt = self.loop.time() - t0 283 284 # 50 ms: maximum granularity of the event loop 285 self.assertGreaterEqual(dt, delay - 0.050, dt) 286 # tolerate a difference of +800 ms because some Python buildbots 287 # are really slow 288 self.assertLessEqual(dt, 0.9, dt) 289 with self.assertRaises(TypeError, msg="when cannot be None"): 290 self.loop.call_at(None, cb) 291 292 def check_thread(self, loop, debug): 293 def cb(): 294 pass 295 296 loop.set_debug(debug) 297 if debug: 298 msg = ("Non-thread-safe operation invoked on an event loop other " 299 "than the current one") 300 with self.assertRaisesRegex(RuntimeError, msg): 301 loop.call_soon(cb) 302 with self.assertRaisesRegex(RuntimeError, msg): 303 loop.call_later(60, cb) 304 with self.assertRaisesRegex(RuntimeError, msg): 305 loop.call_at(loop.time() + 60, cb) 306 else: 307 loop.call_soon(cb) 308 loop.call_later(60, cb) 309 loop.call_at(loop.time() + 60, cb) 310 311 def test_check_thread(self): 312 def check_in_thread(loop, event, debug, create_loop, fut): 313 # wait until the event loop is running 314 event.wait() 315 316 try: 317 if create_loop: 318 loop2 = base_events.BaseEventLoop() 319 try: 320 asyncio.set_event_loop(loop2) 321 self.check_thread(loop, debug) 322 finally: 323 asyncio.set_event_loop(None) 324 loop2.close() 325 else: 326 self.check_thread(loop, debug) 327 except Exception as exc: 328 loop.call_soon_threadsafe(fut.set_exception, exc) 329 else: 330 loop.call_soon_threadsafe(fut.set_result, None) 331 332 def test_thread(loop, debug, create_loop=False): 333 event = threading.Event() 334 fut = loop.create_future() 335 loop.call_soon(event.set) 336 args = (loop, event, debug, create_loop, fut) 337 thread = threading.Thread(target=check_in_thread, args=args) 338 thread.start() 339 loop.run_until_complete(fut) 340 thread.join() 341 342 self.loop._process_events = mock.Mock() 343 self.loop._write_to_self = mock.Mock() 344 345 # raise RuntimeError if the thread has no event loop 346 test_thread(self.loop, True) 347 348 # check disabled if debug mode is disabled 349 test_thread(self.loop, False) 350 351 # raise RuntimeError if the event loop of the thread is not the called 352 # event loop 353 test_thread(self.loop, True, create_loop=True) 354 355 # check disabled if debug mode is disabled 356 test_thread(self.loop, False, create_loop=True) 357 358 def test__run_once(self): 359 h1 = asyncio.TimerHandle(time.monotonic() + 5.0, lambda: True, (), 360 self.loop, None) 361 h2 = asyncio.TimerHandle(time.monotonic() + 10.0, lambda: True, (), 362 self.loop, None) 363 364 h1.cancel() 365 366 self.loop._process_events = mock.Mock() 367 self.loop._scheduled.append(h1) 368 self.loop._scheduled.append(h2) 369 self.loop._run_once() 370 371 t = self.loop._selector.select.call_args[0][0] 372 self.assertTrue(9.5 < t < 10.5, t) 373 self.assertEqual([h2], self.loop._scheduled) 374 self.assertTrue(self.loop._process_events.called) 375 376 def test_set_debug(self): 377 self.loop.set_debug(True) 378 self.assertTrue(self.loop.get_debug()) 379 self.loop.set_debug(False) 380 self.assertFalse(self.loop.get_debug()) 381 382 def test__run_once_schedule_handle(self): 383 handle = None 384 processed = False 385 386 def cb(loop): 387 nonlocal processed, handle 388 processed = True 389 handle = loop.call_soon(lambda: True) 390 391 h = asyncio.TimerHandle(time.monotonic() - 1, cb, (self.loop,), 392 self.loop, None) 393 394 self.loop._process_events = mock.Mock() 395 self.loop._scheduled.append(h) 396 self.loop._run_once() 397 398 self.assertTrue(processed) 399 self.assertEqual([handle], list(self.loop._ready)) 400 401 def test__run_once_cancelled_event_cleanup(self): 402 self.loop._process_events = mock.Mock() 403 404 self.assertTrue( 405 0 < base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION < 1.0) 406 407 def cb(): 408 pass 409 410 # Set up one "blocking" event that will not be cancelled to 411 # ensure later cancelled events do not make it to the head 412 # of the queue and get cleaned. 413 not_cancelled_count = 1 414 self.loop.call_later(3000, cb) 415 416 # Add less than threshold (base_events._MIN_SCHEDULED_TIMER_HANDLES) 417 # cancelled handles, ensure they aren't removed 418 419 cancelled_count = 2 420 for x in range(2): 421 h = self.loop.call_later(3600, cb) 422 h.cancel() 423 424 # Add some cancelled events that will be at head and removed 425 cancelled_count += 2 426 for x in range(2): 427 h = self.loop.call_later(100, cb) 428 h.cancel() 429 430 # This test is invalid if _MIN_SCHEDULED_TIMER_HANDLES is too low 431 self.assertLessEqual(cancelled_count + not_cancelled_count, 432 base_events._MIN_SCHEDULED_TIMER_HANDLES) 433 434 self.assertEqual(self.loop._timer_cancelled_count, cancelled_count) 435 436 self.loop._run_once() 437 438 cancelled_count -= 2 439 440 self.assertEqual(self.loop._timer_cancelled_count, cancelled_count) 441 442 self.assertEqual(len(self.loop._scheduled), 443 cancelled_count + not_cancelled_count) 444 445 # Need enough events to pass _MIN_CANCELLED_TIMER_HANDLES_FRACTION 446 # so that deletion of cancelled events will occur on next _run_once 447 add_cancel_count = int(math.ceil( 448 base_events._MIN_SCHEDULED_TIMER_HANDLES * 449 base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION)) + 1 450 451 add_not_cancel_count = max(base_events._MIN_SCHEDULED_TIMER_HANDLES - 452 add_cancel_count, 0) 453 454 # Add some events that will not be cancelled 455 not_cancelled_count += add_not_cancel_count 456 for x in range(add_not_cancel_count): 457 self.loop.call_later(3600, cb) 458 459 # Add enough cancelled events 460 cancelled_count += add_cancel_count 461 for x in range(add_cancel_count): 462 h = self.loop.call_later(3600, cb) 463 h.cancel() 464 465 # Ensure all handles are still scheduled 466 self.assertEqual(len(self.loop._scheduled), 467 cancelled_count + not_cancelled_count) 468 469 self.loop._run_once() 470 471 # Ensure cancelled events were removed 472 self.assertEqual(len(self.loop._scheduled), not_cancelled_count) 473 474 # Ensure only uncancelled events remain scheduled 475 self.assertTrue(all([not x._cancelled for x in self.loop._scheduled])) 476 477 def test_run_until_complete_type_error(self): 478 self.assertRaises(TypeError, 479 self.loop.run_until_complete, 'blah') 480 481 def test_run_until_complete_loop(self): 482 task = self.loop.create_future() 483 other_loop = self.new_test_loop() 484 self.addCleanup(other_loop.close) 485 self.assertRaises(ValueError, 486 other_loop.run_until_complete, task) 487 488 def test_run_until_complete_loop_orphan_future_close_loop(self): 489 class ShowStopper(SystemExit): 490 pass 491 492 async def foo(delay): 493 await asyncio.sleep(delay) 494 495 def throw(): 496 raise ShowStopper 497 498 self.loop._process_events = mock.Mock() 499 self.loop.call_soon(throw) 500 with self.assertRaises(ShowStopper): 501 self.loop.run_until_complete(foo(0.1)) 502 503 # This call fails if run_until_complete does not clean up 504 # done-callback for the previous future. 505 self.loop.run_until_complete(foo(0.2)) 506 507 def test_subprocess_exec_invalid_args(self): 508 args = [sys.executable, '-c', 'pass'] 509 510 # missing program parameter (empty args) 511 self.assertRaises(TypeError, 512 self.loop.run_until_complete, self.loop.subprocess_exec, 513 asyncio.SubprocessProtocol) 514 515 # expected multiple arguments, not a list 516 self.assertRaises(TypeError, 517 self.loop.run_until_complete, self.loop.subprocess_exec, 518 asyncio.SubprocessProtocol, args) 519 520 # program arguments must be strings, not int 521 self.assertRaises(TypeError, 522 self.loop.run_until_complete, self.loop.subprocess_exec, 523 asyncio.SubprocessProtocol, sys.executable, 123) 524 525 # universal_newlines, shell, bufsize must not be set 526 self.assertRaises(TypeError, 527 self.loop.run_until_complete, self.loop.subprocess_exec, 528 asyncio.SubprocessProtocol, *args, universal_newlines=True) 529 self.assertRaises(TypeError, 530 self.loop.run_until_complete, self.loop.subprocess_exec, 531 asyncio.SubprocessProtocol, *args, shell=True) 532 self.assertRaises(TypeError, 533 self.loop.run_until_complete, self.loop.subprocess_exec, 534 asyncio.SubprocessProtocol, *args, bufsize=4096) 535 536 def test_subprocess_shell_invalid_args(self): 537 # expected a string, not an int or a list 538 self.assertRaises(TypeError, 539 self.loop.run_until_complete, self.loop.subprocess_shell, 540 asyncio.SubprocessProtocol, 123) 541 self.assertRaises(TypeError, 542 self.loop.run_until_complete, self.loop.subprocess_shell, 543 asyncio.SubprocessProtocol, [sys.executable, '-c', 'pass']) 544 545 # universal_newlines, shell, bufsize must not be set 546 self.assertRaises(TypeError, 547 self.loop.run_until_complete, self.loop.subprocess_shell, 548 asyncio.SubprocessProtocol, 'exit 0', universal_newlines=True) 549 self.assertRaises(TypeError, 550 self.loop.run_until_complete, self.loop.subprocess_shell, 551 asyncio.SubprocessProtocol, 'exit 0', shell=True) 552 self.assertRaises(TypeError, 553 self.loop.run_until_complete, self.loop.subprocess_shell, 554 asyncio.SubprocessProtocol, 'exit 0', bufsize=4096) 555 556 def test_default_exc_handler_callback(self): 557 self.loop._process_events = mock.Mock() 558 559 def zero_error(fut): 560 fut.set_result(True) 561 1/0 562 563 # Test call_soon (events.Handle) 564 with mock.patch('asyncio.base_events.logger') as log: 565 fut = self.loop.create_future() 566 self.loop.call_soon(zero_error, fut) 567 fut.add_done_callback(lambda fut: self.loop.stop()) 568 self.loop.run_forever() 569 log.error.assert_called_with( 570 test_utils.MockPattern('Exception in callback.*zero'), 571 exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) 572 573 # Test call_later (events.TimerHandle) 574 with mock.patch('asyncio.base_events.logger') as log: 575 fut = self.loop.create_future() 576 self.loop.call_later(0.01, zero_error, fut) 577 fut.add_done_callback(lambda fut: self.loop.stop()) 578 self.loop.run_forever() 579 log.error.assert_called_with( 580 test_utils.MockPattern('Exception in callback.*zero'), 581 exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) 582 583 def test_default_exc_handler_coro(self): 584 self.loop._process_events = mock.Mock() 585 586 async def zero_error_coro(): 587 await asyncio.sleep(0.01) 588 1/0 589 590 # Test Future.__del__ 591 with mock.patch('asyncio.base_events.logger') as log: 592 fut = asyncio.ensure_future(zero_error_coro(), loop=self.loop) 593 fut.add_done_callback(lambda *args: self.loop.stop()) 594 self.loop.run_forever() 595 fut = None # Trigger Future.__del__ or futures._TracebackLogger 596 support.gc_collect() 597 # Future.__del__ in logs error with an actual exception context 598 log.error.assert_called_with( 599 test_utils.MockPattern('.*exception was never retrieved'), 600 exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) 601 602 def test_set_exc_handler_invalid(self): 603 with self.assertRaisesRegex(TypeError, 'A callable object or None'): 604 self.loop.set_exception_handler('spam') 605 606 def test_set_exc_handler_custom(self): 607 def zero_error(): 608 1/0 609 610 def run_loop(): 611 handle = self.loop.call_soon(zero_error) 612 self.loop._run_once() 613 return handle 614 615 self.loop.set_debug(True) 616 self.loop._process_events = mock.Mock() 617 618 self.assertIsNone(self.loop.get_exception_handler()) 619 mock_handler = mock.Mock() 620 self.loop.set_exception_handler(mock_handler) 621 self.assertIs(self.loop.get_exception_handler(), mock_handler) 622 handle = run_loop() 623 mock_handler.assert_called_with(self.loop, { 624 'exception': MOCK_ANY, 625 'message': test_utils.MockPattern( 626 'Exception in callback.*zero_error'), 627 'handle': handle, 628 'source_traceback': handle._source_traceback, 629 }) 630 mock_handler.reset_mock() 631 632 self.loop.set_exception_handler(None) 633 with mock.patch('asyncio.base_events.logger') as log: 634 run_loop() 635 log.error.assert_called_with( 636 test_utils.MockPattern( 637 'Exception in callback.*zero'), 638 exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) 639 640 self.assertFalse(mock_handler.called) 641 642 def test_set_exc_handler_broken(self): 643 def run_loop(): 644 def zero_error(): 645 1/0 646 self.loop.call_soon(zero_error) 647 self.loop._run_once() 648 649 def handler(loop, context): 650 raise AttributeError('spam') 651 652 self.loop._process_events = mock.Mock() 653 654 self.loop.set_exception_handler(handler) 655 656 with mock.patch('asyncio.base_events.logger') as log: 657 run_loop() 658 log.error.assert_called_with( 659 test_utils.MockPattern( 660 'Unhandled error in exception handler'), 661 exc_info=(AttributeError, MOCK_ANY, MOCK_ANY)) 662 663 def test_default_exc_handler_broken(self): 664 _context = None 665 666 class Loop(base_events.BaseEventLoop): 667 668 _selector = mock.Mock() 669 _process_events = mock.Mock() 670 671 def default_exception_handler(self, context): 672 nonlocal _context 673 _context = context 674 # Simulates custom buggy "default_exception_handler" 675 raise ValueError('spam') 676 677 loop = Loop() 678 self.addCleanup(loop.close) 679 asyncio.set_event_loop(loop) 680 681 def run_loop(): 682 def zero_error(): 683 1/0 684 loop.call_soon(zero_error) 685 loop._run_once() 686 687 with mock.patch('asyncio.base_events.logger') as log: 688 run_loop() 689 log.error.assert_called_with( 690 'Exception in default exception handler', 691 exc_info=True) 692 693 def custom_handler(loop, context): 694 raise ValueError('ham') 695 696 _context = None 697 loop.set_exception_handler(custom_handler) 698 with mock.patch('asyncio.base_events.logger') as log: 699 run_loop() 700 log.error.assert_called_with( 701 test_utils.MockPattern('Exception in default exception.*' 702 'while handling.*in custom'), 703 exc_info=True) 704 705 # Check that original context was passed to default 706 # exception handler. 707 self.assertIn('context', _context) 708 self.assertIs(type(_context['context']['exception']), 709 ZeroDivisionError) 710 711 def test_set_task_factory_invalid(self): 712 with self.assertRaisesRegex( 713 TypeError, 'task factory must be a callable or None'): 714 715 self.loop.set_task_factory(1) 716 717 self.assertIsNone(self.loop.get_task_factory()) 718 719 def test_set_task_factory(self): 720 self.loop._process_events = mock.Mock() 721 722 class MyTask(asyncio.Task): 723 pass 724 725 async def coro(): 726 pass 727 728 factory = lambda loop, coro: MyTask(coro, loop=loop) 729 730 self.assertIsNone(self.loop.get_task_factory()) 731 self.loop.set_task_factory(factory) 732 self.assertIs(self.loop.get_task_factory(), factory) 733 734 task = self.loop.create_task(coro()) 735 self.assertTrue(isinstance(task, MyTask)) 736 self.loop.run_until_complete(task) 737 738 self.loop.set_task_factory(None) 739 self.assertIsNone(self.loop.get_task_factory()) 740 741 task = self.loop.create_task(coro()) 742 self.assertTrue(isinstance(task, asyncio.Task)) 743 self.assertFalse(isinstance(task, MyTask)) 744 self.loop.run_until_complete(task) 745 746 def test_env_var_debug(self): 747 code = '\n'.join(( 748 'import asyncio', 749 'loop = asyncio.new_event_loop()', 750 'print(loop.get_debug())')) 751 752 # Test with -E to not fail if the unit test was run with 753 # PYTHONASYNCIODEBUG set to a non-empty string 754 sts, stdout, stderr = assert_python_ok('-E', '-c', code) 755 self.assertEqual(stdout.rstrip(), b'False') 756 757 sts, stdout, stderr = assert_python_ok('-c', code, 758 PYTHONASYNCIODEBUG='', 759 PYTHONDEVMODE='') 760 self.assertEqual(stdout.rstrip(), b'False') 761 762 sts, stdout, stderr = assert_python_ok('-c', code, 763 PYTHONASYNCIODEBUG='1', 764 PYTHONDEVMODE='') 765 self.assertEqual(stdout.rstrip(), b'True') 766 767 sts, stdout, stderr = assert_python_ok('-E', '-c', code, 768 PYTHONASYNCIODEBUG='1') 769 self.assertEqual(stdout.rstrip(), b'False') 770 771 # -X dev 772 sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev', 773 '-c', code) 774 self.assertEqual(stdout.rstrip(), b'True') 775 776 def test_create_task(self): 777 class MyTask(asyncio.Task): 778 pass 779 780 async def test(): 781 pass 782 783 class EventLoop(base_events.BaseEventLoop): 784 def create_task(self, coro): 785 return MyTask(coro, loop=loop) 786 787 loop = EventLoop() 788 self.set_event_loop(loop) 789 790 coro = test() 791 task = asyncio.ensure_future(coro, loop=loop) 792 self.assertIsInstance(task, MyTask) 793 794 # make warnings quiet 795 task._log_destroy_pending = False 796 coro.close() 797 798 def test_create_task_error_closes_coro(self): 799 async def test(): 800 pass 801 loop = asyncio.new_event_loop() 802 loop.close() 803 with warnings.catch_warnings(record=True) as w: 804 with self.assertRaises(RuntimeError): 805 asyncio.ensure_future(test(), loop=loop) 806 self.assertEqual(len(w), 0) 807 808 809 def test_create_named_task_with_default_factory(self): 810 async def test(): 811 pass 812 813 loop = asyncio.new_event_loop() 814 task = loop.create_task(test(), name='test_task') 815 try: 816 self.assertEqual(task.get_name(), 'test_task') 817 finally: 818 loop.run_until_complete(task) 819 loop.close() 820 821 def test_create_named_task_with_custom_factory(self): 822 def task_factory(loop, coro): 823 return asyncio.Task(coro, loop=loop) 824 825 async def test(): 826 pass 827 828 loop = asyncio.new_event_loop() 829 loop.set_task_factory(task_factory) 830 task = loop.create_task(test(), name='test_task') 831 try: 832 self.assertEqual(task.get_name(), 'test_task') 833 finally: 834 loop.run_until_complete(task) 835 loop.close() 836 837 def test_run_forever_keyboard_interrupt(self): 838 # Python issue #22601: ensure that the temporary task created by 839 # run_forever() consumes the KeyboardInterrupt and so don't log 840 # a warning 841 async def raise_keyboard_interrupt(): 842 raise KeyboardInterrupt 843 844 self.loop._process_events = mock.Mock() 845 self.loop.call_exception_handler = mock.Mock() 846 847 try: 848 self.loop.run_until_complete(raise_keyboard_interrupt()) 849 except KeyboardInterrupt: 850 pass 851 self.loop.close() 852 support.gc_collect() 853 854 self.assertFalse(self.loop.call_exception_handler.called) 855 856 def test_run_until_complete_baseexception(self): 857 # Python issue #22429: run_until_complete() must not schedule a pending 858 # call to stop() if the future raised a BaseException 859 async def raise_keyboard_interrupt(): 860 raise KeyboardInterrupt 861 862 self.loop._process_events = mock.Mock() 863 864 try: 865 self.loop.run_until_complete(raise_keyboard_interrupt()) 866 except KeyboardInterrupt: 867 pass 868 869 def func(): 870 self.loop.stop() 871 func.called = True 872 func.called = False 873 try: 874 self.loop.call_soon(func) 875 self.loop.run_forever() 876 except KeyboardInterrupt: 877 pass 878 self.assertTrue(func.called) 879 880 def test_single_selecter_event_callback_after_stopping(self): 881 # Python issue #25593: A stopped event loop may cause event callbacks 882 # to run more than once. 883 event_sentinel = object() 884 callcount = 0 885 doer = None 886 887 def proc_events(event_list): 888 nonlocal doer 889 if event_sentinel in event_list: 890 doer = self.loop.call_soon(do_event) 891 892 def do_event(): 893 nonlocal callcount 894 callcount += 1 895 self.loop.call_soon(clear_selector) 896 897 def clear_selector(): 898 doer.cancel() 899 self.loop._selector.select.return_value = () 900 901 self.loop._process_events = proc_events 902 self.loop._selector.select.return_value = (event_sentinel,) 903 904 for i in range(1, 3): 905 with self.subTest('Loop %d/2' % i): 906 self.loop.call_soon(self.loop.stop) 907 self.loop.run_forever() 908 self.assertEqual(callcount, 1) 909 910 def test_run_once(self): 911 # Simple test for test_utils.run_once(). It may seem strange 912 # to have a test for this (the function isn't even used!) but 913 # it's a de-factor standard API for library tests. This tests 914 # the idiom: loop.call_soon(loop.stop); loop.run_forever(). 915 count = 0 916 917 def callback(): 918 nonlocal count 919 count += 1 920 921 self.loop._process_events = mock.Mock() 922 self.loop.call_soon(callback) 923 test_utils.run_once(self.loop) 924 self.assertEqual(count, 1) 925 926 def test_run_forever_pre_stopped(self): 927 # Test that the old idiom for pre-stopping the loop works. 928 self.loop._process_events = mock.Mock() 929 self.loop.stop() 930 self.loop.run_forever() 931 self.loop._selector.select.assert_called_once_with(0) 932 933 async def leave_unfinalized_asyncgen(self): 934 # Create an async generator, iterate it partially, and leave it 935 # to be garbage collected. 936 # Used in async generator finalization tests. 937 # Depends on implementation details of garbage collector. Changes 938 # in gc may break this function. 939 status = {'started': False, 940 'stopped': False, 941 'finalized': False} 942 943 async def agen(): 944 status['started'] = True 945 try: 946 for item in ['ZERO', 'ONE', 'TWO', 'THREE', 'FOUR']: 947 yield item 948 finally: 949 status['finalized'] = True 950 951 ag = agen() 952 ai = ag.__aiter__() 953 954 async def iter_one(): 955 try: 956 item = await ai.__anext__() 957 except StopAsyncIteration: 958 return 959 if item == 'THREE': 960 status['stopped'] = True 961 return 962 asyncio.create_task(iter_one()) 963 964 asyncio.create_task(iter_one()) 965 return status 966 967 def test_asyncgen_finalization_by_gc(self): 968 # Async generators should be finalized when garbage collected. 969 self.loop._process_events = mock.Mock() 970 self.loop._write_to_self = mock.Mock() 971 with support.disable_gc(): 972 status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen()) 973 while not status['stopped']: 974 test_utils.run_briefly(self.loop) 975 self.assertTrue(status['started']) 976 self.assertTrue(status['stopped']) 977 self.assertFalse(status['finalized']) 978 support.gc_collect() 979 test_utils.run_briefly(self.loop) 980 self.assertTrue(status['finalized']) 981 982 def test_asyncgen_finalization_by_gc_in_other_thread(self): 983 # Python issue 34769: If garbage collector runs in another 984 # thread, async generators will not finalize in debug 985 # mode. 986 self.loop._process_events = mock.Mock() 987 self.loop._write_to_self = mock.Mock() 988 self.loop.set_debug(True) 989 with support.disable_gc(): 990 status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen()) 991 while not status['stopped']: 992 test_utils.run_briefly(self.loop) 993 self.assertTrue(status['started']) 994 self.assertTrue(status['stopped']) 995 self.assertFalse(status['finalized']) 996 self.loop.run_until_complete( 997 self.loop.run_in_executor(None, support.gc_collect)) 998 test_utils.run_briefly(self.loop) 999 self.assertTrue(status['finalized']) 1000 1001 1002class MyProto(asyncio.Protocol): 1003 done = None 1004 1005 def __init__(self, create_future=False): 1006 self.state = 'INITIAL' 1007 self.nbytes = 0 1008 if create_future: 1009 self.done = asyncio.get_running_loop().create_future() 1010 1011 def _assert_state(self, *expected): 1012 if self.state not in expected: 1013 raise AssertionError(f'state: {self.state!r}, expected: {expected!r}') 1014 1015 def connection_made(self, transport): 1016 self.transport = transport 1017 self._assert_state('INITIAL') 1018 self.state = 'CONNECTED' 1019 transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n') 1020 1021 def data_received(self, data): 1022 self._assert_state('CONNECTED') 1023 self.nbytes += len(data) 1024 1025 def eof_received(self): 1026 self._assert_state('CONNECTED') 1027 self.state = 'EOF' 1028 1029 def connection_lost(self, exc): 1030 self._assert_state('CONNECTED', 'EOF') 1031 self.state = 'CLOSED' 1032 if self.done: 1033 self.done.set_result(None) 1034 1035 1036class MyDatagramProto(asyncio.DatagramProtocol): 1037 done = None 1038 1039 def __init__(self, create_future=False, loop=None): 1040 self.state = 'INITIAL' 1041 self.nbytes = 0 1042 if create_future: 1043 self.done = loop.create_future() 1044 1045 def _assert_state(self, expected): 1046 if self.state != expected: 1047 raise AssertionError(f'state: {self.state!r}, expected: {expected!r}') 1048 1049 def connection_made(self, transport): 1050 self.transport = transport 1051 self._assert_state('INITIAL') 1052 self.state = 'INITIALIZED' 1053 1054 def datagram_received(self, data, addr): 1055 self._assert_state('INITIALIZED') 1056 self.nbytes += len(data) 1057 1058 def error_received(self, exc): 1059 self._assert_state('INITIALIZED') 1060 1061 def connection_lost(self, exc): 1062 self._assert_state('INITIALIZED') 1063 self.state = 'CLOSED' 1064 if self.done: 1065 self.done.set_result(None) 1066 1067 1068class BaseEventLoopWithSelectorTests(test_utils.TestCase): 1069 1070 def setUp(self): 1071 super().setUp() 1072 self.loop = asyncio.SelectorEventLoop() 1073 self.set_event_loop(self.loop) 1074 1075 @mock.patch('socket.getnameinfo') 1076 def test_getnameinfo(self, m_gai): 1077 m_gai.side_effect = lambda *args: 42 1078 r = self.loop.run_until_complete(self.loop.getnameinfo(('abc', 123))) 1079 self.assertEqual(r, 42) 1080 1081 @patch_socket 1082 def test_create_connection_multiple_errors(self, m_socket): 1083 1084 class MyProto(asyncio.Protocol): 1085 pass 1086 1087 async def getaddrinfo(*args, **kw): 1088 return [(2, 1, 6, '', ('107.6.106.82', 80)), 1089 (2, 1, 6, '', ('107.6.106.82', 80))] 1090 1091 def getaddrinfo_task(*args, **kwds): 1092 return self.loop.create_task(getaddrinfo(*args, **kwds)) 1093 1094 idx = -1 1095 errors = ['err1', 'err2'] 1096 1097 def _socket(*args, **kw): 1098 nonlocal idx, errors 1099 idx += 1 1100 raise OSError(errors[idx]) 1101 1102 m_socket.socket = _socket 1103 1104 self.loop.getaddrinfo = getaddrinfo_task 1105 1106 coro = self.loop.create_connection(MyProto, 'example.com', 80) 1107 with self.assertRaises(OSError) as cm: 1108 self.loop.run_until_complete(coro) 1109 1110 self.assertEqual(str(cm.exception), 'Multiple exceptions: err1, err2') 1111 1112 @patch_socket 1113 def test_create_connection_timeout(self, m_socket): 1114 # Ensure that the socket is closed on timeout 1115 sock = mock.Mock() 1116 m_socket.socket.return_value = sock 1117 1118 def getaddrinfo(*args, **kw): 1119 fut = self.loop.create_future() 1120 addr = (socket.AF_INET, socket.SOCK_STREAM, 0, '', 1121 ('127.0.0.1', 80)) 1122 fut.set_result([addr]) 1123 return fut 1124 self.loop.getaddrinfo = getaddrinfo 1125 1126 with mock.patch.object(self.loop, 'sock_connect', 1127 side_effect=asyncio.TimeoutError): 1128 coro = self.loop.create_connection(MyProto, '127.0.0.1', 80) 1129 with self.assertRaises(asyncio.TimeoutError): 1130 self.loop.run_until_complete(coro) 1131 self.assertTrue(sock.close.called) 1132 1133 def test_create_connection_host_port_sock(self): 1134 coro = self.loop.create_connection( 1135 MyProto, 'example.com', 80, sock=object()) 1136 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1137 1138 def test_create_connection_wrong_sock(self): 1139 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 1140 with sock: 1141 coro = self.loop.create_connection(MyProto, sock=sock) 1142 with self.assertRaisesRegex(ValueError, 1143 'A Stream Socket was expected'): 1144 self.loop.run_until_complete(coro) 1145 1146 def test_create_server_wrong_sock(self): 1147 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 1148 with sock: 1149 coro = self.loop.create_server(MyProto, sock=sock) 1150 with self.assertRaisesRegex(ValueError, 1151 'A Stream Socket was expected'): 1152 self.loop.run_until_complete(coro) 1153 1154 def test_create_server_ssl_timeout_for_plain_socket(self): 1155 coro = self.loop.create_server( 1156 MyProto, 'example.com', 80, ssl_handshake_timeout=1) 1157 with self.assertRaisesRegex( 1158 ValueError, 1159 'ssl_handshake_timeout is only meaningful with ssl'): 1160 self.loop.run_until_complete(coro) 1161 1162 @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'), 1163 'no socket.SOCK_NONBLOCK (linux only)') 1164 def test_create_server_stream_bittype(self): 1165 sock = socket.socket( 1166 socket.AF_INET, socket.SOCK_STREAM | socket.SOCK_NONBLOCK) 1167 with sock: 1168 coro = self.loop.create_server(lambda: None, sock=sock) 1169 srv = self.loop.run_until_complete(coro) 1170 srv.close() 1171 self.loop.run_until_complete(srv.wait_closed()) 1172 1173 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'no IPv6 support') 1174 def test_create_server_ipv6(self): 1175 async def main(): 1176 srv = await asyncio.start_server(lambda: None, '::1', 0) 1177 try: 1178 self.assertGreater(len(srv.sockets), 0) 1179 finally: 1180 srv.close() 1181 await srv.wait_closed() 1182 1183 try: 1184 self.loop.run_until_complete(main()) 1185 except OSError as ex: 1186 if (hasattr(errno, 'EADDRNOTAVAIL') and 1187 ex.errno == errno.EADDRNOTAVAIL): 1188 self.skipTest('failed to bind to ::1') 1189 else: 1190 raise 1191 1192 def test_create_datagram_endpoint_wrong_sock(self): 1193 sock = socket.socket(socket.AF_INET) 1194 with sock: 1195 coro = self.loop.create_datagram_endpoint(MyProto, sock=sock) 1196 with self.assertRaisesRegex(ValueError, 1197 'A UDP Socket was expected'): 1198 self.loop.run_until_complete(coro) 1199 1200 def test_create_connection_no_host_port_sock(self): 1201 coro = self.loop.create_connection(MyProto) 1202 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1203 1204 def test_create_connection_no_getaddrinfo(self): 1205 async def getaddrinfo(*args, **kw): 1206 return [] 1207 1208 def getaddrinfo_task(*args, **kwds): 1209 return self.loop.create_task(getaddrinfo(*args, **kwds)) 1210 1211 self.loop.getaddrinfo = getaddrinfo_task 1212 coro = self.loop.create_connection(MyProto, 'example.com', 80) 1213 self.assertRaises( 1214 OSError, self.loop.run_until_complete, coro) 1215 1216 def test_create_connection_connect_err(self): 1217 async def getaddrinfo(*args, **kw): 1218 return [(2, 1, 6, '', ('107.6.106.82', 80))] 1219 1220 def getaddrinfo_task(*args, **kwds): 1221 return self.loop.create_task(getaddrinfo(*args, **kwds)) 1222 1223 self.loop.getaddrinfo = getaddrinfo_task 1224 self.loop.sock_connect = mock.Mock() 1225 self.loop.sock_connect.side_effect = OSError 1226 1227 coro = self.loop.create_connection(MyProto, 'example.com', 80) 1228 self.assertRaises( 1229 OSError, self.loop.run_until_complete, coro) 1230 1231 def test_create_connection_multiple(self): 1232 async def getaddrinfo(*args, **kw): 1233 return [(2, 1, 6, '', ('0.0.0.1', 80)), 1234 (2, 1, 6, '', ('0.0.0.2', 80))] 1235 1236 def getaddrinfo_task(*args, **kwds): 1237 return self.loop.create_task(getaddrinfo(*args, **kwds)) 1238 1239 self.loop.getaddrinfo = getaddrinfo_task 1240 self.loop.sock_connect = mock.Mock() 1241 self.loop.sock_connect.side_effect = OSError 1242 1243 coro = self.loop.create_connection( 1244 MyProto, 'example.com', 80, family=socket.AF_INET) 1245 with self.assertRaises(OSError): 1246 self.loop.run_until_complete(coro) 1247 1248 @patch_socket 1249 def test_create_connection_multiple_errors_local_addr(self, m_socket): 1250 1251 def bind(addr): 1252 if addr[0] == '0.0.0.1': 1253 err = OSError('Err') 1254 err.strerror = 'Err' 1255 raise err 1256 1257 m_socket.socket.return_value.bind = bind 1258 1259 async def getaddrinfo(*args, **kw): 1260 return [(2, 1, 6, '', ('0.0.0.1', 80)), 1261 (2, 1, 6, '', ('0.0.0.2', 80))] 1262 1263 def getaddrinfo_task(*args, **kwds): 1264 return self.loop.create_task(getaddrinfo(*args, **kwds)) 1265 1266 self.loop.getaddrinfo = getaddrinfo_task 1267 self.loop.sock_connect = mock.Mock() 1268 self.loop.sock_connect.side_effect = OSError('Err2') 1269 1270 coro = self.loop.create_connection( 1271 MyProto, 'example.com', 80, family=socket.AF_INET, 1272 local_addr=(None, 8080)) 1273 with self.assertRaises(OSError) as cm: 1274 self.loop.run_until_complete(coro) 1275 1276 self.assertTrue(str(cm.exception).startswith('Multiple exceptions: ')) 1277 self.assertTrue(m_socket.socket.return_value.close.called) 1278 1279 def _test_create_connection_ip_addr(self, m_socket, allow_inet_pton): 1280 # Test the fallback code, even if this system has inet_pton. 1281 if not allow_inet_pton: 1282 del m_socket.inet_pton 1283 1284 m_socket.getaddrinfo = socket.getaddrinfo 1285 sock = m_socket.socket.return_value 1286 1287 self.loop._add_reader = mock.Mock() 1288 self.loop._add_writer = mock.Mock() 1289 1290 coro = self.loop.create_connection(asyncio.Protocol, '1.2.3.4', 80) 1291 t, p = self.loop.run_until_complete(coro) 1292 try: 1293 sock.connect.assert_called_with(('1.2.3.4', 80)) 1294 _, kwargs = m_socket.socket.call_args 1295 self.assertEqual(kwargs['family'], m_socket.AF_INET) 1296 self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM) 1297 finally: 1298 t.close() 1299 test_utils.run_briefly(self.loop) # allow transport to close 1300 1301 if socket_helper.IPV6_ENABLED: 1302 sock.family = socket.AF_INET6 1303 coro = self.loop.create_connection(asyncio.Protocol, '::1', 80) 1304 t, p = self.loop.run_until_complete(coro) 1305 try: 1306 # Without inet_pton we use getaddrinfo, which transforms 1307 # ('::1', 80) to ('::1', 80, 0, 0). The last 0s are flow info, 1308 # scope id. 1309 [address] = sock.connect.call_args[0] 1310 host, port = address[:2] 1311 self.assertRegex(host, r'::(0\.)*1') 1312 self.assertEqual(port, 80) 1313 _, kwargs = m_socket.socket.call_args 1314 self.assertEqual(kwargs['family'], m_socket.AF_INET6) 1315 self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM) 1316 finally: 1317 t.close() 1318 test_utils.run_briefly(self.loop) # allow transport to close 1319 1320 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'no IPv6 support') 1321 @unittest.skipIf(sys.platform.startswith('aix'), 1322 "bpo-25545: IPv6 scope id and getaddrinfo() behave differently on AIX") 1323 @patch_socket 1324 def test_create_connection_ipv6_scope(self, m_socket): 1325 m_socket.getaddrinfo = socket.getaddrinfo 1326 sock = m_socket.socket.return_value 1327 sock.family = socket.AF_INET6 1328 1329 self.loop._add_reader = mock.Mock() 1330 self.loop._add_writer = mock.Mock() 1331 1332 coro = self.loop.create_connection(asyncio.Protocol, 'fe80::1%1', 80) 1333 t, p = self.loop.run_until_complete(coro) 1334 try: 1335 sock.connect.assert_called_with(('fe80::1', 80, 0, 1)) 1336 _, kwargs = m_socket.socket.call_args 1337 self.assertEqual(kwargs['family'], m_socket.AF_INET6) 1338 self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM) 1339 finally: 1340 t.close() 1341 test_utils.run_briefly(self.loop) # allow transport to close 1342 1343 @patch_socket 1344 def test_create_connection_ip_addr(self, m_socket): 1345 self._test_create_connection_ip_addr(m_socket, True) 1346 1347 @patch_socket 1348 def test_create_connection_no_inet_pton(self, m_socket): 1349 self._test_create_connection_ip_addr(m_socket, False) 1350 1351 @patch_socket 1352 def test_create_connection_service_name(self, m_socket): 1353 m_socket.getaddrinfo = socket.getaddrinfo 1354 sock = m_socket.socket.return_value 1355 1356 self.loop._add_reader = mock.Mock() 1357 self.loop._add_writer = mock.Mock() 1358 1359 for service, port in ('http', 80), (b'http', 80): 1360 coro = self.loop.create_connection(asyncio.Protocol, 1361 '127.0.0.1', service) 1362 1363 t, p = self.loop.run_until_complete(coro) 1364 try: 1365 sock.connect.assert_called_with(('127.0.0.1', port)) 1366 _, kwargs = m_socket.socket.call_args 1367 self.assertEqual(kwargs['family'], m_socket.AF_INET) 1368 self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM) 1369 finally: 1370 t.close() 1371 test_utils.run_briefly(self.loop) # allow transport to close 1372 1373 for service in 'nonsense', b'nonsense': 1374 coro = self.loop.create_connection(asyncio.Protocol, 1375 '127.0.0.1', service) 1376 1377 with self.assertRaises(OSError): 1378 self.loop.run_until_complete(coro) 1379 1380 def test_create_connection_no_local_addr(self): 1381 async def getaddrinfo(host, *args, **kw): 1382 if host == 'example.com': 1383 return [(2, 1, 6, '', ('107.6.106.82', 80)), 1384 (2, 1, 6, '', ('107.6.106.82', 80))] 1385 else: 1386 return [] 1387 1388 def getaddrinfo_task(*args, **kwds): 1389 return self.loop.create_task(getaddrinfo(*args, **kwds)) 1390 self.loop.getaddrinfo = getaddrinfo_task 1391 1392 coro = self.loop.create_connection( 1393 MyProto, 'example.com', 80, family=socket.AF_INET, 1394 local_addr=(None, 8080)) 1395 self.assertRaises( 1396 OSError, self.loop.run_until_complete, coro) 1397 1398 @patch_socket 1399 def test_create_connection_bluetooth(self, m_socket): 1400 # See http://bugs.python.org/issue27136, fallback to getaddrinfo when 1401 # we can't recognize an address is resolved, e.g. a Bluetooth address. 1402 addr = ('00:01:02:03:04:05', 1) 1403 1404 def getaddrinfo(host, port, *args, **kw): 1405 self.assertEqual((host, port), addr) 1406 return [(999, 1, 999, '', (addr, 1))] 1407 1408 m_socket.getaddrinfo = getaddrinfo 1409 sock = m_socket.socket() 1410 coro = self.loop.sock_connect(sock, addr) 1411 self.loop.run_until_complete(coro) 1412 1413 def test_create_connection_ssl_server_hostname_default(self): 1414 self.loop.getaddrinfo = mock.Mock() 1415 1416 def mock_getaddrinfo(*args, **kwds): 1417 f = self.loop.create_future() 1418 f.set_result([(socket.AF_INET, socket.SOCK_STREAM, 1419 socket.SOL_TCP, '', ('1.2.3.4', 80))]) 1420 return f 1421 1422 self.loop.getaddrinfo.side_effect = mock_getaddrinfo 1423 self.loop.sock_connect = mock.Mock() 1424 self.loop.sock_connect.return_value = self.loop.create_future() 1425 self.loop.sock_connect.return_value.set_result(None) 1426 self.loop._make_ssl_transport = mock.Mock() 1427 1428 class _SelectorTransportMock: 1429 _sock = None 1430 1431 def get_extra_info(self, key): 1432 return mock.Mock() 1433 1434 def close(self): 1435 self._sock.close() 1436 1437 def mock_make_ssl_transport(sock, protocol, sslcontext, waiter, 1438 **kwds): 1439 waiter.set_result(None) 1440 transport = _SelectorTransportMock() 1441 transport._sock = sock 1442 return transport 1443 1444 self.loop._make_ssl_transport.side_effect = mock_make_ssl_transport 1445 ANY = mock.ANY 1446 handshake_timeout = object() 1447 shutdown_timeout = object() 1448 # First try the default server_hostname. 1449 self.loop._make_ssl_transport.reset_mock() 1450 coro = self.loop.create_connection( 1451 MyProto, 'python.org', 80, ssl=True, 1452 ssl_handshake_timeout=handshake_timeout, 1453 ssl_shutdown_timeout=shutdown_timeout) 1454 transport, _ = self.loop.run_until_complete(coro) 1455 transport.close() 1456 self.loop._make_ssl_transport.assert_called_with( 1457 ANY, ANY, ANY, ANY, 1458 server_side=False, 1459 server_hostname='python.org', 1460 ssl_handshake_timeout=handshake_timeout, 1461 ssl_shutdown_timeout=shutdown_timeout) 1462 # Next try an explicit server_hostname. 1463 self.loop._make_ssl_transport.reset_mock() 1464 coro = self.loop.create_connection( 1465 MyProto, 'python.org', 80, ssl=True, 1466 server_hostname='perl.com', 1467 ssl_handshake_timeout=handshake_timeout, 1468 ssl_shutdown_timeout=shutdown_timeout) 1469 transport, _ = self.loop.run_until_complete(coro) 1470 transport.close() 1471 self.loop._make_ssl_transport.assert_called_with( 1472 ANY, ANY, ANY, ANY, 1473 server_side=False, 1474 server_hostname='perl.com', 1475 ssl_handshake_timeout=handshake_timeout, 1476 ssl_shutdown_timeout=shutdown_timeout) 1477 # Finally try an explicit empty server_hostname. 1478 self.loop._make_ssl_transport.reset_mock() 1479 coro = self.loop.create_connection( 1480 MyProto, 'python.org', 80, ssl=True, 1481 server_hostname='', 1482 ssl_handshake_timeout=handshake_timeout, 1483 ssl_shutdown_timeout=shutdown_timeout) 1484 transport, _ = self.loop.run_until_complete(coro) 1485 transport.close() 1486 self.loop._make_ssl_transport.assert_called_with( 1487 ANY, ANY, ANY, ANY, 1488 server_side=False, 1489 server_hostname='', 1490 ssl_handshake_timeout=handshake_timeout, 1491 ssl_shutdown_timeout=shutdown_timeout) 1492 1493 def test_create_connection_no_ssl_server_hostname_errors(self): 1494 # When not using ssl, server_hostname must be None. 1495 coro = self.loop.create_connection(MyProto, 'python.org', 80, 1496 server_hostname='') 1497 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1498 coro = self.loop.create_connection(MyProto, 'python.org', 80, 1499 server_hostname='python.org') 1500 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1501 1502 def test_create_connection_ssl_server_hostname_errors(self): 1503 # When using ssl, server_hostname may be None if host is non-empty. 1504 coro = self.loop.create_connection(MyProto, '', 80, ssl=True) 1505 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1506 coro = self.loop.create_connection(MyProto, None, 80, ssl=True) 1507 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1508 sock = socket.socket() 1509 coro = self.loop.create_connection(MyProto, None, None, 1510 ssl=True, sock=sock) 1511 self.addCleanup(sock.close) 1512 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1513 1514 def test_create_connection_ssl_timeout_for_plain_socket(self): 1515 coro = self.loop.create_connection( 1516 MyProto, 'example.com', 80, ssl_handshake_timeout=1) 1517 with self.assertRaisesRegex( 1518 ValueError, 1519 'ssl_handshake_timeout is only meaningful with ssl'): 1520 self.loop.run_until_complete(coro) 1521 1522 def test_create_server_empty_host(self): 1523 # if host is empty string use None instead 1524 host = object() 1525 1526 async def getaddrinfo(*args, **kw): 1527 nonlocal host 1528 host = args[0] 1529 return [] 1530 1531 def getaddrinfo_task(*args, **kwds): 1532 return self.loop.create_task(getaddrinfo(*args, **kwds)) 1533 1534 self.loop.getaddrinfo = getaddrinfo_task 1535 fut = self.loop.create_server(MyProto, '', 0) 1536 self.assertRaises(OSError, self.loop.run_until_complete, fut) 1537 self.assertIsNone(host) 1538 1539 def test_create_server_host_port_sock(self): 1540 fut = self.loop.create_server( 1541 MyProto, '0.0.0.0', 0, sock=object()) 1542 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1543 1544 def test_create_server_no_host_port_sock(self): 1545 fut = self.loop.create_server(MyProto) 1546 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1547 1548 def test_create_server_no_getaddrinfo(self): 1549 getaddrinfo = self.loop.getaddrinfo = mock.Mock() 1550 getaddrinfo.return_value = self.loop.create_future() 1551 getaddrinfo.return_value.set_result(None) 1552 1553 f = self.loop.create_server(MyProto, 'python.org', 0) 1554 self.assertRaises(OSError, self.loop.run_until_complete, f) 1555 1556 @patch_socket 1557 def test_create_server_nosoreuseport(self, m_socket): 1558 m_socket.getaddrinfo = socket.getaddrinfo 1559 del m_socket.SO_REUSEPORT 1560 m_socket.socket.return_value = mock.Mock() 1561 1562 f = self.loop.create_server( 1563 MyProto, '0.0.0.0', 0, reuse_port=True) 1564 1565 self.assertRaises(ValueError, self.loop.run_until_complete, f) 1566 1567 @patch_socket 1568 def test_create_server_soreuseport_only_defined(self, m_socket): 1569 m_socket.getaddrinfo = socket.getaddrinfo 1570 m_socket.socket.return_value = mock.Mock() 1571 m_socket.SO_REUSEPORT = -1 1572 1573 f = self.loop.create_server( 1574 MyProto, '0.0.0.0', 0, reuse_port=True) 1575 1576 self.assertRaises(ValueError, self.loop.run_until_complete, f) 1577 1578 @patch_socket 1579 def test_create_server_cant_bind(self, m_socket): 1580 1581 class Err(OSError): 1582 strerror = 'error' 1583 1584 m_socket.getaddrinfo.return_value = [ 1585 (2, 1, 6, '', ('127.0.0.1', 10100))] 1586 m_sock = m_socket.socket.return_value = mock.Mock() 1587 m_sock.bind.side_effect = Err 1588 1589 fut = self.loop.create_server(MyProto, '0.0.0.0', 0) 1590 self.assertRaises(OSError, self.loop.run_until_complete, fut) 1591 self.assertTrue(m_sock.close.called) 1592 1593 @patch_socket 1594 def test_create_datagram_endpoint_no_addrinfo(self, m_socket): 1595 m_socket.getaddrinfo.return_value = [] 1596 1597 coro = self.loop.create_datagram_endpoint( 1598 MyDatagramProto, local_addr=('localhost', 0)) 1599 self.assertRaises( 1600 OSError, self.loop.run_until_complete, coro) 1601 1602 def test_create_datagram_endpoint_addr_error(self): 1603 coro = self.loop.create_datagram_endpoint( 1604 MyDatagramProto, local_addr='localhost') 1605 self.assertRaises( 1606 TypeError, self.loop.run_until_complete, coro) 1607 coro = self.loop.create_datagram_endpoint( 1608 MyDatagramProto, local_addr=('localhost', 1, 2, 3)) 1609 self.assertRaises( 1610 TypeError, self.loop.run_until_complete, coro) 1611 1612 def test_create_datagram_endpoint_connect_err(self): 1613 self.loop.sock_connect = mock.Mock() 1614 self.loop.sock_connect.side_effect = OSError 1615 1616 coro = self.loop.create_datagram_endpoint( 1617 asyncio.DatagramProtocol, remote_addr=('127.0.0.1', 0)) 1618 self.assertRaises( 1619 OSError, self.loop.run_until_complete, coro) 1620 1621 def test_create_datagram_endpoint_allow_broadcast(self): 1622 protocol = MyDatagramProto(create_future=True, loop=self.loop) 1623 self.loop.sock_connect = sock_connect = mock.Mock() 1624 sock_connect.return_value = [] 1625 1626 coro = self.loop.create_datagram_endpoint( 1627 lambda: protocol, 1628 remote_addr=('127.0.0.1', 0), 1629 allow_broadcast=True) 1630 1631 transport, _ = self.loop.run_until_complete(coro) 1632 self.assertFalse(sock_connect.called) 1633 1634 transport.close() 1635 self.loop.run_until_complete(protocol.done) 1636 self.assertEqual('CLOSED', protocol.state) 1637 1638 @patch_socket 1639 def test_create_datagram_endpoint_socket_err(self, m_socket): 1640 m_socket.getaddrinfo = socket.getaddrinfo 1641 m_socket.socket.side_effect = OSError 1642 1643 coro = self.loop.create_datagram_endpoint( 1644 asyncio.DatagramProtocol, family=socket.AF_INET) 1645 self.assertRaises( 1646 OSError, self.loop.run_until_complete, coro) 1647 1648 coro = self.loop.create_datagram_endpoint( 1649 asyncio.DatagramProtocol, local_addr=('127.0.0.1', 0)) 1650 self.assertRaises( 1651 OSError, self.loop.run_until_complete, coro) 1652 1653 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 not supported or enabled') 1654 def test_create_datagram_endpoint_no_matching_family(self): 1655 coro = self.loop.create_datagram_endpoint( 1656 asyncio.DatagramProtocol, 1657 remote_addr=('127.0.0.1', 0), local_addr=('::1', 0)) 1658 self.assertRaises( 1659 ValueError, self.loop.run_until_complete, coro) 1660 1661 @patch_socket 1662 def test_create_datagram_endpoint_setblk_err(self, m_socket): 1663 m_socket.socket.return_value.setblocking.side_effect = OSError 1664 1665 coro = self.loop.create_datagram_endpoint( 1666 asyncio.DatagramProtocol, family=socket.AF_INET) 1667 self.assertRaises( 1668 OSError, self.loop.run_until_complete, coro) 1669 self.assertTrue( 1670 m_socket.socket.return_value.close.called) 1671 1672 def test_create_datagram_endpoint_noaddr_nofamily(self): 1673 coro = self.loop.create_datagram_endpoint( 1674 asyncio.DatagramProtocol) 1675 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1676 1677 @patch_socket 1678 def test_create_datagram_endpoint_cant_bind(self, m_socket): 1679 class Err(OSError): 1680 pass 1681 1682 m_socket.getaddrinfo = socket.getaddrinfo 1683 m_sock = m_socket.socket.return_value = mock.Mock() 1684 m_sock.bind.side_effect = Err 1685 1686 fut = self.loop.create_datagram_endpoint( 1687 MyDatagramProto, 1688 local_addr=('127.0.0.1', 0), family=socket.AF_INET) 1689 self.assertRaises(Err, self.loop.run_until_complete, fut) 1690 self.assertTrue(m_sock.close.called) 1691 1692 def test_create_datagram_endpoint_sock(self): 1693 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 1694 sock.bind(('127.0.0.1', 0)) 1695 fut = self.loop.create_datagram_endpoint( 1696 lambda: MyDatagramProto(create_future=True, loop=self.loop), 1697 sock=sock) 1698 transport, protocol = self.loop.run_until_complete(fut) 1699 transport.close() 1700 self.loop.run_until_complete(protocol.done) 1701 self.assertEqual('CLOSED', protocol.state) 1702 1703 @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') 1704 def test_create_datagram_endpoint_sock_unix(self): 1705 fut = self.loop.create_datagram_endpoint( 1706 lambda: MyDatagramProto(create_future=True, loop=self.loop), 1707 family=socket.AF_UNIX) 1708 transport, protocol = self.loop.run_until_complete(fut) 1709 self.assertEqual(transport._sock.family, socket.AF_UNIX) 1710 transport.close() 1711 self.loop.run_until_complete(protocol.done) 1712 self.assertEqual('CLOSED', protocol.state) 1713 1714 @socket_helper.skip_unless_bind_unix_socket 1715 def test_create_datagram_endpoint_existing_sock_unix(self): 1716 with test_utils.unix_socket_path() as path: 1717 sock = socket.socket(socket.AF_UNIX, type=socket.SOCK_DGRAM) 1718 sock.bind(path) 1719 sock.close() 1720 1721 coro = self.loop.create_datagram_endpoint( 1722 lambda: MyDatagramProto(create_future=True, loop=self.loop), 1723 path, family=socket.AF_UNIX) 1724 transport, protocol = self.loop.run_until_complete(coro) 1725 transport.close() 1726 self.loop.run_until_complete(protocol.done) 1727 1728 def test_create_datagram_endpoint_sock_sockopts(self): 1729 class FakeSock: 1730 type = socket.SOCK_DGRAM 1731 1732 fut = self.loop.create_datagram_endpoint( 1733 MyDatagramProto, local_addr=('127.0.0.1', 0), sock=FakeSock()) 1734 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1735 1736 fut = self.loop.create_datagram_endpoint( 1737 MyDatagramProto, remote_addr=('127.0.0.1', 0), sock=FakeSock()) 1738 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1739 1740 fut = self.loop.create_datagram_endpoint( 1741 MyDatagramProto, family=1, sock=FakeSock()) 1742 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1743 1744 fut = self.loop.create_datagram_endpoint( 1745 MyDatagramProto, proto=1, sock=FakeSock()) 1746 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1747 1748 fut = self.loop.create_datagram_endpoint( 1749 MyDatagramProto, flags=1, sock=FakeSock()) 1750 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1751 1752 fut = self.loop.create_datagram_endpoint( 1753 MyDatagramProto, reuse_port=True, sock=FakeSock()) 1754 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1755 1756 fut = self.loop.create_datagram_endpoint( 1757 MyDatagramProto, allow_broadcast=True, sock=FakeSock()) 1758 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1759 1760 @unittest.skipIf(sys.platform == 'vxworks', 1761 "SO_BROADCAST is enabled by default on VxWorks") 1762 def test_create_datagram_endpoint_sockopts(self): 1763 # Socket options should not be applied unless asked for. 1764 # SO_REUSEPORT is not available on all platforms. 1765 1766 coro = self.loop.create_datagram_endpoint( 1767 lambda: MyDatagramProto(create_future=True, loop=self.loop), 1768 local_addr=('127.0.0.1', 0)) 1769 transport, protocol = self.loop.run_until_complete(coro) 1770 sock = transport.get_extra_info('socket') 1771 1772 reuseport_supported = hasattr(socket, 'SO_REUSEPORT') 1773 1774 if reuseport_supported: 1775 self.assertFalse( 1776 sock.getsockopt( 1777 socket.SOL_SOCKET, socket.SO_REUSEPORT)) 1778 self.assertFalse( 1779 sock.getsockopt( 1780 socket.SOL_SOCKET, socket.SO_BROADCAST)) 1781 1782 transport.close() 1783 self.loop.run_until_complete(protocol.done) 1784 self.assertEqual('CLOSED', protocol.state) 1785 1786 coro = self.loop.create_datagram_endpoint( 1787 lambda: MyDatagramProto(create_future=True, loop=self.loop), 1788 local_addr=('127.0.0.1', 0), 1789 reuse_port=reuseport_supported, 1790 allow_broadcast=True) 1791 transport, protocol = self.loop.run_until_complete(coro) 1792 sock = transport.get_extra_info('socket') 1793 1794 self.assertFalse( 1795 sock.getsockopt( 1796 socket.SOL_SOCKET, socket.SO_REUSEADDR)) 1797 if reuseport_supported: 1798 self.assertTrue( 1799 sock.getsockopt( 1800 socket.SOL_SOCKET, socket.SO_REUSEPORT)) 1801 self.assertTrue( 1802 sock.getsockopt( 1803 socket.SOL_SOCKET, socket.SO_BROADCAST)) 1804 1805 transport.close() 1806 self.loop.run_until_complete(protocol.done) 1807 self.assertEqual('CLOSED', protocol.state) 1808 1809 @patch_socket 1810 def test_create_datagram_endpoint_nosoreuseport(self, m_socket): 1811 del m_socket.SO_REUSEPORT 1812 m_socket.socket.return_value = mock.Mock() 1813 1814 coro = self.loop.create_datagram_endpoint( 1815 lambda: MyDatagramProto(loop=self.loop), 1816 local_addr=('127.0.0.1', 0), 1817 reuse_port=True) 1818 1819 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1820 1821 @patch_socket 1822 def test_create_datagram_endpoint_ip_addr(self, m_socket): 1823 def getaddrinfo(*args, **kw): 1824 self.fail('should not have called getaddrinfo') 1825 1826 m_socket.getaddrinfo = getaddrinfo 1827 m_socket.socket.return_value.bind = bind = mock.Mock() 1828 self.loop._add_reader = mock.Mock() 1829 1830 reuseport_supported = hasattr(socket, 'SO_REUSEPORT') 1831 coro = self.loop.create_datagram_endpoint( 1832 lambda: MyDatagramProto(loop=self.loop), 1833 local_addr=('1.2.3.4', 0), 1834 reuse_port=reuseport_supported) 1835 1836 t, p = self.loop.run_until_complete(coro) 1837 try: 1838 bind.assert_called_with(('1.2.3.4', 0)) 1839 m_socket.socket.assert_called_with(family=m_socket.AF_INET, 1840 proto=m_socket.IPPROTO_UDP, 1841 type=m_socket.SOCK_DGRAM) 1842 finally: 1843 t.close() 1844 test_utils.run_briefly(self.loop) # allow transport to close 1845 1846 def test_accept_connection_retry(self): 1847 sock = mock.Mock() 1848 sock.accept.side_effect = BlockingIOError() 1849 1850 self.loop._accept_connection(MyProto, sock) 1851 self.assertFalse(sock.close.called) 1852 1853 @mock.patch('asyncio.base_events.logger') 1854 def test_accept_connection_exception(self, m_log): 1855 sock = mock.Mock() 1856 sock.fileno.return_value = 10 1857 sock.accept.side_effect = OSError(errno.EMFILE, 'Too many open files') 1858 self.loop._remove_reader = mock.Mock() 1859 self.loop.call_later = mock.Mock() 1860 1861 self.loop._accept_connection(MyProto, sock) 1862 self.assertTrue(m_log.error.called) 1863 self.assertFalse(sock.close.called) 1864 self.loop._remove_reader.assert_called_with(10) 1865 self.loop.call_later.assert_called_with( 1866 constants.ACCEPT_RETRY_DELAY, 1867 # self.loop._start_serving 1868 mock.ANY, 1869 MyProto, sock, None, None, mock.ANY, mock.ANY, mock.ANY) 1870 1871 def test_call_coroutine(self): 1872 async def simple_coroutine(): 1873 pass 1874 1875 self.loop.set_debug(True) 1876 coro_func = simple_coroutine 1877 coro_obj = coro_func() 1878 self.addCleanup(coro_obj.close) 1879 for func in (coro_func, coro_obj): 1880 with self.assertRaises(TypeError): 1881 self.loop.call_soon(func) 1882 with self.assertRaises(TypeError): 1883 self.loop.call_soon_threadsafe(func) 1884 with self.assertRaises(TypeError): 1885 self.loop.call_later(60, func) 1886 with self.assertRaises(TypeError): 1887 self.loop.call_at(self.loop.time() + 60, func) 1888 with self.assertRaises(TypeError): 1889 self.loop.run_until_complete( 1890 self.loop.run_in_executor(None, func)) 1891 1892 @mock.patch('asyncio.base_events.logger') 1893 def test_log_slow_callbacks(self, m_logger): 1894 def stop_loop_cb(loop): 1895 loop.stop() 1896 1897 async def stop_loop_coro(loop): 1898 loop.stop() 1899 1900 asyncio.set_event_loop(self.loop) 1901 self.loop.set_debug(True) 1902 self.loop.slow_callback_duration = 0.0 1903 1904 # slow callback 1905 self.loop.call_soon(stop_loop_cb, self.loop) 1906 self.loop.run_forever() 1907 fmt, *args = m_logger.warning.call_args[0] 1908 self.assertRegex(fmt % tuple(args), 1909 "^Executing <Handle.*stop_loop_cb.*> " 1910 "took .* seconds$") 1911 1912 # slow task 1913 asyncio.ensure_future(stop_loop_coro(self.loop), loop=self.loop) 1914 self.loop.run_forever() 1915 fmt, *args = m_logger.warning.call_args[0] 1916 self.assertRegex(fmt % tuple(args), 1917 "^Executing <Task.*stop_loop_coro.*> " 1918 "took .* seconds$") 1919 1920 1921class RunningLoopTests(unittest.TestCase): 1922 1923 def test_running_loop_within_a_loop(self): 1924 async def runner(loop): 1925 loop.run_forever() 1926 1927 loop = asyncio.new_event_loop() 1928 outer_loop = asyncio.new_event_loop() 1929 try: 1930 with self.assertRaisesRegex(RuntimeError, 1931 'while another loop is running'): 1932 outer_loop.run_until_complete(runner(loop)) 1933 finally: 1934 loop.close() 1935 outer_loop.close() 1936 1937 1938class BaseLoopSockSendfileTests(test_utils.TestCase): 1939 1940 DATA = b"12345abcde" * 16 * 1024 # 160 KiB 1941 1942 class MyProto(asyncio.Protocol): 1943 1944 def __init__(self, loop): 1945 self.started = False 1946 self.closed = False 1947 self.data = bytearray() 1948 self.fut = loop.create_future() 1949 self.transport = None 1950 1951 def connection_made(self, transport): 1952 self.started = True 1953 self.transport = transport 1954 1955 def data_received(self, data): 1956 self.data.extend(data) 1957 1958 def connection_lost(self, exc): 1959 self.closed = True 1960 self.fut.set_result(None) 1961 self.transport = None 1962 1963 async def wait_closed(self): 1964 await self.fut 1965 1966 @classmethod 1967 def setUpClass(cls): 1968 cls.__old_bufsize = constants.SENDFILE_FALLBACK_READBUFFER_SIZE 1969 constants.SENDFILE_FALLBACK_READBUFFER_SIZE = 1024 * 16 1970 with open(os_helper.TESTFN, 'wb') as fp: 1971 fp.write(cls.DATA) 1972 super().setUpClass() 1973 1974 @classmethod 1975 def tearDownClass(cls): 1976 constants.SENDFILE_FALLBACK_READBUFFER_SIZE = cls.__old_bufsize 1977 os_helper.unlink(os_helper.TESTFN) 1978 super().tearDownClass() 1979 1980 def setUp(self): 1981 from asyncio.selector_events import BaseSelectorEventLoop 1982 # BaseSelectorEventLoop() has no native implementation 1983 self.loop = BaseSelectorEventLoop() 1984 self.set_event_loop(self.loop) 1985 self.file = open(os_helper.TESTFN, 'rb') 1986 self.addCleanup(self.file.close) 1987 super().setUp() 1988 1989 def make_socket(self, blocking=False): 1990 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1991 sock.setblocking(blocking) 1992 self.addCleanup(sock.close) 1993 return sock 1994 1995 def run_loop(self, coro): 1996 return self.loop.run_until_complete(coro) 1997 1998 def prepare(self): 1999 sock = self.make_socket() 2000 proto = self.MyProto(self.loop) 2001 server = self.run_loop(self.loop.create_server( 2002 lambda: proto, socket_helper.HOST, 0, family=socket.AF_INET)) 2003 addr = server.sockets[0].getsockname() 2004 2005 for _ in range(10): 2006 try: 2007 self.run_loop(self.loop.sock_connect(sock, addr)) 2008 except OSError: 2009 self.run_loop(asyncio.sleep(0.5)) 2010 continue 2011 else: 2012 break 2013 else: 2014 # One last try, so we get the exception 2015 self.run_loop(self.loop.sock_connect(sock, addr)) 2016 2017 def cleanup(): 2018 server.close() 2019 self.run_loop(server.wait_closed()) 2020 sock.close() 2021 if proto.transport is not None: 2022 proto.transport.close() 2023 self.run_loop(proto.wait_closed()) 2024 2025 self.addCleanup(cleanup) 2026 2027 return sock, proto 2028 2029 def test__sock_sendfile_native_failure(self): 2030 sock, proto = self.prepare() 2031 2032 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 2033 "sendfile is not available"): 2034 self.run_loop(self.loop._sock_sendfile_native(sock, self.file, 2035 0, None)) 2036 2037 self.assertEqual(proto.data, b'') 2038 self.assertEqual(self.file.tell(), 0) 2039 2040 def test_sock_sendfile_no_fallback(self): 2041 sock, proto = self.prepare() 2042 2043 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 2044 "sendfile is not available"): 2045 self.run_loop(self.loop.sock_sendfile(sock, self.file, 2046 fallback=False)) 2047 2048 self.assertEqual(self.file.tell(), 0) 2049 self.assertEqual(proto.data, b'') 2050 2051 def test_sock_sendfile_fallback(self): 2052 sock, proto = self.prepare() 2053 2054 ret = self.run_loop(self.loop.sock_sendfile(sock, self.file)) 2055 sock.close() 2056 self.run_loop(proto.wait_closed()) 2057 2058 self.assertEqual(ret, len(self.DATA)) 2059 self.assertEqual(self.file.tell(), len(self.DATA)) 2060 self.assertEqual(proto.data, self.DATA) 2061 2062 def test_sock_sendfile_fallback_offset_and_count(self): 2063 sock, proto = self.prepare() 2064 2065 ret = self.run_loop(self.loop.sock_sendfile(sock, self.file, 2066 1000, 2000)) 2067 sock.close() 2068 self.run_loop(proto.wait_closed()) 2069 2070 self.assertEqual(ret, 2000) 2071 self.assertEqual(self.file.tell(), 3000) 2072 self.assertEqual(proto.data, self.DATA[1000:3000]) 2073 2074 def test_blocking_socket(self): 2075 self.loop.set_debug(True) 2076 sock = self.make_socket(blocking=True) 2077 with self.assertRaisesRegex(ValueError, "must be non-blocking"): 2078 self.run_loop(self.loop.sock_sendfile(sock, self.file)) 2079 2080 def test_nonbinary_file(self): 2081 sock = self.make_socket() 2082 with open(os_helper.TESTFN, encoding="utf-8") as f: 2083 with self.assertRaisesRegex(ValueError, "binary mode"): 2084 self.run_loop(self.loop.sock_sendfile(sock, f)) 2085 2086 def test_nonstream_socket(self): 2087 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 2088 sock.setblocking(False) 2089 self.addCleanup(sock.close) 2090 with self.assertRaisesRegex(ValueError, "only SOCK_STREAM type"): 2091 self.run_loop(self.loop.sock_sendfile(sock, self.file)) 2092 2093 def test_notint_count(self): 2094 sock = self.make_socket() 2095 with self.assertRaisesRegex(TypeError, 2096 "count must be a positive integer"): 2097 self.run_loop(self.loop.sock_sendfile(sock, self.file, 0, 'count')) 2098 2099 def test_negative_count(self): 2100 sock = self.make_socket() 2101 with self.assertRaisesRegex(ValueError, 2102 "count must be a positive integer"): 2103 self.run_loop(self.loop.sock_sendfile(sock, self.file, 0, -1)) 2104 2105 def test_notint_offset(self): 2106 sock = self.make_socket() 2107 with self.assertRaisesRegex(TypeError, 2108 "offset must be a non-negative integer"): 2109 self.run_loop(self.loop.sock_sendfile(sock, self.file, 'offset')) 2110 2111 def test_negative_offset(self): 2112 sock = self.make_socket() 2113 with self.assertRaisesRegex(ValueError, 2114 "offset must be a non-negative integer"): 2115 self.run_loop(self.loop.sock_sendfile(sock, self.file, -1)) 2116 2117 2118class TestSelectorUtils(test_utils.TestCase): 2119 def check_set_nodelay(self, sock): 2120 opt = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY) 2121 self.assertFalse(opt) 2122 2123 base_events._set_nodelay(sock) 2124 2125 opt = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY) 2126 self.assertTrue(opt) 2127 2128 @unittest.skipUnless(hasattr(socket, 'TCP_NODELAY'), 2129 'need socket.TCP_NODELAY') 2130 def test_set_nodelay(self): 2131 sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM, 2132 proto=socket.IPPROTO_TCP) 2133 with sock: 2134 self.check_set_nodelay(sock) 2135 2136 sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM, 2137 proto=socket.IPPROTO_TCP) 2138 with sock: 2139 sock.setblocking(False) 2140 self.check_set_nodelay(sock) 2141 2142 2143 2144if __name__ == '__main__': 2145 unittest.main() 2146