1*cda5da8dSAndroid Build Coastguard Worker"""Support for tasks, coroutines and the scheduler.""" 2*cda5da8dSAndroid Build Coastguard Worker 3*cda5da8dSAndroid Build Coastguard Worker__all__ = ( 4*cda5da8dSAndroid Build Coastguard Worker 'Task', 'create_task', 5*cda5da8dSAndroid Build Coastguard Worker 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED', 6*cda5da8dSAndroid Build Coastguard Worker 'wait', 'wait_for', 'as_completed', 'sleep', 7*cda5da8dSAndroid Build Coastguard Worker 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe', 8*cda5da8dSAndroid Build Coastguard Worker 'current_task', 'all_tasks', 9*cda5da8dSAndroid Build Coastguard Worker '_register_task', '_unregister_task', '_enter_task', '_leave_task', 10*cda5da8dSAndroid Build Coastguard Worker) 11*cda5da8dSAndroid Build Coastguard Worker 12*cda5da8dSAndroid Build Coastguard Workerimport concurrent.futures 13*cda5da8dSAndroid Build Coastguard Workerimport contextvars 14*cda5da8dSAndroid Build Coastguard Workerimport functools 15*cda5da8dSAndroid Build Coastguard Workerimport inspect 16*cda5da8dSAndroid Build Coastguard Workerimport itertools 17*cda5da8dSAndroid Build Coastguard Workerimport types 18*cda5da8dSAndroid Build Coastguard Workerimport warnings 19*cda5da8dSAndroid Build Coastguard Workerimport weakref 20*cda5da8dSAndroid Build Coastguard Workerfrom types import GenericAlias 21*cda5da8dSAndroid Build Coastguard Worker 22*cda5da8dSAndroid Build Coastguard Workerfrom . import base_tasks 23*cda5da8dSAndroid Build Coastguard Workerfrom . import coroutines 24*cda5da8dSAndroid Build Coastguard Workerfrom . import events 25*cda5da8dSAndroid Build Coastguard Workerfrom . import exceptions 26*cda5da8dSAndroid Build Coastguard Workerfrom . import futures 27*cda5da8dSAndroid Build Coastguard Workerfrom .coroutines import _is_coroutine 28*cda5da8dSAndroid Build Coastguard Worker 29*cda5da8dSAndroid Build Coastguard Worker# Helper to generate new task names 30*cda5da8dSAndroid Build Coastguard Worker# This uses itertools.count() instead of a "+= 1" operation because the latter 31*cda5da8dSAndroid Build Coastguard Worker# is not thread safe. See bpo-11866 for a longer explanation. 32*cda5da8dSAndroid Build Coastguard Worker_task_name_counter = itertools.count(1).__next__ 33*cda5da8dSAndroid Build Coastguard Worker 34*cda5da8dSAndroid Build Coastguard Worker 35*cda5da8dSAndroid Build Coastguard Workerdef current_task(loop=None): 36*cda5da8dSAndroid Build Coastguard Worker """Return a currently executed task.""" 37*cda5da8dSAndroid Build Coastguard Worker if loop is None: 38*cda5da8dSAndroid Build Coastguard Worker loop = events.get_running_loop() 39*cda5da8dSAndroid Build Coastguard Worker return _current_tasks.get(loop) 40*cda5da8dSAndroid Build Coastguard Worker 41*cda5da8dSAndroid Build Coastguard Worker 42*cda5da8dSAndroid Build Coastguard Workerdef all_tasks(loop=None): 43*cda5da8dSAndroid Build Coastguard Worker """Return a set of all tasks for the loop.""" 44*cda5da8dSAndroid Build Coastguard Worker if loop is None: 45*cda5da8dSAndroid Build Coastguard Worker loop = events.get_running_loop() 46*cda5da8dSAndroid Build Coastguard Worker # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another 47*cda5da8dSAndroid Build Coastguard Worker # thread while we do so. Therefore we cast it to list prior to filtering. The list 48*cda5da8dSAndroid Build Coastguard Worker # cast itself requires iteration, so we repeat it several times ignoring 49*cda5da8dSAndroid Build Coastguard Worker # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for 50*cda5da8dSAndroid Build Coastguard Worker # details. 51*cda5da8dSAndroid Build Coastguard Worker i = 0 52*cda5da8dSAndroid Build Coastguard Worker while True: 53*cda5da8dSAndroid Build Coastguard Worker try: 54*cda5da8dSAndroid Build Coastguard Worker tasks = list(_all_tasks) 55*cda5da8dSAndroid Build Coastguard Worker except RuntimeError: 56*cda5da8dSAndroid Build Coastguard Worker i += 1 57*cda5da8dSAndroid Build Coastguard Worker if i >= 1000: 58*cda5da8dSAndroid Build Coastguard Worker raise 59*cda5da8dSAndroid Build Coastguard Worker else: 60*cda5da8dSAndroid Build Coastguard Worker break 61*cda5da8dSAndroid Build Coastguard Worker return {t for t in tasks 62*cda5da8dSAndroid Build Coastguard Worker if futures._get_loop(t) is loop and not t.done()} 63*cda5da8dSAndroid Build Coastguard Worker 64*cda5da8dSAndroid Build Coastguard Worker 65*cda5da8dSAndroid Build Coastguard Workerdef _set_task_name(task, name): 66*cda5da8dSAndroid Build Coastguard Worker if name is not None: 67*cda5da8dSAndroid Build Coastguard Worker try: 68*cda5da8dSAndroid Build Coastguard Worker set_name = task.set_name 69*cda5da8dSAndroid Build Coastguard Worker except AttributeError: 70*cda5da8dSAndroid Build Coastguard Worker warnings.warn("Task.set_name() was added in Python 3.8, " 71*cda5da8dSAndroid Build Coastguard Worker "the method support will be mandatory for third-party " 72*cda5da8dSAndroid Build Coastguard Worker "task implementations since 3.13.", 73*cda5da8dSAndroid Build Coastguard Worker DeprecationWarning, stacklevel=3) 74*cda5da8dSAndroid Build Coastguard Worker else: 75*cda5da8dSAndroid Build Coastguard Worker set_name(name) 76*cda5da8dSAndroid Build Coastguard Worker 77*cda5da8dSAndroid Build Coastguard Worker 78*cda5da8dSAndroid Build Coastguard Workerclass Task(futures._PyFuture): # Inherit Python Task implementation 79*cda5da8dSAndroid Build Coastguard Worker # from a Python Future implementation. 80*cda5da8dSAndroid Build Coastguard Worker 81*cda5da8dSAndroid Build Coastguard Worker """A coroutine wrapped in a Future.""" 82*cda5da8dSAndroid Build Coastguard Worker 83*cda5da8dSAndroid Build Coastguard Worker # An important invariant maintained while a Task not done: 84*cda5da8dSAndroid Build Coastguard Worker # 85*cda5da8dSAndroid Build Coastguard Worker # - Either _fut_waiter is None, and _step() is scheduled; 86*cda5da8dSAndroid Build Coastguard Worker # - or _fut_waiter is some Future, and _step() is *not* scheduled. 87*cda5da8dSAndroid Build Coastguard Worker # 88*cda5da8dSAndroid Build Coastguard Worker # The only transition from the latter to the former is through 89*cda5da8dSAndroid Build Coastguard Worker # _wakeup(). When _fut_waiter is not None, one of its callbacks 90*cda5da8dSAndroid Build Coastguard Worker # must be _wakeup(). 91*cda5da8dSAndroid Build Coastguard Worker 92*cda5da8dSAndroid Build Coastguard Worker # If False, don't log a message if the task is destroyed whereas its 93*cda5da8dSAndroid Build Coastguard Worker # status is still pending 94*cda5da8dSAndroid Build Coastguard Worker _log_destroy_pending = True 95*cda5da8dSAndroid Build Coastguard Worker 96*cda5da8dSAndroid Build Coastguard Worker def __init__(self, coro, *, loop=None, name=None, context=None): 97*cda5da8dSAndroid Build Coastguard Worker super().__init__(loop=loop) 98*cda5da8dSAndroid Build Coastguard Worker if self._source_traceback: 99*cda5da8dSAndroid Build Coastguard Worker del self._source_traceback[-1] 100*cda5da8dSAndroid Build Coastguard Worker if not coroutines.iscoroutine(coro): 101*cda5da8dSAndroid Build Coastguard Worker # raise after Future.__init__(), attrs are required for __del__ 102*cda5da8dSAndroid Build Coastguard Worker # prevent logging for pending task in __del__ 103*cda5da8dSAndroid Build Coastguard Worker self._log_destroy_pending = False 104*cda5da8dSAndroid Build Coastguard Worker raise TypeError(f"a coroutine was expected, got {coro!r}") 105*cda5da8dSAndroid Build Coastguard Worker 106*cda5da8dSAndroid Build Coastguard Worker if name is None: 107*cda5da8dSAndroid Build Coastguard Worker self._name = f'Task-{_task_name_counter()}' 108*cda5da8dSAndroid Build Coastguard Worker else: 109*cda5da8dSAndroid Build Coastguard Worker self._name = str(name) 110*cda5da8dSAndroid Build Coastguard Worker 111*cda5da8dSAndroid Build Coastguard Worker self._num_cancels_requested = 0 112*cda5da8dSAndroid Build Coastguard Worker self._must_cancel = False 113*cda5da8dSAndroid Build Coastguard Worker self._fut_waiter = None 114*cda5da8dSAndroid Build Coastguard Worker self._coro = coro 115*cda5da8dSAndroid Build Coastguard Worker if context is None: 116*cda5da8dSAndroid Build Coastguard Worker self._context = contextvars.copy_context() 117*cda5da8dSAndroid Build Coastguard Worker else: 118*cda5da8dSAndroid Build Coastguard Worker self._context = context 119*cda5da8dSAndroid Build Coastguard Worker 120*cda5da8dSAndroid Build Coastguard Worker self._loop.call_soon(self.__step, context=self._context) 121*cda5da8dSAndroid Build Coastguard Worker _register_task(self) 122*cda5da8dSAndroid Build Coastguard Worker 123*cda5da8dSAndroid Build Coastguard Worker def __del__(self): 124*cda5da8dSAndroid Build Coastguard Worker if self._state == futures._PENDING and self._log_destroy_pending: 125*cda5da8dSAndroid Build Coastguard Worker context = { 126*cda5da8dSAndroid Build Coastguard Worker 'task': self, 127*cda5da8dSAndroid Build Coastguard Worker 'message': 'Task was destroyed but it is pending!', 128*cda5da8dSAndroid Build Coastguard Worker } 129*cda5da8dSAndroid Build Coastguard Worker if self._source_traceback: 130*cda5da8dSAndroid Build Coastguard Worker context['source_traceback'] = self._source_traceback 131*cda5da8dSAndroid Build Coastguard Worker self._loop.call_exception_handler(context) 132*cda5da8dSAndroid Build Coastguard Worker super().__del__() 133*cda5da8dSAndroid Build Coastguard Worker 134*cda5da8dSAndroid Build Coastguard Worker __class_getitem__ = classmethod(GenericAlias) 135*cda5da8dSAndroid Build Coastguard Worker 136*cda5da8dSAndroid Build Coastguard Worker def __repr__(self): 137*cda5da8dSAndroid Build Coastguard Worker return base_tasks._task_repr(self) 138*cda5da8dSAndroid Build Coastguard Worker 139*cda5da8dSAndroid Build Coastguard Worker def get_coro(self): 140*cda5da8dSAndroid Build Coastguard Worker return self._coro 141*cda5da8dSAndroid Build Coastguard Worker 142*cda5da8dSAndroid Build Coastguard Worker def get_name(self): 143*cda5da8dSAndroid Build Coastguard Worker return self._name 144*cda5da8dSAndroid Build Coastguard Worker 145*cda5da8dSAndroid Build Coastguard Worker def set_name(self, value): 146*cda5da8dSAndroid Build Coastguard Worker self._name = str(value) 147*cda5da8dSAndroid Build Coastguard Worker 148*cda5da8dSAndroid Build Coastguard Worker def set_result(self, result): 149*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError('Task does not support set_result operation') 150*cda5da8dSAndroid Build Coastguard Worker 151*cda5da8dSAndroid Build Coastguard Worker def set_exception(self, exception): 152*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError('Task does not support set_exception operation') 153*cda5da8dSAndroid Build Coastguard Worker 154*cda5da8dSAndroid Build Coastguard Worker def get_stack(self, *, limit=None): 155*cda5da8dSAndroid Build Coastguard Worker """Return the list of stack frames for this task's coroutine. 156*cda5da8dSAndroid Build Coastguard Worker 157*cda5da8dSAndroid Build Coastguard Worker If the coroutine is not done, this returns the stack where it is 158*cda5da8dSAndroid Build Coastguard Worker suspended. If the coroutine has completed successfully or was 159*cda5da8dSAndroid Build Coastguard Worker cancelled, this returns an empty list. If the coroutine was 160*cda5da8dSAndroid Build Coastguard Worker terminated by an exception, this returns the list of traceback 161*cda5da8dSAndroid Build Coastguard Worker frames. 162*cda5da8dSAndroid Build Coastguard Worker 163*cda5da8dSAndroid Build Coastguard Worker The frames are always ordered from oldest to newest. 164*cda5da8dSAndroid Build Coastguard Worker 165*cda5da8dSAndroid Build Coastguard Worker The optional limit gives the maximum number of frames to 166*cda5da8dSAndroid Build Coastguard Worker return; by default all available frames are returned. Its 167*cda5da8dSAndroid Build Coastguard Worker meaning differs depending on whether a stack or a traceback is 168*cda5da8dSAndroid Build Coastguard Worker returned: the newest frames of a stack are returned, but the 169*cda5da8dSAndroid Build Coastguard Worker oldest frames of a traceback are returned. (This matches the 170*cda5da8dSAndroid Build Coastguard Worker behavior of the traceback module.) 171*cda5da8dSAndroid Build Coastguard Worker 172*cda5da8dSAndroid Build Coastguard Worker For reasons beyond our control, only one stack frame is 173*cda5da8dSAndroid Build Coastguard Worker returned for a suspended coroutine. 174*cda5da8dSAndroid Build Coastguard Worker """ 175*cda5da8dSAndroid Build Coastguard Worker return base_tasks._task_get_stack(self, limit) 176*cda5da8dSAndroid Build Coastguard Worker 177*cda5da8dSAndroid Build Coastguard Worker def print_stack(self, *, limit=None, file=None): 178*cda5da8dSAndroid Build Coastguard Worker """Print the stack or traceback for this task's coroutine. 179*cda5da8dSAndroid Build Coastguard Worker 180*cda5da8dSAndroid Build Coastguard Worker This produces output similar to that of the traceback module, 181*cda5da8dSAndroid Build Coastguard Worker for the frames retrieved by get_stack(). The limit argument 182*cda5da8dSAndroid Build Coastguard Worker is passed to get_stack(). The file argument is an I/O stream 183*cda5da8dSAndroid Build Coastguard Worker to which the output is written; by default output is written 184*cda5da8dSAndroid Build Coastguard Worker to sys.stderr. 185*cda5da8dSAndroid Build Coastguard Worker """ 186*cda5da8dSAndroid Build Coastguard Worker return base_tasks._task_print_stack(self, limit, file) 187*cda5da8dSAndroid Build Coastguard Worker 188*cda5da8dSAndroid Build Coastguard Worker def cancel(self, msg=None): 189*cda5da8dSAndroid Build Coastguard Worker """Request that this task cancel itself. 190*cda5da8dSAndroid Build Coastguard Worker 191*cda5da8dSAndroid Build Coastguard Worker This arranges for a CancelledError to be thrown into the 192*cda5da8dSAndroid Build Coastguard Worker wrapped coroutine on the next cycle through the event loop. 193*cda5da8dSAndroid Build Coastguard Worker The coroutine then has a chance to clean up or even deny 194*cda5da8dSAndroid Build Coastguard Worker the request using try/except/finally. 195*cda5da8dSAndroid Build Coastguard Worker 196*cda5da8dSAndroid Build Coastguard Worker Unlike Future.cancel, this does not guarantee that the 197*cda5da8dSAndroid Build Coastguard Worker task will be cancelled: the exception might be caught and 198*cda5da8dSAndroid Build Coastguard Worker acted upon, delaying cancellation of the task or preventing 199*cda5da8dSAndroid Build Coastguard Worker cancellation completely. The task may also return a value or 200*cda5da8dSAndroid Build Coastguard Worker raise a different exception. 201*cda5da8dSAndroid Build Coastguard Worker 202*cda5da8dSAndroid Build Coastguard Worker Immediately after this method is called, Task.cancelled() will 203*cda5da8dSAndroid Build Coastguard Worker not return True (unless the task was already cancelled). A 204*cda5da8dSAndroid Build Coastguard Worker task will be marked as cancelled when the wrapped coroutine 205*cda5da8dSAndroid Build Coastguard Worker terminates with a CancelledError exception (even if cancel() 206*cda5da8dSAndroid Build Coastguard Worker was not called). 207*cda5da8dSAndroid Build Coastguard Worker 208*cda5da8dSAndroid Build Coastguard Worker This also increases the task's count of cancellation requests. 209*cda5da8dSAndroid Build Coastguard Worker """ 210*cda5da8dSAndroid Build Coastguard Worker self._log_traceback = False 211*cda5da8dSAndroid Build Coastguard Worker if self.done(): 212*cda5da8dSAndroid Build Coastguard Worker return False 213*cda5da8dSAndroid Build Coastguard Worker self._num_cancels_requested += 1 214*cda5da8dSAndroid Build Coastguard Worker # These two lines are controversial. See discussion starting at 215*cda5da8dSAndroid Build Coastguard Worker # https://github.com/python/cpython/pull/31394#issuecomment-1053545331 216*cda5da8dSAndroid Build Coastguard Worker # Also remember that this is duplicated in _asynciomodule.c. 217*cda5da8dSAndroid Build Coastguard Worker # if self._num_cancels_requested > 1: 218*cda5da8dSAndroid Build Coastguard Worker # return False 219*cda5da8dSAndroid Build Coastguard Worker if self._fut_waiter is not None: 220*cda5da8dSAndroid Build Coastguard Worker if self._fut_waiter.cancel(msg=msg): 221*cda5da8dSAndroid Build Coastguard Worker # Leave self._fut_waiter; it may be a Task that 222*cda5da8dSAndroid Build Coastguard Worker # catches and ignores the cancellation so we may have 223*cda5da8dSAndroid Build Coastguard Worker # to cancel it again later. 224*cda5da8dSAndroid Build Coastguard Worker return True 225*cda5da8dSAndroid Build Coastguard Worker # It must be the case that self.__step is already scheduled. 226*cda5da8dSAndroid Build Coastguard Worker self._must_cancel = True 227*cda5da8dSAndroid Build Coastguard Worker self._cancel_message = msg 228*cda5da8dSAndroid Build Coastguard Worker return True 229*cda5da8dSAndroid Build Coastguard Worker 230*cda5da8dSAndroid Build Coastguard Worker def cancelling(self): 231*cda5da8dSAndroid Build Coastguard Worker """Return the count of the task's cancellation requests. 232*cda5da8dSAndroid Build Coastguard Worker 233*cda5da8dSAndroid Build Coastguard Worker This count is incremented when .cancel() is called 234*cda5da8dSAndroid Build Coastguard Worker and may be decremented using .uncancel(). 235*cda5da8dSAndroid Build Coastguard Worker """ 236*cda5da8dSAndroid Build Coastguard Worker return self._num_cancels_requested 237*cda5da8dSAndroid Build Coastguard Worker 238*cda5da8dSAndroid Build Coastguard Worker def uncancel(self): 239*cda5da8dSAndroid Build Coastguard Worker """Decrement the task's count of cancellation requests. 240*cda5da8dSAndroid Build Coastguard Worker 241*cda5da8dSAndroid Build Coastguard Worker This should be called by the party that called `cancel()` on the task 242*cda5da8dSAndroid Build Coastguard Worker beforehand. 243*cda5da8dSAndroid Build Coastguard Worker 244*cda5da8dSAndroid Build Coastguard Worker Returns the remaining number of cancellation requests. 245*cda5da8dSAndroid Build Coastguard Worker """ 246*cda5da8dSAndroid Build Coastguard Worker if self._num_cancels_requested > 0: 247*cda5da8dSAndroid Build Coastguard Worker self._num_cancels_requested -= 1 248*cda5da8dSAndroid Build Coastguard Worker return self._num_cancels_requested 249*cda5da8dSAndroid Build Coastguard Worker 250*cda5da8dSAndroid Build Coastguard Worker def __step(self, exc=None): 251*cda5da8dSAndroid Build Coastguard Worker if self.done(): 252*cda5da8dSAndroid Build Coastguard Worker raise exceptions.InvalidStateError( 253*cda5da8dSAndroid Build Coastguard Worker f'_step(): already done: {self!r}, {exc!r}') 254*cda5da8dSAndroid Build Coastguard Worker if self._must_cancel: 255*cda5da8dSAndroid Build Coastguard Worker if not isinstance(exc, exceptions.CancelledError): 256*cda5da8dSAndroid Build Coastguard Worker exc = self._make_cancelled_error() 257*cda5da8dSAndroid Build Coastguard Worker self._must_cancel = False 258*cda5da8dSAndroid Build Coastguard Worker coro = self._coro 259*cda5da8dSAndroid Build Coastguard Worker self._fut_waiter = None 260*cda5da8dSAndroid Build Coastguard Worker 261*cda5da8dSAndroid Build Coastguard Worker _enter_task(self._loop, self) 262*cda5da8dSAndroid Build Coastguard Worker # Call either coro.throw(exc) or coro.send(None). 263*cda5da8dSAndroid Build Coastguard Worker try: 264*cda5da8dSAndroid Build Coastguard Worker if exc is None: 265*cda5da8dSAndroid Build Coastguard Worker # We use the `send` method directly, because coroutines 266*cda5da8dSAndroid Build Coastguard Worker # don't have `__iter__` and `__next__` methods. 267*cda5da8dSAndroid Build Coastguard Worker result = coro.send(None) 268*cda5da8dSAndroid Build Coastguard Worker else: 269*cda5da8dSAndroid Build Coastguard Worker result = coro.throw(exc) 270*cda5da8dSAndroid Build Coastguard Worker except StopIteration as exc: 271*cda5da8dSAndroid Build Coastguard Worker if self._must_cancel: 272*cda5da8dSAndroid Build Coastguard Worker # Task is cancelled right before coro stops. 273*cda5da8dSAndroid Build Coastguard Worker self._must_cancel = False 274*cda5da8dSAndroid Build Coastguard Worker super().cancel(msg=self._cancel_message) 275*cda5da8dSAndroid Build Coastguard Worker else: 276*cda5da8dSAndroid Build Coastguard Worker super().set_result(exc.value) 277*cda5da8dSAndroid Build Coastguard Worker except exceptions.CancelledError as exc: 278*cda5da8dSAndroid Build Coastguard Worker # Save the original exception so we can chain it later. 279*cda5da8dSAndroid Build Coastguard Worker self._cancelled_exc = exc 280*cda5da8dSAndroid Build Coastguard Worker super().cancel() # I.e., Future.cancel(self). 281*cda5da8dSAndroid Build Coastguard Worker except (KeyboardInterrupt, SystemExit) as exc: 282*cda5da8dSAndroid Build Coastguard Worker super().set_exception(exc) 283*cda5da8dSAndroid Build Coastguard Worker raise 284*cda5da8dSAndroid Build Coastguard Worker except BaseException as exc: 285*cda5da8dSAndroid Build Coastguard Worker super().set_exception(exc) 286*cda5da8dSAndroid Build Coastguard Worker else: 287*cda5da8dSAndroid Build Coastguard Worker blocking = getattr(result, '_asyncio_future_blocking', None) 288*cda5da8dSAndroid Build Coastguard Worker if blocking is not None: 289*cda5da8dSAndroid Build Coastguard Worker # Yielded Future must come from Future.__iter__(). 290*cda5da8dSAndroid Build Coastguard Worker if futures._get_loop(result) is not self._loop: 291*cda5da8dSAndroid Build Coastguard Worker new_exc = RuntimeError( 292*cda5da8dSAndroid Build Coastguard Worker f'Task {self!r} got Future ' 293*cda5da8dSAndroid Build Coastguard Worker f'{result!r} attached to a different loop') 294*cda5da8dSAndroid Build Coastguard Worker self._loop.call_soon( 295*cda5da8dSAndroid Build Coastguard Worker self.__step, new_exc, context=self._context) 296*cda5da8dSAndroid Build Coastguard Worker elif blocking: 297*cda5da8dSAndroid Build Coastguard Worker if result is self: 298*cda5da8dSAndroid Build Coastguard Worker new_exc = RuntimeError( 299*cda5da8dSAndroid Build Coastguard Worker f'Task cannot await on itself: {self!r}') 300*cda5da8dSAndroid Build Coastguard Worker self._loop.call_soon( 301*cda5da8dSAndroid Build Coastguard Worker self.__step, new_exc, context=self._context) 302*cda5da8dSAndroid Build Coastguard Worker else: 303*cda5da8dSAndroid Build Coastguard Worker result._asyncio_future_blocking = False 304*cda5da8dSAndroid Build Coastguard Worker result.add_done_callback( 305*cda5da8dSAndroid Build Coastguard Worker self.__wakeup, context=self._context) 306*cda5da8dSAndroid Build Coastguard Worker self._fut_waiter = result 307*cda5da8dSAndroid Build Coastguard Worker if self._must_cancel: 308*cda5da8dSAndroid Build Coastguard Worker if self._fut_waiter.cancel( 309*cda5da8dSAndroid Build Coastguard Worker msg=self._cancel_message): 310*cda5da8dSAndroid Build Coastguard Worker self._must_cancel = False 311*cda5da8dSAndroid Build Coastguard Worker else: 312*cda5da8dSAndroid Build Coastguard Worker new_exc = RuntimeError( 313*cda5da8dSAndroid Build Coastguard Worker f'yield was used instead of yield from ' 314*cda5da8dSAndroid Build Coastguard Worker f'in task {self!r} with {result!r}') 315*cda5da8dSAndroid Build Coastguard Worker self._loop.call_soon( 316*cda5da8dSAndroid Build Coastguard Worker self.__step, new_exc, context=self._context) 317*cda5da8dSAndroid Build Coastguard Worker 318*cda5da8dSAndroid Build Coastguard Worker elif result is None: 319*cda5da8dSAndroid Build Coastguard Worker # Bare yield relinquishes control for one event loop iteration. 320*cda5da8dSAndroid Build Coastguard Worker self._loop.call_soon(self.__step, context=self._context) 321*cda5da8dSAndroid Build Coastguard Worker elif inspect.isgenerator(result): 322*cda5da8dSAndroid Build Coastguard Worker # Yielding a generator is just wrong. 323*cda5da8dSAndroid Build Coastguard Worker new_exc = RuntimeError( 324*cda5da8dSAndroid Build Coastguard Worker f'yield was used instead of yield from for ' 325*cda5da8dSAndroid Build Coastguard Worker f'generator in task {self!r} with {result!r}') 326*cda5da8dSAndroid Build Coastguard Worker self._loop.call_soon( 327*cda5da8dSAndroid Build Coastguard Worker self.__step, new_exc, context=self._context) 328*cda5da8dSAndroid Build Coastguard Worker else: 329*cda5da8dSAndroid Build Coastguard Worker # Yielding something else is an error. 330*cda5da8dSAndroid Build Coastguard Worker new_exc = RuntimeError(f'Task got bad yield: {result!r}') 331*cda5da8dSAndroid Build Coastguard Worker self._loop.call_soon( 332*cda5da8dSAndroid Build Coastguard Worker self.__step, new_exc, context=self._context) 333*cda5da8dSAndroid Build Coastguard Worker finally: 334*cda5da8dSAndroid Build Coastguard Worker _leave_task(self._loop, self) 335*cda5da8dSAndroid Build Coastguard Worker self = None # Needed to break cycles when an exception occurs. 336*cda5da8dSAndroid Build Coastguard Worker 337*cda5da8dSAndroid Build Coastguard Worker def __wakeup(self, future): 338*cda5da8dSAndroid Build Coastguard Worker try: 339*cda5da8dSAndroid Build Coastguard Worker future.result() 340*cda5da8dSAndroid Build Coastguard Worker except BaseException as exc: 341*cda5da8dSAndroid Build Coastguard Worker # This may also be a cancellation. 342*cda5da8dSAndroid Build Coastguard Worker self.__step(exc) 343*cda5da8dSAndroid Build Coastguard Worker else: 344*cda5da8dSAndroid Build Coastguard Worker # Don't pass the value of `future.result()` explicitly, 345*cda5da8dSAndroid Build Coastguard Worker # as `Future.__iter__` and `Future.__await__` don't need it. 346*cda5da8dSAndroid Build Coastguard Worker # If we call `_step(value, None)` instead of `_step()`, 347*cda5da8dSAndroid Build Coastguard Worker # Python eval loop would use `.send(value)` method call, 348*cda5da8dSAndroid Build Coastguard Worker # instead of `__next__()`, which is slower for futures 349*cda5da8dSAndroid Build Coastguard Worker # that return non-generator iterators from their `__iter__`. 350*cda5da8dSAndroid Build Coastguard Worker self.__step() 351*cda5da8dSAndroid Build Coastguard Worker self = None # Needed to break cycles when an exception occurs. 352*cda5da8dSAndroid Build Coastguard Worker 353*cda5da8dSAndroid Build Coastguard Worker 354*cda5da8dSAndroid Build Coastguard Worker_PyTask = Task 355*cda5da8dSAndroid Build Coastguard Worker 356*cda5da8dSAndroid Build Coastguard Worker 357*cda5da8dSAndroid Build Coastguard Workertry: 358*cda5da8dSAndroid Build Coastguard Worker import _asyncio 359*cda5da8dSAndroid Build Coastguard Workerexcept ImportError: 360*cda5da8dSAndroid Build Coastguard Worker pass 361*cda5da8dSAndroid Build Coastguard Workerelse: 362*cda5da8dSAndroid Build Coastguard Worker # _CTask is needed for tests. 363*cda5da8dSAndroid Build Coastguard Worker Task = _CTask = _asyncio.Task 364*cda5da8dSAndroid Build Coastguard Worker 365*cda5da8dSAndroid Build Coastguard Worker 366*cda5da8dSAndroid Build Coastguard Workerdef create_task(coro, *, name=None, context=None): 367*cda5da8dSAndroid Build Coastguard Worker """Schedule the execution of a coroutine object in a spawn task. 368*cda5da8dSAndroid Build Coastguard Worker 369*cda5da8dSAndroid Build Coastguard Worker Return a Task object. 370*cda5da8dSAndroid Build Coastguard Worker """ 371*cda5da8dSAndroid Build Coastguard Worker loop = events.get_running_loop() 372*cda5da8dSAndroid Build Coastguard Worker if context is None: 373*cda5da8dSAndroid Build Coastguard Worker # Use legacy API if context is not needed 374*cda5da8dSAndroid Build Coastguard Worker task = loop.create_task(coro) 375*cda5da8dSAndroid Build Coastguard Worker else: 376*cda5da8dSAndroid Build Coastguard Worker task = loop.create_task(coro, context=context) 377*cda5da8dSAndroid Build Coastguard Worker 378*cda5da8dSAndroid Build Coastguard Worker _set_task_name(task, name) 379*cda5da8dSAndroid Build Coastguard Worker return task 380*cda5da8dSAndroid Build Coastguard Worker 381*cda5da8dSAndroid Build Coastguard Worker 382*cda5da8dSAndroid Build Coastguard Worker# wait() and as_completed() similar to those in PEP 3148. 383*cda5da8dSAndroid Build Coastguard Worker 384*cda5da8dSAndroid Build Coastguard WorkerFIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED 385*cda5da8dSAndroid Build Coastguard WorkerFIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION 386*cda5da8dSAndroid Build Coastguard WorkerALL_COMPLETED = concurrent.futures.ALL_COMPLETED 387*cda5da8dSAndroid Build Coastguard Worker 388*cda5da8dSAndroid Build Coastguard Worker 389*cda5da8dSAndroid Build Coastguard Workerasync def wait(fs, *, timeout=None, return_when=ALL_COMPLETED): 390*cda5da8dSAndroid Build Coastguard Worker """Wait for the Futures or Tasks given by fs to complete. 391*cda5da8dSAndroid Build Coastguard Worker 392*cda5da8dSAndroid Build Coastguard Worker The fs iterable must not be empty. 393*cda5da8dSAndroid Build Coastguard Worker 394*cda5da8dSAndroid Build Coastguard Worker Coroutines will be wrapped in Tasks. 395*cda5da8dSAndroid Build Coastguard Worker 396*cda5da8dSAndroid Build Coastguard Worker Returns two sets of Future: (done, pending). 397*cda5da8dSAndroid Build Coastguard Worker 398*cda5da8dSAndroid Build Coastguard Worker Usage: 399*cda5da8dSAndroid Build Coastguard Worker 400*cda5da8dSAndroid Build Coastguard Worker done, pending = await asyncio.wait(fs) 401*cda5da8dSAndroid Build Coastguard Worker 402*cda5da8dSAndroid Build Coastguard Worker Note: This does not raise TimeoutError! Futures that aren't done 403*cda5da8dSAndroid Build Coastguard Worker when the timeout occurs are returned in the second set. 404*cda5da8dSAndroid Build Coastguard Worker """ 405*cda5da8dSAndroid Build Coastguard Worker if futures.isfuture(fs) or coroutines.iscoroutine(fs): 406*cda5da8dSAndroid Build Coastguard Worker raise TypeError(f"expect a list of futures, not {type(fs).__name__}") 407*cda5da8dSAndroid Build Coastguard Worker if not fs: 408*cda5da8dSAndroid Build Coastguard Worker raise ValueError('Set of Tasks/Futures is empty.') 409*cda5da8dSAndroid Build Coastguard Worker if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED): 410*cda5da8dSAndroid Build Coastguard Worker raise ValueError(f'Invalid return_when value: {return_when}') 411*cda5da8dSAndroid Build Coastguard Worker 412*cda5da8dSAndroid Build Coastguard Worker fs = set(fs) 413*cda5da8dSAndroid Build Coastguard Worker 414*cda5da8dSAndroid Build Coastguard Worker if any(coroutines.iscoroutine(f) for f in fs): 415*cda5da8dSAndroid Build Coastguard Worker raise TypeError("Passing coroutines is forbidden, use tasks explicitly.") 416*cda5da8dSAndroid Build Coastguard Worker 417*cda5da8dSAndroid Build Coastguard Worker loop = events.get_running_loop() 418*cda5da8dSAndroid Build Coastguard Worker return await _wait(fs, timeout, return_when, loop) 419*cda5da8dSAndroid Build Coastguard Worker 420*cda5da8dSAndroid Build Coastguard Worker 421*cda5da8dSAndroid Build Coastguard Workerdef _release_waiter(waiter, *args): 422*cda5da8dSAndroid Build Coastguard Worker if not waiter.done(): 423*cda5da8dSAndroid Build Coastguard Worker waiter.set_result(None) 424*cda5da8dSAndroid Build Coastguard Worker 425*cda5da8dSAndroid Build Coastguard Worker 426*cda5da8dSAndroid Build Coastguard Workerasync def wait_for(fut, timeout): 427*cda5da8dSAndroid Build Coastguard Worker """Wait for the single Future or coroutine to complete, with timeout. 428*cda5da8dSAndroid Build Coastguard Worker 429*cda5da8dSAndroid Build Coastguard Worker Coroutine will be wrapped in Task. 430*cda5da8dSAndroid Build Coastguard Worker 431*cda5da8dSAndroid Build Coastguard Worker Returns result of the Future or coroutine. When a timeout occurs, 432*cda5da8dSAndroid Build Coastguard Worker it cancels the task and raises TimeoutError. To avoid the task 433*cda5da8dSAndroid Build Coastguard Worker cancellation, wrap it in shield(). 434*cda5da8dSAndroid Build Coastguard Worker 435*cda5da8dSAndroid Build Coastguard Worker If the wait is cancelled, the task is also cancelled. 436*cda5da8dSAndroid Build Coastguard Worker 437*cda5da8dSAndroid Build Coastguard Worker This function is a coroutine. 438*cda5da8dSAndroid Build Coastguard Worker """ 439*cda5da8dSAndroid Build Coastguard Worker loop = events.get_running_loop() 440*cda5da8dSAndroid Build Coastguard Worker 441*cda5da8dSAndroid Build Coastguard Worker if timeout is None: 442*cda5da8dSAndroid Build Coastguard Worker return await fut 443*cda5da8dSAndroid Build Coastguard Worker 444*cda5da8dSAndroid Build Coastguard Worker if timeout <= 0: 445*cda5da8dSAndroid Build Coastguard Worker fut = ensure_future(fut, loop=loop) 446*cda5da8dSAndroid Build Coastguard Worker 447*cda5da8dSAndroid Build Coastguard Worker if fut.done(): 448*cda5da8dSAndroid Build Coastguard Worker return fut.result() 449*cda5da8dSAndroid Build Coastguard Worker 450*cda5da8dSAndroid Build Coastguard Worker await _cancel_and_wait(fut, loop=loop) 451*cda5da8dSAndroid Build Coastguard Worker try: 452*cda5da8dSAndroid Build Coastguard Worker return fut.result() 453*cda5da8dSAndroid Build Coastguard Worker except exceptions.CancelledError as exc: 454*cda5da8dSAndroid Build Coastguard Worker raise exceptions.TimeoutError() from exc 455*cda5da8dSAndroid Build Coastguard Worker 456*cda5da8dSAndroid Build Coastguard Worker waiter = loop.create_future() 457*cda5da8dSAndroid Build Coastguard Worker timeout_handle = loop.call_later(timeout, _release_waiter, waiter) 458*cda5da8dSAndroid Build Coastguard Worker cb = functools.partial(_release_waiter, waiter) 459*cda5da8dSAndroid Build Coastguard Worker 460*cda5da8dSAndroid Build Coastguard Worker fut = ensure_future(fut, loop=loop) 461*cda5da8dSAndroid Build Coastguard Worker fut.add_done_callback(cb) 462*cda5da8dSAndroid Build Coastguard Worker 463*cda5da8dSAndroid Build Coastguard Worker try: 464*cda5da8dSAndroid Build Coastguard Worker # wait until the future completes or the timeout 465*cda5da8dSAndroid Build Coastguard Worker try: 466*cda5da8dSAndroid Build Coastguard Worker await waiter 467*cda5da8dSAndroid Build Coastguard Worker except exceptions.CancelledError: 468*cda5da8dSAndroid Build Coastguard Worker if fut.done(): 469*cda5da8dSAndroid Build Coastguard Worker return fut.result() 470*cda5da8dSAndroid Build Coastguard Worker else: 471*cda5da8dSAndroid Build Coastguard Worker fut.remove_done_callback(cb) 472*cda5da8dSAndroid Build Coastguard Worker # We must ensure that the task is not running 473*cda5da8dSAndroid Build Coastguard Worker # after wait_for() returns. 474*cda5da8dSAndroid Build Coastguard Worker # See https://bugs.python.org/issue32751 475*cda5da8dSAndroid Build Coastguard Worker await _cancel_and_wait(fut, loop=loop) 476*cda5da8dSAndroid Build Coastguard Worker raise 477*cda5da8dSAndroid Build Coastguard Worker 478*cda5da8dSAndroid Build Coastguard Worker if fut.done(): 479*cda5da8dSAndroid Build Coastguard Worker return fut.result() 480*cda5da8dSAndroid Build Coastguard Worker else: 481*cda5da8dSAndroid Build Coastguard Worker fut.remove_done_callback(cb) 482*cda5da8dSAndroid Build Coastguard Worker # We must ensure that the task is not running 483*cda5da8dSAndroid Build Coastguard Worker # after wait_for() returns. 484*cda5da8dSAndroid Build Coastguard Worker # See https://bugs.python.org/issue32751 485*cda5da8dSAndroid Build Coastguard Worker await _cancel_and_wait(fut, loop=loop) 486*cda5da8dSAndroid Build Coastguard Worker # In case task cancellation failed with some 487*cda5da8dSAndroid Build Coastguard Worker # exception, we should re-raise it 488*cda5da8dSAndroid Build Coastguard Worker # See https://bugs.python.org/issue40607 489*cda5da8dSAndroid Build Coastguard Worker try: 490*cda5da8dSAndroid Build Coastguard Worker return fut.result() 491*cda5da8dSAndroid Build Coastguard Worker except exceptions.CancelledError as exc: 492*cda5da8dSAndroid Build Coastguard Worker raise exceptions.TimeoutError() from exc 493*cda5da8dSAndroid Build Coastguard Worker finally: 494*cda5da8dSAndroid Build Coastguard Worker timeout_handle.cancel() 495*cda5da8dSAndroid Build Coastguard Worker 496*cda5da8dSAndroid Build Coastguard Worker 497*cda5da8dSAndroid Build Coastguard Workerasync def _wait(fs, timeout, return_when, loop): 498*cda5da8dSAndroid Build Coastguard Worker """Internal helper for wait(). 499*cda5da8dSAndroid Build Coastguard Worker 500*cda5da8dSAndroid Build Coastguard Worker The fs argument must be a collection of Futures. 501*cda5da8dSAndroid Build Coastguard Worker """ 502*cda5da8dSAndroid Build Coastguard Worker assert fs, 'Set of Futures is empty.' 503*cda5da8dSAndroid Build Coastguard Worker waiter = loop.create_future() 504*cda5da8dSAndroid Build Coastguard Worker timeout_handle = None 505*cda5da8dSAndroid Build Coastguard Worker if timeout is not None: 506*cda5da8dSAndroid Build Coastguard Worker timeout_handle = loop.call_later(timeout, _release_waiter, waiter) 507*cda5da8dSAndroid Build Coastguard Worker counter = len(fs) 508*cda5da8dSAndroid Build Coastguard Worker 509*cda5da8dSAndroid Build Coastguard Worker def _on_completion(f): 510*cda5da8dSAndroid Build Coastguard Worker nonlocal counter 511*cda5da8dSAndroid Build Coastguard Worker counter -= 1 512*cda5da8dSAndroid Build Coastguard Worker if (counter <= 0 or 513*cda5da8dSAndroid Build Coastguard Worker return_when == FIRST_COMPLETED or 514*cda5da8dSAndroid Build Coastguard Worker return_when == FIRST_EXCEPTION and (not f.cancelled() and 515*cda5da8dSAndroid Build Coastguard Worker f.exception() is not None)): 516*cda5da8dSAndroid Build Coastguard Worker if timeout_handle is not None: 517*cda5da8dSAndroid Build Coastguard Worker timeout_handle.cancel() 518*cda5da8dSAndroid Build Coastguard Worker if not waiter.done(): 519*cda5da8dSAndroid Build Coastguard Worker waiter.set_result(None) 520*cda5da8dSAndroid Build Coastguard Worker 521*cda5da8dSAndroid Build Coastguard Worker for f in fs: 522*cda5da8dSAndroid Build Coastguard Worker f.add_done_callback(_on_completion) 523*cda5da8dSAndroid Build Coastguard Worker 524*cda5da8dSAndroid Build Coastguard Worker try: 525*cda5da8dSAndroid Build Coastguard Worker await waiter 526*cda5da8dSAndroid Build Coastguard Worker finally: 527*cda5da8dSAndroid Build Coastguard Worker if timeout_handle is not None: 528*cda5da8dSAndroid Build Coastguard Worker timeout_handle.cancel() 529*cda5da8dSAndroid Build Coastguard Worker for f in fs: 530*cda5da8dSAndroid Build Coastguard Worker f.remove_done_callback(_on_completion) 531*cda5da8dSAndroid Build Coastguard Worker 532*cda5da8dSAndroid Build Coastguard Worker done, pending = set(), set() 533*cda5da8dSAndroid Build Coastguard Worker for f in fs: 534*cda5da8dSAndroid Build Coastguard Worker if f.done(): 535*cda5da8dSAndroid Build Coastguard Worker done.add(f) 536*cda5da8dSAndroid Build Coastguard Worker else: 537*cda5da8dSAndroid Build Coastguard Worker pending.add(f) 538*cda5da8dSAndroid Build Coastguard Worker return done, pending 539*cda5da8dSAndroid Build Coastguard Worker 540*cda5da8dSAndroid Build Coastguard Worker 541*cda5da8dSAndroid Build Coastguard Workerasync def _cancel_and_wait(fut, loop): 542*cda5da8dSAndroid Build Coastguard Worker """Cancel the *fut* future or task and wait until it completes.""" 543*cda5da8dSAndroid Build Coastguard Worker 544*cda5da8dSAndroid Build Coastguard Worker waiter = loop.create_future() 545*cda5da8dSAndroid Build Coastguard Worker cb = functools.partial(_release_waiter, waiter) 546*cda5da8dSAndroid Build Coastguard Worker fut.add_done_callback(cb) 547*cda5da8dSAndroid Build Coastguard Worker 548*cda5da8dSAndroid Build Coastguard Worker try: 549*cda5da8dSAndroid Build Coastguard Worker fut.cancel() 550*cda5da8dSAndroid Build Coastguard Worker # We cannot wait on *fut* directly to make 551*cda5da8dSAndroid Build Coastguard Worker # sure _cancel_and_wait itself is reliably cancellable. 552*cda5da8dSAndroid Build Coastguard Worker await waiter 553*cda5da8dSAndroid Build Coastguard Worker finally: 554*cda5da8dSAndroid Build Coastguard Worker fut.remove_done_callback(cb) 555*cda5da8dSAndroid Build Coastguard Worker 556*cda5da8dSAndroid Build Coastguard Worker 557*cda5da8dSAndroid Build Coastguard Worker# This is *not* a @coroutine! It is just an iterator (yielding Futures). 558*cda5da8dSAndroid Build Coastguard Workerdef as_completed(fs, *, timeout=None): 559*cda5da8dSAndroid Build Coastguard Worker """Return an iterator whose values are coroutines. 560*cda5da8dSAndroid Build Coastguard Worker 561*cda5da8dSAndroid Build Coastguard Worker When waiting for the yielded coroutines you'll get the results (or 562*cda5da8dSAndroid Build Coastguard Worker exceptions!) of the original Futures (or coroutines), in the order 563*cda5da8dSAndroid Build Coastguard Worker in which and as soon as they complete. 564*cda5da8dSAndroid Build Coastguard Worker 565*cda5da8dSAndroid Build Coastguard Worker This differs from PEP 3148; the proper way to use this is: 566*cda5da8dSAndroid Build Coastguard Worker 567*cda5da8dSAndroid Build Coastguard Worker for f in as_completed(fs): 568*cda5da8dSAndroid Build Coastguard Worker result = await f # The 'await' may raise. 569*cda5da8dSAndroid Build Coastguard Worker # Use result. 570*cda5da8dSAndroid Build Coastguard Worker 571*cda5da8dSAndroid Build Coastguard Worker If a timeout is specified, the 'await' will raise 572*cda5da8dSAndroid Build Coastguard Worker TimeoutError when the timeout occurs before all Futures are done. 573*cda5da8dSAndroid Build Coastguard Worker 574*cda5da8dSAndroid Build Coastguard Worker Note: The futures 'f' are not necessarily members of fs. 575*cda5da8dSAndroid Build Coastguard Worker """ 576*cda5da8dSAndroid Build Coastguard Worker if futures.isfuture(fs) or coroutines.iscoroutine(fs): 577*cda5da8dSAndroid Build Coastguard Worker raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}") 578*cda5da8dSAndroid Build Coastguard Worker 579*cda5da8dSAndroid Build Coastguard Worker from .queues import Queue # Import here to avoid circular import problem. 580*cda5da8dSAndroid Build Coastguard Worker done = Queue() 581*cda5da8dSAndroid Build Coastguard Worker 582*cda5da8dSAndroid Build Coastguard Worker loop = events._get_event_loop() 583*cda5da8dSAndroid Build Coastguard Worker todo = {ensure_future(f, loop=loop) for f in set(fs)} 584*cda5da8dSAndroid Build Coastguard Worker timeout_handle = None 585*cda5da8dSAndroid Build Coastguard Worker 586*cda5da8dSAndroid Build Coastguard Worker def _on_timeout(): 587*cda5da8dSAndroid Build Coastguard Worker for f in todo: 588*cda5da8dSAndroid Build Coastguard Worker f.remove_done_callback(_on_completion) 589*cda5da8dSAndroid Build Coastguard Worker done.put_nowait(None) # Queue a dummy value for _wait_for_one(). 590*cda5da8dSAndroid Build Coastguard Worker todo.clear() # Can't do todo.remove(f) in the loop. 591*cda5da8dSAndroid Build Coastguard Worker 592*cda5da8dSAndroid Build Coastguard Worker def _on_completion(f): 593*cda5da8dSAndroid Build Coastguard Worker if not todo: 594*cda5da8dSAndroid Build Coastguard Worker return # _on_timeout() was here first. 595*cda5da8dSAndroid Build Coastguard Worker todo.remove(f) 596*cda5da8dSAndroid Build Coastguard Worker done.put_nowait(f) 597*cda5da8dSAndroid Build Coastguard Worker if not todo and timeout_handle is not None: 598*cda5da8dSAndroid Build Coastguard Worker timeout_handle.cancel() 599*cda5da8dSAndroid Build Coastguard Worker 600*cda5da8dSAndroid Build Coastguard Worker async def _wait_for_one(): 601*cda5da8dSAndroid Build Coastguard Worker f = await done.get() 602*cda5da8dSAndroid Build Coastguard Worker if f is None: 603*cda5da8dSAndroid Build Coastguard Worker # Dummy value from _on_timeout(). 604*cda5da8dSAndroid Build Coastguard Worker raise exceptions.TimeoutError 605*cda5da8dSAndroid Build Coastguard Worker return f.result() # May raise f.exception(). 606*cda5da8dSAndroid Build Coastguard Worker 607*cda5da8dSAndroid Build Coastguard Worker for f in todo: 608*cda5da8dSAndroid Build Coastguard Worker f.add_done_callback(_on_completion) 609*cda5da8dSAndroid Build Coastguard Worker if todo and timeout is not None: 610*cda5da8dSAndroid Build Coastguard Worker timeout_handle = loop.call_later(timeout, _on_timeout) 611*cda5da8dSAndroid Build Coastguard Worker for _ in range(len(todo)): 612*cda5da8dSAndroid Build Coastguard Worker yield _wait_for_one() 613*cda5da8dSAndroid Build Coastguard Worker 614*cda5da8dSAndroid Build Coastguard Worker 615*cda5da8dSAndroid Build Coastguard Worker@types.coroutine 616*cda5da8dSAndroid Build Coastguard Workerdef __sleep0(): 617*cda5da8dSAndroid Build Coastguard Worker """Skip one event loop run cycle. 618*cda5da8dSAndroid Build Coastguard Worker 619*cda5da8dSAndroid Build Coastguard Worker This is a private helper for 'asyncio.sleep()', used 620*cda5da8dSAndroid Build Coastguard Worker when the 'delay' is set to 0. It uses a bare 'yield' 621*cda5da8dSAndroid Build Coastguard Worker expression (which Task.__step knows how to handle) 622*cda5da8dSAndroid Build Coastguard Worker instead of creating a Future object. 623*cda5da8dSAndroid Build Coastguard Worker """ 624*cda5da8dSAndroid Build Coastguard Worker yield 625*cda5da8dSAndroid Build Coastguard Worker 626*cda5da8dSAndroid Build Coastguard Worker 627*cda5da8dSAndroid Build Coastguard Workerasync def sleep(delay, result=None): 628*cda5da8dSAndroid Build Coastguard Worker """Coroutine that completes after a given time (in seconds).""" 629*cda5da8dSAndroid Build Coastguard Worker if delay <= 0: 630*cda5da8dSAndroid Build Coastguard Worker await __sleep0() 631*cda5da8dSAndroid Build Coastguard Worker return result 632*cda5da8dSAndroid Build Coastguard Worker 633*cda5da8dSAndroid Build Coastguard Worker loop = events.get_running_loop() 634*cda5da8dSAndroid Build Coastguard Worker future = loop.create_future() 635*cda5da8dSAndroid Build Coastguard Worker h = loop.call_later(delay, 636*cda5da8dSAndroid Build Coastguard Worker futures._set_result_unless_cancelled, 637*cda5da8dSAndroid Build Coastguard Worker future, result) 638*cda5da8dSAndroid Build Coastguard Worker try: 639*cda5da8dSAndroid Build Coastguard Worker return await future 640*cda5da8dSAndroid Build Coastguard Worker finally: 641*cda5da8dSAndroid Build Coastguard Worker h.cancel() 642*cda5da8dSAndroid Build Coastguard Worker 643*cda5da8dSAndroid Build Coastguard Worker 644*cda5da8dSAndroid Build Coastguard Workerdef ensure_future(coro_or_future, *, loop=None): 645*cda5da8dSAndroid Build Coastguard Worker """Wrap a coroutine or an awaitable in a future. 646*cda5da8dSAndroid Build Coastguard Worker 647*cda5da8dSAndroid Build Coastguard Worker If the argument is a Future, it is returned directly. 648*cda5da8dSAndroid Build Coastguard Worker """ 649*cda5da8dSAndroid Build Coastguard Worker return _ensure_future(coro_or_future, loop=loop) 650*cda5da8dSAndroid Build Coastguard Worker 651*cda5da8dSAndroid Build Coastguard Worker 652*cda5da8dSAndroid Build Coastguard Workerdef _ensure_future(coro_or_future, *, loop=None): 653*cda5da8dSAndroid Build Coastguard Worker if futures.isfuture(coro_or_future): 654*cda5da8dSAndroid Build Coastguard Worker if loop is not None and loop is not futures._get_loop(coro_or_future): 655*cda5da8dSAndroid Build Coastguard Worker raise ValueError('The future belongs to a different loop than ' 656*cda5da8dSAndroid Build Coastguard Worker 'the one specified as the loop argument') 657*cda5da8dSAndroid Build Coastguard Worker return coro_or_future 658*cda5da8dSAndroid Build Coastguard Worker called_wrap_awaitable = False 659*cda5da8dSAndroid Build Coastguard Worker if not coroutines.iscoroutine(coro_or_future): 660*cda5da8dSAndroid Build Coastguard Worker if inspect.isawaitable(coro_or_future): 661*cda5da8dSAndroid Build Coastguard Worker coro_or_future = _wrap_awaitable(coro_or_future) 662*cda5da8dSAndroid Build Coastguard Worker called_wrap_awaitable = True 663*cda5da8dSAndroid Build Coastguard Worker else: 664*cda5da8dSAndroid Build Coastguard Worker raise TypeError('An asyncio.Future, a coroutine or an awaitable ' 665*cda5da8dSAndroid Build Coastguard Worker 'is required') 666*cda5da8dSAndroid Build Coastguard Worker 667*cda5da8dSAndroid Build Coastguard Worker if loop is None: 668*cda5da8dSAndroid Build Coastguard Worker loop = events._get_event_loop(stacklevel=4) 669*cda5da8dSAndroid Build Coastguard Worker try: 670*cda5da8dSAndroid Build Coastguard Worker return loop.create_task(coro_or_future) 671*cda5da8dSAndroid Build Coastguard Worker except RuntimeError: 672*cda5da8dSAndroid Build Coastguard Worker if not called_wrap_awaitable: 673*cda5da8dSAndroid Build Coastguard Worker coro_or_future.close() 674*cda5da8dSAndroid Build Coastguard Worker raise 675*cda5da8dSAndroid Build Coastguard Worker 676*cda5da8dSAndroid Build Coastguard Worker 677*cda5da8dSAndroid Build Coastguard Worker@types.coroutine 678*cda5da8dSAndroid Build Coastguard Workerdef _wrap_awaitable(awaitable): 679*cda5da8dSAndroid Build Coastguard Worker """Helper for asyncio.ensure_future(). 680*cda5da8dSAndroid Build Coastguard Worker 681*cda5da8dSAndroid Build Coastguard Worker Wraps awaitable (an object with __await__) into a coroutine 682*cda5da8dSAndroid Build Coastguard Worker that will later be wrapped in a Task by ensure_future(). 683*cda5da8dSAndroid Build Coastguard Worker """ 684*cda5da8dSAndroid Build Coastguard Worker return (yield from awaitable.__await__()) 685*cda5da8dSAndroid Build Coastguard Worker 686*cda5da8dSAndroid Build Coastguard Worker_wrap_awaitable._is_coroutine = _is_coroutine 687*cda5da8dSAndroid Build Coastguard Worker 688*cda5da8dSAndroid Build Coastguard Worker 689*cda5da8dSAndroid Build Coastguard Workerclass _GatheringFuture(futures.Future): 690*cda5da8dSAndroid Build Coastguard Worker """Helper for gather(). 691*cda5da8dSAndroid Build Coastguard Worker 692*cda5da8dSAndroid Build Coastguard Worker This overrides cancel() to cancel all the children and act more 693*cda5da8dSAndroid Build Coastguard Worker like Task.cancel(), which doesn't immediately mark itself as 694*cda5da8dSAndroid Build Coastguard Worker cancelled. 695*cda5da8dSAndroid Build Coastguard Worker """ 696*cda5da8dSAndroid Build Coastguard Worker 697*cda5da8dSAndroid Build Coastguard Worker def __init__(self, children, *, loop): 698*cda5da8dSAndroid Build Coastguard Worker assert loop is not None 699*cda5da8dSAndroid Build Coastguard Worker super().__init__(loop=loop) 700*cda5da8dSAndroid Build Coastguard Worker self._children = children 701*cda5da8dSAndroid Build Coastguard Worker self._cancel_requested = False 702*cda5da8dSAndroid Build Coastguard Worker 703*cda5da8dSAndroid Build Coastguard Worker def cancel(self, msg=None): 704*cda5da8dSAndroid Build Coastguard Worker if self.done(): 705*cda5da8dSAndroid Build Coastguard Worker return False 706*cda5da8dSAndroid Build Coastguard Worker ret = False 707*cda5da8dSAndroid Build Coastguard Worker for child in self._children: 708*cda5da8dSAndroid Build Coastguard Worker if child.cancel(msg=msg): 709*cda5da8dSAndroid Build Coastguard Worker ret = True 710*cda5da8dSAndroid Build Coastguard Worker if ret: 711*cda5da8dSAndroid Build Coastguard Worker # If any child tasks were actually cancelled, we should 712*cda5da8dSAndroid Build Coastguard Worker # propagate the cancellation request regardless of 713*cda5da8dSAndroid Build Coastguard Worker # *return_exceptions* argument. See issue 32684. 714*cda5da8dSAndroid Build Coastguard Worker self._cancel_requested = True 715*cda5da8dSAndroid Build Coastguard Worker return ret 716*cda5da8dSAndroid Build Coastguard Worker 717*cda5da8dSAndroid Build Coastguard Worker 718*cda5da8dSAndroid Build Coastguard Workerdef gather(*coros_or_futures, return_exceptions=False): 719*cda5da8dSAndroid Build Coastguard Worker """Return a future aggregating results from the given coroutines/futures. 720*cda5da8dSAndroid Build Coastguard Worker 721*cda5da8dSAndroid Build Coastguard Worker Coroutines will be wrapped in a future and scheduled in the event 722*cda5da8dSAndroid Build Coastguard Worker loop. They will not necessarily be scheduled in the same order as 723*cda5da8dSAndroid Build Coastguard Worker passed in. 724*cda5da8dSAndroid Build Coastguard Worker 725*cda5da8dSAndroid Build Coastguard Worker All futures must share the same event loop. If all the tasks are 726*cda5da8dSAndroid Build Coastguard Worker done successfully, the returned future's result is the list of 727*cda5da8dSAndroid Build Coastguard Worker results (in the order of the original sequence, not necessarily 728*cda5da8dSAndroid Build Coastguard Worker the order of results arrival). If *return_exceptions* is True, 729*cda5da8dSAndroid Build Coastguard Worker exceptions in the tasks are treated the same as successful 730*cda5da8dSAndroid Build Coastguard Worker results, and gathered in the result list; otherwise, the first 731*cda5da8dSAndroid Build Coastguard Worker raised exception will be immediately propagated to the returned 732*cda5da8dSAndroid Build Coastguard Worker future. 733*cda5da8dSAndroid Build Coastguard Worker 734*cda5da8dSAndroid Build Coastguard Worker Cancellation: if the outer Future is cancelled, all children (that 735*cda5da8dSAndroid Build Coastguard Worker have not completed yet) are also cancelled. If any child is 736*cda5da8dSAndroid Build Coastguard Worker cancelled, this is treated as if it raised CancelledError -- 737*cda5da8dSAndroid Build Coastguard Worker the outer Future is *not* cancelled in this case. (This is to 738*cda5da8dSAndroid Build Coastguard Worker prevent the cancellation of one child to cause other children to 739*cda5da8dSAndroid Build Coastguard Worker be cancelled.) 740*cda5da8dSAndroid Build Coastguard Worker 741*cda5da8dSAndroid Build Coastguard Worker If *return_exceptions* is False, cancelling gather() after it 742*cda5da8dSAndroid Build Coastguard Worker has been marked done won't cancel any submitted awaitables. 743*cda5da8dSAndroid Build Coastguard Worker For instance, gather can be marked done after propagating an 744*cda5da8dSAndroid Build Coastguard Worker exception to the caller, therefore, calling ``gather.cancel()`` 745*cda5da8dSAndroid Build Coastguard Worker after catching an exception (raised by one of the awaitables) from 746*cda5da8dSAndroid Build Coastguard Worker gather won't cancel any other awaitables. 747*cda5da8dSAndroid Build Coastguard Worker """ 748*cda5da8dSAndroid Build Coastguard Worker if not coros_or_futures: 749*cda5da8dSAndroid Build Coastguard Worker loop = events._get_event_loop() 750*cda5da8dSAndroid Build Coastguard Worker outer = loop.create_future() 751*cda5da8dSAndroid Build Coastguard Worker outer.set_result([]) 752*cda5da8dSAndroid Build Coastguard Worker return outer 753*cda5da8dSAndroid Build Coastguard Worker 754*cda5da8dSAndroid Build Coastguard Worker def _done_callback(fut): 755*cda5da8dSAndroid Build Coastguard Worker nonlocal nfinished 756*cda5da8dSAndroid Build Coastguard Worker nfinished += 1 757*cda5da8dSAndroid Build Coastguard Worker 758*cda5da8dSAndroid Build Coastguard Worker if outer is None or outer.done(): 759*cda5da8dSAndroid Build Coastguard Worker if not fut.cancelled(): 760*cda5da8dSAndroid Build Coastguard Worker # Mark exception retrieved. 761*cda5da8dSAndroid Build Coastguard Worker fut.exception() 762*cda5da8dSAndroid Build Coastguard Worker return 763*cda5da8dSAndroid Build Coastguard Worker 764*cda5da8dSAndroid Build Coastguard Worker if not return_exceptions: 765*cda5da8dSAndroid Build Coastguard Worker if fut.cancelled(): 766*cda5da8dSAndroid Build Coastguard Worker # Check if 'fut' is cancelled first, as 767*cda5da8dSAndroid Build Coastguard Worker # 'fut.exception()' will *raise* a CancelledError 768*cda5da8dSAndroid Build Coastguard Worker # instead of returning it. 769*cda5da8dSAndroid Build Coastguard Worker exc = fut._make_cancelled_error() 770*cda5da8dSAndroid Build Coastguard Worker outer.set_exception(exc) 771*cda5da8dSAndroid Build Coastguard Worker return 772*cda5da8dSAndroid Build Coastguard Worker else: 773*cda5da8dSAndroid Build Coastguard Worker exc = fut.exception() 774*cda5da8dSAndroid Build Coastguard Worker if exc is not None: 775*cda5da8dSAndroid Build Coastguard Worker outer.set_exception(exc) 776*cda5da8dSAndroid Build Coastguard Worker return 777*cda5da8dSAndroid Build Coastguard Worker 778*cda5da8dSAndroid Build Coastguard Worker if nfinished == nfuts: 779*cda5da8dSAndroid Build Coastguard Worker # All futures are done; create a list of results 780*cda5da8dSAndroid Build Coastguard Worker # and set it to the 'outer' future. 781*cda5da8dSAndroid Build Coastguard Worker results = [] 782*cda5da8dSAndroid Build Coastguard Worker 783*cda5da8dSAndroid Build Coastguard Worker for fut in children: 784*cda5da8dSAndroid Build Coastguard Worker if fut.cancelled(): 785*cda5da8dSAndroid Build Coastguard Worker # Check if 'fut' is cancelled first, as 'fut.exception()' 786*cda5da8dSAndroid Build Coastguard Worker # will *raise* a CancelledError instead of returning it. 787*cda5da8dSAndroid Build Coastguard Worker # Also, since we're adding the exception return value 788*cda5da8dSAndroid Build Coastguard Worker # to 'results' instead of raising it, don't bother 789*cda5da8dSAndroid Build Coastguard Worker # setting __context__. This also lets us preserve 790*cda5da8dSAndroid Build Coastguard Worker # calling '_make_cancelled_error()' at most once. 791*cda5da8dSAndroid Build Coastguard Worker res = exceptions.CancelledError( 792*cda5da8dSAndroid Build Coastguard Worker '' if fut._cancel_message is None else 793*cda5da8dSAndroid Build Coastguard Worker fut._cancel_message) 794*cda5da8dSAndroid Build Coastguard Worker else: 795*cda5da8dSAndroid Build Coastguard Worker res = fut.exception() 796*cda5da8dSAndroid Build Coastguard Worker if res is None: 797*cda5da8dSAndroid Build Coastguard Worker res = fut.result() 798*cda5da8dSAndroid Build Coastguard Worker results.append(res) 799*cda5da8dSAndroid Build Coastguard Worker 800*cda5da8dSAndroid Build Coastguard Worker if outer._cancel_requested: 801*cda5da8dSAndroid Build Coastguard Worker # If gather is being cancelled we must propagate the 802*cda5da8dSAndroid Build Coastguard Worker # cancellation regardless of *return_exceptions* argument. 803*cda5da8dSAndroid Build Coastguard Worker # See issue 32684. 804*cda5da8dSAndroid Build Coastguard Worker exc = fut._make_cancelled_error() 805*cda5da8dSAndroid Build Coastguard Worker outer.set_exception(exc) 806*cda5da8dSAndroid Build Coastguard Worker else: 807*cda5da8dSAndroid Build Coastguard Worker outer.set_result(results) 808*cda5da8dSAndroid Build Coastguard Worker 809*cda5da8dSAndroid Build Coastguard Worker arg_to_fut = {} 810*cda5da8dSAndroid Build Coastguard Worker children = [] 811*cda5da8dSAndroid Build Coastguard Worker nfuts = 0 812*cda5da8dSAndroid Build Coastguard Worker nfinished = 0 813*cda5da8dSAndroid Build Coastguard Worker loop = None 814*cda5da8dSAndroid Build Coastguard Worker outer = None # bpo-46672 815*cda5da8dSAndroid Build Coastguard Worker for arg in coros_or_futures: 816*cda5da8dSAndroid Build Coastguard Worker if arg not in arg_to_fut: 817*cda5da8dSAndroid Build Coastguard Worker fut = _ensure_future(arg, loop=loop) 818*cda5da8dSAndroid Build Coastguard Worker if loop is None: 819*cda5da8dSAndroid Build Coastguard Worker loop = futures._get_loop(fut) 820*cda5da8dSAndroid Build Coastguard Worker if fut is not arg: 821*cda5da8dSAndroid Build Coastguard Worker # 'arg' was not a Future, therefore, 'fut' is a new 822*cda5da8dSAndroid Build Coastguard Worker # Future created specifically for 'arg'. Since the caller 823*cda5da8dSAndroid Build Coastguard Worker # can't control it, disable the "destroy pending task" 824*cda5da8dSAndroid Build Coastguard Worker # warning. 825*cda5da8dSAndroid Build Coastguard Worker fut._log_destroy_pending = False 826*cda5da8dSAndroid Build Coastguard Worker 827*cda5da8dSAndroid Build Coastguard Worker nfuts += 1 828*cda5da8dSAndroid Build Coastguard Worker arg_to_fut[arg] = fut 829*cda5da8dSAndroid Build Coastguard Worker fut.add_done_callback(_done_callback) 830*cda5da8dSAndroid Build Coastguard Worker 831*cda5da8dSAndroid Build Coastguard Worker else: 832*cda5da8dSAndroid Build Coastguard Worker # There's a duplicate Future object in coros_or_futures. 833*cda5da8dSAndroid Build Coastguard Worker fut = arg_to_fut[arg] 834*cda5da8dSAndroid Build Coastguard Worker 835*cda5da8dSAndroid Build Coastguard Worker children.append(fut) 836*cda5da8dSAndroid Build Coastguard Worker 837*cda5da8dSAndroid Build Coastguard Worker outer = _GatheringFuture(children, loop=loop) 838*cda5da8dSAndroid Build Coastguard Worker return outer 839*cda5da8dSAndroid Build Coastguard Worker 840*cda5da8dSAndroid Build Coastguard Worker 841*cda5da8dSAndroid Build Coastguard Workerdef shield(arg): 842*cda5da8dSAndroid Build Coastguard Worker """Wait for a future, shielding it from cancellation. 843*cda5da8dSAndroid Build Coastguard Worker 844*cda5da8dSAndroid Build Coastguard Worker The statement 845*cda5da8dSAndroid Build Coastguard Worker 846*cda5da8dSAndroid Build Coastguard Worker task = asyncio.create_task(something()) 847*cda5da8dSAndroid Build Coastguard Worker res = await shield(task) 848*cda5da8dSAndroid Build Coastguard Worker 849*cda5da8dSAndroid Build Coastguard Worker is exactly equivalent to the statement 850*cda5da8dSAndroid Build Coastguard Worker 851*cda5da8dSAndroid Build Coastguard Worker res = await something() 852*cda5da8dSAndroid Build Coastguard Worker 853*cda5da8dSAndroid Build Coastguard Worker *except* that if the coroutine containing it is cancelled, the 854*cda5da8dSAndroid Build Coastguard Worker task running in something() is not cancelled. From the POV of 855*cda5da8dSAndroid Build Coastguard Worker something(), the cancellation did not happen. But its caller is 856*cda5da8dSAndroid Build Coastguard Worker still cancelled, so the yield-from expression still raises 857*cda5da8dSAndroid Build Coastguard Worker CancelledError. Note: If something() is cancelled by other means 858*cda5da8dSAndroid Build Coastguard Worker this will still cancel shield(). 859*cda5da8dSAndroid Build Coastguard Worker 860*cda5da8dSAndroid Build Coastguard Worker If you want to completely ignore cancellation (not recommended) 861*cda5da8dSAndroid Build Coastguard Worker you can combine shield() with a try/except clause, as follows: 862*cda5da8dSAndroid Build Coastguard Worker 863*cda5da8dSAndroid Build Coastguard Worker task = asyncio.create_task(something()) 864*cda5da8dSAndroid Build Coastguard Worker try: 865*cda5da8dSAndroid Build Coastguard Worker res = await shield(task) 866*cda5da8dSAndroid Build Coastguard Worker except CancelledError: 867*cda5da8dSAndroid Build Coastguard Worker res = None 868*cda5da8dSAndroid Build Coastguard Worker 869*cda5da8dSAndroid Build Coastguard Worker Save a reference to tasks passed to this function, to avoid 870*cda5da8dSAndroid Build Coastguard Worker a task disappearing mid-execution. The event loop only keeps 871*cda5da8dSAndroid Build Coastguard Worker weak references to tasks. A task that isn't referenced elsewhere 872*cda5da8dSAndroid Build Coastguard Worker may get garbage collected at any time, even before it's done. 873*cda5da8dSAndroid Build Coastguard Worker """ 874*cda5da8dSAndroid Build Coastguard Worker inner = _ensure_future(arg) 875*cda5da8dSAndroid Build Coastguard Worker if inner.done(): 876*cda5da8dSAndroid Build Coastguard Worker # Shortcut. 877*cda5da8dSAndroid Build Coastguard Worker return inner 878*cda5da8dSAndroid Build Coastguard Worker loop = futures._get_loop(inner) 879*cda5da8dSAndroid Build Coastguard Worker outer = loop.create_future() 880*cda5da8dSAndroid Build Coastguard Worker 881*cda5da8dSAndroid Build Coastguard Worker def _inner_done_callback(inner): 882*cda5da8dSAndroid Build Coastguard Worker if outer.cancelled(): 883*cda5da8dSAndroid Build Coastguard Worker if not inner.cancelled(): 884*cda5da8dSAndroid Build Coastguard Worker # Mark inner's result as retrieved. 885*cda5da8dSAndroid Build Coastguard Worker inner.exception() 886*cda5da8dSAndroid Build Coastguard Worker return 887*cda5da8dSAndroid Build Coastguard Worker 888*cda5da8dSAndroid Build Coastguard Worker if inner.cancelled(): 889*cda5da8dSAndroid Build Coastguard Worker outer.cancel() 890*cda5da8dSAndroid Build Coastguard Worker else: 891*cda5da8dSAndroid Build Coastguard Worker exc = inner.exception() 892*cda5da8dSAndroid Build Coastguard Worker if exc is not None: 893*cda5da8dSAndroid Build Coastguard Worker outer.set_exception(exc) 894*cda5da8dSAndroid Build Coastguard Worker else: 895*cda5da8dSAndroid Build Coastguard Worker outer.set_result(inner.result()) 896*cda5da8dSAndroid Build Coastguard Worker 897*cda5da8dSAndroid Build Coastguard Worker 898*cda5da8dSAndroid Build Coastguard Worker def _outer_done_callback(outer): 899*cda5da8dSAndroid Build Coastguard Worker if not inner.done(): 900*cda5da8dSAndroid Build Coastguard Worker inner.remove_done_callback(_inner_done_callback) 901*cda5da8dSAndroid Build Coastguard Worker 902*cda5da8dSAndroid Build Coastguard Worker inner.add_done_callback(_inner_done_callback) 903*cda5da8dSAndroid Build Coastguard Worker outer.add_done_callback(_outer_done_callback) 904*cda5da8dSAndroid Build Coastguard Worker return outer 905*cda5da8dSAndroid Build Coastguard Worker 906*cda5da8dSAndroid Build Coastguard Worker 907*cda5da8dSAndroid Build Coastguard Workerdef run_coroutine_threadsafe(coro, loop): 908*cda5da8dSAndroid Build Coastguard Worker """Submit a coroutine object to a given event loop. 909*cda5da8dSAndroid Build Coastguard Worker 910*cda5da8dSAndroid Build Coastguard Worker Return a concurrent.futures.Future to access the result. 911*cda5da8dSAndroid Build Coastguard Worker """ 912*cda5da8dSAndroid Build Coastguard Worker if not coroutines.iscoroutine(coro): 913*cda5da8dSAndroid Build Coastguard Worker raise TypeError('A coroutine object is required') 914*cda5da8dSAndroid Build Coastguard Worker future = concurrent.futures.Future() 915*cda5da8dSAndroid Build Coastguard Worker 916*cda5da8dSAndroid Build Coastguard Worker def callback(): 917*cda5da8dSAndroid Build Coastguard Worker try: 918*cda5da8dSAndroid Build Coastguard Worker futures._chain_future(ensure_future(coro, loop=loop), future) 919*cda5da8dSAndroid Build Coastguard Worker except (SystemExit, KeyboardInterrupt): 920*cda5da8dSAndroid Build Coastguard Worker raise 921*cda5da8dSAndroid Build Coastguard Worker except BaseException as exc: 922*cda5da8dSAndroid Build Coastguard Worker if future.set_running_or_notify_cancel(): 923*cda5da8dSAndroid Build Coastguard Worker future.set_exception(exc) 924*cda5da8dSAndroid Build Coastguard Worker raise 925*cda5da8dSAndroid Build Coastguard Worker 926*cda5da8dSAndroid Build Coastguard Worker loop.call_soon_threadsafe(callback) 927*cda5da8dSAndroid Build Coastguard Worker return future 928*cda5da8dSAndroid Build Coastguard Worker 929*cda5da8dSAndroid Build Coastguard Worker 930*cda5da8dSAndroid Build Coastguard Worker# WeakSet containing all alive tasks. 931*cda5da8dSAndroid Build Coastguard Worker_all_tasks = weakref.WeakSet() 932*cda5da8dSAndroid Build Coastguard Worker 933*cda5da8dSAndroid Build Coastguard Worker# Dictionary containing tasks that are currently active in 934*cda5da8dSAndroid Build Coastguard Worker# all running event loops. {EventLoop: Task} 935*cda5da8dSAndroid Build Coastguard Worker_current_tasks = {} 936*cda5da8dSAndroid Build Coastguard Worker 937*cda5da8dSAndroid Build Coastguard Worker 938*cda5da8dSAndroid Build Coastguard Workerdef _register_task(task): 939*cda5da8dSAndroid Build Coastguard Worker """Register a new task in asyncio as executed by loop.""" 940*cda5da8dSAndroid Build Coastguard Worker _all_tasks.add(task) 941*cda5da8dSAndroid Build Coastguard Worker 942*cda5da8dSAndroid Build Coastguard Worker 943*cda5da8dSAndroid Build Coastguard Workerdef _enter_task(loop, task): 944*cda5da8dSAndroid Build Coastguard Worker current_task = _current_tasks.get(loop) 945*cda5da8dSAndroid Build Coastguard Worker if current_task is not None: 946*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError(f"Cannot enter into task {task!r} while another " 947*cda5da8dSAndroid Build Coastguard Worker f"task {current_task!r} is being executed.") 948*cda5da8dSAndroid Build Coastguard Worker _current_tasks[loop] = task 949*cda5da8dSAndroid Build Coastguard Worker 950*cda5da8dSAndroid Build Coastguard Worker 951*cda5da8dSAndroid Build Coastguard Workerdef _leave_task(loop, task): 952*cda5da8dSAndroid Build Coastguard Worker current_task = _current_tasks.get(loop) 953*cda5da8dSAndroid Build Coastguard Worker if current_task is not task: 954*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError(f"Leaving task {task!r} does not match " 955*cda5da8dSAndroid Build Coastguard Worker f"the current task {current_task!r}.") 956*cda5da8dSAndroid Build Coastguard Worker del _current_tasks[loop] 957*cda5da8dSAndroid Build Coastguard Worker 958*cda5da8dSAndroid Build Coastguard Worker 959*cda5da8dSAndroid Build Coastguard Workerdef _unregister_task(task): 960*cda5da8dSAndroid Build Coastguard Worker """Unregister a task.""" 961*cda5da8dSAndroid Build Coastguard Worker _all_tasks.discard(task) 962*cda5da8dSAndroid Build Coastguard Worker 963*cda5da8dSAndroid Build Coastguard Worker 964*cda5da8dSAndroid Build Coastguard Worker_py_register_task = _register_task 965*cda5da8dSAndroid Build Coastguard Worker_py_unregister_task = _unregister_task 966*cda5da8dSAndroid Build Coastguard Worker_py_enter_task = _enter_task 967*cda5da8dSAndroid Build Coastguard Worker_py_leave_task = _leave_task 968*cda5da8dSAndroid Build Coastguard Worker 969*cda5da8dSAndroid Build Coastguard Worker 970*cda5da8dSAndroid Build Coastguard Workertry: 971*cda5da8dSAndroid Build Coastguard Worker from _asyncio import (_register_task, _unregister_task, 972*cda5da8dSAndroid Build Coastguard Worker _enter_task, _leave_task, 973*cda5da8dSAndroid Build Coastguard Worker _all_tasks, _current_tasks) 974*cda5da8dSAndroid Build Coastguard Workerexcept ImportError: 975*cda5da8dSAndroid Build Coastguard Worker pass 976*cda5da8dSAndroid Build Coastguard Workerelse: 977*cda5da8dSAndroid Build Coastguard Worker _c_register_task = _register_task 978*cda5da8dSAndroid Build Coastguard Worker _c_unregister_task = _unregister_task 979*cda5da8dSAndroid Build Coastguard Worker _c_enter_task = _enter_task 980*cda5da8dSAndroid Build Coastguard Worker _c_leave_task = _leave_task 981