1"""Tests for base_events.py"""
2
3import concurrent.futures
4import errno
5import math
6import socket
7import sys
8import threading
9import time
10import unittest
11from unittest import mock
12
13import asyncio
14from asyncio import base_events
15from asyncio import constants
16from test.test_asyncio import utils as test_utils
17from test import support
18from test.support.script_helper import assert_python_ok
19from test.support import os_helper
20from test.support import socket_helper
21import warnings
22
23MOCK_ANY = mock.ANY
24
25
26def tearDownModule():
27    asyncio.set_event_loop_policy(None)
28
29
30def mock_socket_module():
31    m_socket = mock.MagicMock(spec=socket)
32    for name in (
33        'AF_INET', 'AF_INET6', 'AF_UNSPEC', 'IPPROTO_TCP', 'IPPROTO_UDP',
34        'SOCK_STREAM', 'SOCK_DGRAM', 'SOL_SOCKET', 'SO_REUSEADDR', 'inet_pton'
35    ):
36        if hasattr(socket, name):
37            setattr(m_socket, name, getattr(socket, name))
38        else:
39            delattr(m_socket, name)
40
41    m_socket.socket = mock.MagicMock()
42    m_socket.socket.return_value = test_utils.mock_nonblocking_socket()
43
44    return m_socket
45
46
47def patch_socket(f):
48    return mock.patch('asyncio.base_events.socket',
49                      new_callable=mock_socket_module)(f)
50
51
52class BaseEventTests(test_utils.TestCase):
53
54    def test_ipaddr_info(self):
55        UNSPEC = socket.AF_UNSPEC
56        INET = socket.AF_INET
57        INET6 = socket.AF_INET6
58        STREAM = socket.SOCK_STREAM
59        DGRAM = socket.SOCK_DGRAM
60        TCP = socket.IPPROTO_TCP
61        UDP = socket.IPPROTO_UDP
62
63        self.assertEqual(
64            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
65            base_events._ipaddr_info('1.2.3.4', 1, INET, STREAM, TCP))
66
67        self.assertEqual(
68            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
69            base_events._ipaddr_info(b'1.2.3.4', 1, INET, STREAM, TCP))
70
71        self.assertEqual(
72            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
73            base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, STREAM, TCP))
74
75        self.assertEqual(
76            (INET, DGRAM, UDP, '', ('1.2.3.4', 1)),
77            base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, DGRAM, UDP))
78
79        # Socket type STREAM implies TCP protocol.
80        self.assertEqual(
81            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
82            base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, STREAM, 0))
83
84        # Socket type DGRAM implies UDP protocol.
85        self.assertEqual(
86            (INET, DGRAM, UDP, '', ('1.2.3.4', 1)),
87            base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, DGRAM, 0))
88
89        # No socket type.
90        self.assertIsNone(
91            base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, 0, 0))
92
93        if socket_helper.IPV6_ENABLED:
94            # IPv4 address with family IPv6.
95            self.assertIsNone(
96                base_events._ipaddr_info('1.2.3.4', 1, INET6, STREAM, TCP))
97
98            self.assertEqual(
99                (INET6, STREAM, TCP, '', ('::3', 1, 0, 0)),
100                base_events._ipaddr_info('::3', 1, INET6, STREAM, TCP))
101
102            self.assertEqual(
103                (INET6, STREAM, TCP, '', ('::3', 1, 0, 0)),
104                base_events._ipaddr_info('::3', 1, UNSPEC, STREAM, TCP))
105
106            # IPv6 address with family IPv4.
107            self.assertIsNone(
108                base_events._ipaddr_info('::3', 1, INET, STREAM, TCP))
109
110            # IPv6 address with zone index.
111            self.assertIsNone(
112                base_events._ipaddr_info('::3%lo0', 1, INET6, STREAM, TCP))
113
114    def test_port_parameter_types(self):
115        # Test obscure kinds of arguments for "port".
116        INET = socket.AF_INET
117        STREAM = socket.SOCK_STREAM
118        TCP = socket.IPPROTO_TCP
119
120        self.assertEqual(
121            (INET, STREAM, TCP, '', ('1.2.3.4', 0)),
122            base_events._ipaddr_info('1.2.3.4', None, INET, STREAM, TCP))
123
124        self.assertEqual(
125            (INET, STREAM, TCP, '', ('1.2.3.4', 0)),
126            base_events._ipaddr_info('1.2.3.4', b'', INET, STREAM, TCP))
127
128        self.assertEqual(
129            (INET, STREAM, TCP, '', ('1.2.3.4', 0)),
130            base_events._ipaddr_info('1.2.3.4', '', INET, STREAM, TCP))
131
132        self.assertEqual(
133            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
134            base_events._ipaddr_info('1.2.3.4', '1', INET, STREAM, TCP))
135
136        self.assertEqual(
137            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
138            base_events._ipaddr_info('1.2.3.4', b'1', INET, STREAM, TCP))
139
140    @patch_socket
141    def test_ipaddr_info_no_inet_pton(self, m_socket):
142        del m_socket.inet_pton
143        self.assertIsNone(base_events._ipaddr_info('1.2.3.4', 1,
144                                                   socket.AF_INET,
145                                                   socket.SOCK_STREAM,
146                                                   socket.IPPROTO_TCP))
147
148
149class BaseEventLoopTests(test_utils.TestCase):
150
151    def setUp(self):
152        super().setUp()
153        self.loop = base_events.BaseEventLoop()
154        self.loop._selector = mock.Mock()
155        self.loop._selector.select.return_value = ()
156        self.set_event_loop(self.loop)
157
158    def test_not_implemented(self):
159        m = mock.Mock()
160        self.assertRaises(
161            NotImplementedError,
162            self.loop._make_socket_transport, m, m)
163        self.assertRaises(
164            NotImplementedError,
165            self.loop._make_ssl_transport, m, m, m, m)
166        self.assertRaises(
167            NotImplementedError,
168            self.loop._make_datagram_transport, m, m)
169        self.assertRaises(
170            NotImplementedError, self.loop._process_events, [])
171        self.assertRaises(
172            NotImplementedError, self.loop._write_to_self)
173        self.assertRaises(
174            NotImplementedError,
175            self.loop._make_read_pipe_transport, m, m)
176        self.assertRaises(
177            NotImplementedError,
178            self.loop._make_write_pipe_transport, m, m)
179        gen = self.loop._make_subprocess_transport(m, m, m, m, m, m, m)
180        with self.assertRaises(NotImplementedError):
181            gen.send(None)
182
183    def test_close(self):
184        self.assertFalse(self.loop.is_closed())
185        self.loop.close()
186        self.assertTrue(self.loop.is_closed())
187
188        # it should be possible to call close() more than once
189        self.loop.close()
190        self.loop.close()
191
192        # operation blocked when the loop is closed
193        f = self.loop.create_future()
194        self.assertRaises(RuntimeError, self.loop.run_forever)
195        self.assertRaises(RuntimeError, self.loop.run_until_complete, f)
196
197    def test__add_callback_handle(self):
198        h = asyncio.Handle(lambda: False, (), self.loop, None)
199
200        self.loop._add_callback(h)
201        self.assertFalse(self.loop._scheduled)
202        self.assertIn(h, self.loop._ready)
203
204    def test__add_callback_cancelled_handle(self):
205        h = asyncio.Handle(lambda: False, (), self.loop, None)
206        h.cancel()
207
208        self.loop._add_callback(h)
209        self.assertFalse(self.loop._scheduled)
210        self.assertFalse(self.loop._ready)
211
212    def test_set_default_executor(self):
213        class DummyExecutor(concurrent.futures.ThreadPoolExecutor):
214            def submit(self, fn, *args, **kwargs):
215                raise NotImplementedError(
216                    'cannot submit into a dummy executor')
217
218        self.loop._process_events = mock.Mock()
219        self.loop._write_to_self = mock.Mock()
220
221        executor = DummyExecutor()
222        self.loop.set_default_executor(executor)
223        self.assertIs(executor, self.loop._default_executor)
224
225    def test_set_default_executor_error(self):
226        executor = mock.Mock()
227
228        msg = 'executor must be ThreadPoolExecutor instance'
229        with self.assertRaisesRegex(TypeError, msg):
230            self.loop.set_default_executor(executor)
231
232        self.assertIsNone(self.loop._default_executor)
233
234    def test_call_soon(self):
235        def cb():
236            pass
237
238        h = self.loop.call_soon(cb)
239        self.assertEqual(h._callback, cb)
240        self.assertIsInstance(h, asyncio.Handle)
241        self.assertIn(h, self.loop._ready)
242
243    def test_call_soon_non_callable(self):
244        self.loop.set_debug(True)
245        with self.assertRaisesRegex(TypeError, 'a callable object'):
246            self.loop.call_soon(1)
247
248    def test_call_later(self):
249        def cb():
250            pass
251
252        h = self.loop.call_later(10.0, cb)
253        self.assertIsInstance(h, asyncio.TimerHandle)
254        self.assertIn(h, self.loop._scheduled)
255        self.assertNotIn(h, self.loop._ready)
256        with self.assertRaises(TypeError, msg="delay must not be None"):
257            self.loop.call_later(None, cb)
258
259    def test_call_later_negative_delays(self):
260        calls = []
261
262        def cb(arg):
263            calls.append(arg)
264
265        self.loop._process_events = mock.Mock()
266        self.loop.call_later(-1, cb, 'a')
267        self.loop.call_later(-2, cb, 'b')
268        test_utils.run_briefly(self.loop)
269        self.assertEqual(calls, ['b', 'a'])
270
271    def test_time_and_call_at(self):
272        def cb():
273            self.loop.stop()
274
275        self.loop._process_events = mock.Mock()
276        delay = 0.1
277
278        when = self.loop.time() + delay
279        self.loop.call_at(when, cb)
280        t0 = self.loop.time()
281        self.loop.run_forever()
282        dt = self.loop.time() - t0
283
284        # 50 ms: maximum granularity of the event loop
285        self.assertGreaterEqual(dt, delay - 0.050, dt)
286        # tolerate a difference of +800 ms because some Python buildbots
287        # are really slow
288        self.assertLessEqual(dt, 0.9, dt)
289        with self.assertRaises(TypeError, msg="when cannot be None"):
290            self.loop.call_at(None, cb)
291
292    def check_thread(self, loop, debug):
293        def cb():
294            pass
295
296        loop.set_debug(debug)
297        if debug:
298            msg = ("Non-thread-safe operation invoked on an event loop other "
299                   "than the current one")
300            with self.assertRaisesRegex(RuntimeError, msg):
301                loop.call_soon(cb)
302            with self.assertRaisesRegex(RuntimeError, msg):
303                loop.call_later(60, cb)
304            with self.assertRaisesRegex(RuntimeError, msg):
305                loop.call_at(loop.time() + 60, cb)
306        else:
307            loop.call_soon(cb)
308            loop.call_later(60, cb)
309            loop.call_at(loop.time() + 60, cb)
310
311    def test_check_thread(self):
312        def check_in_thread(loop, event, debug, create_loop, fut):
313            # wait until the event loop is running
314            event.wait()
315
316            try:
317                if create_loop:
318                    loop2 = base_events.BaseEventLoop()
319                    try:
320                        asyncio.set_event_loop(loop2)
321                        self.check_thread(loop, debug)
322                    finally:
323                        asyncio.set_event_loop(None)
324                        loop2.close()
325                else:
326                    self.check_thread(loop, debug)
327            except Exception as exc:
328                loop.call_soon_threadsafe(fut.set_exception, exc)
329            else:
330                loop.call_soon_threadsafe(fut.set_result, None)
331
332        def test_thread(loop, debug, create_loop=False):
333            event = threading.Event()
334            fut = loop.create_future()
335            loop.call_soon(event.set)
336            args = (loop, event, debug, create_loop, fut)
337            thread = threading.Thread(target=check_in_thread, args=args)
338            thread.start()
339            loop.run_until_complete(fut)
340            thread.join()
341
342        self.loop._process_events = mock.Mock()
343        self.loop._write_to_self = mock.Mock()
344
345        # raise RuntimeError if the thread has no event loop
346        test_thread(self.loop, True)
347
348        # check disabled if debug mode is disabled
349        test_thread(self.loop, False)
350
351        # raise RuntimeError if the event loop of the thread is not the called
352        # event loop
353        test_thread(self.loop, True, create_loop=True)
354
355        # check disabled if debug mode is disabled
356        test_thread(self.loop, False, create_loop=True)
357
358    def test__run_once(self):
359        h1 = asyncio.TimerHandle(time.monotonic() + 5.0, lambda: True, (),
360                                 self.loop, None)
361        h2 = asyncio.TimerHandle(time.monotonic() + 10.0, lambda: True, (),
362                                 self.loop, None)
363
364        h1.cancel()
365
366        self.loop._process_events = mock.Mock()
367        self.loop._scheduled.append(h1)
368        self.loop._scheduled.append(h2)
369        self.loop._run_once()
370
371        t = self.loop._selector.select.call_args[0][0]
372        self.assertTrue(9.5 < t < 10.5, t)
373        self.assertEqual([h2], self.loop._scheduled)
374        self.assertTrue(self.loop._process_events.called)
375
376    def test_set_debug(self):
377        self.loop.set_debug(True)
378        self.assertTrue(self.loop.get_debug())
379        self.loop.set_debug(False)
380        self.assertFalse(self.loop.get_debug())
381
382    def test__run_once_schedule_handle(self):
383        handle = None
384        processed = False
385
386        def cb(loop):
387            nonlocal processed, handle
388            processed = True
389            handle = loop.call_soon(lambda: True)
390
391        h = asyncio.TimerHandle(time.monotonic() - 1, cb, (self.loop,),
392                                self.loop, None)
393
394        self.loop._process_events = mock.Mock()
395        self.loop._scheduled.append(h)
396        self.loop._run_once()
397
398        self.assertTrue(processed)
399        self.assertEqual([handle], list(self.loop._ready))
400
401    def test__run_once_cancelled_event_cleanup(self):
402        self.loop._process_events = mock.Mock()
403
404        self.assertTrue(
405            0 < base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION < 1.0)
406
407        def cb():
408            pass
409
410        # Set up one "blocking" event that will not be cancelled to
411        # ensure later cancelled events do not make it to the head
412        # of the queue and get cleaned.
413        not_cancelled_count = 1
414        self.loop.call_later(3000, cb)
415
416        # Add less than threshold (base_events._MIN_SCHEDULED_TIMER_HANDLES)
417        # cancelled handles, ensure they aren't removed
418
419        cancelled_count = 2
420        for x in range(2):
421            h = self.loop.call_later(3600, cb)
422            h.cancel()
423
424        # Add some cancelled events that will be at head and removed
425        cancelled_count += 2
426        for x in range(2):
427            h = self.loop.call_later(100, cb)
428            h.cancel()
429
430        # This test is invalid if _MIN_SCHEDULED_TIMER_HANDLES is too low
431        self.assertLessEqual(cancelled_count + not_cancelled_count,
432            base_events._MIN_SCHEDULED_TIMER_HANDLES)
433
434        self.assertEqual(self.loop._timer_cancelled_count, cancelled_count)
435
436        self.loop._run_once()
437
438        cancelled_count -= 2
439
440        self.assertEqual(self.loop._timer_cancelled_count, cancelled_count)
441
442        self.assertEqual(len(self.loop._scheduled),
443            cancelled_count + not_cancelled_count)
444
445        # Need enough events to pass _MIN_CANCELLED_TIMER_HANDLES_FRACTION
446        # so that deletion of cancelled events will occur on next _run_once
447        add_cancel_count = int(math.ceil(
448            base_events._MIN_SCHEDULED_TIMER_HANDLES *
449            base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION)) + 1
450
451        add_not_cancel_count = max(base_events._MIN_SCHEDULED_TIMER_HANDLES -
452            add_cancel_count, 0)
453
454        # Add some events that will not be cancelled
455        not_cancelled_count += add_not_cancel_count
456        for x in range(add_not_cancel_count):
457            self.loop.call_later(3600, cb)
458
459        # Add enough cancelled events
460        cancelled_count += add_cancel_count
461        for x in range(add_cancel_count):
462            h = self.loop.call_later(3600, cb)
463            h.cancel()
464
465        # Ensure all handles are still scheduled
466        self.assertEqual(len(self.loop._scheduled),
467            cancelled_count + not_cancelled_count)
468
469        self.loop._run_once()
470
471        # Ensure cancelled events were removed
472        self.assertEqual(len(self.loop._scheduled), not_cancelled_count)
473
474        # Ensure only uncancelled events remain scheduled
475        self.assertTrue(all([not x._cancelled for x in self.loop._scheduled]))
476
477    def test_run_until_complete_type_error(self):
478        self.assertRaises(TypeError,
479            self.loop.run_until_complete, 'blah')
480
481    def test_run_until_complete_loop(self):
482        task = self.loop.create_future()
483        other_loop = self.new_test_loop()
484        self.addCleanup(other_loop.close)
485        self.assertRaises(ValueError,
486            other_loop.run_until_complete, task)
487
488    def test_run_until_complete_loop_orphan_future_close_loop(self):
489        class ShowStopper(SystemExit):
490            pass
491
492        async def foo(delay):
493            await asyncio.sleep(delay)
494
495        def throw():
496            raise ShowStopper
497
498        self.loop._process_events = mock.Mock()
499        self.loop.call_soon(throw)
500        with self.assertRaises(ShowStopper):
501            self.loop.run_until_complete(foo(0.1))
502
503        # This call fails if run_until_complete does not clean up
504        # done-callback for the previous future.
505        self.loop.run_until_complete(foo(0.2))
506
507    def test_subprocess_exec_invalid_args(self):
508        args = [sys.executable, '-c', 'pass']
509
510        # missing program parameter (empty args)
511        self.assertRaises(TypeError,
512            self.loop.run_until_complete, self.loop.subprocess_exec,
513            asyncio.SubprocessProtocol)
514
515        # expected multiple arguments, not a list
516        self.assertRaises(TypeError,
517            self.loop.run_until_complete, self.loop.subprocess_exec,
518            asyncio.SubprocessProtocol, args)
519
520        # program arguments must be strings, not int
521        self.assertRaises(TypeError,
522            self.loop.run_until_complete, self.loop.subprocess_exec,
523            asyncio.SubprocessProtocol, sys.executable, 123)
524
525        # universal_newlines, shell, bufsize must not be set
526        self.assertRaises(TypeError,
527        self.loop.run_until_complete, self.loop.subprocess_exec,
528            asyncio.SubprocessProtocol, *args, universal_newlines=True)
529        self.assertRaises(TypeError,
530            self.loop.run_until_complete, self.loop.subprocess_exec,
531            asyncio.SubprocessProtocol, *args, shell=True)
532        self.assertRaises(TypeError,
533            self.loop.run_until_complete, self.loop.subprocess_exec,
534            asyncio.SubprocessProtocol, *args, bufsize=4096)
535
536    def test_subprocess_shell_invalid_args(self):
537        # expected a string, not an int or a list
538        self.assertRaises(TypeError,
539            self.loop.run_until_complete, self.loop.subprocess_shell,
540            asyncio.SubprocessProtocol, 123)
541        self.assertRaises(TypeError,
542            self.loop.run_until_complete, self.loop.subprocess_shell,
543            asyncio.SubprocessProtocol, [sys.executable, '-c', 'pass'])
544
545        # universal_newlines, shell, bufsize must not be set
546        self.assertRaises(TypeError,
547            self.loop.run_until_complete, self.loop.subprocess_shell,
548            asyncio.SubprocessProtocol, 'exit 0', universal_newlines=True)
549        self.assertRaises(TypeError,
550            self.loop.run_until_complete, self.loop.subprocess_shell,
551            asyncio.SubprocessProtocol, 'exit 0', shell=True)
552        self.assertRaises(TypeError,
553            self.loop.run_until_complete, self.loop.subprocess_shell,
554            asyncio.SubprocessProtocol, 'exit 0', bufsize=4096)
555
556    def test_default_exc_handler_callback(self):
557        self.loop._process_events = mock.Mock()
558
559        def zero_error(fut):
560            fut.set_result(True)
561            1/0
562
563        # Test call_soon (events.Handle)
564        with mock.patch('asyncio.base_events.logger') as log:
565            fut = self.loop.create_future()
566            self.loop.call_soon(zero_error, fut)
567            fut.add_done_callback(lambda fut: self.loop.stop())
568            self.loop.run_forever()
569            log.error.assert_called_with(
570                test_utils.MockPattern('Exception in callback.*zero'),
571                exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
572
573        # Test call_later (events.TimerHandle)
574        with mock.patch('asyncio.base_events.logger') as log:
575            fut = self.loop.create_future()
576            self.loop.call_later(0.01, zero_error, fut)
577            fut.add_done_callback(lambda fut: self.loop.stop())
578            self.loop.run_forever()
579            log.error.assert_called_with(
580                test_utils.MockPattern('Exception in callback.*zero'),
581                exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
582
583    def test_default_exc_handler_coro(self):
584        self.loop._process_events = mock.Mock()
585
586        async def zero_error_coro():
587            await asyncio.sleep(0.01)
588            1/0
589
590        # Test Future.__del__
591        with mock.patch('asyncio.base_events.logger') as log:
592            fut = asyncio.ensure_future(zero_error_coro(), loop=self.loop)
593            fut.add_done_callback(lambda *args: self.loop.stop())
594            self.loop.run_forever()
595            fut = None # Trigger Future.__del__ or futures._TracebackLogger
596            support.gc_collect()
597            # Future.__del__ in logs error with an actual exception context
598            log.error.assert_called_with(
599                test_utils.MockPattern('.*exception was never retrieved'),
600                exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
601
602    def test_set_exc_handler_invalid(self):
603        with self.assertRaisesRegex(TypeError, 'A callable object or None'):
604            self.loop.set_exception_handler('spam')
605
606    def test_set_exc_handler_custom(self):
607        def zero_error():
608            1/0
609
610        def run_loop():
611            handle = self.loop.call_soon(zero_error)
612            self.loop._run_once()
613            return handle
614
615        self.loop.set_debug(True)
616        self.loop._process_events = mock.Mock()
617
618        self.assertIsNone(self.loop.get_exception_handler())
619        mock_handler = mock.Mock()
620        self.loop.set_exception_handler(mock_handler)
621        self.assertIs(self.loop.get_exception_handler(), mock_handler)
622        handle = run_loop()
623        mock_handler.assert_called_with(self.loop, {
624            'exception': MOCK_ANY,
625            'message': test_utils.MockPattern(
626                                'Exception in callback.*zero_error'),
627            'handle': handle,
628            'source_traceback': handle._source_traceback,
629        })
630        mock_handler.reset_mock()
631
632        self.loop.set_exception_handler(None)
633        with mock.patch('asyncio.base_events.logger') as log:
634            run_loop()
635            log.error.assert_called_with(
636                        test_utils.MockPattern(
637                                'Exception in callback.*zero'),
638                        exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
639
640        self.assertFalse(mock_handler.called)
641
642    def test_set_exc_handler_broken(self):
643        def run_loop():
644            def zero_error():
645                1/0
646            self.loop.call_soon(zero_error)
647            self.loop._run_once()
648
649        def handler(loop, context):
650            raise AttributeError('spam')
651
652        self.loop._process_events = mock.Mock()
653
654        self.loop.set_exception_handler(handler)
655
656        with mock.patch('asyncio.base_events.logger') as log:
657            run_loop()
658            log.error.assert_called_with(
659                test_utils.MockPattern(
660                    'Unhandled error in exception handler'),
661                exc_info=(AttributeError, MOCK_ANY, MOCK_ANY))
662
663    def test_default_exc_handler_broken(self):
664        _context = None
665
666        class Loop(base_events.BaseEventLoop):
667
668            _selector = mock.Mock()
669            _process_events = mock.Mock()
670
671            def default_exception_handler(self, context):
672                nonlocal _context
673                _context = context
674                # Simulates custom buggy "default_exception_handler"
675                raise ValueError('spam')
676
677        loop = Loop()
678        self.addCleanup(loop.close)
679        asyncio.set_event_loop(loop)
680
681        def run_loop():
682            def zero_error():
683                1/0
684            loop.call_soon(zero_error)
685            loop._run_once()
686
687        with mock.patch('asyncio.base_events.logger') as log:
688            run_loop()
689            log.error.assert_called_with(
690                'Exception in default exception handler',
691                exc_info=True)
692
693        def custom_handler(loop, context):
694            raise ValueError('ham')
695
696        _context = None
697        loop.set_exception_handler(custom_handler)
698        with mock.patch('asyncio.base_events.logger') as log:
699            run_loop()
700            log.error.assert_called_with(
701                test_utils.MockPattern('Exception in default exception.*'
702                                       'while handling.*in custom'),
703                exc_info=True)
704
705            # Check that original context was passed to default
706            # exception handler.
707            self.assertIn('context', _context)
708            self.assertIs(type(_context['context']['exception']),
709                          ZeroDivisionError)
710
711    def test_set_task_factory_invalid(self):
712        with self.assertRaisesRegex(
713            TypeError, 'task factory must be a callable or None'):
714
715            self.loop.set_task_factory(1)
716
717        self.assertIsNone(self.loop.get_task_factory())
718
719    def test_set_task_factory(self):
720        self.loop._process_events = mock.Mock()
721
722        class MyTask(asyncio.Task):
723            pass
724
725        async def coro():
726            pass
727
728        factory = lambda loop, coro: MyTask(coro, loop=loop)
729
730        self.assertIsNone(self.loop.get_task_factory())
731        self.loop.set_task_factory(factory)
732        self.assertIs(self.loop.get_task_factory(), factory)
733
734        task = self.loop.create_task(coro())
735        self.assertTrue(isinstance(task, MyTask))
736        self.loop.run_until_complete(task)
737
738        self.loop.set_task_factory(None)
739        self.assertIsNone(self.loop.get_task_factory())
740
741        task = self.loop.create_task(coro())
742        self.assertTrue(isinstance(task, asyncio.Task))
743        self.assertFalse(isinstance(task, MyTask))
744        self.loop.run_until_complete(task)
745
746    def test_env_var_debug(self):
747        code = '\n'.join((
748            'import asyncio',
749            'loop = asyncio.new_event_loop()',
750            'print(loop.get_debug())'))
751
752        # Test with -E to not fail if the unit test was run with
753        # PYTHONASYNCIODEBUG set to a non-empty string
754        sts, stdout, stderr = assert_python_ok('-E', '-c', code)
755        self.assertEqual(stdout.rstrip(), b'False')
756
757        sts, stdout, stderr = assert_python_ok('-c', code,
758                                               PYTHONASYNCIODEBUG='',
759                                               PYTHONDEVMODE='')
760        self.assertEqual(stdout.rstrip(), b'False')
761
762        sts, stdout, stderr = assert_python_ok('-c', code,
763                                               PYTHONASYNCIODEBUG='1',
764                                               PYTHONDEVMODE='')
765        self.assertEqual(stdout.rstrip(), b'True')
766
767        sts, stdout, stderr = assert_python_ok('-E', '-c', code,
768                                               PYTHONASYNCIODEBUG='1')
769        self.assertEqual(stdout.rstrip(), b'False')
770
771        # -X dev
772        sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev',
773                                               '-c', code)
774        self.assertEqual(stdout.rstrip(), b'True')
775
776    def test_create_task(self):
777        class MyTask(asyncio.Task):
778            pass
779
780        async def test():
781            pass
782
783        class EventLoop(base_events.BaseEventLoop):
784            def create_task(self, coro):
785                return MyTask(coro, loop=loop)
786
787        loop = EventLoop()
788        self.set_event_loop(loop)
789
790        coro = test()
791        task = asyncio.ensure_future(coro, loop=loop)
792        self.assertIsInstance(task, MyTask)
793
794        # make warnings quiet
795        task._log_destroy_pending = False
796        coro.close()
797
798    def test_create_task_error_closes_coro(self):
799        async def test():
800            pass
801        loop = asyncio.new_event_loop()
802        loop.close()
803        with warnings.catch_warnings(record=True) as w:
804            with self.assertRaises(RuntimeError):
805                asyncio.ensure_future(test(), loop=loop)
806            self.assertEqual(len(w), 0)
807
808
809    def test_create_named_task_with_default_factory(self):
810        async def test():
811            pass
812
813        loop = asyncio.new_event_loop()
814        task = loop.create_task(test(), name='test_task')
815        try:
816            self.assertEqual(task.get_name(), 'test_task')
817        finally:
818            loop.run_until_complete(task)
819            loop.close()
820
821    def test_create_named_task_with_custom_factory(self):
822        def task_factory(loop, coro):
823            return asyncio.Task(coro, loop=loop)
824
825        async def test():
826            pass
827
828        loop = asyncio.new_event_loop()
829        loop.set_task_factory(task_factory)
830        task = loop.create_task(test(), name='test_task')
831        try:
832            self.assertEqual(task.get_name(), 'test_task')
833        finally:
834            loop.run_until_complete(task)
835            loop.close()
836
837    def test_run_forever_keyboard_interrupt(self):
838        # Python issue #22601: ensure that the temporary task created by
839        # run_forever() consumes the KeyboardInterrupt and so don't log
840        # a warning
841        async def raise_keyboard_interrupt():
842            raise KeyboardInterrupt
843
844        self.loop._process_events = mock.Mock()
845        self.loop.call_exception_handler = mock.Mock()
846
847        try:
848            self.loop.run_until_complete(raise_keyboard_interrupt())
849        except KeyboardInterrupt:
850            pass
851        self.loop.close()
852        support.gc_collect()
853
854        self.assertFalse(self.loop.call_exception_handler.called)
855
856    def test_run_until_complete_baseexception(self):
857        # Python issue #22429: run_until_complete() must not schedule a pending
858        # call to stop() if the future raised a BaseException
859        async def raise_keyboard_interrupt():
860            raise KeyboardInterrupt
861
862        self.loop._process_events = mock.Mock()
863
864        try:
865            self.loop.run_until_complete(raise_keyboard_interrupt())
866        except KeyboardInterrupt:
867            pass
868
869        def func():
870            self.loop.stop()
871            func.called = True
872        func.called = False
873        try:
874            self.loop.call_soon(func)
875            self.loop.run_forever()
876        except KeyboardInterrupt:
877            pass
878        self.assertTrue(func.called)
879
880    def test_single_selecter_event_callback_after_stopping(self):
881        # Python issue #25593: A stopped event loop may cause event callbacks
882        # to run more than once.
883        event_sentinel = object()
884        callcount = 0
885        doer = None
886
887        def proc_events(event_list):
888            nonlocal doer
889            if event_sentinel in event_list:
890                doer = self.loop.call_soon(do_event)
891
892        def do_event():
893            nonlocal callcount
894            callcount += 1
895            self.loop.call_soon(clear_selector)
896
897        def clear_selector():
898            doer.cancel()
899            self.loop._selector.select.return_value = ()
900
901        self.loop._process_events = proc_events
902        self.loop._selector.select.return_value = (event_sentinel,)
903
904        for i in range(1, 3):
905            with self.subTest('Loop %d/2' % i):
906                self.loop.call_soon(self.loop.stop)
907                self.loop.run_forever()
908                self.assertEqual(callcount, 1)
909
910    def test_run_once(self):
911        # Simple test for test_utils.run_once().  It may seem strange
912        # to have a test for this (the function isn't even used!) but
913        # it's a de-factor standard API for library tests.  This tests
914        # the idiom: loop.call_soon(loop.stop); loop.run_forever().
915        count = 0
916
917        def callback():
918            nonlocal count
919            count += 1
920
921        self.loop._process_events = mock.Mock()
922        self.loop.call_soon(callback)
923        test_utils.run_once(self.loop)
924        self.assertEqual(count, 1)
925
926    def test_run_forever_pre_stopped(self):
927        # Test that the old idiom for pre-stopping the loop works.
928        self.loop._process_events = mock.Mock()
929        self.loop.stop()
930        self.loop.run_forever()
931        self.loop._selector.select.assert_called_once_with(0)
932
933    async def leave_unfinalized_asyncgen(self):
934        # Create an async generator, iterate it partially, and leave it
935        # to be garbage collected.
936        # Used in async generator finalization tests.
937        # Depends on implementation details of garbage collector. Changes
938        # in gc may break this function.
939        status = {'started': False,
940                  'stopped': False,
941                  'finalized': False}
942
943        async def agen():
944            status['started'] = True
945            try:
946                for item in ['ZERO', 'ONE', 'TWO', 'THREE', 'FOUR']:
947                    yield item
948            finally:
949                status['finalized'] = True
950
951        ag = agen()
952        ai = ag.__aiter__()
953
954        async def iter_one():
955            try:
956                item = await ai.__anext__()
957            except StopAsyncIteration:
958                return
959            if item == 'THREE':
960                status['stopped'] = True
961                return
962            asyncio.create_task(iter_one())
963
964        asyncio.create_task(iter_one())
965        return status
966
967    def test_asyncgen_finalization_by_gc(self):
968        # Async generators should be finalized when garbage collected.
969        self.loop._process_events = mock.Mock()
970        self.loop._write_to_self = mock.Mock()
971        with support.disable_gc():
972            status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen())
973            while not status['stopped']:
974                test_utils.run_briefly(self.loop)
975            self.assertTrue(status['started'])
976            self.assertTrue(status['stopped'])
977            self.assertFalse(status['finalized'])
978            support.gc_collect()
979            test_utils.run_briefly(self.loop)
980            self.assertTrue(status['finalized'])
981
982    def test_asyncgen_finalization_by_gc_in_other_thread(self):
983        # Python issue 34769: If garbage collector runs in another
984        # thread, async generators will not finalize in debug
985        # mode.
986        self.loop._process_events = mock.Mock()
987        self.loop._write_to_self = mock.Mock()
988        self.loop.set_debug(True)
989        with support.disable_gc():
990            status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen())
991            while not status['stopped']:
992                test_utils.run_briefly(self.loop)
993            self.assertTrue(status['started'])
994            self.assertTrue(status['stopped'])
995            self.assertFalse(status['finalized'])
996            self.loop.run_until_complete(
997                self.loop.run_in_executor(None, support.gc_collect))
998            test_utils.run_briefly(self.loop)
999            self.assertTrue(status['finalized'])
1000
1001
1002class MyProto(asyncio.Protocol):
1003    done = None
1004
1005    def __init__(self, create_future=False):
1006        self.state = 'INITIAL'
1007        self.nbytes = 0
1008        if create_future:
1009            self.done = asyncio.get_running_loop().create_future()
1010
1011    def _assert_state(self, *expected):
1012        if self.state not in expected:
1013            raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
1014
1015    def connection_made(self, transport):
1016        self.transport = transport
1017        self._assert_state('INITIAL')
1018        self.state = 'CONNECTED'
1019        transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n')
1020
1021    def data_received(self, data):
1022        self._assert_state('CONNECTED')
1023        self.nbytes += len(data)
1024
1025    def eof_received(self):
1026        self._assert_state('CONNECTED')
1027        self.state = 'EOF'
1028
1029    def connection_lost(self, exc):
1030        self._assert_state('CONNECTED', 'EOF')
1031        self.state = 'CLOSED'
1032        if self.done:
1033            self.done.set_result(None)
1034
1035
1036class MyDatagramProto(asyncio.DatagramProtocol):
1037    done = None
1038
1039    def __init__(self, create_future=False, loop=None):
1040        self.state = 'INITIAL'
1041        self.nbytes = 0
1042        if create_future:
1043            self.done = loop.create_future()
1044
1045    def _assert_state(self, expected):
1046        if self.state != expected:
1047            raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
1048
1049    def connection_made(self, transport):
1050        self.transport = transport
1051        self._assert_state('INITIAL')
1052        self.state = 'INITIALIZED'
1053
1054    def datagram_received(self, data, addr):
1055        self._assert_state('INITIALIZED')
1056        self.nbytes += len(data)
1057
1058    def error_received(self, exc):
1059        self._assert_state('INITIALIZED')
1060
1061    def connection_lost(self, exc):
1062        self._assert_state('INITIALIZED')
1063        self.state = 'CLOSED'
1064        if self.done:
1065            self.done.set_result(None)
1066
1067
1068class BaseEventLoopWithSelectorTests(test_utils.TestCase):
1069
1070    def setUp(self):
1071        super().setUp()
1072        self.loop = asyncio.SelectorEventLoop()
1073        self.set_event_loop(self.loop)
1074
1075    @mock.patch('socket.getnameinfo')
1076    def test_getnameinfo(self, m_gai):
1077        m_gai.side_effect = lambda *args: 42
1078        r = self.loop.run_until_complete(self.loop.getnameinfo(('abc', 123)))
1079        self.assertEqual(r, 42)
1080
1081    @patch_socket
1082    def test_create_connection_multiple_errors(self, m_socket):
1083
1084        class MyProto(asyncio.Protocol):
1085            pass
1086
1087        async def getaddrinfo(*args, **kw):
1088            return [(2, 1, 6, '', ('107.6.106.82', 80)),
1089                    (2, 1, 6, '', ('107.6.106.82', 80))]
1090
1091        def getaddrinfo_task(*args, **kwds):
1092            return self.loop.create_task(getaddrinfo(*args, **kwds))
1093
1094        idx = -1
1095        errors = ['err1', 'err2']
1096
1097        def _socket(*args, **kw):
1098            nonlocal idx, errors
1099            idx += 1
1100            raise OSError(errors[idx])
1101
1102        m_socket.socket = _socket
1103
1104        self.loop.getaddrinfo = getaddrinfo_task
1105
1106        coro = self.loop.create_connection(MyProto, 'example.com', 80)
1107        with self.assertRaises(OSError) as cm:
1108            self.loop.run_until_complete(coro)
1109
1110        self.assertEqual(str(cm.exception), 'Multiple exceptions: err1, err2')
1111
1112    @patch_socket
1113    def test_create_connection_timeout(self, m_socket):
1114        # Ensure that the socket is closed on timeout
1115        sock = mock.Mock()
1116        m_socket.socket.return_value = sock
1117
1118        def getaddrinfo(*args, **kw):
1119            fut = self.loop.create_future()
1120            addr = (socket.AF_INET, socket.SOCK_STREAM, 0, '',
1121                    ('127.0.0.1', 80))
1122            fut.set_result([addr])
1123            return fut
1124        self.loop.getaddrinfo = getaddrinfo
1125
1126        with mock.patch.object(self.loop, 'sock_connect',
1127                               side_effect=asyncio.TimeoutError):
1128            coro = self.loop.create_connection(MyProto, '127.0.0.1', 80)
1129            with self.assertRaises(asyncio.TimeoutError):
1130                self.loop.run_until_complete(coro)
1131            self.assertTrue(sock.close.called)
1132
1133    def test_create_connection_host_port_sock(self):
1134        coro = self.loop.create_connection(
1135            MyProto, 'example.com', 80, sock=object())
1136        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1137
1138    def test_create_connection_wrong_sock(self):
1139        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1140        with sock:
1141            coro = self.loop.create_connection(MyProto, sock=sock)
1142            with self.assertRaisesRegex(ValueError,
1143                                        'A Stream Socket was expected'):
1144                self.loop.run_until_complete(coro)
1145
1146    def test_create_server_wrong_sock(self):
1147        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1148        with sock:
1149            coro = self.loop.create_server(MyProto, sock=sock)
1150            with self.assertRaisesRegex(ValueError,
1151                                        'A Stream Socket was expected'):
1152                self.loop.run_until_complete(coro)
1153
1154    def test_create_server_ssl_timeout_for_plain_socket(self):
1155        coro = self.loop.create_server(
1156            MyProto, 'example.com', 80, ssl_handshake_timeout=1)
1157        with self.assertRaisesRegex(
1158                ValueError,
1159                'ssl_handshake_timeout is only meaningful with ssl'):
1160            self.loop.run_until_complete(coro)
1161
1162    @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'),
1163                         'no socket.SOCK_NONBLOCK (linux only)')
1164    def test_create_server_stream_bittype(self):
1165        sock = socket.socket(
1166            socket.AF_INET, socket.SOCK_STREAM | socket.SOCK_NONBLOCK)
1167        with sock:
1168            coro = self.loop.create_server(lambda: None, sock=sock)
1169            srv = self.loop.run_until_complete(coro)
1170            srv.close()
1171            self.loop.run_until_complete(srv.wait_closed())
1172
1173    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'no IPv6 support')
1174    def test_create_server_ipv6(self):
1175        async def main():
1176            srv = await asyncio.start_server(lambda: None, '::1', 0)
1177            try:
1178                self.assertGreater(len(srv.sockets), 0)
1179            finally:
1180                srv.close()
1181                await srv.wait_closed()
1182
1183        try:
1184            self.loop.run_until_complete(main())
1185        except OSError as ex:
1186            if (hasattr(errno, 'EADDRNOTAVAIL') and
1187                    ex.errno == errno.EADDRNOTAVAIL):
1188                self.skipTest('failed to bind to ::1')
1189            else:
1190                raise
1191
1192    def test_create_datagram_endpoint_wrong_sock(self):
1193        sock = socket.socket(socket.AF_INET)
1194        with sock:
1195            coro = self.loop.create_datagram_endpoint(MyProto, sock=sock)
1196            with self.assertRaisesRegex(ValueError,
1197                                        'A UDP Socket was expected'):
1198                self.loop.run_until_complete(coro)
1199
1200    def test_create_connection_no_host_port_sock(self):
1201        coro = self.loop.create_connection(MyProto)
1202        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1203
1204    def test_create_connection_no_getaddrinfo(self):
1205        async def getaddrinfo(*args, **kw):
1206            return []
1207
1208        def getaddrinfo_task(*args, **kwds):
1209            return self.loop.create_task(getaddrinfo(*args, **kwds))
1210
1211        self.loop.getaddrinfo = getaddrinfo_task
1212        coro = self.loop.create_connection(MyProto, 'example.com', 80)
1213        self.assertRaises(
1214            OSError, self.loop.run_until_complete, coro)
1215
1216    def test_create_connection_connect_err(self):
1217        async def getaddrinfo(*args, **kw):
1218            return [(2, 1, 6, '', ('107.6.106.82', 80))]
1219
1220        def getaddrinfo_task(*args, **kwds):
1221            return self.loop.create_task(getaddrinfo(*args, **kwds))
1222
1223        self.loop.getaddrinfo = getaddrinfo_task
1224        self.loop.sock_connect = mock.Mock()
1225        self.loop.sock_connect.side_effect = OSError
1226
1227        coro = self.loop.create_connection(MyProto, 'example.com', 80)
1228        self.assertRaises(
1229            OSError, self.loop.run_until_complete, coro)
1230
1231    def test_create_connection_multiple(self):
1232        async def getaddrinfo(*args, **kw):
1233            return [(2, 1, 6, '', ('0.0.0.1', 80)),
1234                    (2, 1, 6, '', ('0.0.0.2', 80))]
1235
1236        def getaddrinfo_task(*args, **kwds):
1237            return self.loop.create_task(getaddrinfo(*args, **kwds))
1238
1239        self.loop.getaddrinfo = getaddrinfo_task
1240        self.loop.sock_connect = mock.Mock()
1241        self.loop.sock_connect.side_effect = OSError
1242
1243        coro = self.loop.create_connection(
1244            MyProto, 'example.com', 80, family=socket.AF_INET)
1245        with self.assertRaises(OSError):
1246            self.loop.run_until_complete(coro)
1247
1248    @patch_socket
1249    def test_create_connection_multiple_errors_local_addr(self, m_socket):
1250
1251        def bind(addr):
1252            if addr[0] == '0.0.0.1':
1253                err = OSError('Err')
1254                err.strerror = 'Err'
1255                raise err
1256
1257        m_socket.socket.return_value.bind = bind
1258
1259        async def getaddrinfo(*args, **kw):
1260            return [(2, 1, 6, '', ('0.0.0.1', 80)),
1261                    (2, 1, 6, '', ('0.0.0.2', 80))]
1262
1263        def getaddrinfo_task(*args, **kwds):
1264            return self.loop.create_task(getaddrinfo(*args, **kwds))
1265
1266        self.loop.getaddrinfo = getaddrinfo_task
1267        self.loop.sock_connect = mock.Mock()
1268        self.loop.sock_connect.side_effect = OSError('Err2')
1269
1270        coro = self.loop.create_connection(
1271            MyProto, 'example.com', 80, family=socket.AF_INET,
1272            local_addr=(None, 8080))
1273        with self.assertRaises(OSError) as cm:
1274            self.loop.run_until_complete(coro)
1275
1276        self.assertTrue(str(cm.exception).startswith('Multiple exceptions: '))
1277        self.assertTrue(m_socket.socket.return_value.close.called)
1278
1279    def _test_create_connection_ip_addr(self, m_socket, allow_inet_pton):
1280        # Test the fallback code, even if this system has inet_pton.
1281        if not allow_inet_pton:
1282            del m_socket.inet_pton
1283
1284        m_socket.getaddrinfo = socket.getaddrinfo
1285        sock = m_socket.socket.return_value
1286
1287        self.loop._add_reader = mock.Mock()
1288        self.loop._add_writer = mock.Mock()
1289
1290        coro = self.loop.create_connection(asyncio.Protocol, '1.2.3.4', 80)
1291        t, p = self.loop.run_until_complete(coro)
1292        try:
1293            sock.connect.assert_called_with(('1.2.3.4', 80))
1294            _, kwargs = m_socket.socket.call_args
1295            self.assertEqual(kwargs['family'], m_socket.AF_INET)
1296            self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
1297        finally:
1298            t.close()
1299            test_utils.run_briefly(self.loop)  # allow transport to close
1300
1301        if socket_helper.IPV6_ENABLED:
1302            sock.family = socket.AF_INET6
1303            coro = self.loop.create_connection(asyncio.Protocol, '::1', 80)
1304            t, p = self.loop.run_until_complete(coro)
1305            try:
1306                # Without inet_pton we use getaddrinfo, which transforms
1307                # ('::1', 80) to ('::1', 80, 0, 0). The last 0s are flow info,
1308                # scope id.
1309                [address] = sock.connect.call_args[0]
1310                host, port = address[:2]
1311                self.assertRegex(host, r'::(0\.)*1')
1312                self.assertEqual(port, 80)
1313                _, kwargs = m_socket.socket.call_args
1314                self.assertEqual(kwargs['family'], m_socket.AF_INET6)
1315                self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
1316            finally:
1317                t.close()
1318                test_utils.run_briefly(self.loop)  # allow transport to close
1319
1320    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'no IPv6 support')
1321    @unittest.skipIf(sys.platform.startswith('aix'),
1322                    "bpo-25545: IPv6 scope id and getaddrinfo() behave differently on AIX")
1323    @patch_socket
1324    def test_create_connection_ipv6_scope(self, m_socket):
1325        m_socket.getaddrinfo = socket.getaddrinfo
1326        sock = m_socket.socket.return_value
1327        sock.family = socket.AF_INET6
1328
1329        self.loop._add_reader = mock.Mock()
1330        self.loop._add_writer = mock.Mock()
1331
1332        coro = self.loop.create_connection(asyncio.Protocol, 'fe80::1%1', 80)
1333        t, p = self.loop.run_until_complete(coro)
1334        try:
1335            sock.connect.assert_called_with(('fe80::1', 80, 0, 1))
1336            _, kwargs = m_socket.socket.call_args
1337            self.assertEqual(kwargs['family'], m_socket.AF_INET6)
1338            self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
1339        finally:
1340            t.close()
1341            test_utils.run_briefly(self.loop)  # allow transport to close
1342
1343    @patch_socket
1344    def test_create_connection_ip_addr(self, m_socket):
1345        self._test_create_connection_ip_addr(m_socket, True)
1346
1347    @patch_socket
1348    def test_create_connection_no_inet_pton(self, m_socket):
1349        self._test_create_connection_ip_addr(m_socket, False)
1350
1351    @patch_socket
1352    def test_create_connection_service_name(self, m_socket):
1353        m_socket.getaddrinfo = socket.getaddrinfo
1354        sock = m_socket.socket.return_value
1355
1356        self.loop._add_reader = mock.Mock()
1357        self.loop._add_writer = mock.Mock()
1358
1359        for service, port in ('http', 80), (b'http', 80):
1360            coro = self.loop.create_connection(asyncio.Protocol,
1361                                               '127.0.0.1', service)
1362
1363            t, p = self.loop.run_until_complete(coro)
1364            try:
1365                sock.connect.assert_called_with(('127.0.0.1', port))
1366                _, kwargs = m_socket.socket.call_args
1367                self.assertEqual(kwargs['family'], m_socket.AF_INET)
1368                self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
1369            finally:
1370                t.close()
1371                test_utils.run_briefly(self.loop)  # allow transport to close
1372
1373        for service in 'nonsense', b'nonsense':
1374            coro = self.loop.create_connection(asyncio.Protocol,
1375                                               '127.0.0.1', service)
1376
1377            with self.assertRaises(OSError):
1378                self.loop.run_until_complete(coro)
1379
1380    def test_create_connection_no_local_addr(self):
1381        async def getaddrinfo(host, *args, **kw):
1382            if host == 'example.com':
1383                return [(2, 1, 6, '', ('107.6.106.82', 80)),
1384                        (2, 1, 6, '', ('107.6.106.82', 80))]
1385            else:
1386                return []
1387
1388        def getaddrinfo_task(*args, **kwds):
1389            return self.loop.create_task(getaddrinfo(*args, **kwds))
1390        self.loop.getaddrinfo = getaddrinfo_task
1391
1392        coro = self.loop.create_connection(
1393            MyProto, 'example.com', 80, family=socket.AF_INET,
1394            local_addr=(None, 8080))
1395        self.assertRaises(
1396            OSError, self.loop.run_until_complete, coro)
1397
1398    @patch_socket
1399    def test_create_connection_bluetooth(self, m_socket):
1400        # See http://bugs.python.org/issue27136, fallback to getaddrinfo when
1401        # we can't recognize an address is resolved, e.g. a Bluetooth address.
1402        addr = ('00:01:02:03:04:05', 1)
1403
1404        def getaddrinfo(host, port, *args, **kw):
1405            self.assertEqual((host, port), addr)
1406            return [(999, 1, 999, '', (addr, 1))]
1407
1408        m_socket.getaddrinfo = getaddrinfo
1409        sock = m_socket.socket()
1410        coro = self.loop.sock_connect(sock, addr)
1411        self.loop.run_until_complete(coro)
1412
1413    def test_create_connection_ssl_server_hostname_default(self):
1414        self.loop.getaddrinfo = mock.Mock()
1415
1416        def mock_getaddrinfo(*args, **kwds):
1417            f = self.loop.create_future()
1418            f.set_result([(socket.AF_INET, socket.SOCK_STREAM,
1419                           socket.SOL_TCP, '', ('1.2.3.4', 80))])
1420            return f
1421
1422        self.loop.getaddrinfo.side_effect = mock_getaddrinfo
1423        self.loop.sock_connect = mock.Mock()
1424        self.loop.sock_connect.return_value = self.loop.create_future()
1425        self.loop.sock_connect.return_value.set_result(None)
1426        self.loop._make_ssl_transport = mock.Mock()
1427
1428        class _SelectorTransportMock:
1429            _sock = None
1430
1431            def get_extra_info(self, key):
1432                return mock.Mock()
1433
1434            def close(self):
1435                self._sock.close()
1436
1437        def mock_make_ssl_transport(sock, protocol, sslcontext, waiter,
1438                                    **kwds):
1439            waiter.set_result(None)
1440            transport = _SelectorTransportMock()
1441            transport._sock = sock
1442            return transport
1443
1444        self.loop._make_ssl_transport.side_effect = mock_make_ssl_transport
1445        ANY = mock.ANY
1446        handshake_timeout = object()
1447        shutdown_timeout = object()
1448        # First try the default server_hostname.
1449        self.loop._make_ssl_transport.reset_mock()
1450        coro = self.loop.create_connection(
1451                MyProto, 'python.org', 80, ssl=True,
1452                ssl_handshake_timeout=handshake_timeout,
1453                ssl_shutdown_timeout=shutdown_timeout)
1454        transport, _ = self.loop.run_until_complete(coro)
1455        transport.close()
1456        self.loop._make_ssl_transport.assert_called_with(
1457            ANY, ANY, ANY, ANY,
1458            server_side=False,
1459            server_hostname='python.org',
1460            ssl_handshake_timeout=handshake_timeout,
1461            ssl_shutdown_timeout=shutdown_timeout)
1462        # Next try an explicit server_hostname.
1463        self.loop._make_ssl_transport.reset_mock()
1464        coro = self.loop.create_connection(
1465                MyProto, 'python.org', 80, ssl=True,
1466                server_hostname='perl.com',
1467                ssl_handshake_timeout=handshake_timeout,
1468                ssl_shutdown_timeout=shutdown_timeout)
1469        transport, _ = self.loop.run_until_complete(coro)
1470        transport.close()
1471        self.loop._make_ssl_transport.assert_called_with(
1472            ANY, ANY, ANY, ANY,
1473            server_side=False,
1474            server_hostname='perl.com',
1475            ssl_handshake_timeout=handshake_timeout,
1476            ssl_shutdown_timeout=shutdown_timeout)
1477        # Finally try an explicit empty server_hostname.
1478        self.loop._make_ssl_transport.reset_mock()
1479        coro = self.loop.create_connection(
1480                MyProto, 'python.org', 80, ssl=True,
1481                server_hostname='',
1482                ssl_handshake_timeout=handshake_timeout,
1483                ssl_shutdown_timeout=shutdown_timeout)
1484        transport, _ = self.loop.run_until_complete(coro)
1485        transport.close()
1486        self.loop._make_ssl_transport.assert_called_with(
1487                ANY, ANY, ANY, ANY,
1488                server_side=False,
1489                server_hostname='',
1490                ssl_handshake_timeout=handshake_timeout,
1491                ssl_shutdown_timeout=shutdown_timeout)
1492
1493    def test_create_connection_no_ssl_server_hostname_errors(self):
1494        # When not using ssl, server_hostname must be None.
1495        coro = self.loop.create_connection(MyProto, 'python.org', 80,
1496                                           server_hostname='')
1497        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1498        coro = self.loop.create_connection(MyProto, 'python.org', 80,
1499                                           server_hostname='python.org')
1500        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1501
1502    def test_create_connection_ssl_server_hostname_errors(self):
1503        # When using ssl, server_hostname may be None if host is non-empty.
1504        coro = self.loop.create_connection(MyProto, '', 80, ssl=True)
1505        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1506        coro = self.loop.create_connection(MyProto, None, 80, ssl=True)
1507        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1508        sock = socket.socket()
1509        coro = self.loop.create_connection(MyProto, None, None,
1510                                           ssl=True, sock=sock)
1511        self.addCleanup(sock.close)
1512        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1513
1514    def test_create_connection_ssl_timeout_for_plain_socket(self):
1515        coro = self.loop.create_connection(
1516            MyProto, 'example.com', 80, ssl_handshake_timeout=1)
1517        with self.assertRaisesRegex(
1518                ValueError,
1519                'ssl_handshake_timeout is only meaningful with ssl'):
1520            self.loop.run_until_complete(coro)
1521
1522    def test_create_server_empty_host(self):
1523        # if host is empty string use None instead
1524        host = object()
1525
1526        async def getaddrinfo(*args, **kw):
1527            nonlocal host
1528            host = args[0]
1529            return []
1530
1531        def getaddrinfo_task(*args, **kwds):
1532            return self.loop.create_task(getaddrinfo(*args, **kwds))
1533
1534        self.loop.getaddrinfo = getaddrinfo_task
1535        fut = self.loop.create_server(MyProto, '', 0)
1536        self.assertRaises(OSError, self.loop.run_until_complete, fut)
1537        self.assertIsNone(host)
1538
1539    def test_create_server_host_port_sock(self):
1540        fut = self.loop.create_server(
1541            MyProto, '0.0.0.0', 0, sock=object())
1542        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1543
1544    def test_create_server_no_host_port_sock(self):
1545        fut = self.loop.create_server(MyProto)
1546        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1547
1548    def test_create_server_no_getaddrinfo(self):
1549        getaddrinfo = self.loop.getaddrinfo = mock.Mock()
1550        getaddrinfo.return_value = self.loop.create_future()
1551        getaddrinfo.return_value.set_result(None)
1552
1553        f = self.loop.create_server(MyProto, 'python.org', 0)
1554        self.assertRaises(OSError, self.loop.run_until_complete, f)
1555
1556    @patch_socket
1557    def test_create_server_nosoreuseport(self, m_socket):
1558        m_socket.getaddrinfo = socket.getaddrinfo
1559        del m_socket.SO_REUSEPORT
1560        m_socket.socket.return_value = mock.Mock()
1561
1562        f = self.loop.create_server(
1563            MyProto, '0.0.0.0', 0, reuse_port=True)
1564
1565        self.assertRaises(ValueError, self.loop.run_until_complete, f)
1566
1567    @patch_socket
1568    def test_create_server_soreuseport_only_defined(self, m_socket):
1569        m_socket.getaddrinfo = socket.getaddrinfo
1570        m_socket.socket.return_value = mock.Mock()
1571        m_socket.SO_REUSEPORT = -1
1572
1573        f = self.loop.create_server(
1574            MyProto, '0.0.0.0', 0, reuse_port=True)
1575
1576        self.assertRaises(ValueError, self.loop.run_until_complete, f)
1577
1578    @patch_socket
1579    def test_create_server_cant_bind(self, m_socket):
1580
1581        class Err(OSError):
1582            strerror = 'error'
1583
1584        m_socket.getaddrinfo.return_value = [
1585            (2, 1, 6, '', ('127.0.0.1', 10100))]
1586        m_sock = m_socket.socket.return_value = mock.Mock()
1587        m_sock.bind.side_effect = Err
1588
1589        fut = self.loop.create_server(MyProto, '0.0.0.0', 0)
1590        self.assertRaises(OSError, self.loop.run_until_complete, fut)
1591        self.assertTrue(m_sock.close.called)
1592
1593    @patch_socket
1594    def test_create_datagram_endpoint_no_addrinfo(self, m_socket):
1595        m_socket.getaddrinfo.return_value = []
1596
1597        coro = self.loop.create_datagram_endpoint(
1598            MyDatagramProto, local_addr=('localhost', 0))
1599        self.assertRaises(
1600            OSError, self.loop.run_until_complete, coro)
1601
1602    def test_create_datagram_endpoint_addr_error(self):
1603        coro = self.loop.create_datagram_endpoint(
1604            MyDatagramProto, local_addr='localhost')
1605        self.assertRaises(
1606            TypeError, self.loop.run_until_complete, coro)
1607        coro = self.loop.create_datagram_endpoint(
1608            MyDatagramProto, local_addr=('localhost', 1, 2, 3))
1609        self.assertRaises(
1610            TypeError, self.loop.run_until_complete, coro)
1611
1612    def test_create_datagram_endpoint_connect_err(self):
1613        self.loop.sock_connect = mock.Mock()
1614        self.loop.sock_connect.side_effect = OSError
1615
1616        coro = self.loop.create_datagram_endpoint(
1617            asyncio.DatagramProtocol, remote_addr=('127.0.0.1', 0))
1618        self.assertRaises(
1619            OSError, self.loop.run_until_complete, coro)
1620
1621    def test_create_datagram_endpoint_allow_broadcast(self):
1622        protocol = MyDatagramProto(create_future=True, loop=self.loop)
1623        self.loop.sock_connect = sock_connect = mock.Mock()
1624        sock_connect.return_value = []
1625
1626        coro = self.loop.create_datagram_endpoint(
1627            lambda: protocol,
1628            remote_addr=('127.0.0.1', 0),
1629            allow_broadcast=True)
1630
1631        transport, _ = self.loop.run_until_complete(coro)
1632        self.assertFalse(sock_connect.called)
1633
1634        transport.close()
1635        self.loop.run_until_complete(protocol.done)
1636        self.assertEqual('CLOSED', protocol.state)
1637
1638    @patch_socket
1639    def test_create_datagram_endpoint_socket_err(self, m_socket):
1640        m_socket.getaddrinfo = socket.getaddrinfo
1641        m_socket.socket.side_effect = OSError
1642
1643        coro = self.loop.create_datagram_endpoint(
1644            asyncio.DatagramProtocol, family=socket.AF_INET)
1645        self.assertRaises(
1646            OSError, self.loop.run_until_complete, coro)
1647
1648        coro = self.loop.create_datagram_endpoint(
1649            asyncio.DatagramProtocol, local_addr=('127.0.0.1', 0))
1650        self.assertRaises(
1651            OSError, self.loop.run_until_complete, coro)
1652
1653    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 not supported or enabled')
1654    def test_create_datagram_endpoint_no_matching_family(self):
1655        coro = self.loop.create_datagram_endpoint(
1656            asyncio.DatagramProtocol,
1657            remote_addr=('127.0.0.1', 0), local_addr=('::1', 0))
1658        self.assertRaises(
1659            ValueError, self.loop.run_until_complete, coro)
1660
1661    @patch_socket
1662    def test_create_datagram_endpoint_setblk_err(self, m_socket):
1663        m_socket.socket.return_value.setblocking.side_effect = OSError
1664
1665        coro = self.loop.create_datagram_endpoint(
1666            asyncio.DatagramProtocol, family=socket.AF_INET)
1667        self.assertRaises(
1668            OSError, self.loop.run_until_complete, coro)
1669        self.assertTrue(
1670            m_socket.socket.return_value.close.called)
1671
1672    def test_create_datagram_endpoint_noaddr_nofamily(self):
1673        coro = self.loop.create_datagram_endpoint(
1674            asyncio.DatagramProtocol)
1675        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1676
1677    @patch_socket
1678    def test_create_datagram_endpoint_cant_bind(self, m_socket):
1679        class Err(OSError):
1680            pass
1681
1682        m_socket.getaddrinfo = socket.getaddrinfo
1683        m_sock = m_socket.socket.return_value = mock.Mock()
1684        m_sock.bind.side_effect = Err
1685
1686        fut = self.loop.create_datagram_endpoint(
1687            MyDatagramProto,
1688            local_addr=('127.0.0.1', 0), family=socket.AF_INET)
1689        self.assertRaises(Err, self.loop.run_until_complete, fut)
1690        self.assertTrue(m_sock.close.called)
1691
1692    def test_create_datagram_endpoint_sock(self):
1693        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1694        sock.bind(('127.0.0.1', 0))
1695        fut = self.loop.create_datagram_endpoint(
1696            lambda: MyDatagramProto(create_future=True, loop=self.loop),
1697            sock=sock)
1698        transport, protocol = self.loop.run_until_complete(fut)
1699        transport.close()
1700        self.loop.run_until_complete(protocol.done)
1701        self.assertEqual('CLOSED', protocol.state)
1702
1703    @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
1704    def test_create_datagram_endpoint_sock_unix(self):
1705        fut = self.loop.create_datagram_endpoint(
1706            lambda: MyDatagramProto(create_future=True, loop=self.loop),
1707            family=socket.AF_UNIX)
1708        transport, protocol = self.loop.run_until_complete(fut)
1709        self.assertEqual(transport._sock.family, socket.AF_UNIX)
1710        transport.close()
1711        self.loop.run_until_complete(protocol.done)
1712        self.assertEqual('CLOSED', protocol.state)
1713
1714    @socket_helper.skip_unless_bind_unix_socket
1715    def test_create_datagram_endpoint_existing_sock_unix(self):
1716        with test_utils.unix_socket_path() as path:
1717            sock = socket.socket(socket.AF_UNIX, type=socket.SOCK_DGRAM)
1718            sock.bind(path)
1719            sock.close()
1720
1721            coro = self.loop.create_datagram_endpoint(
1722                lambda: MyDatagramProto(create_future=True, loop=self.loop),
1723                path, family=socket.AF_UNIX)
1724            transport, protocol = self.loop.run_until_complete(coro)
1725            transport.close()
1726            self.loop.run_until_complete(protocol.done)
1727
1728    def test_create_datagram_endpoint_sock_sockopts(self):
1729        class FakeSock:
1730            type = socket.SOCK_DGRAM
1731
1732        fut = self.loop.create_datagram_endpoint(
1733            MyDatagramProto, local_addr=('127.0.0.1', 0), sock=FakeSock())
1734        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1735
1736        fut = self.loop.create_datagram_endpoint(
1737            MyDatagramProto, remote_addr=('127.0.0.1', 0), sock=FakeSock())
1738        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1739
1740        fut = self.loop.create_datagram_endpoint(
1741            MyDatagramProto, family=1, sock=FakeSock())
1742        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1743
1744        fut = self.loop.create_datagram_endpoint(
1745            MyDatagramProto, proto=1, sock=FakeSock())
1746        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1747
1748        fut = self.loop.create_datagram_endpoint(
1749            MyDatagramProto, flags=1, sock=FakeSock())
1750        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1751
1752        fut = self.loop.create_datagram_endpoint(
1753            MyDatagramProto, reuse_port=True, sock=FakeSock())
1754        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1755
1756        fut = self.loop.create_datagram_endpoint(
1757            MyDatagramProto, allow_broadcast=True, sock=FakeSock())
1758        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1759
1760    @unittest.skipIf(sys.platform == 'vxworks',
1761                    "SO_BROADCAST is enabled by default on VxWorks")
1762    def test_create_datagram_endpoint_sockopts(self):
1763        # Socket options should not be applied unless asked for.
1764        # SO_REUSEPORT is not available on all platforms.
1765
1766        coro = self.loop.create_datagram_endpoint(
1767            lambda: MyDatagramProto(create_future=True, loop=self.loop),
1768            local_addr=('127.0.0.1', 0))
1769        transport, protocol = self.loop.run_until_complete(coro)
1770        sock = transport.get_extra_info('socket')
1771
1772        reuseport_supported = hasattr(socket, 'SO_REUSEPORT')
1773
1774        if reuseport_supported:
1775            self.assertFalse(
1776                sock.getsockopt(
1777                    socket.SOL_SOCKET, socket.SO_REUSEPORT))
1778        self.assertFalse(
1779            sock.getsockopt(
1780                socket.SOL_SOCKET, socket.SO_BROADCAST))
1781
1782        transport.close()
1783        self.loop.run_until_complete(protocol.done)
1784        self.assertEqual('CLOSED', protocol.state)
1785
1786        coro = self.loop.create_datagram_endpoint(
1787            lambda: MyDatagramProto(create_future=True, loop=self.loop),
1788            local_addr=('127.0.0.1', 0),
1789            reuse_port=reuseport_supported,
1790            allow_broadcast=True)
1791        transport, protocol = self.loop.run_until_complete(coro)
1792        sock = transport.get_extra_info('socket')
1793
1794        self.assertFalse(
1795            sock.getsockopt(
1796                socket.SOL_SOCKET, socket.SO_REUSEADDR))
1797        if reuseport_supported:
1798            self.assertTrue(
1799                sock.getsockopt(
1800                    socket.SOL_SOCKET, socket.SO_REUSEPORT))
1801        self.assertTrue(
1802            sock.getsockopt(
1803                socket.SOL_SOCKET, socket.SO_BROADCAST))
1804
1805        transport.close()
1806        self.loop.run_until_complete(protocol.done)
1807        self.assertEqual('CLOSED', protocol.state)
1808
1809    @patch_socket
1810    def test_create_datagram_endpoint_nosoreuseport(self, m_socket):
1811        del m_socket.SO_REUSEPORT
1812        m_socket.socket.return_value = mock.Mock()
1813
1814        coro = self.loop.create_datagram_endpoint(
1815            lambda: MyDatagramProto(loop=self.loop),
1816            local_addr=('127.0.0.1', 0),
1817            reuse_port=True)
1818
1819        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1820
1821    @patch_socket
1822    def test_create_datagram_endpoint_ip_addr(self, m_socket):
1823        def getaddrinfo(*args, **kw):
1824            self.fail('should not have called getaddrinfo')
1825
1826        m_socket.getaddrinfo = getaddrinfo
1827        m_socket.socket.return_value.bind = bind = mock.Mock()
1828        self.loop._add_reader = mock.Mock()
1829
1830        reuseport_supported = hasattr(socket, 'SO_REUSEPORT')
1831        coro = self.loop.create_datagram_endpoint(
1832            lambda: MyDatagramProto(loop=self.loop),
1833            local_addr=('1.2.3.4', 0),
1834            reuse_port=reuseport_supported)
1835
1836        t, p = self.loop.run_until_complete(coro)
1837        try:
1838            bind.assert_called_with(('1.2.3.4', 0))
1839            m_socket.socket.assert_called_with(family=m_socket.AF_INET,
1840                                               proto=m_socket.IPPROTO_UDP,
1841                                               type=m_socket.SOCK_DGRAM)
1842        finally:
1843            t.close()
1844            test_utils.run_briefly(self.loop)  # allow transport to close
1845
1846    def test_accept_connection_retry(self):
1847        sock = mock.Mock()
1848        sock.accept.side_effect = BlockingIOError()
1849
1850        self.loop._accept_connection(MyProto, sock)
1851        self.assertFalse(sock.close.called)
1852
1853    @mock.patch('asyncio.base_events.logger')
1854    def test_accept_connection_exception(self, m_log):
1855        sock = mock.Mock()
1856        sock.fileno.return_value = 10
1857        sock.accept.side_effect = OSError(errno.EMFILE, 'Too many open files')
1858        self.loop._remove_reader = mock.Mock()
1859        self.loop.call_later = mock.Mock()
1860
1861        self.loop._accept_connection(MyProto, sock)
1862        self.assertTrue(m_log.error.called)
1863        self.assertFalse(sock.close.called)
1864        self.loop._remove_reader.assert_called_with(10)
1865        self.loop.call_later.assert_called_with(
1866            constants.ACCEPT_RETRY_DELAY,
1867            # self.loop._start_serving
1868            mock.ANY,
1869            MyProto, sock, None, None, mock.ANY, mock.ANY, mock.ANY)
1870
1871    def test_call_coroutine(self):
1872        async def simple_coroutine():
1873            pass
1874
1875        self.loop.set_debug(True)
1876        coro_func = simple_coroutine
1877        coro_obj = coro_func()
1878        self.addCleanup(coro_obj.close)
1879        for func in (coro_func, coro_obj):
1880            with self.assertRaises(TypeError):
1881                self.loop.call_soon(func)
1882            with self.assertRaises(TypeError):
1883                self.loop.call_soon_threadsafe(func)
1884            with self.assertRaises(TypeError):
1885                self.loop.call_later(60, func)
1886            with self.assertRaises(TypeError):
1887                self.loop.call_at(self.loop.time() + 60, func)
1888            with self.assertRaises(TypeError):
1889                self.loop.run_until_complete(
1890                    self.loop.run_in_executor(None, func))
1891
1892    @mock.patch('asyncio.base_events.logger')
1893    def test_log_slow_callbacks(self, m_logger):
1894        def stop_loop_cb(loop):
1895            loop.stop()
1896
1897        async def stop_loop_coro(loop):
1898            loop.stop()
1899
1900        asyncio.set_event_loop(self.loop)
1901        self.loop.set_debug(True)
1902        self.loop.slow_callback_duration = 0.0
1903
1904        # slow callback
1905        self.loop.call_soon(stop_loop_cb, self.loop)
1906        self.loop.run_forever()
1907        fmt, *args = m_logger.warning.call_args[0]
1908        self.assertRegex(fmt % tuple(args),
1909                         "^Executing <Handle.*stop_loop_cb.*> "
1910                         "took .* seconds$")
1911
1912        # slow task
1913        asyncio.ensure_future(stop_loop_coro(self.loop), loop=self.loop)
1914        self.loop.run_forever()
1915        fmt, *args = m_logger.warning.call_args[0]
1916        self.assertRegex(fmt % tuple(args),
1917                         "^Executing <Task.*stop_loop_coro.*> "
1918                         "took .* seconds$")
1919
1920
1921class RunningLoopTests(unittest.TestCase):
1922
1923    def test_running_loop_within_a_loop(self):
1924        async def runner(loop):
1925            loop.run_forever()
1926
1927        loop = asyncio.new_event_loop()
1928        outer_loop = asyncio.new_event_loop()
1929        try:
1930            with self.assertRaisesRegex(RuntimeError,
1931                                        'while another loop is running'):
1932                outer_loop.run_until_complete(runner(loop))
1933        finally:
1934            loop.close()
1935            outer_loop.close()
1936
1937
1938class BaseLoopSockSendfileTests(test_utils.TestCase):
1939
1940    DATA = b"12345abcde" * 16 * 1024  # 160 KiB
1941
1942    class MyProto(asyncio.Protocol):
1943
1944        def __init__(self, loop):
1945            self.started = False
1946            self.closed = False
1947            self.data = bytearray()
1948            self.fut = loop.create_future()
1949            self.transport = None
1950
1951        def connection_made(self, transport):
1952            self.started = True
1953            self.transport = transport
1954
1955        def data_received(self, data):
1956            self.data.extend(data)
1957
1958        def connection_lost(self, exc):
1959            self.closed = True
1960            self.fut.set_result(None)
1961            self.transport = None
1962
1963        async def wait_closed(self):
1964            await self.fut
1965
1966    @classmethod
1967    def setUpClass(cls):
1968        cls.__old_bufsize = constants.SENDFILE_FALLBACK_READBUFFER_SIZE
1969        constants.SENDFILE_FALLBACK_READBUFFER_SIZE = 1024 * 16
1970        with open(os_helper.TESTFN, 'wb') as fp:
1971            fp.write(cls.DATA)
1972        super().setUpClass()
1973
1974    @classmethod
1975    def tearDownClass(cls):
1976        constants.SENDFILE_FALLBACK_READBUFFER_SIZE = cls.__old_bufsize
1977        os_helper.unlink(os_helper.TESTFN)
1978        super().tearDownClass()
1979
1980    def setUp(self):
1981        from asyncio.selector_events import BaseSelectorEventLoop
1982        # BaseSelectorEventLoop() has no native implementation
1983        self.loop = BaseSelectorEventLoop()
1984        self.set_event_loop(self.loop)
1985        self.file = open(os_helper.TESTFN, 'rb')
1986        self.addCleanup(self.file.close)
1987        super().setUp()
1988
1989    def make_socket(self, blocking=False):
1990        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1991        sock.setblocking(blocking)
1992        self.addCleanup(sock.close)
1993        return sock
1994
1995    def run_loop(self, coro):
1996        return self.loop.run_until_complete(coro)
1997
1998    def prepare(self):
1999        sock = self.make_socket()
2000        proto = self.MyProto(self.loop)
2001        server = self.run_loop(self.loop.create_server(
2002            lambda: proto, socket_helper.HOST, 0, family=socket.AF_INET))
2003        addr = server.sockets[0].getsockname()
2004
2005        for _ in range(10):
2006            try:
2007                self.run_loop(self.loop.sock_connect(sock, addr))
2008            except OSError:
2009                self.run_loop(asyncio.sleep(0.5))
2010                continue
2011            else:
2012                break
2013        else:
2014            # One last try, so we get the exception
2015            self.run_loop(self.loop.sock_connect(sock, addr))
2016
2017        def cleanup():
2018            server.close()
2019            self.run_loop(server.wait_closed())
2020            sock.close()
2021            if proto.transport is not None:
2022                proto.transport.close()
2023                self.run_loop(proto.wait_closed())
2024
2025        self.addCleanup(cleanup)
2026
2027        return sock, proto
2028
2029    def test__sock_sendfile_native_failure(self):
2030        sock, proto = self.prepare()
2031
2032        with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
2033                                    "sendfile is not available"):
2034            self.run_loop(self.loop._sock_sendfile_native(sock, self.file,
2035                                                          0, None))
2036
2037        self.assertEqual(proto.data, b'')
2038        self.assertEqual(self.file.tell(), 0)
2039
2040    def test_sock_sendfile_no_fallback(self):
2041        sock, proto = self.prepare()
2042
2043        with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
2044                                    "sendfile is not available"):
2045            self.run_loop(self.loop.sock_sendfile(sock, self.file,
2046                                                  fallback=False))
2047
2048        self.assertEqual(self.file.tell(), 0)
2049        self.assertEqual(proto.data, b'')
2050
2051    def test_sock_sendfile_fallback(self):
2052        sock, proto = self.prepare()
2053
2054        ret = self.run_loop(self.loop.sock_sendfile(sock, self.file))
2055        sock.close()
2056        self.run_loop(proto.wait_closed())
2057
2058        self.assertEqual(ret, len(self.DATA))
2059        self.assertEqual(self.file.tell(), len(self.DATA))
2060        self.assertEqual(proto.data, self.DATA)
2061
2062    def test_sock_sendfile_fallback_offset_and_count(self):
2063        sock, proto = self.prepare()
2064
2065        ret = self.run_loop(self.loop.sock_sendfile(sock, self.file,
2066                                                    1000, 2000))
2067        sock.close()
2068        self.run_loop(proto.wait_closed())
2069
2070        self.assertEqual(ret, 2000)
2071        self.assertEqual(self.file.tell(), 3000)
2072        self.assertEqual(proto.data, self.DATA[1000:3000])
2073
2074    def test_blocking_socket(self):
2075        self.loop.set_debug(True)
2076        sock = self.make_socket(blocking=True)
2077        with self.assertRaisesRegex(ValueError, "must be non-blocking"):
2078            self.run_loop(self.loop.sock_sendfile(sock, self.file))
2079
2080    def test_nonbinary_file(self):
2081        sock = self.make_socket()
2082        with open(os_helper.TESTFN, encoding="utf-8") as f:
2083            with self.assertRaisesRegex(ValueError, "binary mode"):
2084                self.run_loop(self.loop.sock_sendfile(sock, f))
2085
2086    def test_nonstream_socket(self):
2087        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
2088        sock.setblocking(False)
2089        self.addCleanup(sock.close)
2090        with self.assertRaisesRegex(ValueError, "only SOCK_STREAM type"):
2091            self.run_loop(self.loop.sock_sendfile(sock, self.file))
2092
2093    def test_notint_count(self):
2094        sock = self.make_socket()
2095        with self.assertRaisesRegex(TypeError,
2096                                    "count must be a positive integer"):
2097            self.run_loop(self.loop.sock_sendfile(sock, self.file, 0, 'count'))
2098
2099    def test_negative_count(self):
2100        sock = self.make_socket()
2101        with self.assertRaisesRegex(ValueError,
2102                                    "count must be a positive integer"):
2103            self.run_loop(self.loop.sock_sendfile(sock, self.file, 0, -1))
2104
2105    def test_notint_offset(self):
2106        sock = self.make_socket()
2107        with self.assertRaisesRegex(TypeError,
2108                                    "offset must be a non-negative integer"):
2109            self.run_loop(self.loop.sock_sendfile(sock, self.file, 'offset'))
2110
2111    def test_negative_offset(self):
2112        sock = self.make_socket()
2113        with self.assertRaisesRegex(ValueError,
2114                                    "offset must be a non-negative integer"):
2115            self.run_loop(self.loop.sock_sendfile(sock, self.file, -1))
2116
2117
2118class TestSelectorUtils(test_utils.TestCase):
2119    def check_set_nodelay(self, sock):
2120        opt = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY)
2121        self.assertFalse(opt)
2122
2123        base_events._set_nodelay(sock)
2124
2125        opt = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY)
2126        self.assertTrue(opt)
2127
2128    @unittest.skipUnless(hasattr(socket, 'TCP_NODELAY'),
2129                         'need socket.TCP_NODELAY')
2130    def test_set_nodelay(self):
2131        sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM,
2132                             proto=socket.IPPROTO_TCP)
2133        with sock:
2134            self.check_set_nodelay(sock)
2135
2136        sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM,
2137                             proto=socket.IPPROTO_TCP)
2138        with sock:
2139            sock.setblocking(False)
2140            self.check_set_nodelay(sock)
2141
2142
2143
2144if __name__ == '__main__':
2145    unittest.main()
2146