xref: /aosp_15_r20/prebuilts/build-tools/common/py3-stdlib/asyncio/tasks.py (revision cda5da8d549138a6648c5ee6d7a49cf8f4a657be)
1*cda5da8dSAndroid Build Coastguard Worker"""Support for tasks, coroutines and the scheduler."""
2*cda5da8dSAndroid Build Coastguard Worker
3*cda5da8dSAndroid Build Coastguard Worker__all__ = (
4*cda5da8dSAndroid Build Coastguard Worker    'Task', 'create_task',
5*cda5da8dSAndroid Build Coastguard Worker    'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
6*cda5da8dSAndroid Build Coastguard Worker    'wait', 'wait_for', 'as_completed', 'sleep',
7*cda5da8dSAndroid Build Coastguard Worker    'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
8*cda5da8dSAndroid Build Coastguard Worker    'current_task', 'all_tasks',
9*cda5da8dSAndroid Build Coastguard Worker    '_register_task', '_unregister_task', '_enter_task', '_leave_task',
10*cda5da8dSAndroid Build Coastguard Worker)
11*cda5da8dSAndroid Build Coastguard Worker
12*cda5da8dSAndroid Build Coastguard Workerimport concurrent.futures
13*cda5da8dSAndroid Build Coastguard Workerimport contextvars
14*cda5da8dSAndroid Build Coastguard Workerimport functools
15*cda5da8dSAndroid Build Coastguard Workerimport inspect
16*cda5da8dSAndroid Build Coastguard Workerimport itertools
17*cda5da8dSAndroid Build Coastguard Workerimport types
18*cda5da8dSAndroid Build Coastguard Workerimport warnings
19*cda5da8dSAndroid Build Coastguard Workerimport weakref
20*cda5da8dSAndroid Build Coastguard Workerfrom types import GenericAlias
21*cda5da8dSAndroid Build Coastguard Worker
22*cda5da8dSAndroid Build Coastguard Workerfrom . import base_tasks
23*cda5da8dSAndroid Build Coastguard Workerfrom . import coroutines
24*cda5da8dSAndroid Build Coastguard Workerfrom . import events
25*cda5da8dSAndroid Build Coastguard Workerfrom . import exceptions
26*cda5da8dSAndroid Build Coastguard Workerfrom . import futures
27*cda5da8dSAndroid Build Coastguard Workerfrom .coroutines import _is_coroutine
28*cda5da8dSAndroid Build Coastguard Worker
29*cda5da8dSAndroid Build Coastguard Worker# Helper to generate new task names
30*cda5da8dSAndroid Build Coastguard Worker# This uses itertools.count() instead of a "+= 1" operation because the latter
31*cda5da8dSAndroid Build Coastguard Worker# is not thread safe. See bpo-11866 for a longer explanation.
32*cda5da8dSAndroid Build Coastguard Worker_task_name_counter = itertools.count(1).__next__
33*cda5da8dSAndroid Build Coastguard Worker
34*cda5da8dSAndroid Build Coastguard Worker
35*cda5da8dSAndroid Build Coastguard Workerdef current_task(loop=None):
36*cda5da8dSAndroid Build Coastguard Worker    """Return a currently executed task."""
37*cda5da8dSAndroid Build Coastguard Worker    if loop is None:
38*cda5da8dSAndroid Build Coastguard Worker        loop = events.get_running_loop()
39*cda5da8dSAndroid Build Coastguard Worker    return _current_tasks.get(loop)
40*cda5da8dSAndroid Build Coastguard Worker
41*cda5da8dSAndroid Build Coastguard Worker
42*cda5da8dSAndroid Build Coastguard Workerdef all_tasks(loop=None):
43*cda5da8dSAndroid Build Coastguard Worker    """Return a set of all tasks for the loop."""
44*cda5da8dSAndroid Build Coastguard Worker    if loop is None:
45*cda5da8dSAndroid Build Coastguard Worker        loop = events.get_running_loop()
46*cda5da8dSAndroid Build Coastguard Worker    # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
47*cda5da8dSAndroid Build Coastguard Worker    # thread while we do so. Therefore we cast it to list prior to filtering. The list
48*cda5da8dSAndroid Build Coastguard Worker    # cast itself requires iteration, so we repeat it several times ignoring
49*cda5da8dSAndroid Build Coastguard Worker    # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
50*cda5da8dSAndroid Build Coastguard Worker    # details.
51*cda5da8dSAndroid Build Coastguard Worker    i = 0
52*cda5da8dSAndroid Build Coastguard Worker    while True:
53*cda5da8dSAndroid Build Coastguard Worker        try:
54*cda5da8dSAndroid Build Coastguard Worker            tasks = list(_all_tasks)
55*cda5da8dSAndroid Build Coastguard Worker        except RuntimeError:
56*cda5da8dSAndroid Build Coastguard Worker            i += 1
57*cda5da8dSAndroid Build Coastguard Worker            if i >= 1000:
58*cda5da8dSAndroid Build Coastguard Worker                raise
59*cda5da8dSAndroid Build Coastguard Worker        else:
60*cda5da8dSAndroid Build Coastguard Worker            break
61*cda5da8dSAndroid Build Coastguard Worker    return {t for t in tasks
62*cda5da8dSAndroid Build Coastguard Worker            if futures._get_loop(t) is loop and not t.done()}
63*cda5da8dSAndroid Build Coastguard Worker
64*cda5da8dSAndroid Build Coastguard Worker
65*cda5da8dSAndroid Build Coastguard Workerdef _set_task_name(task, name):
66*cda5da8dSAndroid Build Coastguard Worker    if name is not None:
67*cda5da8dSAndroid Build Coastguard Worker        try:
68*cda5da8dSAndroid Build Coastguard Worker            set_name = task.set_name
69*cda5da8dSAndroid Build Coastguard Worker        except AttributeError:
70*cda5da8dSAndroid Build Coastguard Worker            warnings.warn("Task.set_name() was added in Python 3.8, "
71*cda5da8dSAndroid Build Coastguard Worker                      "the method support will be mandatory for third-party "
72*cda5da8dSAndroid Build Coastguard Worker                      "task implementations since 3.13.",
73*cda5da8dSAndroid Build Coastguard Worker                      DeprecationWarning, stacklevel=3)
74*cda5da8dSAndroid Build Coastguard Worker        else:
75*cda5da8dSAndroid Build Coastguard Worker            set_name(name)
76*cda5da8dSAndroid Build Coastguard Worker
77*cda5da8dSAndroid Build Coastguard Worker
78*cda5da8dSAndroid Build Coastguard Workerclass Task(futures._PyFuture):  # Inherit Python Task implementation
79*cda5da8dSAndroid Build Coastguard Worker                                # from a Python Future implementation.
80*cda5da8dSAndroid Build Coastguard Worker
81*cda5da8dSAndroid Build Coastguard Worker    """A coroutine wrapped in a Future."""
82*cda5da8dSAndroid Build Coastguard Worker
83*cda5da8dSAndroid Build Coastguard Worker    # An important invariant maintained while a Task not done:
84*cda5da8dSAndroid Build Coastguard Worker    #
85*cda5da8dSAndroid Build Coastguard Worker    # - Either _fut_waiter is None, and _step() is scheduled;
86*cda5da8dSAndroid Build Coastguard Worker    # - or _fut_waiter is some Future, and _step() is *not* scheduled.
87*cda5da8dSAndroid Build Coastguard Worker    #
88*cda5da8dSAndroid Build Coastguard Worker    # The only transition from the latter to the former is through
89*cda5da8dSAndroid Build Coastguard Worker    # _wakeup().  When _fut_waiter is not None, one of its callbacks
90*cda5da8dSAndroid Build Coastguard Worker    # must be _wakeup().
91*cda5da8dSAndroid Build Coastguard Worker
92*cda5da8dSAndroid Build Coastguard Worker    # If False, don't log a message if the task is destroyed whereas its
93*cda5da8dSAndroid Build Coastguard Worker    # status is still pending
94*cda5da8dSAndroid Build Coastguard Worker    _log_destroy_pending = True
95*cda5da8dSAndroid Build Coastguard Worker
96*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, coro, *, loop=None, name=None, context=None):
97*cda5da8dSAndroid Build Coastguard Worker        super().__init__(loop=loop)
98*cda5da8dSAndroid Build Coastguard Worker        if self._source_traceback:
99*cda5da8dSAndroid Build Coastguard Worker            del self._source_traceback[-1]
100*cda5da8dSAndroid Build Coastguard Worker        if not coroutines.iscoroutine(coro):
101*cda5da8dSAndroid Build Coastguard Worker            # raise after Future.__init__(), attrs are required for __del__
102*cda5da8dSAndroid Build Coastguard Worker            # prevent logging for pending task in __del__
103*cda5da8dSAndroid Build Coastguard Worker            self._log_destroy_pending = False
104*cda5da8dSAndroid Build Coastguard Worker            raise TypeError(f"a coroutine was expected, got {coro!r}")
105*cda5da8dSAndroid Build Coastguard Worker
106*cda5da8dSAndroid Build Coastguard Worker        if name is None:
107*cda5da8dSAndroid Build Coastguard Worker            self._name = f'Task-{_task_name_counter()}'
108*cda5da8dSAndroid Build Coastguard Worker        else:
109*cda5da8dSAndroid Build Coastguard Worker            self._name = str(name)
110*cda5da8dSAndroid Build Coastguard Worker
111*cda5da8dSAndroid Build Coastguard Worker        self._num_cancels_requested = 0
112*cda5da8dSAndroid Build Coastguard Worker        self._must_cancel = False
113*cda5da8dSAndroid Build Coastguard Worker        self._fut_waiter = None
114*cda5da8dSAndroid Build Coastguard Worker        self._coro = coro
115*cda5da8dSAndroid Build Coastguard Worker        if context is None:
116*cda5da8dSAndroid Build Coastguard Worker            self._context = contextvars.copy_context()
117*cda5da8dSAndroid Build Coastguard Worker        else:
118*cda5da8dSAndroid Build Coastguard Worker            self._context = context
119*cda5da8dSAndroid Build Coastguard Worker
120*cda5da8dSAndroid Build Coastguard Worker        self._loop.call_soon(self.__step, context=self._context)
121*cda5da8dSAndroid Build Coastguard Worker        _register_task(self)
122*cda5da8dSAndroid Build Coastguard Worker
123*cda5da8dSAndroid Build Coastguard Worker    def __del__(self):
124*cda5da8dSAndroid Build Coastguard Worker        if self._state == futures._PENDING and self._log_destroy_pending:
125*cda5da8dSAndroid Build Coastguard Worker            context = {
126*cda5da8dSAndroid Build Coastguard Worker                'task': self,
127*cda5da8dSAndroid Build Coastguard Worker                'message': 'Task was destroyed but it is pending!',
128*cda5da8dSAndroid Build Coastguard Worker            }
129*cda5da8dSAndroid Build Coastguard Worker            if self._source_traceback:
130*cda5da8dSAndroid Build Coastguard Worker                context['source_traceback'] = self._source_traceback
131*cda5da8dSAndroid Build Coastguard Worker            self._loop.call_exception_handler(context)
132*cda5da8dSAndroid Build Coastguard Worker        super().__del__()
133*cda5da8dSAndroid Build Coastguard Worker
134*cda5da8dSAndroid Build Coastguard Worker    __class_getitem__ = classmethod(GenericAlias)
135*cda5da8dSAndroid Build Coastguard Worker
136*cda5da8dSAndroid Build Coastguard Worker    def __repr__(self):
137*cda5da8dSAndroid Build Coastguard Worker        return base_tasks._task_repr(self)
138*cda5da8dSAndroid Build Coastguard Worker
139*cda5da8dSAndroid Build Coastguard Worker    def get_coro(self):
140*cda5da8dSAndroid Build Coastguard Worker        return self._coro
141*cda5da8dSAndroid Build Coastguard Worker
142*cda5da8dSAndroid Build Coastguard Worker    def get_name(self):
143*cda5da8dSAndroid Build Coastguard Worker        return self._name
144*cda5da8dSAndroid Build Coastguard Worker
145*cda5da8dSAndroid Build Coastguard Worker    def set_name(self, value):
146*cda5da8dSAndroid Build Coastguard Worker        self._name = str(value)
147*cda5da8dSAndroid Build Coastguard Worker
148*cda5da8dSAndroid Build Coastguard Worker    def set_result(self, result):
149*cda5da8dSAndroid Build Coastguard Worker        raise RuntimeError('Task does not support set_result operation')
150*cda5da8dSAndroid Build Coastguard Worker
151*cda5da8dSAndroid Build Coastguard Worker    def set_exception(self, exception):
152*cda5da8dSAndroid Build Coastguard Worker        raise RuntimeError('Task does not support set_exception operation')
153*cda5da8dSAndroid Build Coastguard Worker
154*cda5da8dSAndroid Build Coastguard Worker    def get_stack(self, *, limit=None):
155*cda5da8dSAndroid Build Coastguard Worker        """Return the list of stack frames for this task's coroutine.
156*cda5da8dSAndroid Build Coastguard Worker
157*cda5da8dSAndroid Build Coastguard Worker        If the coroutine is not done, this returns the stack where it is
158*cda5da8dSAndroid Build Coastguard Worker        suspended.  If the coroutine has completed successfully or was
159*cda5da8dSAndroid Build Coastguard Worker        cancelled, this returns an empty list.  If the coroutine was
160*cda5da8dSAndroid Build Coastguard Worker        terminated by an exception, this returns the list of traceback
161*cda5da8dSAndroid Build Coastguard Worker        frames.
162*cda5da8dSAndroid Build Coastguard Worker
163*cda5da8dSAndroid Build Coastguard Worker        The frames are always ordered from oldest to newest.
164*cda5da8dSAndroid Build Coastguard Worker
165*cda5da8dSAndroid Build Coastguard Worker        The optional limit gives the maximum number of frames to
166*cda5da8dSAndroid Build Coastguard Worker        return; by default all available frames are returned.  Its
167*cda5da8dSAndroid Build Coastguard Worker        meaning differs depending on whether a stack or a traceback is
168*cda5da8dSAndroid Build Coastguard Worker        returned: the newest frames of a stack are returned, but the
169*cda5da8dSAndroid Build Coastguard Worker        oldest frames of a traceback are returned.  (This matches the
170*cda5da8dSAndroid Build Coastguard Worker        behavior of the traceback module.)
171*cda5da8dSAndroid Build Coastguard Worker
172*cda5da8dSAndroid Build Coastguard Worker        For reasons beyond our control, only one stack frame is
173*cda5da8dSAndroid Build Coastguard Worker        returned for a suspended coroutine.
174*cda5da8dSAndroid Build Coastguard Worker        """
175*cda5da8dSAndroid Build Coastguard Worker        return base_tasks._task_get_stack(self, limit)
176*cda5da8dSAndroid Build Coastguard Worker
177*cda5da8dSAndroid Build Coastguard Worker    def print_stack(self, *, limit=None, file=None):
178*cda5da8dSAndroid Build Coastguard Worker        """Print the stack or traceback for this task's coroutine.
179*cda5da8dSAndroid Build Coastguard Worker
180*cda5da8dSAndroid Build Coastguard Worker        This produces output similar to that of the traceback module,
181*cda5da8dSAndroid Build Coastguard Worker        for the frames retrieved by get_stack().  The limit argument
182*cda5da8dSAndroid Build Coastguard Worker        is passed to get_stack().  The file argument is an I/O stream
183*cda5da8dSAndroid Build Coastguard Worker        to which the output is written; by default output is written
184*cda5da8dSAndroid Build Coastguard Worker        to sys.stderr.
185*cda5da8dSAndroid Build Coastguard Worker        """
186*cda5da8dSAndroid Build Coastguard Worker        return base_tasks._task_print_stack(self, limit, file)
187*cda5da8dSAndroid Build Coastguard Worker
188*cda5da8dSAndroid Build Coastguard Worker    def cancel(self, msg=None):
189*cda5da8dSAndroid Build Coastguard Worker        """Request that this task cancel itself.
190*cda5da8dSAndroid Build Coastguard Worker
191*cda5da8dSAndroid Build Coastguard Worker        This arranges for a CancelledError to be thrown into the
192*cda5da8dSAndroid Build Coastguard Worker        wrapped coroutine on the next cycle through the event loop.
193*cda5da8dSAndroid Build Coastguard Worker        The coroutine then has a chance to clean up or even deny
194*cda5da8dSAndroid Build Coastguard Worker        the request using try/except/finally.
195*cda5da8dSAndroid Build Coastguard Worker
196*cda5da8dSAndroid Build Coastguard Worker        Unlike Future.cancel, this does not guarantee that the
197*cda5da8dSAndroid Build Coastguard Worker        task will be cancelled: the exception might be caught and
198*cda5da8dSAndroid Build Coastguard Worker        acted upon, delaying cancellation of the task or preventing
199*cda5da8dSAndroid Build Coastguard Worker        cancellation completely.  The task may also return a value or
200*cda5da8dSAndroid Build Coastguard Worker        raise a different exception.
201*cda5da8dSAndroid Build Coastguard Worker
202*cda5da8dSAndroid Build Coastguard Worker        Immediately after this method is called, Task.cancelled() will
203*cda5da8dSAndroid Build Coastguard Worker        not return True (unless the task was already cancelled).  A
204*cda5da8dSAndroid Build Coastguard Worker        task will be marked as cancelled when the wrapped coroutine
205*cda5da8dSAndroid Build Coastguard Worker        terminates with a CancelledError exception (even if cancel()
206*cda5da8dSAndroid Build Coastguard Worker        was not called).
207*cda5da8dSAndroid Build Coastguard Worker
208*cda5da8dSAndroid Build Coastguard Worker        This also increases the task's count of cancellation requests.
209*cda5da8dSAndroid Build Coastguard Worker        """
210*cda5da8dSAndroid Build Coastguard Worker        self._log_traceback = False
211*cda5da8dSAndroid Build Coastguard Worker        if self.done():
212*cda5da8dSAndroid Build Coastguard Worker            return False
213*cda5da8dSAndroid Build Coastguard Worker        self._num_cancels_requested += 1
214*cda5da8dSAndroid Build Coastguard Worker        # These two lines are controversial.  See discussion starting at
215*cda5da8dSAndroid Build Coastguard Worker        # https://github.com/python/cpython/pull/31394#issuecomment-1053545331
216*cda5da8dSAndroid Build Coastguard Worker        # Also remember that this is duplicated in _asynciomodule.c.
217*cda5da8dSAndroid Build Coastguard Worker        # if self._num_cancels_requested > 1:
218*cda5da8dSAndroid Build Coastguard Worker        #     return False
219*cda5da8dSAndroid Build Coastguard Worker        if self._fut_waiter is not None:
220*cda5da8dSAndroid Build Coastguard Worker            if self._fut_waiter.cancel(msg=msg):
221*cda5da8dSAndroid Build Coastguard Worker                # Leave self._fut_waiter; it may be a Task that
222*cda5da8dSAndroid Build Coastguard Worker                # catches and ignores the cancellation so we may have
223*cda5da8dSAndroid Build Coastguard Worker                # to cancel it again later.
224*cda5da8dSAndroid Build Coastguard Worker                return True
225*cda5da8dSAndroid Build Coastguard Worker        # It must be the case that self.__step is already scheduled.
226*cda5da8dSAndroid Build Coastguard Worker        self._must_cancel = True
227*cda5da8dSAndroid Build Coastguard Worker        self._cancel_message = msg
228*cda5da8dSAndroid Build Coastguard Worker        return True
229*cda5da8dSAndroid Build Coastguard Worker
230*cda5da8dSAndroid Build Coastguard Worker    def cancelling(self):
231*cda5da8dSAndroid Build Coastguard Worker        """Return the count of the task's cancellation requests.
232*cda5da8dSAndroid Build Coastguard Worker
233*cda5da8dSAndroid Build Coastguard Worker        This count is incremented when .cancel() is called
234*cda5da8dSAndroid Build Coastguard Worker        and may be decremented using .uncancel().
235*cda5da8dSAndroid Build Coastguard Worker        """
236*cda5da8dSAndroid Build Coastguard Worker        return self._num_cancels_requested
237*cda5da8dSAndroid Build Coastguard Worker
238*cda5da8dSAndroid Build Coastguard Worker    def uncancel(self):
239*cda5da8dSAndroid Build Coastguard Worker        """Decrement the task's count of cancellation requests.
240*cda5da8dSAndroid Build Coastguard Worker
241*cda5da8dSAndroid Build Coastguard Worker        This should be called by the party that called `cancel()` on the task
242*cda5da8dSAndroid Build Coastguard Worker        beforehand.
243*cda5da8dSAndroid Build Coastguard Worker
244*cda5da8dSAndroid Build Coastguard Worker        Returns the remaining number of cancellation requests.
245*cda5da8dSAndroid Build Coastguard Worker        """
246*cda5da8dSAndroid Build Coastguard Worker        if self._num_cancels_requested > 0:
247*cda5da8dSAndroid Build Coastguard Worker            self._num_cancels_requested -= 1
248*cda5da8dSAndroid Build Coastguard Worker        return self._num_cancels_requested
249*cda5da8dSAndroid Build Coastguard Worker
250*cda5da8dSAndroid Build Coastguard Worker    def __step(self, exc=None):
251*cda5da8dSAndroid Build Coastguard Worker        if self.done():
252*cda5da8dSAndroid Build Coastguard Worker            raise exceptions.InvalidStateError(
253*cda5da8dSAndroid Build Coastguard Worker                f'_step(): already done: {self!r}, {exc!r}')
254*cda5da8dSAndroid Build Coastguard Worker        if self._must_cancel:
255*cda5da8dSAndroid Build Coastguard Worker            if not isinstance(exc, exceptions.CancelledError):
256*cda5da8dSAndroid Build Coastguard Worker                exc = self._make_cancelled_error()
257*cda5da8dSAndroid Build Coastguard Worker            self._must_cancel = False
258*cda5da8dSAndroid Build Coastguard Worker        coro = self._coro
259*cda5da8dSAndroid Build Coastguard Worker        self._fut_waiter = None
260*cda5da8dSAndroid Build Coastguard Worker
261*cda5da8dSAndroid Build Coastguard Worker        _enter_task(self._loop, self)
262*cda5da8dSAndroid Build Coastguard Worker        # Call either coro.throw(exc) or coro.send(None).
263*cda5da8dSAndroid Build Coastguard Worker        try:
264*cda5da8dSAndroid Build Coastguard Worker            if exc is None:
265*cda5da8dSAndroid Build Coastguard Worker                # We use the `send` method directly, because coroutines
266*cda5da8dSAndroid Build Coastguard Worker                # don't have `__iter__` and `__next__` methods.
267*cda5da8dSAndroid Build Coastguard Worker                result = coro.send(None)
268*cda5da8dSAndroid Build Coastguard Worker            else:
269*cda5da8dSAndroid Build Coastguard Worker                result = coro.throw(exc)
270*cda5da8dSAndroid Build Coastguard Worker        except StopIteration as exc:
271*cda5da8dSAndroid Build Coastguard Worker            if self._must_cancel:
272*cda5da8dSAndroid Build Coastguard Worker                # Task is cancelled right before coro stops.
273*cda5da8dSAndroid Build Coastguard Worker                self._must_cancel = False
274*cda5da8dSAndroid Build Coastguard Worker                super().cancel(msg=self._cancel_message)
275*cda5da8dSAndroid Build Coastguard Worker            else:
276*cda5da8dSAndroid Build Coastguard Worker                super().set_result(exc.value)
277*cda5da8dSAndroid Build Coastguard Worker        except exceptions.CancelledError as exc:
278*cda5da8dSAndroid Build Coastguard Worker            # Save the original exception so we can chain it later.
279*cda5da8dSAndroid Build Coastguard Worker            self._cancelled_exc = exc
280*cda5da8dSAndroid Build Coastguard Worker            super().cancel()  # I.e., Future.cancel(self).
281*cda5da8dSAndroid Build Coastguard Worker        except (KeyboardInterrupt, SystemExit) as exc:
282*cda5da8dSAndroid Build Coastguard Worker            super().set_exception(exc)
283*cda5da8dSAndroid Build Coastguard Worker            raise
284*cda5da8dSAndroid Build Coastguard Worker        except BaseException as exc:
285*cda5da8dSAndroid Build Coastguard Worker            super().set_exception(exc)
286*cda5da8dSAndroid Build Coastguard Worker        else:
287*cda5da8dSAndroid Build Coastguard Worker            blocking = getattr(result, '_asyncio_future_blocking', None)
288*cda5da8dSAndroid Build Coastguard Worker            if blocking is not None:
289*cda5da8dSAndroid Build Coastguard Worker                # Yielded Future must come from Future.__iter__().
290*cda5da8dSAndroid Build Coastguard Worker                if futures._get_loop(result) is not self._loop:
291*cda5da8dSAndroid Build Coastguard Worker                    new_exc = RuntimeError(
292*cda5da8dSAndroid Build Coastguard Worker                        f'Task {self!r} got Future '
293*cda5da8dSAndroid Build Coastguard Worker                        f'{result!r} attached to a different loop')
294*cda5da8dSAndroid Build Coastguard Worker                    self._loop.call_soon(
295*cda5da8dSAndroid Build Coastguard Worker                        self.__step, new_exc, context=self._context)
296*cda5da8dSAndroid Build Coastguard Worker                elif blocking:
297*cda5da8dSAndroid Build Coastguard Worker                    if result is self:
298*cda5da8dSAndroid Build Coastguard Worker                        new_exc = RuntimeError(
299*cda5da8dSAndroid Build Coastguard Worker                            f'Task cannot await on itself: {self!r}')
300*cda5da8dSAndroid Build Coastguard Worker                        self._loop.call_soon(
301*cda5da8dSAndroid Build Coastguard Worker                            self.__step, new_exc, context=self._context)
302*cda5da8dSAndroid Build Coastguard Worker                    else:
303*cda5da8dSAndroid Build Coastguard Worker                        result._asyncio_future_blocking = False
304*cda5da8dSAndroid Build Coastguard Worker                        result.add_done_callback(
305*cda5da8dSAndroid Build Coastguard Worker                            self.__wakeup, context=self._context)
306*cda5da8dSAndroid Build Coastguard Worker                        self._fut_waiter = result
307*cda5da8dSAndroid Build Coastguard Worker                        if self._must_cancel:
308*cda5da8dSAndroid Build Coastguard Worker                            if self._fut_waiter.cancel(
309*cda5da8dSAndroid Build Coastguard Worker                                    msg=self._cancel_message):
310*cda5da8dSAndroid Build Coastguard Worker                                self._must_cancel = False
311*cda5da8dSAndroid Build Coastguard Worker                else:
312*cda5da8dSAndroid Build Coastguard Worker                    new_exc = RuntimeError(
313*cda5da8dSAndroid Build Coastguard Worker                        f'yield was used instead of yield from '
314*cda5da8dSAndroid Build Coastguard Worker                        f'in task {self!r} with {result!r}')
315*cda5da8dSAndroid Build Coastguard Worker                    self._loop.call_soon(
316*cda5da8dSAndroid Build Coastguard Worker                        self.__step, new_exc, context=self._context)
317*cda5da8dSAndroid Build Coastguard Worker
318*cda5da8dSAndroid Build Coastguard Worker            elif result is None:
319*cda5da8dSAndroid Build Coastguard Worker                # Bare yield relinquishes control for one event loop iteration.
320*cda5da8dSAndroid Build Coastguard Worker                self._loop.call_soon(self.__step, context=self._context)
321*cda5da8dSAndroid Build Coastguard Worker            elif inspect.isgenerator(result):
322*cda5da8dSAndroid Build Coastguard Worker                # Yielding a generator is just wrong.
323*cda5da8dSAndroid Build Coastguard Worker                new_exc = RuntimeError(
324*cda5da8dSAndroid Build Coastguard Worker                    f'yield was used instead of yield from for '
325*cda5da8dSAndroid Build Coastguard Worker                    f'generator in task {self!r} with {result!r}')
326*cda5da8dSAndroid Build Coastguard Worker                self._loop.call_soon(
327*cda5da8dSAndroid Build Coastguard Worker                    self.__step, new_exc, context=self._context)
328*cda5da8dSAndroid Build Coastguard Worker            else:
329*cda5da8dSAndroid Build Coastguard Worker                # Yielding something else is an error.
330*cda5da8dSAndroid Build Coastguard Worker                new_exc = RuntimeError(f'Task got bad yield: {result!r}')
331*cda5da8dSAndroid Build Coastguard Worker                self._loop.call_soon(
332*cda5da8dSAndroid Build Coastguard Worker                    self.__step, new_exc, context=self._context)
333*cda5da8dSAndroid Build Coastguard Worker        finally:
334*cda5da8dSAndroid Build Coastguard Worker            _leave_task(self._loop, self)
335*cda5da8dSAndroid Build Coastguard Worker            self = None  # Needed to break cycles when an exception occurs.
336*cda5da8dSAndroid Build Coastguard Worker
337*cda5da8dSAndroid Build Coastguard Worker    def __wakeup(self, future):
338*cda5da8dSAndroid Build Coastguard Worker        try:
339*cda5da8dSAndroid Build Coastguard Worker            future.result()
340*cda5da8dSAndroid Build Coastguard Worker        except BaseException as exc:
341*cda5da8dSAndroid Build Coastguard Worker            # This may also be a cancellation.
342*cda5da8dSAndroid Build Coastguard Worker            self.__step(exc)
343*cda5da8dSAndroid Build Coastguard Worker        else:
344*cda5da8dSAndroid Build Coastguard Worker            # Don't pass the value of `future.result()` explicitly,
345*cda5da8dSAndroid Build Coastguard Worker            # as `Future.__iter__` and `Future.__await__` don't need it.
346*cda5da8dSAndroid Build Coastguard Worker            # If we call `_step(value, None)` instead of `_step()`,
347*cda5da8dSAndroid Build Coastguard Worker            # Python eval loop would use `.send(value)` method call,
348*cda5da8dSAndroid Build Coastguard Worker            # instead of `__next__()`, which is slower for futures
349*cda5da8dSAndroid Build Coastguard Worker            # that return non-generator iterators from their `__iter__`.
350*cda5da8dSAndroid Build Coastguard Worker            self.__step()
351*cda5da8dSAndroid Build Coastguard Worker        self = None  # Needed to break cycles when an exception occurs.
352*cda5da8dSAndroid Build Coastguard Worker
353*cda5da8dSAndroid Build Coastguard Worker
354*cda5da8dSAndroid Build Coastguard Worker_PyTask = Task
355*cda5da8dSAndroid Build Coastguard Worker
356*cda5da8dSAndroid Build Coastguard Worker
357*cda5da8dSAndroid Build Coastguard Workertry:
358*cda5da8dSAndroid Build Coastguard Worker    import _asyncio
359*cda5da8dSAndroid Build Coastguard Workerexcept ImportError:
360*cda5da8dSAndroid Build Coastguard Worker    pass
361*cda5da8dSAndroid Build Coastguard Workerelse:
362*cda5da8dSAndroid Build Coastguard Worker    # _CTask is needed for tests.
363*cda5da8dSAndroid Build Coastguard Worker    Task = _CTask = _asyncio.Task
364*cda5da8dSAndroid Build Coastguard Worker
365*cda5da8dSAndroid Build Coastguard Worker
366*cda5da8dSAndroid Build Coastguard Workerdef create_task(coro, *, name=None, context=None):
367*cda5da8dSAndroid Build Coastguard Worker    """Schedule the execution of a coroutine object in a spawn task.
368*cda5da8dSAndroid Build Coastguard Worker
369*cda5da8dSAndroid Build Coastguard Worker    Return a Task object.
370*cda5da8dSAndroid Build Coastguard Worker    """
371*cda5da8dSAndroid Build Coastguard Worker    loop = events.get_running_loop()
372*cda5da8dSAndroid Build Coastguard Worker    if context is None:
373*cda5da8dSAndroid Build Coastguard Worker        # Use legacy API if context is not needed
374*cda5da8dSAndroid Build Coastguard Worker        task = loop.create_task(coro)
375*cda5da8dSAndroid Build Coastguard Worker    else:
376*cda5da8dSAndroid Build Coastguard Worker        task = loop.create_task(coro, context=context)
377*cda5da8dSAndroid Build Coastguard Worker
378*cda5da8dSAndroid Build Coastguard Worker    _set_task_name(task, name)
379*cda5da8dSAndroid Build Coastguard Worker    return task
380*cda5da8dSAndroid Build Coastguard Worker
381*cda5da8dSAndroid Build Coastguard Worker
382*cda5da8dSAndroid Build Coastguard Worker# wait() and as_completed() similar to those in PEP 3148.
383*cda5da8dSAndroid Build Coastguard Worker
384*cda5da8dSAndroid Build Coastguard WorkerFIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
385*cda5da8dSAndroid Build Coastguard WorkerFIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
386*cda5da8dSAndroid Build Coastguard WorkerALL_COMPLETED = concurrent.futures.ALL_COMPLETED
387*cda5da8dSAndroid Build Coastguard Worker
388*cda5da8dSAndroid Build Coastguard Worker
389*cda5da8dSAndroid Build Coastguard Workerasync def wait(fs, *, timeout=None, return_when=ALL_COMPLETED):
390*cda5da8dSAndroid Build Coastguard Worker    """Wait for the Futures or Tasks given by fs to complete.
391*cda5da8dSAndroid Build Coastguard Worker
392*cda5da8dSAndroid Build Coastguard Worker    The fs iterable must not be empty.
393*cda5da8dSAndroid Build Coastguard Worker
394*cda5da8dSAndroid Build Coastguard Worker    Coroutines will be wrapped in Tasks.
395*cda5da8dSAndroid Build Coastguard Worker
396*cda5da8dSAndroid Build Coastguard Worker    Returns two sets of Future: (done, pending).
397*cda5da8dSAndroid Build Coastguard Worker
398*cda5da8dSAndroid Build Coastguard Worker    Usage:
399*cda5da8dSAndroid Build Coastguard Worker
400*cda5da8dSAndroid Build Coastguard Worker        done, pending = await asyncio.wait(fs)
401*cda5da8dSAndroid Build Coastguard Worker
402*cda5da8dSAndroid Build Coastguard Worker    Note: This does not raise TimeoutError! Futures that aren't done
403*cda5da8dSAndroid Build Coastguard Worker    when the timeout occurs are returned in the second set.
404*cda5da8dSAndroid Build Coastguard Worker    """
405*cda5da8dSAndroid Build Coastguard Worker    if futures.isfuture(fs) or coroutines.iscoroutine(fs):
406*cda5da8dSAndroid Build Coastguard Worker        raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
407*cda5da8dSAndroid Build Coastguard Worker    if not fs:
408*cda5da8dSAndroid Build Coastguard Worker        raise ValueError('Set of Tasks/Futures is empty.')
409*cda5da8dSAndroid Build Coastguard Worker    if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
410*cda5da8dSAndroid Build Coastguard Worker        raise ValueError(f'Invalid return_when value: {return_when}')
411*cda5da8dSAndroid Build Coastguard Worker
412*cda5da8dSAndroid Build Coastguard Worker    fs = set(fs)
413*cda5da8dSAndroid Build Coastguard Worker
414*cda5da8dSAndroid Build Coastguard Worker    if any(coroutines.iscoroutine(f) for f in fs):
415*cda5da8dSAndroid Build Coastguard Worker        raise TypeError("Passing coroutines is forbidden, use tasks explicitly.")
416*cda5da8dSAndroid Build Coastguard Worker
417*cda5da8dSAndroid Build Coastguard Worker    loop = events.get_running_loop()
418*cda5da8dSAndroid Build Coastguard Worker    return await _wait(fs, timeout, return_when, loop)
419*cda5da8dSAndroid Build Coastguard Worker
420*cda5da8dSAndroid Build Coastguard Worker
421*cda5da8dSAndroid Build Coastguard Workerdef _release_waiter(waiter, *args):
422*cda5da8dSAndroid Build Coastguard Worker    if not waiter.done():
423*cda5da8dSAndroid Build Coastguard Worker        waiter.set_result(None)
424*cda5da8dSAndroid Build Coastguard Worker
425*cda5da8dSAndroid Build Coastguard Worker
426*cda5da8dSAndroid Build Coastguard Workerasync def wait_for(fut, timeout):
427*cda5da8dSAndroid Build Coastguard Worker    """Wait for the single Future or coroutine to complete, with timeout.
428*cda5da8dSAndroid Build Coastguard Worker
429*cda5da8dSAndroid Build Coastguard Worker    Coroutine will be wrapped in Task.
430*cda5da8dSAndroid Build Coastguard Worker
431*cda5da8dSAndroid Build Coastguard Worker    Returns result of the Future or coroutine.  When a timeout occurs,
432*cda5da8dSAndroid Build Coastguard Worker    it cancels the task and raises TimeoutError.  To avoid the task
433*cda5da8dSAndroid Build Coastguard Worker    cancellation, wrap it in shield().
434*cda5da8dSAndroid Build Coastguard Worker
435*cda5da8dSAndroid Build Coastguard Worker    If the wait is cancelled, the task is also cancelled.
436*cda5da8dSAndroid Build Coastguard Worker
437*cda5da8dSAndroid Build Coastguard Worker    This function is a coroutine.
438*cda5da8dSAndroid Build Coastguard Worker    """
439*cda5da8dSAndroid Build Coastguard Worker    loop = events.get_running_loop()
440*cda5da8dSAndroid Build Coastguard Worker
441*cda5da8dSAndroid Build Coastguard Worker    if timeout is None:
442*cda5da8dSAndroid Build Coastguard Worker        return await fut
443*cda5da8dSAndroid Build Coastguard Worker
444*cda5da8dSAndroid Build Coastguard Worker    if timeout <= 0:
445*cda5da8dSAndroid Build Coastguard Worker        fut = ensure_future(fut, loop=loop)
446*cda5da8dSAndroid Build Coastguard Worker
447*cda5da8dSAndroid Build Coastguard Worker        if fut.done():
448*cda5da8dSAndroid Build Coastguard Worker            return fut.result()
449*cda5da8dSAndroid Build Coastguard Worker
450*cda5da8dSAndroid Build Coastguard Worker        await _cancel_and_wait(fut, loop=loop)
451*cda5da8dSAndroid Build Coastguard Worker        try:
452*cda5da8dSAndroid Build Coastguard Worker            return fut.result()
453*cda5da8dSAndroid Build Coastguard Worker        except exceptions.CancelledError as exc:
454*cda5da8dSAndroid Build Coastguard Worker            raise exceptions.TimeoutError() from exc
455*cda5da8dSAndroid Build Coastguard Worker
456*cda5da8dSAndroid Build Coastguard Worker    waiter = loop.create_future()
457*cda5da8dSAndroid Build Coastguard Worker    timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
458*cda5da8dSAndroid Build Coastguard Worker    cb = functools.partial(_release_waiter, waiter)
459*cda5da8dSAndroid Build Coastguard Worker
460*cda5da8dSAndroid Build Coastguard Worker    fut = ensure_future(fut, loop=loop)
461*cda5da8dSAndroid Build Coastguard Worker    fut.add_done_callback(cb)
462*cda5da8dSAndroid Build Coastguard Worker
463*cda5da8dSAndroid Build Coastguard Worker    try:
464*cda5da8dSAndroid Build Coastguard Worker        # wait until the future completes or the timeout
465*cda5da8dSAndroid Build Coastguard Worker        try:
466*cda5da8dSAndroid Build Coastguard Worker            await waiter
467*cda5da8dSAndroid Build Coastguard Worker        except exceptions.CancelledError:
468*cda5da8dSAndroid Build Coastguard Worker            if fut.done():
469*cda5da8dSAndroid Build Coastguard Worker                return fut.result()
470*cda5da8dSAndroid Build Coastguard Worker            else:
471*cda5da8dSAndroid Build Coastguard Worker                fut.remove_done_callback(cb)
472*cda5da8dSAndroid Build Coastguard Worker                # We must ensure that the task is not running
473*cda5da8dSAndroid Build Coastguard Worker                # after wait_for() returns.
474*cda5da8dSAndroid Build Coastguard Worker                # See https://bugs.python.org/issue32751
475*cda5da8dSAndroid Build Coastguard Worker                await _cancel_and_wait(fut, loop=loop)
476*cda5da8dSAndroid Build Coastguard Worker                raise
477*cda5da8dSAndroid Build Coastguard Worker
478*cda5da8dSAndroid Build Coastguard Worker        if fut.done():
479*cda5da8dSAndroid Build Coastguard Worker            return fut.result()
480*cda5da8dSAndroid Build Coastguard Worker        else:
481*cda5da8dSAndroid Build Coastguard Worker            fut.remove_done_callback(cb)
482*cda5da8dSAndroid Build Coastguard Worker            # We must ensure that the task is not running
483*cda5da8dSAndroid Build Coastguard Worker            # after wait_for() returns.
484*cda5da8dSAndroid Build Coastguard Worker            # See https://bugs.python.org/issue32751
485*cda5da8dSAndroid Build Coastguard Worker            await _cancel_and_wait(fut, loop=loop)
486*cda5da8dSAndroid Build Coastguard Worker            # In case task cancellation failed with some
487*cda5da8dSAndroid Build Coastguard Worker            # exception, we should re-raise it
488*cda5da8dSAndroid Build Coastguard Worker            # See https://bugs.python.org/issue40607
489*cda5da8dSAndroid Build Coastguard Worker            try:
490*cda5da8dSAndroid Build Coastguard Worker                return fut.result()
491*cda5da8dSAndroid Build Coastguard Worker            except exceptions.CancelledError as exc:
492*cda5da8dSAndroid Build Coastguard Worker                raise exceptions.TimeoutError() from exc
493*cda5da8dSAndroid Build Coastguard Worker    finally:
494*cda5da8dSAndroid Build Coastguard Worker        timeout_handle.cancel()
495*cda5da8dSAndroid Build Coastguard Worker
496*cda5da8dSAndroid Build Coastguard Worker
497*cda5da8dSAndroid Build Coastguard Workerasync def _wait(fs, timeout, return_when, loop):
498*cda5da8dSAndroid Build Coastguard Worker    """Internal helper for wait().
499*cda5da8dSAndroid Build Coastguard Worker
500*cda5da8dSAndroid Build Coastguard Worker    The fs argument must be a collection of Futures.
501*cda5da8dSAndroid Build Coastguard Worker    """
502*cda5da8dSAndroid Build Coastguard Worker    assert fs, 'Set of Futures is empty.'
503*cda5da8dSAndroid Build Coastguard Worker    waiter = loop.create_future()
504*cda5da8dSAndroid Build Coastguard Worker    timeout_handle = None
505*cda5da8dSAndroid Build Coastguard Worker    if timeout is not None:
506*cda5da8dSAndroid Build Coastguard Worker        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
507*cda5da8dSAndroid Build Coastguard Worker    counter = len(fs)
508*cda5da8dSAndroid Build Coastguard Worker
509*cda5da8dSAndroid Build Coastguard Worker    def _on_completion(f):
510*cda5da8dSAndroid Build Coastguard Worker        nonlocal counter
511*cda5da8dSAndroid Build Coastguard Worker        counter -= 1
512*cda5da8dSAndroid Build Coastguard Worker        if (counter <= 0 or
513*cda5da8dSAndroid Build Coastguard Worker            return_when == FIRST_COMPLETED or
514*cda5da8dSAndroid Build Coastguard Worker            return_when == FIRST_EXCEPTION and (not f.cancelled() and
515*cda5da8dSAndroid Build Coastguard Worker                                                f.exception() is not None)):
516*cda5da8dSAndroid Build Coastguard Worker            if timeout_handle is not None:
517*cda5da8dSAndroid Build Coastguard Worker                timeout_handle.cancel()
518*cda5da8dSAndroid Build Coastguard Worker            if not waiter.done():
519*cda5da8dSAndroid Build Coastguard Worker                waiter.set_result(None)
520*cda5da8dSAndroid Build Coastguard Worker
521*cda5da8dSAndroid Build Coastguard Worker    for f in fs:
522*cda5da8dSAndroid Build Coastguard Worker        f.add_done_callback(_on_completion)
523*cda5da8dSAndroid Build Coastguard Worker
524*cda5da8dSAndroid Build Coastguard Worker    try:
525*cda5da8dSAndroid Build Coastguard Worker        await waiter
526*cda5da8dSAndroid Build Coastguard Worker    finally:
527*cda5da8dSAndroid Build Coastguard Worker        if timeout_handle is not None:
528*cda5da8dSAndroid Build Coastguard Worker            timeout_handle.cancel()
529*cda5da8dSAndroid Build Coastguard Worker        for f in fs:
530*cda5da8dSAndroid Build Coastguard Worker            f.remove_done_callback(_on_completion)
531*cda5da8dSAndroid Build Coastguard Worker
532*cda5da8dSAndroid Build Coastguard Worker    done, pending = set(), set()
533*cda5da8dSAndroid Build Coastguard Worker    for f in fs:
534*cda5da8dSAndroid Build Coastguard Worker        if f.done():
535*cda5da8dSAndroid Build Coastguard Worker            done.add(f)
536*cda5da8dSAndroid Build Coastguard Worker        else:
537*cda5da8dSAndroid Build Coastguard Worker            pending.add(f)
538*cda5da8dSAndroid Build Coastguard Worker    return done, pending
539*cda5da8dSAndroid Build Coastguard Worker
540*cda5da8dSAndroid Build Coastguard Worker
541*cda5da8dSAndroid Build Coastguard Workerasync def _cancel_and_wait(fut, loop):
542*cda5da8dSAndroid Build Coastguard Worker    """Cancel the *fut* future or task and wait until it completes."""
543*cda5da8dSAndroid Build Coastguard Worker
544*cda5da8dSAndroid Build Coastguard Worker    waiter = loop.create_future()
545*cda5da8dSAndroid Build Coastguard Worker    cb = functools.partial(_release_waiter, waiter)
546*cda5da8dSAndroid Build Coastguard Worker    fut.add_done_callback(cb)
547*cda5da8dSAndroid Build Coastguard Worker
548*cda5da8dSAndroid Build Coastguard Worker    try:
549*cda5da8dSAndroid Build Coastguard Worker        fut.cancel()
550*cda5da8dSAndroid Build Coastguard Worker        # We cannot wait on *fut* directly to make
551*cda5da8dSAndroid Build Coastguard Worker        # sure _cancel_and_wait itself is reliably cancellable.
552*cda5da8dSAndroid Build Coastguard Worker        await waiter
553*cda5da8dSAndroid Build Coastguard Worker    finally:
554*cda5da8dSAndroid Build Coastguard Worker        fut.remove_done_callback(cb)
555*cda5da8dSAndroid Build Coastguard Worker
556*cda5da8dSAndroid Build Coastguard Worker
557*cda5da8dSAndroid Build Coastguard Worker# This is *not* a @coroutine!  It is just an iterator (yielding Futures).
558*cda5da8dSAndroid Build Coastguard Workerdef as_completed(fs, *, timeout=None):
559*cda5da8dSAndroid Build Coastguard Worker    """Return an iterator whose values are coroutines.
560*cda5da8dSAndroid Build Coastguard Worker
561*cda5da8dSAndroid Build Coastguard Worker    When waiting for the yielded coroutines you'll get the results (or
562*cda5da8dSAndroid Build Coastguard Worker    exceptions!) of the original Futures (or coroutines), in the order
563*cda5da8dSAndroid Build Coastguard Worker    in which and as soon as they complete.
564*cda5da8dSAndroid Build Coastguard Worker
565*cda5da8dSAndroid Build Coastguard Worker    This differs from PEP 3148; the proper way to use this is:
566*cda5da8dSAndroid Build Coastguard Worker
567*cda5da8dSAndroid Build Coastguard Worker        for f in as_completed(fs):
568*cda5da8dSAndroid Build Coastguard Worker            result = await f  # The 'await' may raise.
569*cda5da8dSAndroid Build Coastguard Worker            # Use result.
570*cda5da8dSAndroid Build Coastguard Worker
571*cda5da8dSAndroid Build Coastguard Worker    If a timeout is specified, the 'await' will raise
572*cda5da8dSAndroid Build Coastguard Worker    TimeoutError when the timeout occurs before all Futures are done.
573*cda5da8dSAndroid Build Coastguard Worker
574*cda5da8dSAndroid Build Coastguard Worker    Note: The futures 'f' are not necessarily members of fs.
575*cda5da8dSAndroid Build Coastguard Worker    """
576*cda5da8dSAndroid Build Coastguard Worker    if futures.isfuture(fs) or coroutines.iscoroutine(fs):
577*cda5da8dSAndroid Build Coastguard Worker        raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}")
578*cda5da8dSAndroid Build Coastguard Worker
579*cda5da8dSAndroid Build Coastguard Worker    from .queues import Queue  # Import here to avoid circular import problem.
580*cda5da8dSAndroid Build Coastguard Worker    done = Queue()
581*cda5da8dSAndroid Build Coastguard Worker
582*cda5da8dSAndroid Build Coastguard Worker    loop = events._get_event_loop()
583*cda5da8dSAndroid Build Coastguard Worker    todo = {ensure_future(f, loop=loop) for f in set(fs)}
584*cda5da8dSAndroid Build Coastguard Worker    timeout_handle = None
585*cda5da8dSAndroid Build Coastguard Worker
586*cda5da8dSAndroid Build Coastguard Worker    def _on_timeout():
587*cda5da8dSAndroid Build Coastguard Worker        for f in todo:
588*cda5da8dSAndroid Build Coastguard Worker            f.remove_done_callback(_on_completion)
589*cda5da8dSAndroid Build Coastguard Worker            done.put_nowait(None)  # Queue a dummy value for _wait_for_one().
590*cda5da8dSAndroid Build Coastguard Worker        todo.clear()  # Can't do todo.remove(f) in the loop.
591*cda5da8dSAndroid Build Coastguard Worker
592*cda5da8dSAndroid Build Coastguard Worker    def _on_completion(f):
593*cda5da8dSAndroid Build Coastguard Worker        if not todo:
594*cda5da8dSAndroid Build Coastguard Worker            return  # _on_timeout() was here first.
595*cda5da8dSAndroid Build Coastguard Worker        todo.remove(f)
596*cda5da8dSAndroid Build Coastguard Worker        done.put_nowait(f)
597*cda5da8dSAndroid Build Coastguard Worker        if not todo and timeout_handle is not None:
598*cda5da8dSAndroid Build Coastguard Worker            timeout_handle.cancel()
599*cda5da8dSAndroid Build Coastguard Worker
600*cda5da8dSAndroid Build Coastguard Worker    async def _wait_for_one():
601*cda5da8dSAndroid Build Coastguard Worker        f = await done.get()
602*cda5da8dSAndroid Build Coastguard Worker        if f is None:
603*cda5da8dSAndroid Build Coastguard Worker            # Dummy value from _on_timeout().
604*cda5da8dSAndroid Build Coastguard Worker            raise exceptions.TimeoutError
605*cda5da8dSAndroid Build Coastguard Worker        return f.result()  # May raise f.exception().
606*cda5da8dSAndroid Build Coastguard Worker
607*cda5da8dSAndroid Build Coastguard Worker    for f in todo:
608*cda5da8dSAndroid Build Coastguard Worker        f.add_done_callback(_on_completion)
609*cda5da8dSAndroid Build Coastguard Worker    if todo and timeout is not None:
610*cda5da8dSAndroid Build Coastguard Worker        timeout_handle = loop.call_later(timeout, _on_timeout)
611*cda5da8dSAndroid Build Coastguard Worker    for _ in range(len(todo)):
612*cda5da8dSAndroid Build Coastguard Worker        yield _wait_for_one()
613*cda5da8dSAndroid Build Coastguard Worker
614*cda5da8dSAndroid Build Coastguard Worker
615*cda5da8dSAndroid Build Coastguard Worker@types.coroutine
616*cda5da8dSAndroid Build Coastguard Workerdef __sleep0():
617*cda5da8dSAndroid Build Coastguard Worker    """Skip one event loop run cycle.
618*cda5da8dSAndroid Build Coastguard Worker
619*cda5da8dSAndroid Build Coastguard Worker    This is a private helper for 'asyncio.sleep()', used
620*cda5da8dSAndroid Build Coastguard Worker    when the 'delay' is set to 0.  It uses a bare 'yield'
621*cda5da8dSAndroid Build Coastguard Worker    expression (which Task.__step knows how to handle)
622*cda5da8dSAndroid Build Coastguard Worker    instead of creating a Future object.
623*cda5da8dSAndroid Build Coastguard Worker    """
624*cda5da8dSAndroid Build Coastguard Worker    yield
625*cda5da8dSAndroid Build Coastguard Worker
626*cda5da8dSAndroid Build Coastguard Worker
627*cda5da8dSAndroid Build Coastguard Workerasync def sleep(delay, result=None):
628*cda5da8dSAndroid Build Coastguard Worker    """Coroutine that completes after a given time (in seconds)."""
629*cda5da8dSAndroid Build Coastguard Worker    if delay <= 0:
630*cda5da8dSAndroid Build Coastguard Worker        await __sleep0()
631*cda5da8dSAndroid Build Coastguard Worker        return result
632*cda5da8dSAndroid Build Coastguard Worker
633*cda5da8dSAndroid Build Coastguard Worker    loop = events.get_running_loop()
634*cda5da8dSAndroid Build Coastguard Worker    future = loop.create_future()
635*cda5da8dSAndroid Build Coastguard Worker    h = loop.call_later(delay,
636*cda5da8dSAndroid Build Coastguard Worker                        futures._set_result_unless_cancelled,
637*cda5da8dSAndroid Build Coastguard Worker                        future, result)
638*cda5da8dSAndroid Build Coastguard Worker    try:
639*cda5da8dSAndroid Build Coastguard Worker        return await future
640*cda5da8dSAndroid Build Coastguard Worker    finally:
641*cda5da8dSAndroid Build Coastguard Worker        h.cancel()
642*cda5da8dSAndroid Build Coastguard Worker
643*cda5da8dSAndroid Build Coastguard Worker
644*cda5da8dSAndroid Build Coastguard Workerdef ensure_future(coro_or_future, *, loop=None):
645*cda5da8dSAndroid Build Coastguard Worker    """Wrap a coroutine or an awaitable in a future.
646*cda5da8dSAndroid Build Coastguard Worker
647*cda5da8dSAndroid Build Coastguard Worker    If the argument is a Future, it is returned directly.
648*cda5da8dSAndroid Build Coastguard Worker    """
649*cda5da8dSAndroid Build Coastguard Worker    return _ensure_future(coro_or_future, loop=loop)
650*cda5da8dSAndroid Build Coastguard Worker
651*cda5da8dSAndroid Build Coastguard Worker
652*cda5da8dSAndroid Build Coastguard Workerdef _ensure_future(coro_or_future, *, loop=None):
653*cda5da8dSAndroid Build Coastguard Worker    if futures.isfuture(coro_or_future):
654*cda5da8dSAndroid Build Coastguard Worker        if loop is not None and loop is not futures._get_loop(coro_or_future):
655*cda5da8dSAndroid Build Coastguard Worker            raise ValueError('The future belongs to a different loop than '
656*cda5da8dSAndroid Build Coastguard Worker                            'the one specified as the loop argument')
657*cda5da8dSAndroid Build Coastguard Worker        return coro_or_future
658*cda5da8dSAndroid Build Coastguard Worker    called_wrap_awaitable = False
659*cda5da8dSAndroid Build Coastguard Worker    if not coroutines.iscoroutine(coro_or_future):
660*cda5da8dSAndroid Build Coastguard Worker        if inspect.isawaitable(coro_or_future):
661*cda5da8dSAndroid Build Coastguard Worker            coro_or_future = _wrap_awaitable(coro_or_future)
662*cda5da8dSAndroid Build Coastguard Worker            called_wrap_awaitable = True
663*cda5da8dSAndroid Build Coastguard Worker        else:
664*cda5da8dSAndroid Build Coastguard Worker            raise TypeError('An asyncio.Future, a coroutine or an awaitable '
665*cda5da8dSAndroid Build Coastguard Worker                            'is required')
666*cda5da8dSAndroid Build Coastguard Worker
667*cda5da8dSAndroid Build Coastguard Worker    if loop is None:
668*cda5da8dSAndroid Build Coastguard Worker        loop = events._get_event_loop(stacklevel=4)
669*cda5da8dSAndroid Build Coastguard Worker    try:
670*cda5da8dSAndroid Build Coastguard Worker        return loop.create_task(coro_or_future)
671*cda5da8dSAndroid Build Coastguard Worker    except RuntimeError:
672*cda5da8dSAndroid Build Coastguard Worker        if not called_wrap_awaitable:
673*cda5da8dSAndroid Build Coastguard Worker            coro_or_future.close()
674*cda5da8dSAndroid Build Coastguard Worker        raise
675*cda5da8dSAndroid Build Coastguard Worker
676*cda5da8dSAndroid Build Coastguard Worker
677*cda5da8dSAndroid Build Coastguard Worker@types.coroutine
678*cda5da8dSAndroid Build Coastguard Workerdef _wrap_awaitable(awaitable):
679*cda5da8dSAndroid Build Coastguard Worker    """Helper for asyncio.ensure_future().
680*cda5da8dSAndroid Build Coastguard Worker
681*cda5da8dSAndroid Build Coastguard Worker    Wraps awaitable (an object with __await__) into a coroutine
682*cda5da8dSAndroid Build Coastguard Worker    that will later be wrapped in a Task by ensure_future().
683*cda5da8dSAndroid Build Coastguard Worker    """
684*cda5da8dSAndroid Build Coastguard Worker    return (yield from awaitable.__await__())
685*cda5da8dSAndroid Build Coastguard Worker
686*cda5da8dSAndroid Build Coastguard Worker_wrap_awaitable._is_coroutine = _is_coroutine
687*cda5da8dSAndroid Build Coastguard Worker
688*cda5da8dSAndroid Build Coastguard Worker
689*cda5da8dSAndroid Build Coastguard Workerclass _GatheringFuture(futures.Future):
690*cda5da8dSAndroid Build Coastguard Worker    """Helper for gather().
691*cda5da8dSAndroid Build Coastguard Worker
692*cda5da8dSAndroid Build Coastguard Worker    This overrides cancel() to cancel all the children and act more
693*cda5da8dSAndroid Build Coastguard Worker    like Task.cancel(), which doesn't immediately mark itself as
694*cda5da8dSAndroid Build Coastguard Worker    cancelled.
695*cda5da8dSAndroid Build Coastguard Worker    """
696*cda5da8dSAndroid Build Coastguard Worker
697*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, children, *, loop):
698*cda5da8dSAndroid Build Coastguard Worker        assert loop is not None
699*cda5da8dSAndroid Build Coastguard Worker        super().__init__(loop=loop)
700*cda5da8dSAndroid Build Coastguard Worker        self._children = children
701*cda5da8dSAndroid Build Coastguard Worker        self._cancel_requested = False
702*cda5da8dSAndroid Build Coastguard Worker
703*cda5da8dSAndroid Build Coastguard Worker    def cancel(self, msg=None):
704*cda5da8dSAndroid Build Coastguard Worker        if self.done():
705*cda5da8dSAndroid Build Coastguard Worker            return False
706*cda5da8dSAndroid Build Coastguard Worker        ret = False
707*cda5da8dSAndroid Build Coastguard Worker        for child in self._children:
708*cda5da8dSAndroid Build Coastguard Worker            if child.cancel(msg=msg):
709*cda5da8dSAndroid Build Coastguard Worker                ret = True
710*cda5da8dSAndroid Build Coastguard Worker        if ret:
711*cda5da8dSAndroid Build Coastguard Worker            # If any child tasks were actually cancelled, we should
712*cda5da8dSAndroid Build Coastguard Worker            # propagate the cancellation request regardless of
713*cda5da8dSAndroid Build Coastguard Worker            # *return_exceptions* argument.  See issue 32684.
714*cda5da8dSAndroid Build Coastguard Worker            self._cancel_requested = True
715*cda5da8dSAndroid Build Coastguard Worker        return ret
716*cda5da8dSAndroid Build Coastguard Worker
717*cda5da8dSAndroid Build Coastguard Worker
718*cda5da8dSAndroid Build Coastguard Workerdef gather(*coros_or_futures, return_exceptions=False):
719*cda5da8dSAndroid Build Coastguard Worker    """Return a future aggregating results from the given coroutines/futures.
720*cda5da8dSAndroid Build Coastguard Worker
721*cda5da8dSAndroid Build Coastguard Worker    Coroutines will be wrapped in a future and scheduled in the event
722*cda5da8dSAndroid Build Coastguard Worker    loop. They will not necessarily be scheduled in the same order as
723*cda5da8dSAndroid Build Coastguard Worker    passed in.
724*cda5da8dSAndroid Build Coastguard Worker
725*cda5da8dSAndroid Build Coastguard Worker    All futures must share the same event loop.  If all the tasks are
726*cda5da8dSAndroid Build Coastguard Worker    done successfully, the returned future's result is the list of
727*cda5da8dSAndroid Build Coastguard Worker    results (in the order of the original sequence, not necessarily
728*cda5da8dSAndroid Build Coastguard Worker    the order of results arrival).  If *return_exceptions* is True,
729*cda5da8dSAndroid Build Coastguard Worker    exceptions in the tasks are treated the same as successful
730*cda5da8dSAndroid Build Coastguard Worker    results, and gathered in the result list; otherwise, the first
731*cda5da8dSAndroid Build Coastguard Worker    raised exception will be immediately propagated to the returned
732*cda5da8dSAndroid Build Coastguard Worker    future.
733*cda5da8dSAndroid Build Coastguard Worker
734*cda5da8dSAndroid Build Coastguard Worker    Cancellation: if the outer Future is cancelled, all children (that
735*cda5da8dSAndroid Build Coastguard Worker    have not completed yet) are also cancelled.  If any child is
736*cda5da8dSAndroid Build Coastguard Worker    cancelled, this is treated as if it raised CancelledError --
737*cda5da8dSAndroid Build Coastguard Worker    the outer Future is *not* cancelled in this case.  (This is to
738*cda5da8dSAndroid Build Coastguard Worker    prevent the cancellation of one child to cause other children to
739*cda5da8dSAndroid Build Coastguard Worker    be cancelled.)
740*cda5da8dSAndroid Build Coastguard Worker
741*cda5da8dSAndroid Build Coastguard Worker    If *return_exceptions* is False, cancelling gather() after it
742*cda5da8dSAndroid Build Coastguard Worker    has been marked done won't cancel any submitted awaitables.
743*cda5da8dSAndroid Build Coastguard Worker    For instance, gather can be marked done after propagating an
744*cda5da8dSAndroid Build Coastguard Worker    exception to the caller, therefore, calling ``gather.cancel()``
745*cda5da8dSAndroid Build Coastguard Worker    after catching an exception (raised by one of the awaitables) from
746*cda5da8dSAndroid Build Coastguard Worker    gather won't cancel any other awaitables.
747*cda5da8dSAndroid Build Coastguard Worker    """
748*cda5da8dSAndroid Build Coastguard Worker    if not coros_or_futures:
749*cda5da8dSAndroid Build Coastguard Worker        loop = events._get_event_loop()
750*cda5da8dSAndroid Build Coastguard Worker        outer = loop.create_future()
751*cda5da8dSAndroid Build Coastguard Worker        outer.set_result([])
752*cda5da8dSAndroid Build Coastguard Worker        return outer
753*cda5da8dSAndroid Build Coastguard Worker
754*cda5da8dSAndroid Build Coastguard Worker    def _done_callback(fut):
755*cda5da8dSAndroid Build Coastguard Worker        nonlocal nfinished
756*cda5da8dSAndroid Build Coastguard Worker        nfinished += 1
757*cda5da8dSAndroid Build Coastguard Worker
758*cda5da8dSAndroid Build Coastguard Worker        if outer is None or outer.done():
759*cda5da8dSAndroid Build Coastguard Worker            if not fut.cancelled():
760*cda5da8dSAndroid Build Coastguard Worker                # Mark exception retrieved.
761*cda5da8dSAndroid Build Coastguard Worker                fut.exception()
762*cda5da8dSAndroid Build Coastguard Worker            return
763*cda5da8dSAndroid Build Coastguard Worker
764*cda5da8dSAndroid Build Coastguard Worker        if not return_exceptions:
765*cda5da8dSAndroid Build Coastguard Worker            if fut.cancelled():
766*cda5da8dSAndroid Build Coastguard Worker                # Check if 'fut' is cancelled first, as
767*cda5da8dSAndroid Build Coastguard Worker                # 'fut.exception()' will *raise* a CancelledError
768*cda5da8dSAndroid Build Coastguard Worker                # instead of returning it.
769*cda5da8dSAndroid Build Coastguard Worker                exc = fut._make_cancelled_error()
770*cda5da8dSAndroid Build Coastguard Worker                outer.set_exception(exc)
771*cda5da8dSAndroid Build Coastguard Worker                return
772*cda5da8dSAndroid Build Coastguard Worker            else:
773*cda5da8dSAndroid Build Coastguard Worker                exc = fut.exception()
774*cda5da8dSAndroid Build Coastguard Worker                if exc is not None:
775*cda5da8dSAndroid Build Coastguard Worker                    outer.set_exception(exc)
776*cda5da8dSAndroid Build Coastguard Worker                    return
777*cda5da8dSAndroid Build Coastguard Worker
778*cda5da8dSAndroid Build Coastguard Worker        if nfinished == nfuts:
779*cda5da8dSAndroid Build Coastguard Worker            # All futures are done; create a list of results
780*cda5da8dSAndroid Build Coastguard Worker            # and set it to the 'outer' future.
781*cda5da8dSAndroid Build Coastguard Worker            results = []
782*cda5da8dSAndroid Build Coastguard Worker
783*cda5da8dSAndroid Build Coastguard Worker            for fut in children:
784*cda5da8dSAndroid Build Coastguard Worker                if fut.cancelled():
785*cda5da8dSAndroid Build Coastguard Worker                    # Check if 'fut' is cancelled first, as 'fut.exception()'
786*cda5da8dSAndroid Build Coastguard Worker                    # will *raise* a CancelledError instead of returning it.
787*cda5da8dSAndroid Build Coastguard Worker                    # Also, since we're adding the exception return value
788*cda5da8dSAndroid Build Coastguard Worker                    # to 'results' instead of raising it, don't bother
789*cda5da8dSAndroid Build Coastguard Worker                    # setting __context__.  This also lets us preserve
790*cda5da8dSAndroid Build Coastguard Worker                    # calling '_make_cancelled_error()' at most once.
791*cda5da8dSAndroid Build Coastguard Worker                    res = exceptions.CancelledError(
792*cda5da8dSAndroid Build Coastguard Worker                        '' if fut._cancel_message is None else
793*cda5da8dSAndroid Build Coastguard Worker                        fut._cancel_message)
794*cda5da8dSAndroid Build Coastguard Worker                else:
795*cda5da8dSAndroid Build Coastguard Worker                    res = fut.exception()
796*cda5da8dSAndroid Build Coastguard Worker                    if res is None:
797*cda5da8dSAndroid Build Coastguard Worker                        res = fut.result()
798*cda5da8dSAndroid Build Coastguard Worker                results.append(res)
799*cda5da8dSAndroid Build Coastguard Worker
800*cda5da8dSAndroid Build Coastguard Worker            if outer._cancel_requested:
801*cda5da8dSAndroid Build Coastguard Worker                # If gather is being cancelled we must propagate the
802*cda5da8dSAndroid Build Coastguard Worker                # cancellation regardless of *return_exceptions* argument.
803*cda5da8dSAndroid Build Coastguard Worker                # See issue 32684.
804*cda5da8dSAndroid Build Coastguard Worker                exc = fut._make_cancelled_error()
805*cda5da8dSAndroid Build Coastguard Worker                outer.set_exception(exc)
806*cda5da8dSAndroid Build Coastguard Worker            else:
807*cda5da8dSAndroid Build Coastguard Worker                outer.set_result(results)
808*cda5da8dSAndroid Build Coastguard Worker
809*cda5da8dSAndroid Build Coastguard Worker    arg_to_fut = {}
810*cda5da8dSAndroid Build Coastguard Worker    children = []
811*cda5da8dSAndroid Build Coastguard Worker    nfuts = 0
812*cda5da8dSAndroid Build Coastguard Worker    nfinished = 0
813*cda5da8dSAndroid Build Coastguard Worker    loop = None
814*cda5da8dSAndroid Build Coastguard Worker    outer = None  # bpo-46672
815*cda5da8dSAndroid Build Coastguard Worker    for arg in coros_or_futures:
816*cda5da8dSAndroid Build Coastguard Worker        if arg not in arg_to_fut:
817*cda5da8dSAndroid Build Coastguard Worker            fut = _ensure_future(arg, loop=loop)
818*cda5da8dSAndroid Build Coastguard Worker            if loop is None:
819*cda5da8dSAndroid Build Coastguard Worker                loop = futures._get_loop(fut)
820*cda5da8dSAndroid Build Coastguard Worker            if fut is not arg:
821*cda5da8dSAndroid Build Coastguard Worker                # 'arg' was not a Future, therefore, 'fut' is a new
822*cda5da8dSAndroid Build Coastguard Worker                # Future created specifically for 'arg'.  Since the caller
823*cda5da8dSAndroid Build Coastguard Worker                # can't control it, disable the "destroy pending task"
824*cda5da8dSAndroid Build Coastguard Worker                # warning.
825*cda5da8dSAndroid Build Coastguard Worker                fut._log_destroy_pending = False
826*cda5da8dSAndroid Build Coastguard Worker
827*cda5da8dSAndroid Build Coastguard Worker            nfuts += 1
828*cda5da8dSAndroid Build Coastguard Worker            arg_to_fut[arg] = fut
829*cda5da8dSAndroid Build Coastguard Worker            fut.add_done_callback(_done_callback)
830*cda5da8dSAndroid Build Coastguard Worker
831*cda5da8dSAndroid Build Coastguard Worker        else:
832*cda5da8dSAndroid Build Coastguard Worker            # There's a duplicate Future object in coros_or_futures.
833*cda5da8dSAndroid Build Coastguard Worker            fut = arg_to_fut[arg]
834*cda5da8dSAndroid Build Coastguard Worker
835*cda5da8dSAndroid Build Coastguard Worker        children.append(fut)
836*cda5da8dSAndroid Build Coastguard Worker
837*cda5da8dSAndroid Build Coastguard Worker    outer = _GatheringFuture(children, loop=loop)
838*cda5da8dSAndroid Build Coastguard Worker    return outer
839*cda5da8dSAndroid Build Coastguard Worker
840*cda5da8dSAndroid Build Coastguard Worker
841*cda5da8dSAndroid Build Coastguard Workerdef shield(arg):
842*cda5da8dSAndroid Build Coastguard Worker    """Wait for a future, shielding it from cancellation.
843*cda5da8dSAndroid Build Coastguard Worker
844*cda5da8dSAndroid Build Coastguard Worker    The statement
845*cda5da8dSAndroid Build Coastguard Worker
846*cda5da8dSAndroid Build Coastguard Worker        task = asyncio.create_task(something())
847*cda5da8dSAndroid Build Coastguard Worker        res = await shield(task)
848*cda5da8dSAndroid Build Coastguard Worker
849*cda5da8dSAndroid Build Coastguard Worker    is exactly equivalent to the statement
850*cda5da8dSAndroid Build Coastguard Worker
851*cda5da8dSAndroid Build Coastguard Worker        res = await something()
852*cda5da8dSAndroid Build Coastguard Worker
853*cda5da8dSAndroid Build Coastguard Worker    *except* that if the coroutine containing it is cancelled, the
854*cda5da8dSAndroid Build Coastguard Worker    task running in something() is not cancelled.  From the POV of
855*cda5da8dSAndroid Build Coastguard Worker    something(), the cancellation did not happen.  But its caller is
856*cda5da8dSAndroid Build Coastguard Worker    still cancelled, so the yield-from expression still raises
857*cda5da8dSAndroid Build Coastguard Worker    CancelledError.  Note: If something() is cancelled by other means
858*cda5da8dSAndroid Build Coastguard Worker    this will still cancel shield().
859*cda5da8dSAndroid Build Coastguard Worker
860*cda5da8dSAndroid Build Coastguard Worker    If you want to completely ignore cancellation (not recommended)
861*cda5da8dSAndroid Build Coastguard Worker    you can combine shield() with a try/except clause, as follows:
862*cda5da8dSAndroid Build Coastguard Worker
863*cda5da8dSAndroid Build Coastguard Worker        task = asyncio.create_task(something())
864*cda5da8dSAndroid Build Coastguard Worker        try:
865*cda5da8dSAndroid Build Coastguard Worker            res = await shield(task)
866*cda5da8dSAndroid Build Coastguard Worker        except CancelledError:
867*cda5da8dSAndroid Build Coastguard Worker            res = None
868*cda5da8dSAndroid Build Coastguard Worker
869*cda5da8dSAndroid Build Coastguard Worker    Save a reference to tasks passed to this function, to avoid
870*cda5da8dSAndroid Build Coastguard Worker    a task disappearing mid-execution. The event loop only keeps
871*cda5da8dSAndroid Build Coastguard Worker    weak references to tasks. A task that isn't referenced elsewhere
872*cda5da8dSAndroid Build Coastguard Worker    may get garbage collected at any time, even before it's done.
873*cda5da8dSAndroid Build Coastguard Worker    """
874*cda5da8dSAndroid Build Coastguard Worker    inner = _ensure_future(arg)
875*cda5da8dSAndroid Build Coastguard Worker    if inner.done():
876*cda5da8dSAndroid Build Coastguard Worker        # Shortcut.
877*cda5da8dSAndroid Build Coastguard Worker        return inner
878*cda5da8dSAndroid Build Coastguard Worker    loop = futures._get_loop(inner)
879*cda5da8dSAndroid Build Coastguard Worker    outer = loop.create_future()
880*cda5da8dSAndroid Build Coastguard Worker
881*cda5da8dSAndroid Build Coastguard Worker    def _inner_done_callback(inner):
882*cda5da8dSAndroid Build Coastguard Worker        if outer.cancelled():
883*cda5da8dSAndroid Build Coastguard Worker            if not inner.cancelled():
884*cda5da8dSAndroid Build Coastguard Worker                # Mark inner's result as retrieved.
885*cda5da8dSAndroid Build Coastguard Worker                inner.exception()
886*cda5da8dSAndroid Build Coastguard Worker            return
887*cda5da8dSAndroid Build Coastguard Worker
888*cda5da8dSAndroid Build Coastguard Worker        if inner.cancelled():
889*cda5da8dSAndroid Build Coastguard Worker            outer.cancel()
890*cda5da8dSAndroid Build Coastguard Worker        else:
891*cda5da8dSAndroid Build Coastguard Worker            exc = inner.exception()
892*cda5da8dSAndroid Build Coastguard Worker            if exc is not None:
893*cda5da8dSAndroid Build Coastguard Worker                outer.set_exception(exc)
894*cda5da8dSAndroid Build Coastguard Worker            else:
895*cda5da8dSAndroid Build Coastguard Worker                outer.set_result(inner.result())
896*cda5da8dSAndroid Build Coastguard Worker
897*cda5da8dSAndroid Build Coastguard Worker
898*cda5da8dSAndroid Build Coastguard Worker    def _outer_done_callback(outer):
899*cda5da8dSAndroid Build Coastguard Worker        if not inner.done():
900*cda5da8dSAndroid Build Coastguard Worker            inner.remove_done_callback(_inner_done_callback)
901*cda5da8dSAndroid Build Coastguard Worker
902*cda5da8dSAndroid Build Coastguard Worker    inner.add_done_callback(_inner_done_callback)
903*cda5da8dSAndroid Build Coastguard Worker    outer.add_done_callback(_outer_done_callback)
904*cda5da8dSAndroid Build Coastguard Worker    return outer
905*cda5da8dSAndroid Build Coastguard Worker
906*cda5da8dSAndroid Build Coastguard Worker
907*cda5da8dSAndroid Build Coastguard Workerdef run_coroutine_threadsafe(coro, loop):
908*cda5da8dSAndroid Build Coastguard Worker    """Submit a coroutine object to a given event loop.
909*cda5da8dSAndroid Build Coastguard Worker
910*cda5da8dSAndroid Build Coastguard Worker    Return a concurrent.futures.Future to access the result.
911*cda5da8dSAndroid Build Coastguard Worker    """
912*cda5da8dSAndroid Build Coastguard Worker    if not coroutines.iscoroutine(coro):
913*cda5da8dSAndroid Build Coastguard Worker        raise TypeError('A coroutine object is required')
914*cda5da8dSAndroid Build Coastguard Worker    future = concurrent.futures.Future()
915*cda5da8dSAndroid Build Coastguard Worker
916*cda5da8dSAndroid Build Coastguard Worker    def callback():
917*cda5da8dSAndroid Build Coastguard Worker        try:
918*cda5da8dSAndroid Build Coastguard Worker            futures._chain_future(ensure_future(coro, loop=loop), future)
919*cda5da8dSAndroid Build Coastguard Worker        except (SystemExit, KeyboardInterrupt):
920*cda5da8dSAndroid Build Coastguard Worker            raise
921*cda5da8dSAndroid Build Coastguard Worker        except BaseException as exc:
922*cda5da8dSAndroid Build Coastguard Worker            if future.set_running_or_notify_cancel():
923*cda5da8dSAndroid Build Coastguard Worker                future.set_exception(exc)
924*cda5da8dSAndroid Build Coastguard Worker            raise
925*cda5da8dSAndroid Build Coastguard Worker
926*cda5da8dSAndroid Build Coastguard Worker    loop.call_soon_threadsafe(callback)
927*cda5da8dSAndroid Build Coastguard Worker    return future
928*cda5da8dSAndroid Build Coastguard Worker
929*cda5da8dSAndroid Build Coastguard Worker
930*cda5da8dSAndroid Build Coastguard Worker# WeakSet containing all alive tasks.
931*cda5da8dSAndroid Build Coastguard Worker_all_tasks = weakref.WeakSet()
932*cda5da8dSAndroid Build Coastguard Worker
933*cda5da8dSAndroid Build Coastguard Worker# Dictionary containing tasks that are currently active in
934*cda5da8dSAndroid Build Coastguard Worker# all running event loops.  {EventLoop: Task}
935*cda5da8dSAndroid Build Coastguard Worker_current_tasks = {}
936*cda5da8dSAndroid Build Coastguard Worker
937*cda5da8dSAndroid Build Coastguard Worker
938*cda5da8dSAndroid Build Coastguard Workerdef _register_task(task):
939*cda5da8dSAndroid Build Coastguard Worker    """Register a new task in asyncio as executed by loop."""
940*cda5da8dSAndroid Build Coastguard Worker    _all_tasks.add(task)
941*cda5da8dSAndroid Build Coastguard Worker
942*cda5da8dSAndroid Build Coastguard Worker
943*cda5da8dSAndroid Build Coastguard Workerdef _enter_task(loop, task):
944*cda5da8dSAndroid Build Coastguard Worker    current_task = _current_tasks.get(loop)
945*cda5da8dSAndroid Build Coastguard Worker    if current_task is not None:
946*cda5da8dSAndroid Build Coastguard Worker        raise RuntimeError(f"Cannot enter into task {task!r} while another "
947*cda5da8dSAndroid Build Coastguard Worker                           f"task {current_task!r} is being executed.")
948*cda5da8dSAndroid Build Coastguard Worker    _current_tasks[loop] = task
949*cda5da8dSAndroid Build Coastguard Worker
950*cda5da8dSAndroid Build Coastguard Worker
951*cda5da8dSAndroid Build Coastguard Workerdef _leave_task(loop, task):
952*cda5da8dSAndroid Build Coastguard Worker    current_task = _current_tasks.get(loop)
953*cda5da8dSAndroid Build Coastguard Worker    if current_task is not task:
954*cda5da8dSAndroid Build Coastguard Worker        raise RuntimeError(f"Leaving task {task!r} does not match "
955*cda5da8dSAndroid Build Coastguard Worker                           f"the current task {current_task!r}.")
956*cda5da8dSAndroid Build Coastguard Worker    del _current_tasks[loop]
957*cda5da8dSAndroid Build Coastguard Worker
958*cda5da8dSAndroid Build Coastguard Worker
959*cda5da8dSAndroid Build Coastguard Workerdef _unregister_task(task):
960*cda5da8dSAndroid Build Coastguard Worker    """Unregister a task."""
961*cda5da8dSAndroid Build Coastguard Worker    _all_tasks.discard(task)
962*cda5da8dSAndroid Build Coastguard Worker
963*cda5da8dSAndroid Build Coastguard Worker
964*cda5da8dSAndroid Build Coastguard Worker_py_register_task = _register_task
965*cda5da8dSAndroid Build Coastguard Worker_py_unregister_task = _unregister_task
966*cda5da8dSAndroid Build Coastguard Worker_py_enter_task = _enter_task
967*cda5da8dSAndroid Build Coastguard Worker_py_leave_task = _leave_task
968*cda5da8dSAndroid Build Coastguard Worker
969*cda5da8dSAndroid Build Coastguard Worker
970*cda5da8dSAndroid Build Coastguard Workertry:
971*cda5da8dSAndroid Build Coastguard Worker    from _asyncio import (_register_task, _unregister_task,
972*cda5da8dSAndroid Build Coastguard Worker                          _enter_task, _leave_task,
973*cda5da8dSAndroid Build Coastguard Worker                          _all_tasks, _current_tasks)
974*cda5da8dSAndroid Build Coastguard Workerexcept ImportError:
975*cda5da8dSAndroid Build Coastguard Worker    pass
976*cda5da8dSAndroid Build Coastguard Workerelse:
977*cda5da8dSAndroid Build Coastguard Worker    _c_register_task = _register_task
978*cda5da8dSAndroid Build Coastguard Worker    _c_unregister_task = _unregister_task
979*cda5da8dSAndroid Build Coastguard Worker    _c_enter_task = _enter_task
980*cda5da8dSAndroid Build Coastguard Worker    _c_leave_task = _leave_task
981