xref: /aosp_15_r20/prebuilts/build-tools/common/py3-stdlib/asyncio/windows_events.py (revision cda5da8d549138a6648c5ee6d7a49cf8f4a657be)
1*cda5da8dSAndroid Build Coastguard Worker"""Selector and proactor event loops for Windows."""
2*cda5da8dSAndroid Build Coastguard Worker
3*cda5da8dSAndroid Build Coastguard Workerimport sys
4*cda5da8dSAndroid Build Coastguard Worker
5*cda5da8dSAndroid Build Coastguard Workerif sys.platform != 'win32':  # pragma: no cover
6*cda5da8dSAndroid Build Coastguard Worker    raise ImportError('win32 only')
7*cda5da8dSAndroid Build Coastguard Worker
8*cda5da8dSAndroid Build Coastguard Workerimport _overlapped
9*cda5da8dSAndroid Build Coastguard Workerimport _winapi
10*cda5da8dSAndroid Build Coastguard Workerimport errno
11*cda5da8dSAndroid Build Coastguard Workerimport math
12*cda5da8dSAndroid Build Coastguard Workerimport msvcrt
13*cda5da8dSAndroid Build Coastguard Workerimport socket
14*cda5da8dSAndroid Build Coastguard Workerimport struct
15*cda5da8dSAndroid Build Coastguard Workerimport time
16*cda5da8dSAndroid Build Coastguard Workerimport weakref
17*cda5da8dSAndroid Build Coastguard Worker
18*cda5da8dSAndroid Build Coastguard Workerfrom . import events
19*cda5da8dSAndroid Build Coastguard Workerfrom . import base_subprocess
20*cda5da8dSAndroid Build Coastguard Workerfrom . import futures
21*cda5da8dSAndroid Build Coastguard Workerfrom . import exceptions
22*cda5da8dSAndroid Build Coastguard Workerfrom . import proactor_events
23*cda5da8dSAndroid Build Coastguard Workerfrom . import selector_events
24*cda5da8dSAndroid Build Coastguard Workerfrom . import tasks
25*cda5da8dSAndroid Build Coastguard Workerfrom . import windows_utils
26*cda5da8dSAndroid Build Coastguard Workerfrom .log import logger
27*cda5da8dSAndroid Build Coastguard Worker
28*cda5da8dSAndroid Build Coastguard Worker
29*cda5da8dSAndroid Build Coastguard Worker__all__ = (
30*cda5da8dSAndroid Build Coastguard Worker    'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
31*cda5da8dSAndroid Build Coastguard Worker    'DefaultEventLoopPolicy', 'WindowsSelectorEventLoopPolicy',
32*cda5da8dSAndroid Build Coastguard Worker    'WindowsProactorEventLoopPolicy',
33*cda5da8dSAndroid Build Coastguard Worker)
34*cda5da8dSAndroid Build Coastguard Worker
35*cda5da8dSAndroid Build Coastguard Worker
36*cda5da8dSAndroid Build Coastguard WorkerNULL = _winapi.NULL
37*cda5da8dSAndroid Build Coastguard WorkerINFINITE = _winapi.INFINITE
38*cda5da8dSAndroid Build Coastguard WorkerERROR_CONNECTION_REFUSED = 1225
39*cda5da8dSAndroid Build Coastguard WorkerERROR_CONNECTION_ABORTED = 1236
40*cda5da8dSAndroid Build Coastguard Worker
41*cda5da8dSAndroid Build Coastguard Worker# Initial delay in seconds for connect_pipe() before retrying to connect
42*cda5da8dSAndroid Build Coastguard WorkerCONNECT_PIPE_INIT_DELAY = 0.001
43*cda5da8dSAndroid Build Coastguard Worker
44*cda5da8dSAndroid Build Coastguard Worker# Maximum delay in seconds for connect_pipe() before retrying to connect
45*cda5da8dSAndroid Build Coastguard WorkerCONNECT_PIPE_MAX_DELAY = 0.100
46*cda5da8dSAndroid Build Coastguard Worker
47*cda5da8dSAndroid Build Coastguard Worker
48*cda5da8dSAndroid Build Coastguard Workerclass _OverlappedFuture(futures.Future):
49*cda5da8dSAndroid Build Coastguard Worker    """Subclass of Future which represents an overlapped operation.
50*cda5da8dSAndroid Build Coastguard Worker
51*cda5da8dSAndroid Build Coastguard Worker    Cancelling it will immediately cancel the overlapped operation.
52*cda5da8dSAndroid Build Coastguard Worker    """
53*cda5da8dSAndroid Build Coastguard Worker
54*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, ov, *, loop=None):
55*cda5da8dSAndroid Build Coastguard Worker        super().__init__(loop=loop)
56*cda5da8dSAndroid Build Coastguard Worker        if self._source_traceback:
57*cda5da8dSAndroid Build Coastguard Worker            del self._source_traceback[-1]
58*cda5da8dSAndroid Build Coastguard Worker        self._ov = ov
59*cda5da8dSAndroid Build Coastguard Worker
60*cda5da8dSAndroid Build Coastguard Worker    def _repr_info(self):
61*cda5da8dSAndroid Build Coastguard Worker        info = super()._repr_info()
62*cda5da8dSAndroid Build Coastguard Worker        if self._ov is not None:
63*cda5da8dSAndroid Build Coastguard Worker            state = 'pending' if self._ov.pending else 'completed'
64*cda5da8dSAndroid Build Coastguard Worker            info.insert(1, f'overlapped=<{state}, {self._ov.address:#x}>')
65*cda5da8dSAndroid Build Coastguard Worker        return info
66*cda5da8dSAndroid Build Coastguard Worker
67*cda5da8dSAndroid Build Coastguard Worker    def _cancel_overlapped(self):
68*cda5da8dSAndroid Build Coastguard Worker        if self._ov is None:
69*cda5da8dSAndroid Build Coastguard Worker            return
70*cda5da8dSAndroid Build Coastguard Worker        try:
71*cda5da8dSAndroid Build Coastguard Worker            self._ov.cancel()
72*cda5da8dSAndroid Build Coastguard Worker        except OSError as exc:
73*cda5da8dSAndroid Build Coastguard Worker            context = {
74*cda5da8dSAndroid Build Coastguard Worker                'message': 'Cancelling an overlapped future failed',
75*cda5da8dSAndroid Build Coastguard Worker                'exception': exc,
76*cda5da8dSAndroid Build Coastguard Worker                'future': self,
77*cda5da8dSAndroid Build Coastguard Worker            }
78*cda5da8dSAndroid Build Coastguard Worker            if self._source_traceback:
79*cda5da8dSAndroid Build Coastguard Worker                context['source_traceback'] = self._source_traceback
80*cda5da8dSAndroid Build Coastguard Worker            self._loop.call_exception_handler(context)
81*cda5da8dSAndroid Build Coastguard Worker        self._ov = None
82*cda5da8dSAndroid Build Coastguard Worker
83*cda5da8dSAndroid Build Coastguard Worker    def cancel(self, msg=None):
84*cda5da8dSAndroid Build Coastguard Worker        self._cancel_overlapped()
85*cda5da8dSAndroid Build Coastguard Worker        return super().cancel(msg=msg)
86*cda5da8dSAndroid Build Coastguard Worker
87*cda5da8dSAndroid Build Coastguard Worker    def set_exception(self, exception):
88*cda5da8dSAndroid Build Coastguard Worker        super().set_exception(exception)
89*cda5da8dSAndroid Build Coastguard Worker        self._cancel_overlapped()
90*cda5da8dSAndroid Build Coastguard Worker
91*cda5da8dSAndroid Build Coastguard Worker    def set_result(self, result):
92*cda5da8dSAndroid Build Coastguard Worker        super().set_result(result)
93*cda5da8dSAndroid Build Coastguard Worker        self._ov = None
94*cda5da8dSAndroid Build Coastguard Worker
95*cda5da8dSAndroid Build Coastguard Worker
96*cda5da8dSAndroid Build Coastguard Workerclass _BaseWaitHandleFuture(futures.Future):
97*cda5da8dSAndroid Build Coastguard Worker    """Subclass of Future which represents a wait handle."""
98*cda5da8dSAndroid Build Coastguard Worker
99*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, ov, handle, wait_handle, *, loop=None):
100*cda5da8dSAndroid Build Coastguard Worker        super().__init__(loop=loop)
101*cda5da8dSAndroid Build Coastguard Worker        if self._source_traceback:
102*cda5da8dSAndroid Build Coastguard Worker            del self._source_traceback[-1]
103*cda5da8dSAndroid Build Coastguard Worker        # Keep a reference to the Overlapped object to keep it alive until the
104*cda5da8dSAndroid Build Coastguard Worker        # wait is unregistered
105*cda5da8dSAndroid Build Coastguard Worker        self._ov = ov
106*cda5da8dSAndroid Build Coastguard Worker        self._handle = handle
107*cda5da8dSAndroid Build Coastguard Worker        self._wait_handle = wait_handle
108*cda5da8dSAndroid Build Coastguard Worker
109*cda5da8dSAndroid Build Coastguard Worker        # Should we call UnregisterWaitEx() if the wait completes
110*cda5da8dSAndroid Build Coastguard Worker        # or is cancelled?
111*cda5da8dSAndroid Build Coastguard Worker        self._registered = True
112*cda5da8dSAndroid Build Coastguard Worker
113*cda5da8dSAndroid Build Coastguard Worker    def _poll(self):
114*cda5da8dSAndroid Build Coastguard Worker        # non-blocking wait: use a timeout of 0 millisecond
115*cda5da8dSAndroid Build Coastguard Worker        return (_winapi.WaitForSingleObject(self._handle, 0) ==
116*cda5da8dSAndroid Build Coastguard Worker                _winapi.WAIT_OBJECT_0)
117*cda5da8dSAndroid Build Coastguard Worker
118*cda5da8dSAndroid Build Coastguard Worker    def _repr_info(self):
119*cda5da8dSAndroid Build Coastguard Worker        info = super()._repr_info()
120*cda5da8dSAndroid Build Coastguard Worker        info.append(f'handle={self._handle:#x}')
121*cda5da8dSAndroid Build Coastguard Worker        if self._handle is not None:
122*cda5da8dSAndroid Build Coastguard Worker            state = 'signaled' if self._poll() else 'waiting'
123*cda5da8dSAndroid Build Coastguard Worker            info.append(state)
124*cda5da8dSAndroid Build Coastguard Worker        if self._wait_handle is not None:
125*cda5da8dSAndroid Build Coastguard Worker            info.append(f'wait_handle={self._wait_handle:#x}')
126*cda5da8dSAndroid Build Coastguard Worker        return info
127*cda5da8dSAndroid Build Coastguard Worker
128*cda5da8dSAndroid Build Coastguard Worker    def _unregister_wait_cb(self, fut):
129*cda5da8dSAndroid Build Coastguard Worker        # The wait was unregistered: it's not safe to destroy the Overlapped
130*cda5da8dSAndroid Build Coastguard Worker        # object
131*cda5da8dSAndroid Build Coastguard Worker        self._ov = None
132*cda5da8dSAndroid Build Coastguard Worker
133*cda5da8dSAndroid Build Coastguard Worker    def _unregister_wait(self):
134*cda5da8dSAndroid Build Coastguard Worker        if not self._registered:
135*cda5da8dSAndroid Build Coastguard Worker            return
136*cda5da8dSAndroid Build Coastguard Worker        self._registered = False
137*cda5da8dSAndroid Build Coastguard Worker
138*cda5da8dSAndroid Build Coastguard Worker        wait_handle = self._wait_handle
139*cda5da8dSAndroid Build Coastguard Worker        self._wait_handle = None
140*cda5da8dSAndroid Build Coastguard Worker        try:
141*cda5da8dSAndroid Build Coastguard Worker            _overlapped.UnregisterWait(wait_handle)
142*cda5da8dSAndroid Build Coastguard Worker        except OSError as exc:
143*cda5da8dSAndroid Build Coastguard Worker            if exc.winerror != _overlapped.ERROR_IO_PENDING:
144*cda5da8dSAndroid Build Coastguard Worker                context = {
145*cda5da8dSAndroid Build Coastguard Worker                    'message': 'Failed to unregister the wait handle',
146*cda5da8dSAndroid Build Coastguard Worker                    'exception': exc,
147*cda5da8dSAndroid Build Coastguard Worker                    'future': self,
148*cda5da8dSAndroid Build Coastguard Worker                }
149*cda5da8dSAndroid Build Coastguard Worker                if self._source_traceback:
150*cda5da8dSAndroid Build Coastguard Worker                    context['source_traceback'] = self._source_traceback
151*cda5da8dSAndroid Build Coastguard Worker                self._loop.call_exception_handler(context)
152*cda5da8dSAndroid Build Coastguard Worker                return
153*cda5da8dSAndroid Build Coastguard Worker            # ERROR_IO_PENDING means that the unregister is pending
154*cda5da8dSAndroid Build Coastguard Worker
155*cda5da8dSAndroid Build Coastguard Worker        self._unregister_wait_cb(None)
156*cda5da8dSAndroid Build Coastguard Worker
157*cda5da8dSAndroid Build Coastguard Worker    def cancel(self, msg=None):
158*cda5da8dSAndroid Build Coastguard Worker        self._unregister_wait()
159*cda5da8dSAndroid Build Coastguard Worker        return super().cancel(msg=msg)
160*cda5da8dSAndroid Build Coastguard Worker
161*cda5da8dSAndroid Build Coastguard Worker    def set_exception(self, exception):
162*cda5da8dSAndroid Build Coastguard Worker        self._unregister_wait()
163*cda5da8dSAndroid Build Coastguard Worker        super().set_exception(exception)
164*cda5da8dSAndroid Build Coastguard Worker
165*cda5da8dSAndroid Build Coastguard Worker    def set_result(self, result):
166*cda5da8dSAndroid Build Coastguard Worker        self._unregister_wait()
167*cda5da8dSAndroid Build Coastguard Worker        super().set_result(result)
168*cda5da8dSAndroid Build Coastguard Worker
169*cda5da8dSAndroid Build Coastguard Worker
170*cda5da8dSAndroid Build Coastguard Workerclass _WaitCancelFuture(_BaseWaitHandleFuture):
171*cda5da8dSAndroid Build Coastguard Worker    """Subclass of Future which represents a wait for the cancellation of a
172*cda5da8dSAndroid Build Coastguard Worker    _WaitHandleFuture using an event.
173*cda5da8dSAndroid Build Coastguard Worker    """
174*cda5da8dSAndroid Build Coastguard Worker
175*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, ov, event, wait_handle, *, loop=None):
176*cda5da8dSAndroid Build Coastguard Worker        super().__init__(ov, event, wait_handle, loop=loop)
177*cda5da8dSAndroid Build Coastguard Worker
178*cda5da8dSAndroid Build Coastguard Worker        self._done_callback = None
179*cda5da8dSAndroid Build Coastguard Worker
180*cda5da8dSAndroid Build Coastguard Worker    def cancel(self):
181*cda5da8dSAndroid Build Coastguard Worker        raise RuntimeError("_WaitCancelFuture must not be cancelled")
182*cda5da8dSAndroid Build Coastguard Worker
183*cda5da8dSAndroid Build Coastguard Worker    def set_result(self, result):
184*cda5da8dSAndroid Build Coastguard Worker        super().set_result(result)
185*cda5da8dSAndroid Build Coastguard Worker        if self._done_callback is not None:
186*cda5da8dSAndroid Build Coastguard Worker            self._done_callback(self)
187*cda5da8dSAndroid Build Coastguard Worker
188*cda5da8dSAndroid Build Coastguard Worker    def set_exception(self, exception):
189*cda5da8dSAndroid Build Coastguard Worker        super().set_exception(exception)
190*cda5da8dSAndroid Build Coastguard Worker        if self._done_callback is not None:
191*cda5da8dSAndroid Build Coastguard Worker            self._done_callback(self)
192*cda5da8dSAndroid Build Coastguard Worker
193*cda5da8dSAndroid Build Coastguard Worker
194*cda5da8dSAndroid Build Coastguard Workerclass _WaitHandleFuture(_BaseWaitHandleFuture):
195*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, ov, handle, wait_handle, proactor, *, loop=None):
196*cda5da8dSAndroid Build Coastguard Worker        super().__init__(ov, handle, wait_handle, loop=loop)
197*cda5da8dSAndroid Build Coastguard Worker        self._proactor = proactor
198*cda5da8dSAndroid Build Coastguard Worker        self._unregister_proactor = True
199*cda5da8dSAndroid Build Coastguard Worker        self._event = _overlapped.CreateEvent(None, True, False, None)
200*cda5da8dSAndroid Build Coastguard Worker        self._event_fut = None
201*cda5da8dSAndroid Build Coastguard Worker
202*cda5da8dSAndroid Build Coastguard Worker    def _unregister_wait_cb(self, fut):
203*cda5da8dSAndroid Build Coastguard Worker        if self._event is not None:
204*cda5da8dSAndroid Build Coastguard Worker            _winapi.CloseHandle(self._event)
205*cda5da8dSAndroid Build Coastguard Worker            self._event = None
206*cda5da8dSAndroid Build Coastguard Worker            self._event_fut = None
207*cda5da8dSAndroid Build Coastguard Worker
208*cda5da8dSAndroid Build Coastguard Worker        # If the wait was cancelled, the wait may never be signalled, so
209*cda5da8dSAndroid Build Coastguard Worker        # it's required to unregister it. Otherwise, IocpProactor.close() will
210*cda5da8dSAndroid Build Coastguard Worker        # wait forever for an event which will never come.
211*cda5da8dSAndroid Build Coastguard Worker        #
212*cda5da8dSAndroid Build Coastguard Worker        # If the IocpProactor already received the event, it's safe to call
213*cda5da8dSAndroid Build Coastguard Worker        # _unregister() because we kept a reference to the Overlapped object
214*cda5da8dSAndroid Build Coastguard Worker        # which is used as a unique key.
215*cda5da8dSAndroid Build Coastguard Worker        self._proactor._unregister(self._ov)
216*cda5da8dSAndroid Build Coastguard Worker        self._proactor = None
217*cda5da8dSAndroid Build Coastguard Worker
218*cda5da8dSAndroid Build Coastguard Worker        super()._unregister_wait_cb(fut)
219*cda5da8dSAndroid Build Coastguard Worker
220*cda5da8dSAndroid Build Coastguard Worker    def _unregister_wait(self):
221*cda5da8dSAndroid Build Coastguard Worker        if not self._registered:
222*cda5da8dSAndroid Build Coastguard Worker            return
223*cda5da8dSAndroid Build Coastguard Worker        self._registered = False
224*cda5da8dSAndroid Build Coastguard Worker
225*cda5da8dSAndroid Build Coastguard Worker        wait_handle = self._wait_handle
226*cda5da8dSAndroid Build Coastguard Worker        self._wait_handle = None
227*cda5da8dSAndroid Build Coastguard Worker        try:
228*cda5da8dSAndroid Build Coastguard Worker            _overlapped.UnregisterWaitEx(wait_handle, self._event)
229*cda5da8dSAndroid Build Coastguard Worker        except OSError as exc:
230*cda5da8dSAndroid Build Coastguard Worker            if exc.winerror != _overlapped.ERROR_IO_PENDING:
231*cda5da8dSAndroid Build Coastguard Worker                context = {
232*cda5da8dSAndroid Build Coastguard Worker                    'message': 'Failed to unregister the wait handle',
233*cda5da8dSAndroid Build Coastguard Worker                    'exception': exc,
234*cda5da8dSAndroid Build Coastguard Worker                    'future': self,
235*cda5da8dSAndroid Build Coastguard Worker                }
236*cda5da8dSAndroid Build Coastguard Worker                if self._source_traceback:
237*cda5da8dSAndroid Build Coastguard Worker                    context['source_traceback'] = self._source_traceback
238*cda5da8dSAndroid Build Coastguard Worker                self._loop.call_exception_handler(context)
239*cda5da8dSAndroid Build Coastguard Worker                return
240*cda5da8dSAndroid Build Coastguard Worker            # ERROR_IO_PENDING is not an error, the wait was unregistered
241*cda5da8dSAndroid Build Coastguard Worker
242*cda5da8dSAndroid Build Coastguard Worker        self._event_fut = self._proactor._wait_cancel(self._event,
243*cda5da8dSAndroid Build Coastguard Worker                                                      self._unregister_wait_cb)
244*cda5da8dSAndroid Build Coastguard Worker
245*cda5da8dSAndroid Build Coastguard Worker
246*cda5da8dSAndroid Build Coastguard Workerclass PipeServer(object):
247*cda5da8dSAndroid Build Coastguard Worker    """Class representing a pipe server.
248*cda5da8dSAndroid Build Coastguard Worker
249*cda5da8dSAndroid Build Coastguard Worker    This is much like a bound, listening socket.
250*cda5da8dSAndroid Build Coastguard Worker    """
251*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, address):
252*cda5da8dSAndroid Build Coastguard Worker        self._address = address
253*cda5da8dSAndroid Build Coastguard Worker        self._free_instances = weakref.WeakSet()
254*cda5da8dSAndroid Build Coastguard Worker        # initialize the pipe attribute before calling _server_pipe_handle()
255*cda5da8dSAndroid Build Coastguard Worker        # because this function can raise an exception and the destructor calls
256*cda5da8dSAndroid Build Coastguard Worker        # the close() method
257*cda5da8dSAndroid Build Coastguard Worker        self._pipe = None
258*cda5da8dSAndroid Build Coastguard Worker        self._accept_pipe_future = None
259*cda5da8dSAndroid Build Coastguard Worker        self._pipe = self._server_pipe_handle(True)
260*cda5da8dSAndroid Build Coastguard Worker
261*cda5da8dSAndroid Build Coastguard Worker    def _get_unconnected_pipe(self):
262*cda5da8dSAndroid Build Coastguard Worker        # Create new instance and return previous one.  This ensures
263*cda5da8dSAndroid Build Coastguard Worker        # that (until the server is closed) there is always at least
264*cda5da8dSAndroid Build Coastguard Worker        # one pipe handle for address.  Therefore if a client attempt
265*cda5da8dSAndroid Build Coastguard Worker        # to connect it will not fail with FileNotFoundError.
266*cda5da8dSAndroid Build Coastguard Worker        tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
267*cda5da8dSAndroid Build Coastguard Worker        return tmp
268*cda5da8dSAndroid Build Coastguard Worker
269*cda5da8dSAndroid Build Coastguard Worker    def _server_pipe_handle(self, first):
270*cda5da8dSAndroid Build Coastguard Worker        # Return a wrapper for a new pipe handle.
271*cda5da8dSAndroid Build Coastguard Worker        if self.closed():
272*cda5da8dSAndroid Build Coastguard Worker            return None
273*cda5da8dSAndroid Build Coastguard Worker        flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
274*cda5da8dSAndroid Build Coastguard Worker        if first:
275*cda5da8dSAndroid Build Coastguard Worker            flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
276*cda5da8dSAndroid Build Coastguard Worker        h = _winapi.CreateNamedPipe(
277*cda5da8dSAndroid Build Coastguard Worker            self._address, flags,
278*cda5da8dSAndroid Build Coastguard Worker            _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
279*cda5da8dSAndroid Build Coastguard Worker            _winapi.PIPE_WAIT,
280*cda5da8dSAndroid Build Coastguard Worker            _winapi.PIPE_UNLIMITED_INSTANCES,
281*cda5da8dSAndroid Build Coastguard Worker            windows_utils.BUFSIZE, windows_utils.BUFSIZE,
282*cda5da8dSAndroid Build Coastguard Worker            _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
283*cda5da8dSAndroid Build Coastguard Worker        pipe = windows_utils.PipeHandle(h)
284*cda5da8dSAndroid Build Coastguard Worker        self._free_instances.add(pipe)
285*cda5da8dSAndroid Build Coastguard Worker        return pipe
286*cda5da8dSAndroid Build Coastguard Worker
287*cda5da8dSAndroid Build Coastguard Worker    def closed(self):
288*cda5da8dSAndroid Build Coastguard Worker        return (self._address is None)
289*cda5da8dSAndroid Build Coastguard Worker
290*cda5da8dSAndroid Build Coastguard Worker    def close(self):
291*cda5da8dSAndroid Build Coastguard Worker        if self._accept_pipe_future is not None:
292*cda5da8dSAndroid Build Coastguard Worker            self._accept_pipe_future.cancel()
293*cda5da8dSAndroid Build Coastguard Worker            self._accept_pipe_future = None
294*cda5da8dSAndroid Build Coastguard Worker        # Close all instances which have not been connected to by a client.
295*cda5da8dSAndroid Build Coastguard Worker        if self._address is not None:
296*cda5da8dSAndroid Build Coastguard Worker            for pipe in self._free_instances:
297*cda5da8dSAndroid Build Coastguard Worker                pipe.close()
298*cda5da8dSAndroid Build Coastguard Worker            self._pipe = None
299*cda5da8dSAndroid Build Coastguard Worker            self._address = None
300*cda5da8dSAndroid Build Coastguard Worker            self._free_instances.clear()
301*cda5da8dSAndroid Build Coastguard Worker
302*cda5da8dSAndroid Build Coastguard Worker    __del__ = close
303*cda5da8dSAndroid Build Coastguard Worker
304*cda5da8dSAndroid Build Coastguard Worker
305*cda5da8dSAndroid Build Coastguard Workerclass _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
306*cda5da8dSAndroid Build Coastguard Worker    """Windows version of selector event loop."""
307*cda5da8dSAndroid Build Coastguard Worker
308*cda5da8dSAndroid Build Coastguard Worker
309*cda5da8dSAndroid Build Coastguard Workerclass ProactorEventLoop(proactor_events.BaseProactorEventLoop):
310*cda5da8dSAndroid Build Coastguard Worker    """Windows version of proactor event loop using IOCP."""
311*cda5da8dSAndroid Build Coastguard Worker
312*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, proactor=None):
313*cda5da8dSAndroid Build Coastguard Worker        if proactor is None:
314*cda5da8dSAndroid Build Coastguard Worker            proactor = IocpProactor()
315*cda5da8dSAndroid Build Coastguard Worker        super().__init__(proactor)
316*cda5da8dSAndroid Build Coastguard Worker
317*cda5da8dSAndroid Build Coastguard Worker    def run_forever(self):
318*cda5da8dSAndroid Build Coastguard Worker        try:
319*cda5da8dSAndroid Build Coastguard Worker            assert self._self_reading_future is None
320*cda5da8dSAndroid Build Coastguard Worker            self.call_soon(self._loop_self_reading)
321*cda5da8dSAndroid Build Coastguard Worker            super().run_forever()
322*cda5da8dSAndroid Build Coastguard Worker        finally:
323*cda5da8dSAndroid Build Coastguard Worker            if self._self_reading_future is not None:
324*cda5da8dSAndroid Build Coastguard Worker                ov = self._self_reading_future._ov
325*cda5da8dSAndroid Build Coastguard Worker                self._self_reading_future.cancel()
326*cda5da8dSAndroid Build Coastguard Worker                # self_reading_future was just cancelled so if it hasn't been
327*cda5da8dSAndroid Build Coastguard Worker                # finished yet, it never will be (it's possible that it has
328*cda5da8dSAndroid Build Coastguard Worker                # already finished and its callback is waiting in the queue,
329*cda5da8dSAndroid Build Coastguard Worker                # where it could still happen if the event loop is restarted).
330*cda5da8dSAndroid Build Coastguard Worker                # Unregister it otherwise IocpProactor.close will wait for it
331*cda5da8dSAndroid Build Coastguard Worker                # forever
332*cda5da8dSAndroid Build Coastguard Worker                if ov is not None:
333*cda5da8dSAndroid Build Coastguard Worker                    self._proactor._unregister(ov)
334*cda5da8dSAndroid Build Coastguard Worker                self._self_reading_future = None
335*cda5da8dSAndroid Build Coastguard Worker
336*cda5da8dSAndroid Build Coastguard Worker    async def create_pipe_connection(self, protocol_factory, address):
337*cda5da8dSAndroid Build Coastguard Worker        f = self._proactor.connect_pipe(address)
338*cda5da8dSAndroid Build Coastguard Worker        pipe = await f
339*cda5da8dSAndroid Build Coastguard Worker        protocol = protocol_factory()
340*cda5da8dSAndroid Build Coastguard Worker        trans = self._make_duplex_pipe_transport(pipe, protocol,
341*cda5da8dSAndroid Build Coastguard Worker                                                 extra={'addr': address})
342*cda5da8dSAndroid Build Coastguard Worker        return trans, protocol
343*cda5da8dSAndroid Build Coastguard Worker
344*cda5da8dSAndroid Build Coastguard Worker    async def start_serving_pipe(self, protocol_factory, address):
345*cda5da8dSAndroid Build Coastguard Worker        server = PipeServer(address)
346*cda5da8dSAndroid Build Coastguard Worker
347*cda5da8dSAndroid Build Coastguard Worker        def loop_accept_pipe(f=None):
348*cda5da8dSAndroid Build Coastguard Worker            pipe = None
349*cda5da8dSAndroid Build Coastguard Worker            try:
350*cda5da8dSAndroid Build Coastguard Worker                if f:
351*cda5da8dSAndroid Build Coastguard Worker                    pipe = f.result()
352*cda5da8dSAndroid Build Coastguard Worker                    server._free_instances.discard(pipe)
353*cda5da8dSAndroid Build Coastguard Worker
354*cda5da8dSAndroid Build Coastguard Worker                    if server.closed():
355*cda5da8dSAndroid Build Coastguard Worker                        # A client connected before the server was closed:
356*cda5da8dSAndroid Build Coastguard Worker                        # drop the client (close the pipe) and exit
357*cda5da8dSAndroid Build Coastguard Worker                        pipe.close()
358*cda5da8dSAndroid Build Coastguard Worker                        return
359*cda5da8dSAndroid Build Coastguard Worker
360*cda5da8dSAndroid Build Coastguard Worker                    protocol = protocol_factory()
361*cda5da8dSAndroid Build Coastguard Worker                    self._make_duplex_pipe_transport(
362*cda5da8dSAndroid Build Coastguard Worker                        pipe, protocol, extra={'addr': address})
363*cda5da8dSAndroid Build Coastguard Worker
364*cda5da8dSAndroid Build Coastguard Worker                pipe = server._get_unconnected_pipe()
365*cda5da8dSAndroid Build Coastguard Worker                if pipe is None:
366*cda5da8dSAndroid Build Coastguard Worker                    return
367*cda5da8dSAndroid Build Coastguard Worker
368*cda5da8dSAndroid Build Coastguard Worker                f = self._proactor.accept_pipe(pipe)
369*cda5da8dSAndroid Build Coastguard Worker            except BrokenPipeError:
370*cda5da8dSAndroid Build Coastguard Worker                if pipe and pipe.fileno() != -1:
371*cda5da8dSAndroid Build Coastguard Worker                    pipe.close()
372*cda5da8dSAndroid Build Coastguard Worker                self.call_soon(loop_accept_pipe)
373*cda5da8dSAndroid Build Coastguard Worker            except OSError as exc:
374*cda5da8dSAndroid Build Coastguard Worker                if pipe and pipe.fileno() != -1:
375*cda5da8dSAndroid Build Coastguard Worker                    self.call_exception_handler({
376*cda5da8dSAndroid Build Coastguard Worker                        'message': 'Pipe accept failed',
377*cda5da8dSAndroid Build Coastguard Worker                        'exception': exc,
378*cda5da8dSAndroid Build Coastguard Worker                        'pipe': pipe,
379*cda5da8dSAndroid Build Coastguard Worker                    })
380*cda5da8dSAndroid Build Coastguard Worker                    pipe.close()
381*cda5da8dSAndroid Build Coastguard Worker                elif self._debug:
382*cda5da8dSAndroid Build Coastguard Worker                    logger.warning("Accept pipe failed on pipe %r",
383*cda5da8dSAndroid Build Coastguard Worker                                   pipe, exc_info=True)
384*cda5da8dSAndroid Build Coastguard Worker                self.call_soon(loop_accept_pipe)
385*cda5da8dSAndroid Build Coastguard Worker            except exceptions.CancelledError:
386*cda5da8dSAndroid Build Coastguard Worker                if pipe:
387*cda5da8dSAndroid Build Coastguard Worker                    pipe.close()
388*cda5da8dSAndroid Build Coastguard Worker            else:
389*cda5da8dSAndroid Build Coastguard Worker                server._accept_pipe_future = f
390*cda5da8dSAndroid Build Coastguard Worker                f.add_done_callback(loop_accept_pipe)
391*cda5da8dSAndroid Build Coastguard Worker
392*cda5da8dSAndroid Build Coastguard Worker        self.call_soon(loop_accept_pipe)
393*cda5da8dSAndroid Build Coastguard Worker        return [server]
394*cda5da8dSAndroid Build Coastguard Worker
395*cda5da8dSAndroid Build Coastguard Worker    async def _make_subprocess_transport(self, protocol, args, shell,
396*cda5da8dSAndroid Build Coastguard Worker                                         stdin, stdout, stderr, bufsize,
397*cda5da8dSAndroid Build Coastguard Worker                                         extra=None, **kwargs):
398*cda5da8dSAndroid Build Coastguard Worker        waiter = self.create_future()
399*cda5da8dSAndroid Build Coastguard Worker        transp = _WindowsSubprocessTransport(self, protocol, args, shell,
400*cda5da8dSAndroid Build Coastguard Worker                                             stdin, stdout, stderr, bufsize,
401*cda5da8dSAndroid Build Coastguard Worker                                             waiter=waiter, extra=extra,
402*cda5da8dSAndroid Build Coastguard Worker                                             **kwargs)
403*cda5da8dSAndroid Build Coastguard Worker        try:
404*cda5da8dSAndroid Build Coastguard Worker            await waiter
405*cda5da8dSAndroid Build Coastguard Worker        except (SystemExit, KeyboardInterrupt):
406*cda5da8dSAndroid Build Coastguard Worker            raise
407*cda5da8dSAndroid Build Coastguard Worker        except BaseException:
408*cda5da8dSAndroid Build Coastguard Worker            transp.close()
409*cda5da8dSAndroid Build Coastguard Worker            await transp._wait()
410*cda5da8dSAndroid Build Coastguard Worker            raise
411*cda5da8dSAndroid Build Coastguard Worker
412*cda5da8dSAndroid Build Coastguard Worker        return transp
413*cda5da8dSAndroid Build Coastguard Worker
414*cda5da8dSAndroid Build Coastguard Worker
415*cda5da8dSAndroid Build Coastguard Workerclass IocpProactor:
416*cda5da8dSAndroid Build Coastguard Worker    """Proactor implementation using IOCP."""
417*cda5da8dSAndroid Build Coastguard Worker
418*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, concurrency=INFINITE):
419*cda5da8dSAndroid Build Coastguard Worker        self._loop = None
420*cda5da8dSAndroid Build Coastguard Worker        self._results = []
421*cda5da8dSAndroid Build Coastguard Worker        self._iocp = _overlapped.CreateIoCompletionPort(
422*cda5da8dSAndroid Build Coastguard Worker            _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
423*cda5da8dSAndroid Build Coastguard Worker        self._cache = {}
424*cda5da8dSAndroid Build Coastguard Worker        self._registered = weakref.WeakSet()
425*cda5da8dSAndroid Build Coastguard Worker        self._unregistered = []
426*cda5da8dSAndroid Build Coastguard Worker        self._stopped_serving = weakref.WeakSet()
427*cda5da8dSAndroid Build Coastguard Worker
428*cda5da8dSAndroid Build Coastguard Worker    def _check_closed(self):
429*cda5da8dSAndroid Build Coastguard Worker        if self._iocp is None:
430*cda5da8dSAndroid Build Coastguard Worker            raise RuntimeError('IocpProactor is closed')
431*cda5da8dSAndroid Build Coastguard Worker
432*cda5da8dSAndroid Build Coastguard Worker    def __repr__(self):
433*cda5da8dSAndroid Build Coastguard Worker        info = ['overlapped#=%s' % len(self._cache),
434*cda5da8dSAndroid Build Coastguard Worker                'result#=%s' % len(self._results)]
435*cda5da8dSAndroid Build Coastguard Worker        if self._iocp is None:
436*cda5da8dSAndroid Build Coastguard Worker            info.append('closed')
437*cda5da8dSAndroid Build Coastguard Worker        return '<%s %s>' % (self.__class__.__name__, " ".join(info))
438*cda5da8dSAndroid Build Coastguard Worker
439*cda5da8dSAndroid Build Coastguard Worker    def set_loop(self, loop):
440*cda5da8dSAndroid Build Coastguard Worker        self._loop = loop
441*cda5da8dSAndroid Build Coastguard Worker
442*cda5da8dSAndroid Build Coastguard Worker    def select(self, timeout=None):
443*cda5da8dSAndroid Build Coastguard Worker        if not self._results:
444*cda5da8dSAndroid Build Coastguard Worker            self._poll(timeout)
445*cda5da8dSAndroid Build Coastguard Worker        tmp = self._results
446*cda5da8dSAndroid Build Coastguard Worker        self._results = []
447*cda5da8dSAndroid Build Coastguard Worker        try:
448*cda5da8dSAndroid Build Coastguard Worker            return tmp
449*cda5da8dSAndroid Build Coastguard Worker        finally:
450*cda5da8dSAndroid Build Coastguard Worker            # Needed to break cycles when an exception occurs.
451*cda5da8dSAndroid Build Coastguard Worker            tmp = None
452*cda5da8dSAndroid Build Coastguard Worker
453*cda5da8dSAndroid Build Coastguard Worker    def _result(self, value):
454*cda5da8dSAndroid Build Coastguard Worker        fut = self._loop.create_future()
455*cda5da8dSAndroid Build Coastguard Worker        fut.set_result(value)
456*cda5da8dSAndroid Build Coastguard Worker        return fut
457*cda5da8dSAndroid Build Coastguard Worker
458*cda5da8dSAndroid Build Coastguard Worker    def recv(self, conn, nbytes, flags=0):
459*cda5da8dSAndroid Build Coastguard Worker        self._register_with_iocp(conn)
460*cda5da8dSAndroid Build Coastguard Worker        ov = _overlapped.Overlapped(NULL)
461*cda5da8dSAndroid Build Coastguard Worker        try:
462*cda5da8dSAndroid Build Coastguard Worker            if isinstance(conn, socket.socket):
463*cda5da8dSAndroid Build Coastguard Worker                ov.WSARecv(conn.fileno(), nbytes, flags)
464*cda5da8dSAndroid Build Coastguard Worker            else:
465*cda5da8dSAndroid Build Coastguard Worker                ov.ReadFile(conn.fileno(), nbytes)
466*cda5da8dSAndroid Build Coastguard Worker        except BrokenPipeError:
467*cda5da8dSAndroid Build Coastguard Worker            return self._result(b'')
468*cda5da8dSAndroid Build Coastguard Worker
469*cda5da8dSAndroid Build Coastguard Worker        def finish_recv(trans, key, ov):
470*cda5da8dSAndroid Build Coastguard Worker            try:
471*cda5da8dSAndroid Build Coastguard Worker                return ov.getresult()
472*cda5da8dSAndroid Build Coastguard Worker            except OSError as exc:
473*cda5da8dSAndroid Build Coastguard Worker                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
474*cda5da8dSAndroid Build Coastguard Worker                                    _overlapped.ERROR_OPERATION_ABORTED):
475*cda5da8dSAndroid Build Coastguard Worker                    raise ConnectionResetError(*exc.args)
476*cda5da8dSAndroid Build Coastguard Worker                else:
477*cda5da8dSAndroid Build Coastguard Worker                    raise
478*cda5da8dSAndroid Build Coastguard Worker
479*cda5da8dSAndroid Build Coastguard Worker        return self._register(ov, conn, finish_recv)
480*cda5da8dSAndroid Build Coastguard Worker
481*cda5da8dSAndroid Build Coastguard Worker    def recv_into(self, conn, buf, flags=0):
482*cda5da8dSAndroid Build Coastguard Worker        self._register_with_iocp(conn)
483*cda5da8dSAndroid Build Coastguard Worker        ov = _overlapped.Overlapped(NULL)
484*cda5da8dSAndroid Build Coastguard Worker        try:
485*cda5da8dSAndroid Build Coastguard Worker            if isinstance(conn, socket.socket):
486*cda5da8dSAndroid Build Coastguard Worker                ov.WSARecvInto(conn.fileno(), buf, flags)
487*cda5da8dSAndroid Build Coastguard Worker            else:
488*cda5da8dSAndroid Build Coastguard Worker                ov.ReadFileInto(conn.fileno(), buf)
489*cda5da8dSAndroid Build Coastguard Worker        except BrokenPipeError:
490*cda5da8dSAndroid Build Coastguard Worker            return self._result(0)
491*cda5da8dSAndroid Build Coastguard Worker
492*cda5da8dSAndroid Build Coastguard Worker        def finish_recv(trans, key, ov):
493*cda5da8dSAndroid Build Coastguard Worker            try:
494*cda5da8dSAndroid Build Coastguard Worker                return ov.getresult()
495*cda5da8dSAndroid Build Coastguard Worker            except OSError as exc:
496*cda5da8dSAndroid Build Coastguard Worker                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
497*cda5da8dSAndroid Build Coastguard Worker                                    _overlapped.ERROR_OPERATION_ABORTED):
498*cda5da8dSAndroid Build Coastguard Worker                    raise ConnectionResetError(*exc.args)
499*cda5da8dSAndroid Build Coastguard Worker                else:
500*cda5da8dSAndroid Build Coastguard Worker                    raise
501*cda5da8dSAndroid Build Coastguard Worker
502*cda5da8dSAndroid Build Coastguard Worker        return self._register(ov, conn, finish_recv)
503*cda5da8dSAndroid Build Coastguard Worker
504*cda5da8dSAndroid Build Coastguard Worker    def recvfrom(self, conn, nbytes, flags=0):
505*cda5da8dSAndroid Build Coastguard Worker        self._register_with_iocp(conn)
506*cda5da8dSAndroid Build Coastguard Worker        ov = _overlapped.Overlapped(NULL)
507*cda5da8dSAndroid Build Coastguard Worker        try:
508*cda5da8dSAndroid Build Coastguard Worker            ov.WSARecvFrom(conn.fileno(), nbytes, flags)
509*cda5da8dSAndroid Build Coastguard Worker        except BrokenPipeError:
510*cda5da8dSAndroid Build Coastguard Worker            return self._result((b'', None))
511*cda5da8dSAndroid Build Coastguard Worker
512*cda5da8dSAndroid Build Coastguard Worker        def finish_recv(trans, key, ov):
513*cda5da8dSAndroid Build Coastguard Worker            try:
514*cda5da8dSAndroid Build Coastguard Worker                return ov.getresult()
515*cda5da8dSAndroid Build Coastguard Worker            except OSError as exc:
516*cda5da8dSAndroid Build Coastguard Worker                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
517*cda5da8dSAndroid Build Coastguard Worker                                    _overlapped.ERROR_OPERATION_ABORTED):
518*cda5da8dSAndroid Build Coastguard Worker                    raise ConnectionResetError(*exc.args)
519*cda5da8dSAndroid Build Coastguard Worker                else:
520*cda5da8dSAndroid Build Coastguard Worker                    raise
521*cda5da8dSAndroid Build Coastguard Worker
522*cda5da8dSAndroid Build Coastguard Worker        return self._register(ov, conn, finish_recv)
523*cda5da8dSAndroid Build Coastguard Worker
524*cda5da8dSAndroid Build Coastguard Worker    def recvfrom_into(self, conn, buf, flags=0):
525*cda5da8dSAndroid Build Coastguard Worker        self._register_with_iocp(conn)
526*cda5da8dSAndroid Build Coastguard Worker        ov = _overlapped.Overlapped(NULL)
527*cda5da8dSAndroid Build Coastguard Worker        try:
528*cda5da8dSAndroid Build Coastguard Worker            ov.WSARecvFromInto(conn.fileno(), buf, flags)
529*cda5da8dSAndroid Build Coastguard Worker        except BrokenPipeError:
530*cda5da8dSAndroid Build Coastguard Worker            return self._result((0, None))
531*cda5da8dSAndroid Build Coastguard Worker
532*cda5da8dSAndroid Build Coastguard Worker        def finish_recv(trans, key, ov):
533*cda5da8dSAndroid Build Coastguard Worker            try:
534*cda5da8dSAndroid Build Coastguard Worker                return ov.getresult()
535*cda5da8dSAndroid Build Coastguard Worker            except OSError as exc:
536*cda5da8dSAndroid Build Coastguard Worker                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
537*cda5da8dSAndroid Build Coastguard Worker                                    _overlapped.ERROR_OPERATION_ABORTED):
538*cda5da8dSAndroid Build Coastguard Worker                    raise ConnectionResetError(*exc.args)
539*cda5da8dSAndroid Build Coastguard Worker                else:
540*cda5da8dSAndroid Build Coastguard Worker                    raise
541*cda5da8dSAndroid Build Coastguard Worker
542*cda5da8dSAndroid Build Coastguard Worker        return self._register(ov, conn, finish_recv)
543*cda5da8dSAndroid Build Coastguard Worker
544*cda5da8dSAndroid Build Coastguard Worker    def sendto(self, conn, buf, flags=0, addr=None):
545*cda5da8dSAndroid Build Coastguard Worker        self._register_with_iocp(conn)
546*cda5da8dSAndroid Build Coastguard Worker        ov = _overlapped.Overlapped(NULL)
547*cda5da8dSAndroid Build Coastguard Worker
548*cda5da8dSAndroid Build Coastguard Worker        ov.WSASendTo(conn.fileno(), buf, flags, addr)
549*cda5da8dSAndroid Build Coastguard Worker
550*cda5da8dSAndroid Build Coastguard Worker        def finish_send(trans, key, ov):
551*cda5da8dSAndroid Build Coastguard Worker            try:
552*cda5da8dSAndroid Build Coastguard Worker                return ov.getresult()
553*cda5da8dSAndroid Build Coastguard Worker            except OSError as exc:
554*cda5da8dSAndroid Build Coastguard Worker                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
555*cda5da8dSAndroid Build Coastguard Worker                                    _overlapped.ERROR_OPERATION_ABORTED):
556*cda5da8dSAndroid Build Coastguard Worker                    raise ConnectionResetError(*exc.args)
557*cda5da8dSAndroid Build Coastguard Worker                else:
558*cda5da8dSAndroid Build Coastguard Worker                    raise
559*cda5da8dSAndroid Build Coastguard Worker
560*cda5da8dSAndroid Build Coastguard Worker        return self._register(ov, conn, finish_send)
561*cda5da8dSAndroid Build Coastguard Worker
562*cda5da8dSAndroid Build Coastguard Worker    def send(self, conn, buf, flags=0):
563*cda5da8dSAndroid Build Coastguard Worker        self._register_with_iocp(conn)
564*cda5da8dSAndroid Build Coastguard Worker        ov = _overlapped.Overlapped(NULL)
565*cda5da8dSAndroid Build Coastguard Worker        if isinstance(conn, socket.socket):
566*cda5da8dSAndroid Build Coastguard Worker            ov.WSASend(conn.fileno(), buf, flags)
567*cda5da8dSAndroid Build Coastguard Worker        else:
568*cda5da8dSAndroid Build Coastguard Worker            ov.WriteFile(conn.fileno(), buf)
569*cda5da8dSAndroid Build Coastguard Worker
570*cda5da8dSAndroid Build Coastguard Worker        def finish_send(trans, key, ov):
571*cda5da8dSAndroid Build Coastguard Worker            try:
572*cda5da8dSAndroid Build Coastguard Worker                return ov.getresult()
573*cda5da8dSAndroid Build Coastguard Worker            except OSError as exc:
574*cda5da8dSAndroid Build Coastguard Worker                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
575*cda5da8dSAndroid Build Coastguard Worker                                    _overlapped.ERROR_OPERATION_ABORTED):
576*cda5da8dSAndroid Build Coastguard Worker                    raise ConnectionResetError(*exc.args)
577*cda5da8dSAndroid Build Coastguard Worker                else:
578*cda5da8dSAndroid Build Coastguard Worker                    raise
579*cda5da8dSAndroid Build Coastguard Worker
580*cda5da8dSAndroid Build Coastguard Worker        return self._register(ov, conn, finish_send)
581*cda5da8dSAndroid Build Coastguard Worker
582*cda5da8dSAndroid Build Coastguard Worker    def accept(self, listener):
583*cda5da8dSAndroid Build Coastguard Worker        self._register_with_iocp(listener)
584*cda5da8dSAndroid Build Coastguard Worker        conn = self._get_accept_socket(listener.family)
585*cda5da8dSAndroid Build Coastguard Worker        ov = _overlapped.Overlapped(NULL)
586*cda5da8dSAndroid Build Coastguard Worker        ov.AcceptEx(listener.fileno(), conn.fileno())
587*cda5da8dSAndroid Build Coastguard Worker
588*cda5da8dSAndroid Build Coastguard Worker        def finish_accept(trans, key, ov):
589*cda5da8dSAndroid Build Coastguard Worker            ov.getresult()
590*cda5da8dSAndroid Build Coastguard Worker            # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
591*cda5da8dSAndroid Build Coastguard Worker            buf = struct.pack('@P', listener.fileno())
592*cda5da8dSAndroid Build Coastguard Worker            conn.setsockopt(socket.SOL_SOCKET,
593*cda5da8dSAndroid Build Coastguard Worker                            _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
594*cda5da8dSAndroid Build Coastguard Worker            conn.settimeout(listener.gettimeout())
595*cda5da8dSAndroid Build Coastguard Worker            return conn, conn.getpeername()
596*cda5da8dSAndroid Build Coastguard Worker
597*cda5da8dSAndroid Build Coastguard Worker        async def accept_coro(future, conn):
598*cda5da8dSAndroid Build Coastguard Worker            # Coroutine closing the accept socket if the future is cancelled
599*cda5da8dSAndroid Build Coastguard Worker            try:
600*cda5da8dSAndroid Build Coastguard Worker                await future
601*cda5da8dSAndroid Build Coastguard Worker            except exceptions.CancelledError:
602*cda5da8dSAndroid Build Coastguard Worker                conn.close()
603*cda5da8dSAndroid Build Coastguard Worker                raise
604*cda5da8dSAndroid Build Coastguard Worker
605*cda5da8dSAndroid Build Coastguard Worker        future = self._register(ov, listener, finish_accept)
606*cda5da8dSAndroid Build Coastguard Worker        coro = accept_coro(future, conn)
607*cda5da8dSAndroid Build Coastguard Worker        tasks.ensure_future(coro, loop=self._loop)
608*cda5da8dSAndroid Build Coastguard Worker        return future
609*cda5da8dSAndroid Build Coastguard Worker
610*cda5da8dSAndroid Build Coastguard Worker    def connect(self, conn, address):
611*cda5da8dSAndroid Build Coastguard Worker        if conn.type == socket.SOCK_DGRAM:
612*cda5da8dSAndroid Build Coastguard Worker            # WSAConnect will complete immediately for UDP sockets so we don't
613*cda5da8dSAndroid Build Coastguard Worker            # need to register any IOCP operation
614*cda5da8dSAndroid Build Coastguard Worker            _overlapped.WSAConnect(conn.fileno(), address)
615*cda5da8dSAndroid Build Coastguard Worker            fut = self._loop.create_future()
616*cda5da8dSAndroid Build Coastguard Worker            fut.set_result(None)
617*cda5da8dSAndroid Build Coastguard Worker            return fut
618*cda5da8dSAndroid Build Coastguard Worker
619*cda5da8dSAndroid Build Coastguard Worker        self._register_with_iocp(conn)
620*cda5da8dSAndroid Build Coastguard Worker        # The socket needs to be locally bound before we call ConnectEx().
621*cda5da8dSAndroid Build Coastguard Worker        try:
622*cda5da8dSAndroid Build Coastguard Worker            _overlapped.BindLocal(conn.fileno(), conn.family)
623*cda5da8dSAndroid Build Coastguard Worker        except OSError as e:
624*cda5da8dSAndroid Build Coastguard Worker            if e.winerror != errno.WSAEINVAL:
625*cda5da8dSAndroid Build Coastguard Worker                raise
626*cda5da8dSAndroid Build Coastguard Worker            # Probably already locally bound; check using getsockname().
627*cda5da8dSAndroid Build Coastguard Worker            if conn.getsockname()[1] == 0:
628*cda5da8dSAndroid Build Coastguard Worker                raise
629*cda5da8dSAndroid Build Coastguard Worker        ov = _overlapped.Overlapped(NULL)
630*cda5da8dSAndroid Build Coastguard Worker        ov.ConnectEx(conn.fileno(), address)
631*cda5da8dSAndroid Build Coastguard Worker
632*cda5da8dSAndroid Build Coastguard Worker        def finish_connect(trans, key, ov):
633*cda5da8dSAndroid Build Coastguard Worker            ov.getresult()
634*cda5da8dSAndroid Build Coastguard Worker            # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
635*cda5da8dSAndroid Build Coastguard Worker            conn.setsockopt(socket.SOL_SOCKET,
636*cda5da8dSAndroid Build Coastguard Worker                            _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
637*cda5da8dSAndroid Build Coastguard Worker            return conn
638*cda5da8dSAndroid Build Coastguard Worker
639*cda5da8dSAndroid Build Coastguard Worker        return self._register(ov, conn, finish_connect)
640*cda5da8dSAndroid Build Coastguard Worker
641*cda5da8dSAndroid Build Coastguard Worker    def sendfile(self, sock, file, offset, count):
642*cda5da8dSAndroid Build Coastguard Worker        self._register_with_iocp(sock)
643*cda5da8dSAndroid Build Coastguard Worker        ov = _overlapped.Overlapped(NULL)
644*cda5da8dSAndroid Build Coastguard Worker        offset_low = offset & 0xffff_ffff
645*cda5da8dSAndroid Build Coastguard Worker        offset_high = (offset >> 32) & 0xffff_ffff
646*cda5da8dSAndroid Build Coastguard Worker        ov.TransmitFile(sock.fileno(),
647*cda5da8dSAndroid Build Coastguard Worker                        msvcrt.get_osfhandle(file.fileno()),
648*cda5da8dSAndroid Build Coastguard Worker                        offset_low, offset_high,
649*cda5da8dSAndroid Build Coastguard Worker                        count, 0, 0)
650*cda5da8dSAndroid Build Coastguard Worker
651*cda5da8dSAndroid Build Coastguard Worker        def finish_sendfile(trans, key, ov):
652*cda5da8dSAndroid Build Coastguard Worker            try:
653*cda5da8dSAndroid Build Coastguard Worker                return ov.getresult()
654*cda5da8dSAndroid Build Coastguard Worker            except OSError as exc:
655*cda5da8dSAndroid Build Coastguard Worker                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
656*cda5da8dSAndroid Build Coastguard Worker                                    _overlapped.ERROR_OPERATION_ABORTED):
657*cda5da8dSAndroid Build Coastguard Worker                    raise ConnectionResetError(*exc.args)
658*cda5da8dSAndroid Build Coastguard Worker                else:
659*cda5da8dSAndroid Build Coastguard Worker                    raise
660*cda5da8dSAndroid Build Coastguard Worker        return self._register(ov, sock, finish_sendfile)
661*cda5da8dSAndroid Build Coastguard Worker
662*cda5da8dSAndroid Build Coastguard Worker    def accept_pipe(self, pipe):
663*cda5da8dSAndroid Build Coastguard Worker        self._register_with_iocp(pipe)
664*cda5da8dSAndroid Build Coastguard Worker        ov = _overlapped.Overlapped(NULL)
665*cda5da8dSAndroid Build Coastguard Worker        connected = ov.ConnectNamedPipe(pipe.fileno())
666*cda5da8dSAndroid Build Coastguard Worker
667*cda5da8dSAndroid Build Coastguard Worker        if connected:
668*cda5da8dSAndroid Build Coastguard Worker            # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
669*cda5da8dSAndroid Build Coastguard Worker            # that the pipe is connected. There is no need to wait for the
670*cda5da8dSAndroid Build Coastguard Worker            # completion of the connection.
671*cda5da8dSAndroid Build Coastguard Worker            return self._result(pipe)
672*cda5da8dSAndroid Build Coastguard Worker
673*cda5da8dSAndroid Build Coastguard Worker        def finish_accept_pipe(trans, key, ov):
674*cda5da8dSAndroid Build Coastguard Worker            ov.getresult()
675*cda5da8dSAndroid Build Coastguard Worker            return pipe
676*cda5da8dSAndroid Build Coastguard Worker
677*cda5da8dSAndroid Build Coastguard Worker        return self._register(ov, pipe, finish_accept_pipe)
678*cda5da8dSAndroid Build Coastguard Worker
679*cda5da8dSAndroid Build Coastguard Worker    async def connect_pipe(self, address):
680*cda5da8dSAndroid Build Coastguard Worker        delay = CONNECT_PIPE_INIT_DELAY
681*cda5da8dSAndroid Build Coastguard Worker        while True:
682*cda5da8dSAndroid Build Coastguard Worker            # Unfortunately there is no way to do an overlapped connect to
683*cda5da8dSAndroid Build Coastguard Worker            # a pipe.  Call CreateFile() in a loop until it doesn't fail with
684*cda5da8dSAndroid Build Coastguard Worker            # ERROR_PIPE_BUSY.
685*cda5da8dSAndroid Build Coastguard Worker            try:
686*cda5da8dSAndroid Build Coastguard Worker                handle = _overlapped.ConnectPipe(address)
687*cda5da8dSAndroid Build Coastguard Worker                break
688*cda5da8dSAndroid Build Coastguard Worker            except OSError as exc:
689*cda5da8dSAndroid Build Coastguard Worker                if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
690*cda5da8dSAndroid Build Coastguard Worker                    raise
691*cda5da8dSAndroid Build Coastguard Worker
692*cda5da8dSAndroid Build Coastguard Worker            # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
693*cda5da8dSAndroid Build Coastguard Worker            delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
694*cda5da8dSAndroid Build Coastguard Worker            await tasks.sleep(delay)
695*cda5da8dSAndroid Build Coastguard Worker
696*cda5da8dSAndroid Build Coastguard Worker        return windows_utils.PipeHandle(handle)
697*cda5da8dSAndroid Build Coastguard Worker
698*cda5da8dSAndroid Build Coastguard Worker    def wait_for_handle(self, handle, timeout=None):
699*cda5da8dSAndroid Build Coastguard Worker        """Wait for a handle.
700*cda5da8dSAndroid Build Coastguard Worker
701*cda5da8dSAndroid Build Coastguard Worker        Return a Future object. The result of the future is True if the wait
702*cda5da8dSAndroid Build Coastguard Worker        completed, or False if the wait did not complete (on timeout).
703*cda5da8dSAndroid Build Coastguard Worker        """
704*cda5da8dSAndroid Build Coastguard Worker        return self._wait_for_handle(handle, timeout, False)
705*cda5da8dSAndroid Build Coastguard Worker
706*cda5da8dSAndroid Build Coastguard Worker    def _wait_cancel(self, event, done_callback):
707*cda5da8dSAndroid Build Coastguard Worker        fut = self._wait_for_handle(event, None, True)
708*cda5da8dSAndroid Build Coastguard Worker        # add_done_callback() cannot be used because the wait may only complete
709*cda5da8dSAndroid Build Coastguard Worker        # in IocpProactor.close(), while the event loop is not running.
710*cda5da8dSAndroid Build Coastguard Worker        fut._done_callback = done_callback
711*cda5da8dSAndroid Build Coastguard Worker        return fut
712*cda5da8dSAndroid Build Coastguard Worker
713*cda5da8dSAndroid Build Coastguard Worker    def _wait_for_handle(self, handle, timeout, _is_cancel):
714*cda5da8dSAndroid Build Coastguard Worker        self._check_closed()
715*cda5da8dSAndroid Build Coastguard Worker
716*cda5da8dSAndroid Build Coastguard Worker        if timeout is None:
717*cda5da8dSAndroid Build Coastguard Worker            ms = _winapi.INFINITE
718*cda5da8dSAndroid Build Coastguard Worker        else:
719*cda5da8dSAndroid Build Coastguard Worker            # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
720*cda5da8dSAndroid Build Coastguard Worker            # round away from zero to wait *at least* timeout seconds.
721*cda5da8dSAndroid Build Coastguard Worker            ms = math.ceil(timeout * 1e3)
722*cda5da8dSAndroid Build Coastguard Worker
723*cda5da8dSAndroid Build Coastguard Worker        # We only create ov so we can use ov.address as a key for the cache.
724*cda5da8dSAndroid Build Coastguard Worker        ov = _overlapped.Overlapped(NULL)
725*cda5da8dSAndroid Build Coastguard Worker        wait_handle = _overlapped.RegisterWaitWithQueue(
726*cda5da8dSAndroid Build Coastguard Worker            handle, self._iocp, ov.address, ms)
727*cda5da8dSAndroid Build Coastguard Worker        if _is_cancel:
728*cda5da8dSAndroid Build Coastguard Worker            f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
729*cda5da8dSAndroid Build Coastguard Worker        else:
730*cda5da8dSAndroid Build Coastguard Worker            f = _WaitHandleFuture(ov, handle, wait_handle, self,
731*cda5da8dSAndroid Build Coastguard Worker                                  loop=self._loop)
732*cda5da8dSAndroid Build Coastguard Worker        if f._source_traceback:
733*cda5da8dSAndroid Build Coastguard Worker            del f._source_traceback[-1]
734*cda5da8dSAndroid Build Coastguard Worker
735*cda5da8dSAndroid Build Coastguard Worker        def finish_wait_for_handle(trans, key, ov):
736*cda5da8dSAndroid Build Coastguard Worker            # Note that this second wait means that we should only use
737*cda5da8dSAndroid Build Coastguard Worker            # this with handles types where a successful wait has no
738*cda5da8dSAndroid Build Coastguard Worker            # effect.  So events or processes are all right, but locks
739*cda5da8dSAndroid Build Coastguard Worker            # or semaphores are not.  Also note if the handle is
740*cda5da8dSAndroid Build Coastguard Worker            # signalled and then quickly reset, then we may return
741*cda5da8dSAndroid Build Coastguard Worker            # False even though we have not timed out.
742*cda5da8dSAndroid Build Coastguard Worker            return f._poll()
743*cda5da8dSAndroid Build Coastguard Worker
744*cda5da8dSAndroid Build Coastguard Worker        self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
745*cda5da8dSAndroid Build Coastguard Worker        return f
746*cda5da8dSAndroid Build Coastguard Worker
747*cda5da8dSAndroid Build Coastguard Worker    def _register_with_iocp(self, obj):
748*cda5da8dSAndroid Build Coastguard Worker        # To get notifications of finished ops on this objects sent to the
749*cda5da8dSAndroid Build Coastguard Worker        # completion port, were must register the handle.
750*cda5da8dSAndroid Build Coastguard Worker        if obj not in self._registered:
751*cda5da8dSAndroid Build Coastguard Worker            self._registered.add(obj)
752*cda5da8dSAndroid Build Coastguard Worker            _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
753*cda5da8dSAndroid Build Coastguard Worker            # XXX We could also use SetFileCompletionNotificationModes()
754*cda5da8dSAndroid Build Coastguard Worker            # to avoid sending notifications to completion port of ops
755*cda5da8dSAndroid Build Coastguard Worker            # that succeed immediately.
756*cda5da8dSAndroid Build Coastguard Worker
757*cda5da8dSAndroid Build Coastguard Worker    def _register(self, ov, obj, callback):
758*cda5da8dSAndroid Build Coastguard Worker        self._check_closed()
759*cda5da8dSAndroid Build Coastguard Worker
760*cda5da8dSAndroid Build Coastguard Worker        # Return a future which will be set with the result of the
761*cda5da8dSAndroid Build Coastguard Worker        # operation when it completes.  The future's value is actually
762*cda5da8dSAndroid Build Coastguard Worker        # the value returned by callback().
763*cda5da8dSAndroid Build Coastguard Worker        f = _OverlappedFuture(ov, loop=self._loop)
764*cda5da8dSAndroid Build Coastguard Worker        if f._source_traceback:
765*cda5da8dSAndroid Build Coastguard Worker            del f._source_traceback[-1]
766*cda5da8dSAndroid Build Coastguard Worker        if not ov.pending:
767*cda5da8dSAndroid Build Coastguard Worker            # The operation has completed, so no need to postpone the
768*cda5da8dSAndroid Build Coastguard Worker            # work.  We cannot take this short cut if we need the
769*cda5da8dSAndroid Build Coastguard Worker            # NumberOfBytes, CompletionKey values returned by
770*cda5da8dSAndroid Build Coastguard Worker            # PostQueuedCompletionStatus().
771*cda5da8dSAndroid Build Coastguard Worker            try:
772*cda5da8dSAndroid Build Coastguard Worker                value = callback(None, None, ov)
773*cda5da8dSAndroid Build Coastguard Worker            except OSError as e:
774*cda5da8dSAndroid Build Coastguard Worker                f.set_exception(e)
775*cda5da8dSAndroid Build Coastguard Worker            else:
776*cda5da8dSAndroid Build Coastguard Worker                f.set_result(value)
777*cda5da8dSAndroid Build Coastguard Worker            # Even if GetOverlappedResult() was called, we have to wait for the
778*cda5da8dSAndroid Build Coastguard Worker            # notification of the completion in GetQueuedCompletionStatus().
779*cda5da8dSAndroid Build Coastguard Worker            # Register the overlapped operation to keep a reference to the
780*cda5da8dSAndroid Build Coastguard Worker            # OVERLAPPED object, otherwise the memory is freed and Windows may
781*cda5da8dSAndroid Build Coastguard Worker            # read uninitialized memory.
782*cda5da8dSAndroid Build Coastguard Worker
783*cda5da8dSAndroid Build Coastguard Worker        # Register the overlapped operation for later.  Note that
784*cda5da8dSAndroid Build Coastguard Worker        # we only store obj to prevent it from being garbage
785*cda5da8dSAndroid Build Coastguard Worker        # collected too early.
786*cda5da8dSAndroid Build Coastguard Worker        self._cache[ov.address] = (f, ov, obj, callback)
787*cda5da8dSAndroid Build Coastguard Worker        return f
788*cda5da8dSAndroid Build Coastguard Worker
789*cda5da8dSAndroid Build Coastguard Worker    def _unregister(self, ov):
790*cda5da8dSAndroid Build Coastguard Worker        """Unregister an overlapped object.
791*cda5da8dSAndroid Build Coastguard Worker
792*cda5da8dSAndroid Build Coastguard Worker        Call this method when its future has been cancelled. The event can
793*cda5da8dSAndroid Build Coastguard Worker        already be signalled (pending in the proactor event queue). It is also
794*cda5da8dSAndroid Build Coastguard Worker        safe if the event is never signalled (because it was cancelled).
795*cda5da8dSAndroid Build Coastguard Worker        """
796*cda5da8dSAndroid Build Coastguard Worker        self._check_closed()
797*cda5da8dSAndroid Build Coastguard Worker        self._unregistered.append(ov)
798*cda5da8dSAndroid Build Coastguard Worker
799*cda5da8dSAndroid Build Coastguard Worker    def _get_accept_socket(self, family):
800*cda5da8dSAndroid Build Coastguard Worker        s = socket.socket(family)
801*cda5da8dSAndroid Build Coastguard Worker        s.settimeout(0)
802*cda5da8dSAndroid Build Coastguard Worker        return s
803*cda5da8dSAndroid Build Coastguard Worker
804*cda5da8dSAndroid Build Coastguard Worker    def _poll(self, timeout=None):
805*cda5da8dSAndroid Build Coastguard Worker        if timeout is None:
806*cda5da8dSAndroid Build Coastguard Worker            ms = INFINITE
807*cda5da8dSAndroid Build Coastguard Worker        elif timeout < 0:
808*cda5da8dSAndroid Build Coastguard Worker            raise ValueError("negative timeout")
809*cda5da8dSAndroid Build Coastguard Worker        else:
810*cda5da8dSAndroid Build Coastguard Worker            # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
811*cda5da8dSAndroid Build Coastguard Worker            # round away from zero to wait *at least* timeout seconds.
812*cda5da8dSAndroid Build Coastguard Worker            ms = math.ceil(timeout * 1e3)
813*cda5da8dSAndroid Build Coastguard Worker            if ms >= INFINITE:
814*cda5da8dSAndroid Build Coastguard Worker                raise ValueError("timeout too big")
815*cda5da8dSAndroid Build Coastguard Worker
816*cda5da8dSAndroid Build Coastguard Worker        while True:
817*cda5da8dSAndroid Build Coastguard Worker            status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
818*cda5da8dSAndroid Build Coastguard Worker            if status is None:
819*cda5da8dSAndroid Build Coastguard Worker                break
820*cda5da8dSAndroid Build Coastguard Worker            ms = 0
821*cda5da8dSAndroid Build Coastguard Worker
822*cda5da8dSAndroid Build Coastguard Worker            err, transferred, key, address = status
823*cda5da8dSAndroid Build Coastguard Worker            try:
824*cda5da8dSAndroid Build Coastguard Worker                f, ov, obj, callback = self._cache.pop(address)
825*cda5da8dSAndroid Build Coastguard Worker            except KeyError:
826*cda5da8dSAndroid Build Coastguard Worker                if self._loop.get_debug():
827*cda5da8dSAndroid Build Coastguard Worker                    self._loop.call_exception_handler({
828*cda5da8dSAndroid Build Coastguard Worker                        'message': ('GetQueuedCompletionStatus() returned an '
829*cda5da8dSAndroid Build Coastguard Worker                                    'unexpected event'),
830*cda5da8dSAndroid Build Coastguard Worker                        'status': ('err=%s transferred=%s key=%#x address=%#x'
831*cda5da8dSAndroid Build Coastguard Worker                                   % (err, transferred, key, address)),
832*cda5da8dSAndroid Build Coastguard Worker                    })
833*cda5da8dSAndroid Build Coastguard Worker
834*cda5da8dSAndroid Build Coastguard Worker                # key is either zero, or it is used to return a pipe
835*cda5da8dSAndroid Build Coastguard Worker                # handle which should be closed to avoid a leak.
836*cda5da8dSAndroid Build Coastguard Worker                if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
837*cda5da8dSAndroid Build Coastguard Worker                    _winapi.CloseHandle(key)
838*cda5da8dSAndroid Build Coastguard Worker                continue
839*cda5da8dSAndroid Build Coastguard Worker
840*cda5da8dSAndroid Build Coastguard Worker            if obj in self._stopped_serving:
841*cda5da8dSAndroid Build Coastguard Worker                f.cancel()
842*cda5da8dSAndroid Build Coastguard Worker            # Don't call the callback if _register() already read the result or
843*cda5da8dSAndroid Build Coastguard Worker            # if the overlapped has been cancelled
844*cda5da8dSAndroid Build Coastguard Worker            elif not f.done():
845*cda5da8dSAndroid Build Coastguard Worker                try:
846*cda5da8dSAndroid Build Coastguard Worker                    value = callback(transferred, key, ov)
847*cda5da8dSAndroid Build Coastguard Worker                except OSError as e:
848*cda5da8dSAndroid Build Coastguard Worker                    f.set_exception(e)
849*cda5da8dSAndroid Build Coastguard Worker                    self._results.append(f)
850*cda5da8dSAndroid Build Coastguard Worker                else:
851*cda5da8dSAndroid Build Coastguard Worker                    f.set_result(value)
852*cda5da8dSAndroid Build Coastguard Worker                    self._results.append(f)
853*cda5da8dSAndroid Build Coastguard Worker                finally:
854*cda5da8dSAndroid Build Coastguard Worker                    f = None
855*cda5da8dSAndroid Build Coastguard Worker
856*cda5da8dSAndroid Build Coastguard Worker        # Remove unregistered futures
857*cda5da8dSAndroid Build Coastguard Worker        for ov in self._unregistered:
858*cda5da8dSAndroid Build Coastguard Worker            self._cache.pop(ov.address, None)
859*cda5da8dSAndroid Build Coastguard Worker        self._unregistered.clear()
860*cda5da8dSAndroid Build Coastguard Worker
861*cda5da8dSAndroid Build Coastguard Worker    def _stop_serving(self, obj):
862*cda5da8dSAndroid Build Coastguard Worker        # obj is a socket or pipe handle.  It will be closed in
863*cda5da8dSAndroid Build Coastguard Worker        # BaseProactorEventLoop._stop_serving() which will make any
864*cda5da8dSAndroid Build Coastguard Worker        # pending operations fail quickly.
865*cda5da8dSAndroid Build Coastguard Worker        self._stopped_serving.add(obj)
866*cda5da8dSAndroid Build Coastguard Worker
867*cda5da8dSAndroid Build Coastguard Worker    def close(self):
868*cda5da8dSAndroid Build Coastguard Worker        if self._iocp is None:
869*cda5da8dSAndroid Build Coastguard Worker            # already closed
870*cda5da8dSAndroid Build Coastguard Worker            return
871*cda5da8dSAndroid Build Coastguard Worker
872*cda5da8dSAndroid Build Coastguard Worker        # Cancel remaining registered operations.
873*cda5da8dSAndroid Build Coastguard Worker        for fut, ov, obj, callback in list(self._cache.values()):
874*cda5da8dSAndroid Build Coastguard Worker            if fut.cancelled():
875*cda5da8dSAndroid Build Coastguard Worker                # Nothing to do with cancelled futures
876*cda5da8dSAndroid Build Coastguard Worker                pass
877*cda5da8dSAndroid Build Coastguard Worker            elif isinstance(fut, _WaitCancelFuture):
878*cda5da8dSAndroid Build Coastguard Worker                # _WaitCancelFuture must not be cancelled
879*cda5da8dSAndroid Build Coastguard Worker                pass
880*cda5da8dSAndroid Build Coastguard Worker            else:
881*cda5da8dSAndroid Build Coastguard Worker                try:
882*cda5da8dSAndroid Build Coastguard Worker                    fut.cancel()
883*cda5da8dSAndroid Build Coastguard Worker                except OSError as exc:
884*cda5da8dSAndroid Build Coastguard Worker                    if self._loop is not None:
885*cda5da8dSAndroid Build Coastguard Worker                        context = {
886*cda5da8dSAndroid Build Coastguard Worker                            'message': 'Cancelling a future failed',
887*cda5da8dSAndroid Build Coastguard Worker                            'exception': exc,
888*cda5da8dSAndroid Build Coastguard Worker                            'future': fut,
889*cda5da8dSAndroid Build Coastguard Worker                        }
890*cda5da8dSAndroid Build Coastguard Worker                        if fut._source_traceback:
891*cda5da8dSAndroid Build Coastguard Worker                            context['source_traceback'] = fut._source_traceback
892*cda5da8dSAndroid Build Coastguard Worker                        self._loop.call_exception_handler(context)
893*cda5da8dSAndroid Build Coastguard Worker
894*cda5da8dSAndroid Build Coastguard Worker        # Wait until all cancelled overlapped complete: don't exit with running
895*cda5da8dSAndroid Build Coastguard Worker        # overlapped to prevent a crash. Display progress every second if the
896*cda5da8dSAndroid Build Coastguard Worker        # loop is still running.
897*cda5da8dSAndroid Build Coastguard Worker        msg_update = 1.0
898*cda5da8dSAndroid Build Coastguard Worker        start_time = time.monotonic()
899*cda5da8dSAndroid Build Coastguard Worker        next_msg = start_time + msg_update
900*cda5da8dSAndroid Build Coastguard Worker        while self._cache:
901*cda5da8dSAndroid Build Coastguard Worker            if next_msg <= time.monotonic():
902*cda5da8dSAndroid Build Coastguard Worker                logger.debug('%r is running after closing for %.1f seconds',
903*cda5da8dSAndroid Build Coastguard Worker                             self, time.monotonic() - start_time)
904*cda5da8dSAndroid Build Coastguard Worker                next_msg = time.monotonic() + msg_update
905*cda5da8dSAndroid Build Coastguard Worker
906*cda5da8dSAndroid Build Coastguard Worker            # handle a few events, or timeout
907*cda5da8dSAndroid Build Coastguard Worker            self._poll(msg_update)
908*cda5da8dSAndroid Build Coastguard Worker
909*cda5da8dSAndroid Build Coastguard Worker        self._results = []
910*cda5da8dSAndroid Build Coastguard Worker
911*cda5da8dSAndroid Build Coastguard Worker        _winapi.CloseHandle(self._iocp)
912*cda5da8dSAndroid Build Coastguard Worker        self._iocp = None
913*cda5da8dSAndroid Build Coastguard Worker
914*cda5da8dSAndroid Build Coastguard Worker    def __del__(self):
915*cda5da8dSAndroid Build Coastguard Worker        self.close()
916*cda5da8dSAndroid Build Coastguard Worker
917*cda5da8dSAndroid Build Coastguard Worker
918*cda5da8dSAndroid Build Coastguard Workerclass _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
919*cda5da8dSAndroid Build Coastguard Worker
920*cda5da8dSAndroid Build Coastguard Worker    def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
921*cda5da8dSAndroid Build Coastguard Worker        self._proc = windows_utils.Popen(
922*cda5da8dSAndroid Build Coastguard Worker            args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
923*cda5da8dSAndroid Build Coastguard Worker            bufsize=bufsize, **kwargs)
924*cda5da8dSAndroid Build Coastguard Worker
925*cda5da8dSAndroid Build Coastguard Worker        def callback(f):
926*cda5da8dSAndroid Build Coastguard Worker            returncode = self._proc.poll()
927*cda5da8dSAndroid Build Coastguard Worker            self._process_exited(returncode)
928*cda5da8dSAndroid Build Coastguard Worker
929*cda5da8dSAndroid Build Coastguard Worker        f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
930*cda5da8dSAndroid Build Coastguard Worker        f.add_done_callback(callback)
931*cda5da8dSAndroid Build Coastguard Worker
932*cda5da8dSAndroid Build Coastguard Worker
933*cda5da8dSAndroid Build Coastguard WorkerSelectorEventLoop = _WindowsSelectorEventLoop
934*cda5da8dSAndroid Build Coastguard Worker
935*cda5da8dSAndroid Build Coastguard Worker
936*cda5da8dSAndroid Build Coastguard Workerclass WindowsSelectorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
937*cda5da8dSAndroid Build Coastguard Worker    _loop_factory = SelectorEventLoop
938*cda5da8dSAndroid Build Coastguard Worker
939*cda5da8dSAndroid Build Coastguard Worker
940*cda5da8dSAndroid Build Coastguard Workerclass WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
941*cda5da8dSAndroid Build Coastguard Worker    _loop_factory = ProactorEventLoop
942*cda5da8dSAndroid Build Coastguard Worker
943*cda5da8dSAndroid Build Coastguard Worker
944*cda5da8dSAndroid Build Coastguard WorkerDefaultEventLoopPolicy = WindowsProactorEventLoopPolicy
945