1*cda5da8dSAndroid Build Coastguard Worker"""Selector and proactor event loops for Windows.""" 2*cda5da8dSAndroid Build Coastguard Worker 3*cda5da8dSAndroid Build Coastguard Workerimport sys 4*cda5da8dSAndroid Build Coastguard Worker 5*cda5da8dSAndroid Build Coastguard Workerif sys.platform != 'win32': # pragma: no cover 6*cda5da8dSAndroid Build Coastguard Worker raise ImportError('win32 only') 7*cda5da8dSAndroid Build Coastguard Worker 8*cda5da8dSAndroid Build Coastguard Workerimport _overlapped 9*cda5da8dSAndroid Build Coastguard Workerimport _winapi 10*cda5da8dSAndroid Build Coastguard Workerimport errno 11*cda5da8dSAndroid Build Coastguard Workerimport math 12*cda5da8dSAndroid Build Coastguard Workerimport msvcrt 13*cda5da8dSAndroid Build Coastguard Workerimport socket 14*cda5da8dSAndroid Build Coastguard Workerimport struct 15*cda5da8dSAndroid Build Coastguard Workerimport time 16*cda5da8dSAndroid Build Coastguard Workerimport weakref 17*cda5da8dSAndroid Build Coastguard Worker 18*cda5da8dSAndroid Build Coastguard Workerfrom . import events 19*cda5da8dSAndroid Build Coastguard Workerfrom . import base_subprocess 20*cda5da8dSAndroid Build Coastguard Workerfrom . import futures 21*cda5da8dSAndroid Build Coastguard Workerfrom . import exceptions 22*cda5da8dSAndroid Build Coastguard Workerfrom . import proactor_events 23*cda5da8dSAndroid Build Coastguard Workerfrom . import selector_events 24*cda5da8dSAndroid Build Coastguard Workerfrom . import tasks 25*cda5da8dSAndroid Build Coastguard Workerfrom . import windows_utils 26*cda5da8dSAndroid Build Coastguard Workerfrom .log import logger 27*cda5da8dSAndroid Build Coastguard Worker 28*cda5da8dSAndroid Build Coastguard Worker 29*cda5da8dSAndroid Build Coastguard Worker__all__ = ( 30*cda5da8dSAndroid Build Coastguard Worker 'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor', 31*cda5da8dSAndroid Build Coastguard Worker 'DefaultEventLoopPolicy', 'WindowsSelectorEventLoopPolicy', 32*cda5da8dSAndroid Build Coastguard Worker 'WindowsProactorEventLoopPolicy', 33*cda5da8dSAndroid Build Coastguard Worker) 34*cda5da8dSAndroid Build Coastguard Worker 35*cda5da8dSAndroid Build Coastguard Worker 36*cda5da8dSAndroid Build Coastguard WorkerNULL = _winapi.NULL 37*cda5da8dSAndroid Build Coastguard WorkerINFINITE = _winapi.INFINITE 38*cda5da8dSAndroid Build Coastguard WorkerERROR_CONNECTION_REFUSED = 1225 39*cda5da8dSAndroid Build Coastguard WorkerERROR_CONNECTION_ABORTED = 1236 40*cda5da8dSAndroid Build Coastguard Worker 41*cda5da8dSAndroid Build Coastguard Worker# Initial delay in seconds for connect_pipe() before retrying to connect 42*cda5da8dSAndroid Build Coastguard WorkerCONNECT_PIPE_INIT_DELAY = 0.001 43*cda5da8dSAndroid Build Coastguard Worker 44*cda5da8dSAndroid Build Coastguard Worker# Maximum delay in seconds for connect_pipe() before retrying to connect 45*cda5da8dSAndroid Build Coastguard WorkerCONNECT_PIPE_MAX_DELAY = 0.100 46*cda5da8dSAndroid Build Coastguard Worker 47*cda5da8dSAndroid Build Coastguard Worker 48*cda5da8dSAndroid Build Coastguard Workerclass _OverlappedFuture(futures.Future): 49*cda5da8dSAndroid Build Coastguard Worker """Subclass of Future which represents an overlapped operation. 50*cda5da8dSAndroid Build Coastguard Worker 51*cda5da8dSAndroid Build Coastguard Worker Cancelling it will immediately cancel the overlapped operation. 52*cda5da8dSAndroid Build Coastguard Worker """ 53*cda5da8dSAndroid Build Coastguard Worker 54*cda5da8dSAndroid Build Coastguard Worker def __init__(self, ov, *, loop=None): 55*cda5da8dSAndroid Build Coastguard Worker super().__init__(loop=loop) 56*cda5da8dSAndroid Build Coastguard Worker if self._source_traceback: 57*cda5da8dSAndroid Build Coastguard Worker del self._source_traceback[-1] 58*cda5da8dSAndroid Build Coastguard Worker self._ov = ov 59*cda5da8dSAndroid Build Coastguard Worker 60*cda5da8dSAndroid Build Coastguard Worker def _repr_info(self): 61*cda5da8dSAndroid Build Coastguard Worker info = super()._repr_info() 62*cda5da8dSAndroid Build Coastguard Worker if self._ov is not None: 63*cda5da8dSAndroid Build Coastguard Worker state = 'pending' if self._ov.pending else 'completed' 64*cda5da8dSAndroid Build Coastguard Worker info.insert(1, f'overlapped=<{state}, {self._ov.address:#x}>') 65*cda5da8dSAndroid Build Coastguard Worker return info 66*cda5da8dSAndroid Build Coastguard Worker 67*cda5da8dSAndroid Build Coastguard Worker def _cancel_overlapped(self): 68*cda5da8dSAndroid Build Coastguard Worker if self._ov is None: 69*cda5da8dSAndroid Build Coastguard Worker return 70*cda5da8dSAndroid Build Coastguard Worker try: 71*cda5da8dSAndroid Build Coastguard Worker self._ov.cancel() 72*cda5da8dSAndroid Build Coastguard Worker except OSError as exc: 73*cda5da8dSAndroid Build Coastguard Worker context = { 74*cda5da8dSAndroid Build Coastguard Worker 'message': 'Cancelling an overlapped future failed', 75*cda5da8dSAndroid Build Coastguard Worker 'exception': exc, 76*cda5da8dSAndroid Build Coastguard Worker 'future': self, 77*cda5da8dSAndroid Build Coastguard Worker } 78*cda5da8dSAndroid Build Coastguard Worker if self._source_traceback: 79*cda5da8dSAndroid Build Coastguard Worker context['source_traceback'] = self._source_traceback 80*cda5da8dSAndroid Build Coastguard Worker self._loop.call_exception_handler(context) 81*cda5da8dSAndroid Build Coastguard Worker self._ov = None 82*cda5da8dSAndroid Build Coastguard Worker 83*cda5da8dSAndroid Build Coastguard Worker def cancel(self, msg=None): 84*cda5da8dSAndroid Build Coastguard Worker self._cancel_overlapped() 85*cda5da8dSAndroid Build Coastguard Worker return super().cancel(msg=msg) 86*cda5da8dSAndroid Build Coastguard Worker 87*cda5da8dSAndroid Build Coastguard Worker def set_exception(self, exception): 88*cda5da8dSAndroid Build Coastguard Worker super().set_exception(exception) 89*cda5da8dSAndroid Build Coastguard Worker self._cancel_overlapped() 90*cda5da8dSAndroid Build Coastguard Worker 91*cda5da8dSAndroid Build Coastguard Worker def set_result(self, result): 92*cda5da8dSAndroid Build Coastguard Worker super().set_result(result) 93*cda5da8dSAndroid Build Coastguard Worker self._ov = None 94*cda5da8dSAndroid Build Coastguard Worker 95*cda5da8dSAndroid Build Coastguard Worker 96*cda5da8dSAndroid Build Coastguard Workerclass _BaseWaitHandleFuture(futures.Future): 97*cda5da8dSAndroid Build Coastguard Worker """Subclass of Future which represents a wait handle.""" 98*cda5da8dSAndroid Build Coastguard Worker 99*cda5da8dSAndroid Build Coastguard Worker def __init__(self, ov, handle, wait_handle, *, loop=None): 100*cda5da8dSAndroid Build Coastguard Worker super().__init__(loop=loop) 101*cda5da8dSAndroid Build Coastguard Worker if self._source_traceback: 102*cda5da8dSAndroid Build Coastguard Worker del self._source_traceback[-1] 103*cda5da8dSAndroid Build Coastguard Worker # Keep a reference to the Overlapped object to keep it alive until the 104*cda5da8dSAndroid Build Coastguard Worker # wait is unregistered 105*cda5da8dSAndroid Build Coastguard Worker self._ov = ov 106*cda5da8dSAndroid Build Coastguard Worker self._handle = handle 107*cda5da8dSAndroid Build Coastguard Worker self._wait_handle = wait_handle 108*cda5da8dSAndroid Build Coastguard Worker 109*cda5da8dSAndroid Build Coastguard Worker # Should we call UnregisterWaitEx() if the wait completes 110*cda5da8dSAndroid Build Coastguard Worker # or is cancelled? 111*cda5da8dSAndroid Build Coastguard Worker self._registered = True 112*cda5da8dSAndroid Build Coastguard Worker 113*cda5da8dSAndroid Build Coastguard Worker def _poll(self): 114*cda5da8dSAndroid Build Coastguard Worker # non-blocking wait: use a timeout of 0 millisecond 115*cda5da8dSAndroid Build Coastguard Worker return (_winapi.WaitForSingleObject(self._handle, 0) == 116*cda5da8dSAndroid Build Coastguard Worker _winapi.WAIT_OBJECT_0) 117*cda5da8dSAndroid Build Coastguard Worker 118*cda5da8dSAndroid Build Coastguard Worker def _repr_info(self): 119*cda5da8dSAndroid Build Coastguard Worker info = super()._repr_info() 120*cda5da8dSAndroid Build Coastguard Worker info.append(f'handle={self._handle:#x}') 121*cda5da8dSAndroid Build Coastguard Worker if self._handle is not None: 122*cda5da8dSAndroid Build Coastguard Worker state = 'signaled' if self._poll() else 'waiting' 123*cda5da8dSAndroid Build Coastguard Worker info.append(state) 124*cda5da8dSAndroid Build Coastguard Worker if self._wait_handle is not None: 125*cda5da8dSAndroid Build Coastguard Worker info.append(f'wait_handle={self._wait_handle:#x}') 126*cda5da8dSAndroid Build Coastguard Worker return info 127*cda5da8dSAndroid Build Coastguard Worker 128*cda5da8dSAndroid Build Coastguard Worker def _unregister_wait_cb(self, fut): 129*cda5da8dSAndroid Build Coastguard Worker # The wait was unregistered: it's not safe to destroy the Overlapped 130*cda5da8dSAndroid Build Coastguard Worker # object 131*cda5da8dSAndroid Build Coastguard Worker self._ov = None 132*cda5da8dSAndroid Build Coastguard Worker 133*cda5da8dSAndroid Build Coastguard Worker def _unregister_wait(self): 134*cda5da8dSAndroid Build Coastguard Worker if not self._registered: 135*cda5da8dSAndroid Build Coastguard Worker return 136*cda5da8dSAndroid Build Coastguard Worker self._registered = False 137*cda5da8dSAndroid Build Coastguard Worker 138*cda5da8dSAndroid Build Coastguard Worker wait_handle = self._wait_handle 139*cda5da8dSAndroid Build Coastguard Worker self._wait_handle = None 140*cda5da8dSAndroid Build Coastguard Worker try: 141*cda5da8dSAndroid Build Coastguard Worker _overlapped.UnregisterWait(wait_handle) 142*cda5da8dSAndroid Build Coastguard Worker except OSError as exc: 143*cda5da8dSAndroid Build Coastguard Worker if exc.winerror != _overlapped.ERROR_IO_PENDING: 144*cda5da8dSAndroid Build Coastguard Worker context = { 145*cda5da8dSAndroid Build Coastguard Worker 'message': 'Failed to unregister the wait handle', 146*cda5da8dSAndroid Build Coastguard Worker 'exception': exc, 147*cda5da8dSAndroid Build Coastguard Worker 'future': self, 148*cda5da8dSAndroid Build Coastguard Worker } 149*cda5da8dSAndroid Build Coastguard Worker if self._source_traceback: 150*cda5da8dSAndroid Build Coastguard Worker context['source_traceback'] = self._source_traceback 151*cda5da8dSAndroid Build Coastguard Worker self._loop.call_exception_handler(context) 152*cda5da8dSAndroid Build Coastguard Worker return 153*cda5da8dSAndroid Build Coastguard Worker # ERROR_IO_PENDING means that the unregister is pending 154*cda5da8dSAndroid Build Coastguard Worker 155*cda5da8dSAndroid Build Coastguard Worker self._unregister_wait_cb(None) 156*cda5da8dSAndroid Build Coastguard Worker 157*cda5da8dSAndroid Build Coastguard Worker def cancel(self, msg=None): 158*cda5da8dSAndroid Build Coastguard Worker self._unregister_wait() 159*cda5da8dSAndroid Build Coastguard Worker return super().cancel(msg=msg) 160*cda5da8dSAndroid Build Coastguard Worker 161*cda5da8dSAndroid Build Coastguard Worker def set_exception(self, exception): 162*cda5da8dSAndroid Build Coastguard Worker self._unregister_wait() 163*cda5da8dSAndroid Build Coastguard Worker super().set_exception(exception) 164*cda5da8dSAndroid Build Coastguard Worker 165*cda5da8dSAndroid Build Coastguard Worker def set_result(self, result): 166*cda5da8dSAndroid Build Coastguard Worker self._unregister_wait() 167*cda5da8dSAndroid Build Coastguard Worker super().set_result(result) 168*cda5da8dSAndroid Build Coastguard Worker 169*cda5da8dSAndroid Build Coastguard Worker 170*cda5da8dSAndroid Build Coastguard Workerclass _WaitCancelFuture(_BaseWaitHandleFuture): 171*cda5da8dSAndroid Build Coastguard Worker """Subclass of Future which represents a wait for the cancellation of a 172*cda5da8dSAndroid Build Coastguard Worker _WaitHandleFuture using an event. 173*cda5da8dSAndroid Build Coastguard Worker """ 174*cda5da8dSAndroid Build Coastguard Worker 175*cda5da8dSAndroid Build Coastguard Worker def __init__(self, ov, event, wait_handle, *, loop=None): 176*cda5da8dSAndroid Build Coastguard Worker super().__init__(ov, event, wait_handle, loop=loop) 177*cda5da8dSAndroid Build Coastguard Worker 178*cda5da8dSAndroid Build Coastguard Worker self._done_callback = None 179*cda5da8dSAndroid Build Coastguard Worker 180*cda5da8dSAndroid Build Coastguard Worker def cancel(self): 181*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError("_WaitCancelFuture must not be cancelled") 182*cda5da8dSAndroid Build Coastguard Worker 183*cda5da8dSAndroid Build Coastguard Worker def set_result(self, result): 184*cda5da8dSAndroid Build Coastguard Worker super().set_result(result) 185*cda5da8dSAndroid Build Coastguard Worker if self._done_callback is not None: 186*cda5da8dSAndroid Build Coastguard Worker self._done_callback(self) 187*cda5da8dSAndroid Build Coastguard Worker 188*cda5da8dSAndroid Build Coastguard Worker def set_exception(self, exception): 189*cda5da8dSAndroid Build Coastguard Worker super().set_exception(exception) 190*cda5da8dSAndroid Build Coastguard Worker if self._done_callback is not None: 191*cda5da8dSAndroid Build Coastguard Worker self._done_callback(self) 192*cda5da8dSAndroid Build Coastguard Worker 193*cda5da8dSAndroid Build Coastguard Worker 194*cda5da8dSAndroid Build Coastguard Workerclass _WaitHandleFuture(_BaseWaitHandleFuture): 195*cda5da8dSAndroid Build Coastguard Worker def __init__(self, ov, handle, wait_handle, proactor, *, loop=None): 196*cda5da8dSAndroid Build Coastguard Worker super().__init__(ov, handle, wait_handle, loop=loop) 197*cda5da8dSAndroid Build Coastguard Worker self._proactor = proactor 198*cda5da8dSAndroid Build Coastguard Worker self._unregister_proactor = True 199*cda5da8dSAndroid Build Coastguard Worker self._event = _overlapped.CreateEvent(None, True, False, None) 200*cda5da8dSAndroid Build Coastguard Worker self._event_fut = None 201*cda5da8dSAndroid Build Coastguard Worker 202*cda5da8dSAndroid Build Coastguard Worker def _unregister_wait_cb(self, fut): 203*cda5da8dSAndroid Build Coastguard Worker if self._event is not None: 204*cda5da8dSAndroid Build Coastguard Worker _winapi.CloseHandle(self._event) 205*cda5da8dSAndroid Build Coastguard Worker self._event = None 206*cda5da8dSAndroid Build Coastguard Worker self._event_fut = None 207*cda5da8dSAndroid Build Coastguard Worker 208*cda5da8dSAndroid Build Coastguard Worker # If the wait was cancelled, the wait may never be signalled, so 209*cda5da8dSAndroid Build Coastguard Worker # it's required to unregister it. Otherwise, IocpProactor.close() will 210*cda5da8dSAndroid Build Coastguard Worker # wait forever for an event which will never come. 211*cda5da8dSAndroid Build Coastguard Worker # 212*cda5da8dSAndroid Build Coastguard Worker # If the IocpProactor already received the event, it's safe to call 213*cda5da8dSAndroid Build Coastguard Worker # _unregister() because we kept a reference to the Overlapped object 214*cda5da8dSAndroid Build Coastguard Worker # which is used as a unique key. 215*cda5da8dSAndroid Build Coastguard Worker self._proactor._unregister(self._ov) 216*cda5da8dSAndroid Build Coastguard Worker self._proactor = None 217*cda5da8dSAndroid Build Coastguard Worker 218*cda5da8dSAndroid Build Coastguard Worker super()._unregister_wait_cb(fut) 219*cda5da8dSAndroid Build Coastguard Worker 220*cda5da8dSAndroid Build Coastguard Worker def _unregister_wait(self): 221*cda5da8dSAndroid Build Coastguard Worker if not self._registered: 222*cda5da8dSAndroid Build Coastguard Worker return 223*cda5da8dSAndroid Build Coastguard Worker self._registered = False 224*cda5da8dSAndroid Build Coastguard Worker 225*cda5da8dSAndroid Build Coastguard Worker wait_handle = self._wait_handle 226*cda5da8dSAndroid Build Coastguard Worker self._wait_handle = None 227*cda5da8dSAndroid Build Coastguard Worker try: 228*cda5da8dSAndroid Build Coastguard Worker _overlapped.UnregisterWaitEx(wait_handle, self._event) 229*cda5da8dSAndroid Build Coastguard Worker except OSError as exc: 230*cda5da8dSAndroid Build Coastguard Worker if exc.winerror != _overlapped.ERROR_IO_PENDING: 231*cda5da8dSAndroid Build Coastguard Worker context = { 232*cda5da8dSAndroid Build Coastguard Worker 'message': 'Failed to unregister the wait handle', 233*cda5da8dSAndroid Build Coastguard Worker 'exception': exc, 234*cda5da8dSAndroid Build Coastguard Worker 'future': self, 235*cda5da8dSAndroid Build Coastguard Worker } 236*cda5da8dSAndroid Build Coastguard Worker if self._source_traceback: 237*cda5da8dSAndroid Build Coastguard Worker context['source_traceback'] = self._source_traceback 238*cda5da8dSAndroid Build Coastguard Worker self._loop.call_exception_handler(context) 239*cda5da8dSAndroid Build Coastguard Worker return 240*cda5da8dSAndroid Build Coastguard Worker # ERROR_IO_PENDING is not an error, the wait was unregistered 241*cda5da8dSAndroid Build Coastguard Worker 242*cda5da8dSAndroid Build Coastguard Worker self._event_fut = self._proactor._wait_cancel(self._event, 243*cda5da8dSAndroid Build Coastguard Worker self._unregister_wait_cb) 244*cda5da8dSAndroid Build Coastguard Worker 245*cda5da8dSAndroid Build Coastguard Worker 246*cda5da8dSAndroid Build Coastguard Workerclass PipeServer(object): 247*cda5da8dSAndroid Build Coastguard Worker """Class representing a pipe server. 248*cda5da8dSAndroid Build Coastguard Worker 249*cda5da8dSAndroid Build Coastguard Worker This is much like a bound, listening socket. 250*cda5da8dSAndroid Build Coastguard Worker """ 251*cda5da8dSAndroid Build Coastguard Worker def __init__(self, address): 252*cda5da8dSAndroid Build Coastguard Worker self._address = address 253*cda5da8dSAndroid Build Coastguard Worker self._free_instances = weakref.WeakSet() 254*cda5da8dSAndroid Build Coastguard Worker # initialize the pipe attribute before calling _server_pipe_handle() 255*cda5da8dSAndroid Build Coastguard Worker # because this function can raise an exception and the destructor calls 256*cda5da8dSAndroid Build Coastguard Worker # the close() method 257*cda5da8dSAndroid Build Coastguard Worker self._pipe = None 258*cda5da8dSAndroid Build Coastguard Worker self._accept_pipe_future = None 259*cda5da8dSAndroid Build Coastguard Worker self._pipe = self._server_pipe_handle(True) 260*cda5da8dSAndroid Build Coastguard Worker 261*cda5da8dSAndroid Build Coastguard Worker def _get_unconnected_pipe(self): 262*cda5da8dSAndroid Build Coastguard Worker # Create new instance and return previous one. This ensures 263*cda5da8dSAndroid Build Coastguard Worker # that (until the server is closed) there is always at least 264*cda5da8dSAndroid Build Coastguard Worker # one pipe handle for address. Therefore if a client attempt 265*cda5da8dSAndroid Build Coastguard Worker # to connect it will not fail with FileNotFoundError. 266*cda5da8dSAndroid Build Coastguard Worker tmp, self._pipe = self._pipe, self._server_pipe_handle(False) 267*cda5da8dSAndroid Build Coastguard Worker return tmp 268*cda5da8dSAndroid Build Coastguard Worker 269*cda5da8dSAndroid Build Coastguard Worker def _server_pipe_handle(self, first): 270*cda5da8dSAndroid Build Coastguard Worker # Return a wrapper for a new pipe handle. 271*cda5da8dSAndroid Build Coastguard Worker if self.closed(): 272*cda5da8dSAndroid Build Coastguard Worker return None 273*cda5da8dSAndroid Build Coastguard Worker flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED 274*cda5da8dSAndroid Build Coastguard Worker if first: 275*cda5da8dSAndroid Build Coastguard Worker flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE 276*cda5da8dSAndroid Build Coastguard Worker h = _winapi.CreateNamedPipe( 277*cda5da8dSAndroid Build Coastguard Worker self._address, flags, 278*cda5da8dSAndroid Build Coastguard Worker _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | 279*cda5da8dSAndroid Build Coastguard Worker _winapi.PIPE_WAIT, 280*cda5da8dSAndroid Build Coastguard Worker _winapi.PIPE_UNLIMITED_INSTANCES, 281*cda5da8dSAndroid Build Coastguard Worker windows_utils.BUFSIZE, windows_utils.BUFSIZE, 282*cda5da8dSAndroid Build Coastguard Worker _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL) 283*cda5da8dSAndroid Build Coastguard Worker pipe = windows_utils.PipeHandle(h) 284*cda5da8dSAndroid Build Coastguard Worker self._free_instances.add(pipe) 285*cda5da8dSAndroid Build Coastguard Worker return pipe 286*cda5da8dSAndroid Build Coastguard Worker 287*cda5da8dSAndroid Build Coastguard Worker def closed(self): 288*cda5da8dSAndroid Build Coastguard Worker return (self._address is None) 289*cda5da8dSAndroid Build Coastguard Worker 290*cda5da8dSAndroid Build Coastguard Worker def close(self): 291*cda5da8dSAndroid Build Coastguard Worker if self._accept_pipe_future is not None: 292*cda5da8dSAndroid Build Coastguard Worker self._accept_pipe_future.cancel() 293*cda5da8dSAndroid Build Coastguard Worker self._accept_pipe_future = None 294*cda5da8dSAndroid Build Coastguard Worker # Close all instances which have not been connected to by a client. 295*cda5da8dSAndroid Build Coastguard Worker if self._address is not None: 296*cda5da8dSAndroid Build Coastguard Worker for pipe in self._free_instances: 297*cda5da8dSAndroid Build Coastguard Worker pipe.close() 298*cda5da8dSAndroid Build Coastguard Worker self._pipe = None 299*cda5da8dSAndroid Build Coastguard Worker self._address = None 300*cda5da8dSAndroid Build Coastguard Worker self._free_instances.clear() 301*cda5da8dSAndroid Build Coastguard Worker 302*cda5da8dSAndroid Build Coastguard Worker __del__ = close 303*cda5da8dSAndroid Build Coastguard Worker 304*cda5da8dSAndroid Build Coastguard Worker 305*cda5da8dSAndroid Build Coastguard Workerclass _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop): 306*cda5da8dSAndroid Build Coastguard Worker """Windows version of selector event loop.""" 307*cda5da8dSAndroid Build Coastguard Worker 308*cda5da8dSAndroid Build Coastguard Worker 309*cda5da8dSAndroid Build Coastguard Workerclass ProactorEventLoop(proactor_events.BaseProactorEventLoop): 310*cda5da8dSAndroid Build Coastguard Worker """Windows version of proactor event loop using IOCP.""" 311*cda5da8dSAndroid Build Coastguard Worker 312*cda5da8dSAndroid Build Coastguard Worker def __init__(self, proactor=None): 313*cda5da8dSAndroid Build Coastguard Worker if proactor is None: 314*cda5da8dSAndroid Build Coastguard Worker proactor = IocpProactor() 315*cda5da8dSAndroid Build Coastguard Worker super().__init__(proactor) 316*cda5da8dSAndroid Build Coastguard Worker 317*cda5da8dSAndroid Build Coastguard Worker def run_forever(self): 318*cda5da8dSAndroid Build Coastguard Worker try: 319*cda5da8dSAndroid Build Coastguard Worker assert self._self_reading_future is None 320*cda5da8dSAndroid Build Coastguard Worker self.call_soon(self._loop_self_reading) 321*cda5da8dSAndroid Build Coastguard Worker super().run_forever() 322*cda5da8dSAndroid Build Coastguard Worker finally: 323*cda5da8dSAndroid Build Coastguard Worker if self._self_reading_future is not None: 324*cda5da8dSAndroid Build Coastguard Worker ov = self._self_reading_future._ov 325*cda5da8dSAndroid Build Coastguard Worker self._self_reading_future.cancel() 326*cda5da8dSAndroid Build Coastguard Worker # self_reading_future was just cancelled so if it hasn't been 327*cda5da8dSAndroid Build Coastguard Worker # finished yet, it never will be (it's possible that it has 328*cda5da8dSAndroid Build Coastguard Worker # already finished and its callback is waiting in the queue, 329*cda5da8dSAndroid Build Coastguard Worker # where it could still happen if the event loop is restarted). 330*cda5da8dSAndroid Build Coastguard Worker # Unregister it otherwise IocpProactor.close will wait for it 331*cda5da8dSAndroid Build Coastguard Worker # forever 332*cda5da8dSAndroid Build Coastguard Worker if ov is not None: 333*cda5da8dSAndroid Build Coastguard Worker self._proactor._unregister(ov) 334*cda5da8dSAndroid Build Coastguard Worker self._self_reading_future = None 335*cda5da8dSAndroid Build Coastguard Worker 336*cda5da8dSAndroid Build Coastguard Worker async def create_pipe_connection(self, protocol_factory, address): 337*cda5da8dSAndroid Build Coastguard Worker f = self._proactor.connect_pipe(address) 338*cda5da8dSAndroid Build Coastguard Worker pipe = await f 339*cda5da8dSAndroid Build Coastguard Worker protocol = protocol_factory() 340*cda5da8dSAndroid Build Coastguard Worker trans = self._make_duplex_pipe_transport(pipe, protocol, 341*cda5da8dSAndroid Build Coastguard Worker extra={'addr': address}) 342*cda5da8dSAndroid Build Coastguard Worker return trans, protocol 343*cda5da8dSAndroid Build Coastguard Worker 344*cda5da8dSAndroid Build Coastguard Worker async def start_serving_pipe(self, protocol_factory, address): 345*cda5da8dSAndroid Build Coastguard Worker server = PipeServer(address) 346*cda5da8dSAndroid Build Coastguard Worker 347*cda5da8dSAndroid Build Coastguard Worker def loop_accept_pipe(f=None): 348*cda5da8dSAndroid Build Coastguard Worker pipe = None 349*cda5da8dSAndroid Build Coastguard Worker try: 350*cda5da8dSAndroid Build Coastguard Worker if f: 351*cda5da8dSAndroid Build Coastguard Worker pipe = f.result() 352*cda5da8dSAndroid Build Coastguard Worker server._free_instances.discard(pipe) 353*cda5da8dSAndroid Build Coastguard Worker 354*cda5da8dSAndroid Build Coastguard Worker if server.closed(): 355*cda5da8dSAndroid Build Coastguard Worker # A client connected before the server was closed: 356*cda5da8dSAndroid Build Coastguard Worker # drop the client (close the pipe) and exit 357*cda5da8dSAndroid Build Coastguard Worker pipe.close() 358*cda5da8dSAndroid Build Coastguard Worker return 359*cda5da8dSAndroid Build Coastguard Worker 360*cda5da8dSAndroid Build Coastguard Worker protocol = protocol_factory() 361*cda5da8dSAndroid Build Coastguard Worker self._make_duplex_pipe_transport( 362*cda5da8dSAndroid Build Coastguard Worker pipe, protocol, extra={'addr': address}) 363*cda5da8dSAndroid Build Coastguard Worker 364*cda5da8dSAndroid Build Coastguard Worker pipe = server._get_unconnected_pipe() 365*cda5da8dSAndroid Build Coastguard Worker if pipe is None: 366*cda5da8dSAndroid Build Coastguard Worker return 367*cda5da8dSAndroid Build Coastguard Worker 368*cda5da8dSAndroid Build Coastguard Worker f = self._proactor.accept_pipe(pipe) 369*cda5da8dSAndroid Build Coastguard Worker except BrokenPipeError: 370*cda5da8dSAndroid Build Coastguard Worker if pipe and pipe.fileno() != -1: 371*cda5da8dSAndroid Build Coastguard Worker pipe.close() 372*cda5da8dSAndroid Build Coastguard Worker self.call_soon(loop_accept_pipe) 373*cda5da8dSAndroid Build Coastguard Worker except OSError as exc: 374*cda5da8dSAndroid Build Coastguard Worker if pipe and pipe.fileno() != -1: 375*cda5da8dSAndroid Build Coastguard Worker self.call_exception_handler({ 376*cda5da8dSAndroid Build Coastguard Worker 'message': 'Pipe accept failed', 377*cda5da8dSAndroid Build Coastguard Worker 'exception': exc, 378*cda5da8dSAndroid Build Coastguard Worker 'pipe': pipe, 379*cda5da8dSAndroid Build Coastguard Worker }) 380*cda5da8dSAndroid Build Coastguard Worker pipe.close() 381*cda5da8dSAndroid Build Coastguard Worker elif self._debug: 382*cda5da8dSAndroid Build Coastguard Worker logger.warning("Accept pipe failed on pipe %r", 383*cda5da8dSAndroid Build Coastguard Worker pipe, exc_info=True) 384*cda5da8dSAndroid Build Coastguard Worker self.call_soon(loop_accept_pipe) 385*cda5da8dSAndroid Build Coastguard Worker except exceptions.CancelledError: 386*cda5da8dSAndroid Build Coastguard Worker if pipe: 387*cda5da8dSAndroid Build Coastguard Worker pipe.close() 388*cda5da8dSAndroid Build Coastguard Worker else: 389*cda5da8dSAndroid Build Coastguard Worker server._accept_pipe_future = f 390*cda5da8dSAndroid Build Coastguard Worker f.add_done_callback(loop_accept_pipe) 391*cda5da8dSAndroid Build Coastguard Worker 392*cda5da8dSAndroid Build Coastguard Worker self.call_soon(loop_accept_pipe) 393*cda5da8dSAndroid Build Coastguard Worker return [server] 394*cda5da8dSAndroid Build Coastguard Worker 395*cda5da8dSAndroid Build Coastguard Worker async def _make_subprocess_transport(self, protocol, args, shell, 396*cda5da8dSAndroid Build Coastguard Worker stdin, stdout, stderr, bufsize, 397*cda5da8dSAndroid Build Coastguard Worker extra=None, **kwargs): 398*cda5da8dSAndroid Build Coastguard Worker waiter = self.create_future() 399*cda5da8dSAndroid Build Coastguard Worker transp = _WindowsSubprocessTransport(self, protocol, args, shell, 400*cda5da8dSAndroid Build Coastguard Worker stdin, stdout, stderr, bufsize, 401*cda5da8dSAndroid Build Coastguard Worker waiter=waiter, extra=extra, 402*cda5da8dSAndroid Build Coastguard Worker **kwargs) 403*cda5da8dSAndroid Build Coastguard Worker try: 404*cda5da8dSAndroid Build Coastguard Worker await waiter 405*cda5da8dSAndroid Build Coastguard Worker except (SystemExit, KeyboardInterrupt): 406*cda5da8dSAndroid Build Coastguard Worker raise 407*cda5da8dSAndroid Build Coastguard Worker except BaseException: 408*cda5da8dSAndroid Build Coastguard Worker transp.close() 409*cda5da8dSAndroid Build Coastguard Worker await transp._wait() 410*cda5da8dSAndroid Build Coastguard Worker raise 411*cda5da8dSAndroid Build Coastguard Worker 412*cda5da8dSAndroid Build Coastguard Worker return transp 413*cda5da8dSAndroid Build Coastguard Worker 414*cda5da8dSAndroid Build Coastguard Worker 415*cda5da8dSAndroid Build Coastguard Workerclass IocpProactor: 416*cda5da8dSAndroid Build Coastguard Worker """Proactor implementation using IOCP.""" 417*cda5da8dSAndroid Build Coastguard Worker 418*cda5da8dSAndroid Build Coastguard Worker def __init__(self, concurrency=INFINITE): 419*cda5da8dSAndroid Build Coastguard Worker self._loop = None 420*cda5da8dSAndroid Build Coastguard Worker self._results = [] 421*cda5da8dSAndroid Build Coastguard Worker self._iocp = _overlapped.CreateIoCompletionPort( 422*cda5da8dSAndroid Build Coastguard Worker _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency) 423*cda5da8dSAndroid Build Coastguard Worker self._cache = {} 424*cda5da8dSAndroid Build Coastguard Worker self._registered = weakref.WeakSet() 425*cda5da8dSAndroid Build Coastguard Worker self._unregistered = [] 426*cda5da8dSAndroid Build Coastguard Worker self._stopped_serving = weakref.WeakSet() 427*cda5da8dSAndroid Build Coastguard Worker 428*cda5da8dSAndroid Build Coastguard Worker def _check_closed(self): 429*cda5da8dSAndroid Build Coastguard Worker if self._iocp is None: 430*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError('IocpProactor is closed') 431*cda5da8dSAndroid Build Coastguard Worker 432*cda5da8dSAndroid Build Coastguard Worker def __repr__(self): 433*cda5da8dSAndroid Build Coastguard Worker info = ['overlapped#=%s' % len(self._cache), 434*cda5da8dSAndroid Build Coastguard Worker 'result#=%s' % len(self._results)] 435*cda5da8dSAndroid Build Coastguard Worker if self._iocp is None: 436*cda5da8dSAndroid Build Coastguard Worker info.append('closed') 437*cda5da8dSAndroid Build Coastguard Worker return '<%s %s>' % (self.__class__.__name__, " ".join(info)) 438*cda5da8dSAndroid Build Coastguard Worker 439*cda5da8dSAndroid Build Coastguard Worker def set_loop(self, loop): 440*cda5da8dSAndroid Build Coastguard Worker self._loop = loop 441*cda5da8dSAndroid Build Coastguard Worker 442*cda5da8dSAndroid Build Coastguard Worker def select(self, timeout=None): 443*cda5da8dSAndroid Build Coastguard Worker if not self._results: 444*cda5da8dSAndroid Build Coastguard Worker self._poll(timeout) 445*cda5da8dSAndroid Build Coastguard Worker tmp = self._results 446*cda5da8dSAndroid Build Coastguard Worker self._results = [] 447*cda5da8dSAndroid Build Coastguard Worker try: 448*cda5da8dSAndroid Build Coastguard Worker return tmp 449*cda5da8dSAndroid Build Coastguard Worker finally: 450*cda5da8dSAndroid Build Coastguard Worker # Needed to break cycles when an exception occurs. 451*cda5da8dSAndroid Build Coastguard Worker tmp = None 452*cda5da8dSAndroid Build Coastguard Worker 453*cda5da8dSAndroid Build Coastguard Worker def _result(self, value): 454*cda5da8dSAndroid Build Coastguard Worker fut = self._loop.create_future() 455*cda5da8dSAndroid Build Coastguard Worker fut.set_result(value) 456*cda5da8dSAndroid Build Coastguard Worker return fut 457*cda5da8dSAndroid Build Coastguard Worker 458*cda5da8dSAndroid Build Coastguard Worker def recv(self, conn, nbytes, flags=0): 459*cda5da8dSAndroid Build Coastguard Worker self._register_with_iocp(conn) 460*cda5da8dSAndroid Build Coastguard Worker ov = _overlapped.Overlapped(NULL) 461*cda5da8dSAndroid Build Coastguard Worker try: 462*cda5da8dSAndroid Build Coastguard Worker if isinstance(conn, socket.socket): 463*cda5da8dSAndroid Build Coastguard Worker ov.WSARecv(conn.fileno(), nbytes, flags) 464*cda5da8dSAndroid Build Coastguard Worker else: 465*cda5da8dSAndroid Build Coastguard Worker ov.ReadFile(conn.fileno(), nbytes) 466*cda5da8dSAndroid Build Coastguard Worker except BrokenPipeError: 467*cda5da8dSAndroid Build Coastguard Worker return self._result(b'') 468*cda5da8dSAndroid Build Coastguard Worker 469*cda5da8dSAndroid Build Coastguard Worker def finish_recv(trans, key, ov): 470*cda5da8dSAndroid Build Coastguard Worker try: 471*cda5da8dSAndroid Build Coastguard Worker return ov.getresult() 472*cda5da8dSAndroid Build Coastguard Worker except OSError as exc: 473*cda5da8dSAndroid Build Coastguard Worker if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 474*cda5da8dSAndroid Build Coastguard Worker _overlapped.ERROR_OPERATION_ABORTED): 475*cda5da8dSAndroid Build Coastguard Worker raise ConnectionResetError(*exc.args) 476*cda5da8dSAndroid Build Coastguard Worker else: 477*cda5da8dSAndroid Build Coastguard Worker raise 478*cda5da8dSAndroid Build Coastguard Worker 479*cda5da8dSAndroid Build Coastguard Worker return self._register(ov, conn, finish_recv) 480*cda5da8dSAndroid Build Coastguard Worker 481*cda5da8dSAndroid Build Coastguard Worker def recv_into(self, conn, buf, flags=0): 482*cda5da8dSAndroid Build Coastguard Worker self._register_with_iocp(conn) 483*cda5da8dSAndroid Build Coastguard Worker ov = _overlapped.Overlapped(NULL) 484*cda5da8dSAndroid Build Coastguard Worker try: 485*cda5da8dSAndroid Build Coastguard Worker if isinstance(conn, socket.socket): 486*cda5da8dSAndroid Build Coastguard Worker ov.WSARecvInto(conn.fileno(), buf, flags) 487*cda5da8dSAndroid Build Coastguard Worker else: 488*cda5da8dSAndroid Build Coastguard Worker ov.ReadFileInto(conn.fileno(), buf) 489*cda5da8dSAndroid Build Coastguard Worker except BrokenPipeError: 490*cda5da8dSAndroid Build Coastguard Worker return self._result(0) 491*cda5da8dSAndroid Build Coastguard Worker 492*cda5da8dSAndroid Build Coastguard Worker def finish_recv(trans, key, ov): 493*cda5da8dSAndroid Build Coastguard Worker try: 494*cda5da8dSAndroid Build Coastguard Worker return ov.getresult() 495*cda5da8dSAndroid Build Coastguard Worker except OSError as exc: 496*cda5da8dSAndroid Build Coastguard Worker if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 497*cda5da8dSAndroid Build Coastguard Worker _overlapped.ERROR_OPERATION_ABORTED): 498*cda5da8dSAndroid Build Coastguard Worker raise ConnectionResetError(*exc.args) 499*cda5da8dSAndroid Build Coastguard Worker else: 500*cda5da8dSAndroid Build Coastguard Worker raise 501*cda5da8dSAndroid Build Coastguard Worker 502*cda5da8dSAndroid Build Coastguard Worker return self._register(ov, conn, finish_recv) 503*cda5da8dSAndroid Build Coastguard Worker 504*cda5da8dSAndroid Build Coastguard Worker def recvfrom(self, conn, nbytes, flags=0): 505*cda5da8dSAndroid Build Coastguard Worker self._register_with_iocp(conn) 506*cda5da8dSAndroid Build Coastguard Worker ov = _overlapped.Overlapped(NULL) 507*cda5da8dSAndroid Build Coastguard Worker try: 508*cda5da8dSAndroid Build Coastguard Worker ov.WSARecvFrom(conn.fileno(), nbytes, flags) 509*cda5da8dSAndroid Build Coastguard Worker except BrokenPipeError: 510*cda5da8dSAndroid Build Coastguard Worker return self._result((b'', None)) 511*cda5da8dSAndroid Build Coastguard Worker 512*cda5da8dSAndroid Build Coastguard Worker def finish_recv(trans, key, ov): 513*cda5da8dSAndroid Build Coastguard Worker try: 514*cda5da8dSAndroid Build Coastguard Worker return ov.getresult() 515*cda5da8dSAndroid Build Coastguard Worker except OSError as exc: 516*cda5da8dSAndroid Build Coastguard Worker if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 517*cda5da8dSAndroid Build Coastguard Worker _overlapped.ERROR_OPERATION_ABORTED): 518*cda5da8dSAndroid Build Coastguard Worker raise ConnectionResetError(*exc.args) 519*cda5da8dSAndroid Build Coastguard Worker else: 520*cda5da8dSAndroid Build Coastguard Worker raise 521*cda5da8dSAndroid Build Coastguard Worker 522*cda5da8dSAndroid Build Coastguard Worker return self._register(ov, conn, finish_recv) 523*cda5da8dSAndroid Build Coastguard Worker 524*cda5da8dSAndroid Build Coastguard Worker def recvfrom_into(self, conn, buf, flags=0): 525*cda5da8dSAndroid Build Coastguard Worker self._register_with_iocp(conn) 526*cda5da8dSAndroid Build Coastguard Worker ov = _overlapped.Overlapped(NULL) 527*cda5da8dSAndroid Build Coastguard Worker try: 528*cda5da8dSAndroid Build Coastguard Worker ov.WSARecvFromInto(conn.fileno(), buf, flags) 529*cda5da8dSAndroid Build Coastguard Worker except BrokenPipeError: 530*cda5da8dSAndroid Build Coastguard Worker return self._result((0, None)) 531*cda5da8dSAndroid Build Coastguard Worker 532*cda5da8dSAndroid Build Coastguard Worker def finish_recv(trans, key, ov): 533*cda5da8dSAndroid Build Coastguard Worker try: 534*cda5da8dSAndroid Build Coastguard Worker return ov.getresult() 535*cda5da8dSAndroid Build Coastguard Worker except OSError as exc: 536*cda5da8dSAndroid Build Coastguard Worker if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 537*cda5da8dSAndroid Build Coastguard Worker _overlapped.ERROR_OPERATION_ABORTED): 538*cda5da8dSAndroid Build Coastguard Worker raise ConnectionResetError(*exc.args) 539*cda5da8dSAndroid Build Coastguard Worker else: 540*cda5da8dSAndroid Build Coastguard Worker raise 541*cda5da8dSAndroid Build Coastguard Worker 542*cda5da8dSAndroid Build Coastguard Worker return self._register(ov, conn, finish_recv) 543*cda5da8dSAndroid Build Coastguard Worker 544*cda5da8dSAndroid Build Coastguard Worker def sendto(self, conn, buf, flags=0, addr=None): 545*cda5da8dSAndroid Build Coastguard Worker self._register_with_iocp(conn) 546*cda5da8dSAndroid Build Coastguard Worker ov = _overlapped.Overlapped(NULL) 547*cda5da8dSAndroid Build Coastguard Worker 548*cda5da8dSAndroid Build Coastguard Worker ov.WSASendTo(conn.fileno(), buf, flags, addr) 549*cda5da8dSAndroid Build Coastguard Worker 550*cda5da8dSAndroid Build Coastguard Worker def finish_send(trans, key, ov): 551*cda5da8dSAndroid Build Coastguard Worker try: 552*cda5da8dSAndroid Build Coastguard Worker return ov.getresult() 553*cda5da8dSAndroid Build Coastguard Worker except OSError as exc: 554*cda5da8dSAndroid Build Coastguard Worker if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 555*cda5da8dSAndroid Build Coastguard Worker _overlapped.ERROR_OPERATION_ABORTED): 556*cda5da8dSAndroid Build Coastguard Worker raise ConnectionResetError(*exc.args) 557*cda5da8dSAndroid Build Coastguard Worker else: 558*cda5da8dSAndroid Build Coastguard Worker raise 559*cda5da8dSAndroid Build Coastguard Worker 560*cda5da8dSAndroid Build Coastguard Worker return self._register(ov, conn, finish_send) 561*cda5da8dSAndroid Build Coastguard Worker 562*cda5da8dSAndroid Build Coastguard Worker def send(self, conn, buf, flags=0): 563*cda5da8dSAndroid Build Coastguard Worker self._register_with_iocp(conn) 564*cda5da8dSAndroid Build Coastguard Worker ov = _overlapped.Overlapped(NULL) 565*cda5da8dSAndroid Build Coastguard Worker if isinstance(conn, socket.socket): 566*cda5da8dSAndroid Build Coastguard Worker ov.WSASend(conn.fileno(), buf, flags) 567*cda5da8dSAndroid Build Coastguard Worker else: 568*cda5da8dSAndroid Build Coastguard Worker ov.WriteFile(conn.fileno(), buf) 569*cda5da8dSAndroid Build Coastguard Worker 570*cda5da8dSAndroid Build Coastguard Worker def finish_send(trans, key, ov): 571*cda5da8dSAndroid Build Coastguard Worker try: 572*cda5da8dSAndroid Build Coastguard Worker return ov.getresult() 573*cda5da8dSAndroid Build Coastguard Worker except OSError as exc: 574*cda5da8dSAndroid Build Coastguard Worker if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 575*cda5da8dSAndroid Build Coastguard Worker _overlapped.ERROR_OPERATION_ABORTED): 576*cda5da8dSAndroid Build Coastguard Worker raise ConnectionResetError(*exc.args) 577*cda5da8dSAndroid Build Coastguard Worker else: 578*cda5da8dSAndroid Build Coastguard Worker raise 579*cda5da8dSAndroid Build Coastguard Worker 580*cda5da8dSAndroid Build Coastguard Worker return self._register(ov, conn, finish_send) 581*cda5da8dSAndroid Build Coastguard Worker 582*cda5da8dSAndroid Build Coastguard Worker def accept(self, listener): 583*cda5da8dSAndroid Build Coastguard Worker self._register_with_iocp(listener) 584*cda5da8dSAndroid Build Coastguard Worker conn = self._get_accept_socket(listener.family) 585*cda5da8dSAndroid Build Coastguard Worker ov = _overlapped.Overlapped(NULL) 586*cda5da8dSAndroid Build Coastguard Worker ov.AcceptEx(listener.fileno(), conn.fileno()) 587*cda5da8dSAndroid Build Coastguard Worker 588*cda5da8dSAndroid Build Coastguard Worker def finish_accept(trans, key, ov): 589*cda5da8dSAndroid Build Coastguard Worker ov.getresult() 590*cda5da8dSAndroid Build Coastguard Worker # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work. 591*cda5da8dSAndroid Build Coastguard Worker buf = struct.pack('@P', listener.fileno()) 592*cda5da8dSAndroid Build Coastguard Worker conn.setsockopt(socket.SOL_SOCKET, 593*cda5da8dSAndroid Build Coastguard Worker _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf) 594*cda5da8dSAndroid Build Coastguard Worker conn.settimeout(listener.gettimeout()) 595*cda5da8dSAndroid Build Coastguard Worker return conn, conn.getpeername() 596*cda5da8dSAndroid Build Coastguard Worker 597*cda5da8dSAndroid Build Coastguard Worker async def accept_coro(future, conn): 598*cda5da8dSAndroid Build Coastguard Worker # Coroutine closing the accept socket if the future is cancelled 599*cda5da8dSAndroid Build Coastguard Worker try: 600*cda5da8dSAndroid Build Coastguard Worker await future 601*cda5da8dSAndroid Build Coastguard Worker except exceptions.CancelledError: 602*cda5da8dSAndroid Build Coastguard Worker conn.close() 603*cda5da8dSAndroid Build Coastguard Worker raise 604*cda5da8dSAndroid Build Coastguard Worker 605*cda5da8dSAndroid Build Coastguard Worker future = self._register(ov, listener, finish_accept) 606*cda5da8dSAndroid Build Coastguard Worker coro = accept_coro(future, conn) 607*cda5da8dSAndroid Build Coastguard Worker tasks.ensure_future(coro, loop=self._loop) 608*cda5da8dSAndroid Build Coastguard Worker return future 609*cda5da8dSAndroid Build Coastguard Worker 610*cda5da8dSAndroid Build Coastguard Worker def connect(self, conn, address): 611*cda5da8dSAndroid Build Coastguard Worker if conn.type == socket.SOCK_DGRAM: 612*cda5da8dSAndroid Build Coastguard Worker # WSAConnect will complete immediately for UDP sockets so we don't 613*cda5da8dSAndroid Build Coastguard Worker # need to register any IOCP operation 614*cda5da8dSAndroid Build Coastguard Worker _overlapped.WSAConnect(conn.fileno(), address) 615*cda5da8dSAndroid Build Coastguard Worker fut = self._loop.create_future() 616*cda5da8dSAndroid Build Coastguard Worker fut.set_result(None) 617*cda5da8dSAndroid Build Coastguard Worker return fut 618*cda5da8dSAndroid Build Coastguard Worker 619*cda5da8dSAndroid Build Coastguard Worker self._register_with_iocp(conn) 620*cda5da8dSAndroid Build Coastguard Worker # The socket needs to be locally bound before we call ConnectEx(). 621*cda5da8dSAndroid Build Coastguard Worker try: 622*cda5da8dSAndroid Build Coastguard Worker _overlapped.BindLocal(conn.fileno(), conn.family) 623*cda5da8dSAndroid Build Coastguard Worker except OSError as e: 624*cda5da8dSAndroid Build Coastguard Worker if e.winerror != errno.WSAEINVAL: 625*cda5da8dSAndroid Build Coastguard Worker raise 626*cda5da8dSAndroid Build Coastguard Worker # Probably already locally bound; check using getsockname(). 627*cda5da8dSAndroid Build Coastguard Worker if conn.getsockname()[1] == 0: 628*cda5da8dSAndroid Build Coastguard Worker raise 629*cda5da8dSAndroid Build Coastguard Worker ov = _overlapped.Overlapped(NULL) 630*cda5da8dSAndroid Build Coastguard Worker ov.ConnectEx(conn.fileno(), address) 631*cda5da8dSAndroid Build Coastguard Worker 632*cda5da8dSAndroid Build Coastguard Worker def finish_connect(trans, key, ov): 633*cda5da8dSAndroid Build Coastguard Worker ov.getresult() 634*cda5da8dSAndroid Build Coastguard Worker # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work. 635*cda5da8dSAndroid Build Coastguard Worker conn.setsockopt(socket.SOL_SOCKET, 636*cda5da8dSAndroid Build Coastguard Worker _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0) 637*cda5da8dSAndroid Build Coastguard Worker return conn 638*cda5da8dSAndroid Build Coastguard Worker 639*cda5da8dSAndroid Build Coastguard Worker return self._register(ov, conn, finish_connect) 640*cda5da8dSAndroid Build Coastguard Worker 641*cda5da8dSAndroid Build Coastguard Worker def sendfile(self, sock, file, offset, count): 642*cda5da8dSAndroid Build Coastguard Worker self._register_with_iocp(sock) 643*cda5da8dSAndroid Build Coastguard Worker ov = _overlapped.Overlapped(NULL) 644*cda5da8dSAndroid Build Coastguard Worker offset_low = offset & 0xffff_ffff 645*cda5da8dSAndroid Build Coastguard Worker offset_high = (offset >> 32) & 0xffff_ffff 646*cda5da8dSAndroid Build Coastguard Worker ov.TransmitFile(sock.fileno(), 647*cda5da8dSAndroid Build Coastguard Worker msvcrt.get_osfhandle(file.fileno()), 648*cda5da8dSAndroid Build Coastguard Worker offset_low, offset_high, 649*cda5da8dSAndroid Build Coastguard Worker count, 0, 0) 650*cda5da8dSAndroid Build Coastguard Worker 651*cda5da8dSAndroid Build Coastguard Worker def finish_sendfile(trans, key, ov): 652*cda5da8dSAndroid Build Coastguard Worker try: 653*cda5da8dSAndroid Build Coastguard Worker return ov.getresult() 654*cda5da8dSAndroid Build Coastguard Worker except OSError as exc: 655*cda5da8dSAndroid Build Coastguard Worker if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 656*cda5da8dSAndroid Build Coastguard Worker _overlapped.ERROR_OPERATION_ABORTED): 657*cda5da8dSAndroid Build Coastguard Worker raise ConnectionResetError(*exc.args) 658*cda5da8dSAndroid Build Coastguard Worker else: 659*cda5da8dSAndroid Build Coastguard Worker raise 660*cda5da8dSAndroid Build Coastguard Worker return self._register(ov, sock, finish_sendfile) 661*cda5da8dSAndroid Build Coastguard Worker 662*cda5da8dSAndroid Build Coastguard Worker def accept_pipe(self, pipe): 663*cda5da8dSAndroid Build Coastguard Worker self._register_with_iocp(pipe) 664*cda5da8dSAndroid Build Coastguard Worker ov = _overlapped.Overlapped(NULL) 665*cda5da8dSAndroid Build Coastguard Worker connected = ov.ConnectNamedPipe(pipe.fileno()) 666*cda5da8dSAndroid Build Coastguard Worker 667*cda5da8dSAndroid Build Coastguard Worker if connected: 668*cda5da8dSAndroid Build Coastguard Worker # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means 669*cda5da8dSAndroid Build Coastguard Worker # that the pipe is connected. There is no need to wait for the 670*cda5da8dSAndroid Build Coastguard Worker # completion of the connection. 671*cda5da8dSAndroid Build Coastguard Worker return self._result(pipe) 672*cda5da8dSAndroid Build Coastguard Worker 673*cda5da8dSAndroid Build Coastguard Worker def finish_accept_pipe(trans, key, ov): 674*cda5da8dSAndroid Build Coastguard Worker ov.getresult() 675*cda5da8dSAndroid Build Coastguard Worker return pipe 676*cda5da8dSAndroid Build Coastguard Worker 677*cda5da8dSAndroid Build Coastguard Worker return self._register(ov, pipe, finish_accept_pipe) 678*cda5da8dSAndroid Build Coastguard Worker 679*cda5da8dSAndroid Build Coastguard Worker async def connect_pipe(self, address): 680*cda5da8dSAndroid Build Coastguard Worker delay = CONNECT_PIPE_INIT_DELAY 681*cda5da8dSAndroid Build Coastguard Worker while True: 682*cda5da8dSAndroid Build Coastguard Worker # Unfortunately there is no way to do an overlapped connect to 683*cda5da8dSAndroid Build Coastguard Worker # a pipe. Call CreateFile() in a loop until it doesn't fail with 684*cda5da8dSAndroid Build Coastguard Worker # ERROR_PIPE_BUSY. 685*cda5da8dSAndroid Build Coastguard Worker try: 686*cda5da8dSAndroid Build Coastguard Worker handle = _overlapped.ConnectPipe(address) 687*cda5da8dSAndroid Build Coastguard Worker break 688*cda5da8dSAndroid Build Coastguard Worker except OSError as exc: 689*cda5da8dSAndroid Build Coastguard Worker if exc.winerror != _overlapped.ERROR_PIPE_BUSY: 690*cda5da8dSAndroid Build Coastguard Worker raise 691*cda5da8dSAndroid Build Coastguard Worker 692*cda5da8dSAndroid Build Coastguard Worker # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later 693*cda5da8dSAndroid Build Coastguard Worker delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY) 694*cda5da8dSAndroid Build Coastguard Worker await tasks.sleep(delay) 695*cda5da8dSAndroid Build Coastguard Worker 696*cda5da8dSAndroid Build Coastguard Worker return windows_utils.PipeHandle(handle) 697*cda5da8dSAndroid Build Coastguard Worker 698*cda5da8dSAndroid Build Coastguard Worker def wait_for_handle(self, handle, timeout=None): 699*cda5da8dSAndroid Build Coastguard Worker """Wait for a handle. 700*cda5da8dSAndroid Build Coastguard Worker 701*cda5da8dSAndroid Build Coastguard Worker Return a Future object. The result of the future is True if the wait 702*cda5da8dSAndroid Build Coastguard Worker completed, or False if the wait did not complete (on timeout). 703*cda5da8dSAndroid Build Coastguard Worker """ 704*cda5da8dSAndroid Build Coastguard Worker return self._wait_for_handle(handle, timeout, False) 705*cda5da8dSAndroid Build Coastguard Worker 706*cda5da8dSAndroid Build Coastguard Worker def _wait_cancel(self, event, done_callback): 707*cda5da8dSAndroid Build Coastguard Worker fut = self._wait_for_handle(event, None, True) 708*cda5da8dSAndroid Build Coastguard Worker # add_done_callback() cannot be used because the wait may only complete 709*cda5da8dSAndroid Build Coastguard Worker # in IocpProactor.close(), while the event loop is not running. 710*cda5da8dSAndroid Build Coastguard Worker fut._done_callback = done_callback 711*cda5da8dSAndroid Build Coastguard Worker return fut 712*cda5da8dSAndroid Build Coastguard Worker 713*cda5da8dSAndroid Build Coastguard Worker def _wait_for_handle(self, handle, timeout, _is_cancel): 714*cda5da8dSAndroid Build Coastguard Worker self._check_closed() 715*cda5da8dSAndroid Build Coastguard Worker 716*cda5da8dSAndroid Build Coastguard Worker if timeout is None: 717*cda5da8dSAndroid Build Coastguard Worker ms = _winapi.INFINITE 718*cda5da8dSAndroid Build Coastguard Worker else: 719*cda5da8dSAndroid Build Coastguard Worker # RegisterWaitForSingleObject() has a resolution of 1 millisecond, 720*cda5da8dSAndroid Build Coastguard Worker # round away from zero to wait *at least* timeout seconds. 721*cda5da8dSAndroid Build Coastguard Worker ms = math.ceil(timeout * 1e3) 722*cda5da8dSAndroid Build Coastguard Worker 723*cda5da8dSAndroid Build Coastguard Worker # We only create ov so we can use ov.address as a key for the cache. 724*cda5da8dSAndroid Build Coastguard Worker ov = _overlapped.Overlapped(NULL) 725*cda5da8dSAndroid Build Coastguard Worker wait_handle = _overlapped.RegisterWaitWithQueue( 726*cda5da8dSAndroid Build Coastguard Worker handle, self._iocp, ov.address, ms) 727*cda5da8dSAndroid Build Coastguard Worker if _is_cancel: 728*cda5da8dSAndroid Build Coastguard Worker f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop) 729*cda5da8dSAndroid Build Coastguard Worker else: 730*cda5da8dSAndroid Build Coastguard Worker f = _WaitHandleFuture(ov, handle, wait_handle, self, 731*cda5da8dSAndroid Build Coastguard Worker loop=self._loop) 732*cda5da8dSAndroid Build Coastguard Worker if f._source_traceback: 733*cda5da8dSAndroid Build Coastguard Worker del f._source_traceback[-1] 734*cda5da8dSAndroid Build Coastguard Worker 735*cda5da8dSAndroid Build Coastguard Worker def finish_wait_for_handle(trans, key, ov): 736*cda5da8dSAndroid Build Coastguard Worker # Note that this second wait means that we should only use 737*cda5da8dSAndroid Build Coastguard Worker # this with handles types where a successful wait has no 738*cda5da8dSAndroid Build Coastguard Worker # effect. So events or processes are all right, but locks 739*cda5da8dSAndroid Build Coastguard Worker # or semaphores are not. Also note if the handle is 740*cda5da8dSAndroid Build Coastguard Worker # signalled and then quickly reset, then we may return 741*cda5da8dSAndroid Build Coastguard Worker # False even though we have not timed out. 742*cda5da8dSAndroid Build Coastguard Worker return f._poll() 743*cda5da8dSAndroid Build Coastguard Worker 744*cda5da8dSAndroid Build Coastguard Worker self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle) 745*cda5da8dSAndroid Build Coastguard Worker return f 746*cda5da8dSAndroid Build Coastguard Worker 747*cda5da8dSAndroid Build Coastguard Worker def _register_with_iocp(self, obj): 748*cda5da8dSAndroid Build Coastguard Worker # To get notifications of finished ops on this objects sent to the 749*cda5da8dSAndroid Build Coastguard Worker # completion port, were must register the handle. 750*cda5da8dSAndroid Build Coastguard Worker if obj not in self._registered: 751*cda5da8dSAndroid Build Coastguard Worker self._registered.add(obj) 752*cda5da8dSAndroid Build Coastguard Worker _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0) 753*cda5da8dSAndroid Build Coastguard Worker # XXX We could also use SetFileCompletionNotificationModes() 754*cda5da8dSAndroid Build Coastguard Worker # to avoid sending notifications to completion port of ops 755*cda5da8dSAndroid Build Coastguard Worker # that succeed immediately. 756*cda5da8dSAndroid Build Coastguard Worker 757*cda5da8dSAndroid Build Coastguard Worker def _register(self, ov, obj, callback): 758*cda5da8dSAndroid Build Coastguard Worker self._check_closed() 759*cda5da8dSAndroid Build Coastguard Worker 760*cda5da8dSAndroid Build Coastguard Worker # Return a future which will be set with the result of the 761*cda5da8dSAndroid Build Coastguard Worker # operation when it completes. The future's value is actually 762*cda5da8dSAndroid Build Coastguard Worker # the value returned by callback(). 763*cda5da8dSAndroid Build Coastguard Worker f = _OverlappedFuture(ov, loop=self._loop) 764*cda5da8dSAndroid Build Coastguard Worker if f._source_traceback: 765*cda5da8dSAndroid Build Coastguard Worker del f._source_traceback[-1] 766*cda5da8dSAndroid Build Coastguard Worker if not ov.pending: 767*cda5da8dSAndroid Build Coastguard Worker # The operation has completed, so no need to postpone the 768*cda5da8dSAndroid Build Coastguard Worker # work. We cannot take this short cut if we need the 769*cda5da8dSAndroid Build Coastguard Worker # NumberOfBytes, CompletionKey values returned by 770*cda5da8dSAndroid Build Coastguard Worker # PostQueuedCompletionStatus(). 771*cda5da8dSAndroid Build Coastguard Worker try: 772*cda5da8dSAndroid Build Coastguard Worker value = callback(None, None, ov) 773*cda5da8dSAndroid Build Coastguard Worker except OSError as e: 774*cda5da8dSAndroid Build Coastguard Worker f.set_exception(e) 775*cda5da8dSAndroid Build Coastguard Worker else: 776*cda5da8dSAndroid Build Coastguard Worker f.set_result(value) 777*cda5da8dSAndroid Build Coastguard Worker # Even if GetOverlappedResult() was called, we have to wait for the 778*cda5da8dSAndroid Build Coastguard Worker # notification of the completion in GetQueuedCompletionStatus(). 779*cda5da8dSAndroid Build Coastguard Worker # Register the overlapped operation to keep a reference to the 780*cda5da8dSAndroid Build Coastguard Worker # OVERLAPPED object, otherwise the memory is freed and Windows may 781*cda5da8dSAndroid Build Coastguard Worker # read uninitialized memory. 782*cda5da8dSAndroid Build Coastguard Worker 783*cda5da8dSAndroid Build Coastguard Worker # Register the overlapped operation for later. Note that 784*cda5da8dSAndroid Build Coastguard Worker # we only store obj to prevent it from being garbage 785*cda5da8dSAndroid Build Coastguard Worker # collected too early. 786*cda5da8dSAndroid Build Coastguard Worker self._cache[ov.address] = (f, ov, obj, callback) 787*cda5da8dSAndroid Build Coastguard Worker return f 788*cda5da8dSAndroid Build Coastguard Worker 789*cda5da8dSAndroid Build Coastguard Worker def _unregister(self, ov): 790*cda5da8dSAndroid Build Coastguard Worker """Unregister an overlapped object. 791*cda5da8dSAndroid Build Coastguard Worker 792*cda5da8dSAndroid Build Coastguard Worker Call this method when its future has been cancelled. The event can 793*cda5da8dSAndroid Build Coastguard Worker already be signalled (pending in the proactor event queue). It is also 794*cda5da8dSAndroid Build Coastguard Worker safe if the event is never signalled (because it was cancelled). 795*cda5da8dSAndroid Build Coastguard Worker """ 796*cda5da8dSAndroid Build Coastguard Worker self._check_closed() 797*cda5da8dSAndroid Build Coastguard Worker self._unregistered.append(ov) 798*cda5da8dSAndroid Build Coastguard Worker 799*cda5da8dSAndroid Build Coastguard Worker def _get_accept_socket(self, family): 800*cda5da8dSAndroid Build Coastguard Worker s = socket.socket(family) 801*cda5da8dSAndroid Build Coastguard Worker s.settimeout(0) 802*cda5da8dSAndroid Build Coastguard Worker return s 803*cda5da8dSAndroid Build Coastguard Worker 804*cda5da8dSAndroid Build Coastguard Worker def _poll(self, timeout=None): 805*cda5da8dSAndroid Build Coastguard Worker if timeout is None: 806*cda5da8dSAndroid Build Coastguard Worker ms = INFINITE 807*cda5da8dSAndroid Build Coastguard Worker elif timeout < 0: 808*cda5da8dSAndroid Build Coastguard Worker raise ValueError("negative timeout") 809*cda5da8dSAndroid Build Coastguard Worker else: 810*cda5da8dSAndroid Build Coastguard Worker # GetQueuedCompletionStatus() has a resolution of 1 millisecond, 811*cda5da8dSAndroid Build Coastguard Worker # round away from zero to wait *at least* timeout seconds. 812*cda5da8dSAndroid Build Coastguard Worker ms = math.ceil(timeout * 1e3) 813*cda5da8dSAndroid Build Coastguard Worker if ms >= INFINITE: 814*cda5da8dSAndroid Build Coastguard Worker raise ValueError("timeout too big") 815*cda5da8dSAndroid Build Coastguard Worker 816*cda5da8dSAndroid Build Coastguard Worker while True: 817*cda5da8dSAndroid Build Coastguard Worker status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms) 818*cda5da8dSAndroid Build Coastguard Worker if status is None: 819*cda5da8dSAndroid Build Coastguard Worker break 820*cda5da8dSAndroid Build Coastguard Worker ms = 0 821*cda5da8dSAndroid Build Coastguard Worker 822*cda5da8dSAndroid Build Coastguard Worker err, transferred, key, address = status 823*cda5da8dSAndroid Build Coastguard Worker try: 824*cda5da8dSAndroid Build Coastguard Worker f, ov, obj, callback = self._cache.pop(address) 825*cda5da8dSAndroid Build Coastguard Worker except KeyError: 826*cda5da8dSAndroid Build Coastguard Worker if self._loop.get_debug(): 827*cda5da8dSAndroid Build Coastguard Worker self._loop.call_exception_handler({ 828*cda5da8dSAndroid Build Coastguard Worker 'message': ('GetQueuedCompletionStatus() returned an ' 829*cda5da8dSAndroid Build Coastguard Worker 'unexpected event'), 830*cda5da8dSAndroid Build Coastguard Worker 'status': ('err=%s transferred=%s key=%#x address=%#x' 831*cda5da8dSAndroid Build Coastguard Worker % (err, transferred, key, address)), 832*cda5da8dSAndroid Build Coastguard Worker }) 833*cda5da8dSAndroid Build Coastguard Worker 834*cda5da8dSAndroid Build Coastguard Worker # key is either zero, or it is used to return a pipe 835*cda5da8dSAndroid Build Coastguard Worker # handle which should be closed to avoid a leak. 836*cda5da8dSAndroid Build Coastguard Worker if key not in (0, _overlapped.INVALID_HANDLE_VALUE): 837*cda5da8dSAndroid Build Coastguard Worker _winapi.CloseHandle(key) 838*cda5da8dSAndroid Build Coastguard Worker continue 839*cda5da8dSAndroid Build Coastguard Worker 840*cda5da8dSAndroid Build Coastguard Worker if obj in self._stopped_serving: 841*cda5da8dSAndroid Build Coastguard Worker f.cancel() 842*cda5da8dSAndroid Build Coastguard Worker # Don't call the callback if _register() already read the result or 843*cda5da8dSAndroid Build Coastguard Worker # if the overlapped has been cancelled 844*cda5da8dSAndroid Build Coastguard Worker elif not f.done(): 845*cda5da8dSAndroid Build Coastguard Worker try: 846*cda5da8dSAndroid Build Coastguard Worker value = callback(transferred, key, ov) 847*cda5da8dSAndroid Build Coastguard Worker except OSError as e: 848*cda5da8dSAndroid Build Coastguard Worker f.set_exception(e) 849*cda5da8dSAndroid Build Coastguard Worker self._results.append(f) 850*cda5da8dSAndroid Build Coastguard Worker else: 851*cda5da8dSAndroid Build Coastguard Worker f.set_result(value) 852*cda5da8dSAndroid Build Coastguard Worker self._results.append(f) 853*cda5da8dSAndroid Build Coastguard Worker finally: 854*cda5da8dSAndroid Build Coastguard Worker f = None 855*cda5da8dSAndroid Build Coastguard Worker 856*cda5da8dSAndroid Build Coastguard Worker # Remove unregistered futures 857*cda5da8dSAndroid Build Coastguard Worker for ov in self._unregistered: 858*cda5da8dSAndroid Build Coastguard Worker self._cache.pop(ov.address, None) 859*cda5da8dSAndroid Build Coastguard Worker self._unregistered.clear() 860*cda5da8dSAndroid Build Coastguard Worker 861*cda5da8dSAndroid Build Coastguard Worker def _stop_serving(self, obj): 862*cda5da8dSAndroid Build Coastguard Worker # obj is a socket or pipe handle. It will be closed in 863*cda5da8dSAndroid Build Coastguard Worker # BaseProactorEventLoop._stop_serving() which will make any 864*cda5da8dSAndroid Build Coastguard Worker # pending operations fail quickly. 865*cda5da8dSAndroid Build Coastguard Worker self._stopped_serving.add(obj) 866*cda5da8dSAndroid Build Coastguard Worker 867*cda5da8dSAndroid Build Coastguard Worker def close(self): 868*cda5da8dSAndroid Build Coastguard Worker if self._iocp is None: 869*cda5da8dSAndroid Build Coastguard Worker # already closed 870*cda5da8dSAndroid Build Coastguard Worker return 871*cda5da8dSAndroid Build Coastguard Worker 872*cda5da8dSAndroid Build Coastguard Worker # Cancel remaining registered operations. 873*cda5da8dSAndroid Build Coastguard Worker for fut, ov, obj, callback in list(self._cache.values()): 874*cda5da8dSAndroid Build Coastguard Worker if fut.cancelled(): 875*cda5da8dSAndroid Build Coastguard Worker # Nothing to do with cancelled futures 876*cda5da8dSAndroid Build Coastguard Worker pass 877*cda5da8dSAndroid Build Coastguard Worker elif isinstance(fut, _WaitCancelFuture): 878*cda5da8dSAndroid Build Coastguard Worker # _WaitCancelFuture must not be cancelled 879*cda5da8dSAndroid Build Coastguard Worker pass 880*cda5da8dSAndroid Build Coastguard Worker else: 881*cda5da8dSAndroid Build Coastguard Worker try: 882*cda5da8dSAndroid Build Coastguard Worker fut.cancel() 883*cda5da8dSAndroid Build Coastguard Worker except OSError as exc: 884*cda5da8dSAndroid Build Coastguard Worker if self._loop is not None: 885*cda5da8dSAndroid Build Coastguard Worker context = { 886*cda5da8dSAndroid Build Coastguard Worker 'message': 'Cancelling a future failed', 887*cda5da8dSAndroid Build Coastguard Worker 'exception': exc, 888*cda5da8dSAndroid Build Coastguard Worker 'future': fut, 889*cda5da8dSAndroid Build Coastguard Worker } 890*cda5da8dSAndroid Build Coastguard Worker if fut._source_traceback: 891*cda5da8dSAndroid Build Coastguard Worker context['source_traceback'] = fut._source_traceback 892*cda5da8dSAndroid Build Coastguard Worker self._loop.call_exception_handler(context) 893*cda5da8dSAndroid Build Coastguard Worker 894*cda5da8dSAndroid Build Coastguard Worker # Wait until all cancelled overlapped complete: don't exit with running 895*cda5da8dSAndroid Build Coastguard Worker # overlapped to prevent a crash. Display progress every second if the 896*cda5da8dSAndroid Build Coastguard Worker # loop is still running. 897*cda5da8dSAndroid Build Coastguard Worker msg_update = 1.0 898*cda5da8dSAndroid Build Coastguard Worker start_time = time.monotonic() 899*cda5da8dSAndroid Build Coastguard Worker next_msg = start_time + msg_update 900*cda5da8dSAndroid Build Coastguard Worker while self._cache: 901*cda5da8dSAndroid Build Coastguard Worker if next_msg <= time.monotonic(): 902*cda5da8dSAndroid Build Coastguard Worker logger.debug('%r is running after closing for %.1f seconds', 903*cda5da8dSAndroid Build Coastguard Worker self, time.monotonic() - start_time) 904*cda5da8dSAndroid Build Coastguard Worker next_msg = time.monotonic() + msg_update 905*cda5da8dSAndroid Build Coastguard Worker 906*cda5da8dSAndroid Build Coastguard Worker # handle a few events, or timeout 907*cda5da8dSAndroid Build Coastguard Worker self._poll(msg_update) 908*cda5da8dSAndroid Build Coastguard Worker 909*cda5da8dSAndroid Build Coastguard Worker self._results = [] 910*cda5da8dSAndroid Build Coastguard Worker 911*cda5da8dSAndroid Build Coastguard Worker _winapi.CloseHandle(self._iocp) 912*cda5da8dSAndroid Build Coastguard Worker self._iocp = None 913*cda5da8dSAndroid Build Coastguard Worker 914*cda5da8dSAndroid Build Coastguard Worker def __del__(self): 915*cda5da8dSAndroid Build Coastguard Worker self.close() 916*cda5da8dSAndroid Build Coastguard Worker 917*cda5da8dSAndroid Build Coastguard Worker 918*cda5da8dSAndroid Build Coastguard Workerclass _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport): 919*cda5da8dSAndroid Build Coastguard Worker 920*cda5da8dSAndroid Build Coastguard Worker def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): 921*cda5da8dSAndroid Build Coastguard Worker self._proc = windows_utils.Popen( 922*cda5da8dSAndroid Build Coastguard Worker args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, 923*cda5da8dSAndroid Build Coastguard Worker bufsize=bufsize, **kwargs) 924*cda5da8dSAndroid Build Coastguard Worker 925*cda5da8dSAndroid Build Coastguard Worker def callback(f): 926*cda5da8dSAndroid Build Coastguard Worker returncode = self._proc.poll() 927*cda5da8dSAndroid Build Coastguard Worker self._process_exited(returncode) 928*cda5da8dSAndroid Build Coastguard Worker 929*cda5da8dSAndroid Build Coastguard Worker f = self._loop._proactor.wait_for_handle(int(self._proc._handle)) 930*cda5da8dSAndroid Build Coastguard Worker f.add_done_callback(callback) 931*cda5da8dSAndroid Build Coastguard Worker 932*cda5da8dSAndroid Build Coastguard Worker 933*cda5da8dSAndroid Build Coastguard WorkerSelectorEventLoop = _WindowsSelectorEventLoop 934*cda5da8dSAndroid Build Coastguard Worker 935*cda5da8dSAndroid Build Coastguard Worker 936*cda5da8dSAndroid Build Coastguard Workerclass WindowsSelectorEventLoopPolicy(events.BaseDefaultEventLoopPolicy): 937*cda5da8dSAndroid Build Coastguard Worker _loop_factory = SelectorEventLoop 938*cda5da8dSAndroid Build Coastguard Worker 939*cda5da8dSAndroid Build Coastguard Worker 940*cda5da8dSAndroid Build Coastguard Workerclass WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy): 941*cda5da8dSAndroid Build Coastguard Worker _loop_factory = ProactorEventLoop 942*cda5da8dSAndroid Build Coastguard Worker 943*cda5da8dSAndroid Build Coastguard Worker 944*cda5da8dSAndroid Build Coastguard WorkerDefaultEventLoopPolicy = WindowsProactorEventLoopPolicy 945