1*cda5da8dSAndroid Build Coastguard Worker# 2*cda5da8dSAndroid Build Coastguard Worker# Module which deals with pickling of objects. 3*cda5da8dSAndroid Build Coastguard Worker# 4*cda5da8dSAndroid Build Coastguard Worker# multiprocessing/reduction.py 5*cda5da8dSAndroid Build Coastguard Worker# 6*cda5da8dSAndroid Build Coastguard Worker# Copyright (c) 2006-2008, R Oudkerk 7*cda5da8dSAndroid Build Coastguard Worker# Licensed to PSF under a Contributor Agreement. 8*cda5da8dSAndroid Build Coastguard Worker# 9*cda5da8dSAndroid Build Coastguard Worker 10*cda5da8dSAndroid Build Coastguard Workerfrom abc import ABCMeta 11*cda5da8dSAndroid Build Coastguard Workerimport copyreg 12*cda5da8dSAndroid Build Coastguard Workerimport functools 13*cda5da8dSAndroid Build Coastguard Workerimport io 14*cda5da8dSAndroid Build Coastguard Workerimport os 15*cda5da8dSAndroid Build Coastguard Workerimport pickle 16*cda5da8dSAndroid Build Coastguard Workerimport socket 17*cda5da8dSAndroid Build Coastguard Workerimport sys 18*cda5da8dSAndroid Build Coastguard Worker 19*cda5da8dSAndroid Build Coastguard Workerfrom . import context 20*cda5da8dSAndroid Build Coastguard Worker 21*cda5da8dSAndroid Build Coastguard Worker__all__ = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump'] 22*cda5da8dSAndroid Build Coastguard Worker 23*cda5da8dSAndroid Build Coastguard Worker 24*cda5da8dSAndroid Build Coastguard WorkerHAVE_SEND_HANDLE = (sys.platform == 'win32' or 25*cda5da8dSAndroid Build Coastguard Worker (hasattr(socket, 'CMSG_LEN') and 26*cda5da8dSAndroid Build Coastguard Worker hasattr(socket, 'SCM_RIGHTS') and 27*cda5da8dSAndroid Build Coastguard Worker hasattr(socket.socket, 'sendmsg'))) 28*cda5da8dSAndroid Build Coastguard Worker 29*cda5da8dSAndroid Build Coastguard Worker# 30*cda5da8dSAndroid Build Coastguard Worker# Pickler subclass 31*cda5da8dSAndroid Build Coastguard Worker# 32*cda5da8dSAndroid Build Coastguard Worker 33*cda5da8dSAndroid Build Coastguard Workerclass ForkingPickler(pickle.Pickler): 34*cda5da8dSAndroid Build Coastguard Worker '''Pickler subclass used by multiprocessing.''' 35*cda5da8dSAndroid Build Coastguard Worker _extra_reducers = {} 36*cda5da8dSAndroid Build Coastguard Worker _copyreg_dispatch_table = copyreg.dispatch_table 37*cda5da8dSAndroid Build Coastguard Worker 38*cda5da8dSAndroid Build Coastguard Worker def __init__(self, *args): 39*cda5da8dSAndroid Build Coastguard Worker super().__init__(*args) 40*cda5da8dSAndroid Build Coastguard Worker self.dispatch_table = self._copyreg_dispatch_table.copy() 41*cda5da8dSAndroid Build Coastguard Worker self.dispatch_table.update(self._extra_reducers) 42*cda5da8dSAndroid Build Coastguard Worker 43*cda5da8dSAndroid Build Coastguard Worker @classmethod 44*cda5da8dSAndroid Build Coastguard Worker def register(cls, type, reduce): 45*cda5da8dSAndroid Build Coastguard Worker '''Register a reduce function for a type.''' 46*cda5da8dSAndroid Build Coastguard Worker cls._extra_reducers[type] = reduce 47*cda5da8dSAndroid Build Coastguard Worker 48*cda5da8dSAndroid Build Coastguard Worker @classmethod 49*cda5da8dSAndroid Build Coastguard Worker def dumps(cls, obj, protocol=None): 50*cda5da8dSAndroid Build Coastguard Worker buf = io.BytesIO() 51*cda5da8dSAndroid Build Coastguard Worker cls(buf, protocol).dump(obj) 52*cda5da8dSAndroid Build Coastguard Worker return buf.getbuffer() 53*cda5da8dSAndroid Build Coastguard Worker 54*cda5da8dSAndroid Build Coastguard Worker loads = pickle.loads 55*cda5da8dSAndroid Build Coastguard Worker 56*cda5da8dSAndroid Build Coastguard Workerregister = ForkingPickler.register 57*cda5da8dSAndroid Build Coastguard Worker 58*cda5da8dSAndroid Build Coastguard Workerdef dump(obj, file, protocol=None): 59*cda5da8dSAndroid Build Coastguard Worker '''Replacement for pickle.dump() using ForkingPickler.''' 60*cda5da8dSAndroid Build Coastguard Worker ForkingPickler(file, protocol).dump(obj) 61*cda5da8dSAndroid Build Coastguard Worker 62*cda5da8dSAndroid Build Coastguard Worker# 63*cda5da8dSAndroid Build Coastguard Worker# Platform specific definitions 64*cda5da8dSAndroid Build Coastguard Worker# 65*cda5da8dSAndroid Build Coastguard Worker 66*cda5da8dSAndroid Build Coastguard Workerif sys.platform == 'win32': 67*cda5da8dSAndroid Build Coastguard Worker # Windows 68*cda5da8dSAndroid Build Coastguard Worker __all__ += ['DupHandle', 'duplicate', 'steal_handle'] 69*cda5da8dSAndroid Build Coastguard Worker import _winapi 70*cda5da8dSAndroid Build Coastguard Worker 71*cda5da8dSAndroid Build Coastguard Worker def duplicate(handle, target_process=None, inheritable=False, 72*cda5da8dSAndroid Build Coastguard Worker *, source_process=None): 73*cda5da8dSAndroid Build Coastguard Worker '''Duplicate a handle. (target_process is a handle not a pid!)''' 74*cda5da8dSAndroid Build Coastguard Worker current_process = _winapi.GetCurrentProcess() 75*cda5da8dSAndroid Build Coastguard Worker if source_process is None: 76*cda5da8dSAndroid Build Coastguard Worker source_process = current_process 77*cda5da8dSAndroid Build Coastguard Worker if target_process is None: 78*cda5da8dSAndroid Build Coastguard Worker target_process = current_process 79*cda5da8dSAndroid Build Coastguard Worker return _winapi.DuplicateHandle( 80*cda5da8dSAndroid Build Coastguard Worker source_process, handle, target_process, 81*cda5da8dSAndroid Build Coastguard Worker 0, inheritable, _winapi.DUPLICATE_SAME_ACCESS) 82*cda5da8dSAndroid Build Coastguard Worker 83*cda5da8dSAndroid Build Coastguard Worker def steal_handle(source_pid, handle): 84*cda5da8dSAndroid Build Coastguard Worker '''Steal a handle from process identified by source_pid.''' 85*cda5da8dSAndroid Build Coastguard Worker source_process_handle = _winapi.OpenProcess( 86*cda5da8dSAndroid Build Coastguard Worker _winapi.PROCESS_DUP_HANDLE, False, source_pid) 87*cda5da8dSAndroid Build Coastguard Worker try: 88*cda5da8dSAndroid Build Coastguard Worker return _winapi.DuplicateHandle( 89*cda5da8dSAndroid Build Coastguard Worker source_process_handle, handle, 90*cda5da8dSAndroid Build Coastguard Worker _winapi.GetCurrentProcess(), 0, False, 91*cda5da8dSAndroid Build Coastguard Worker _winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE) 92*cda5da8dSAndroid Build Coastguard Worker finally: 93*cda5da8dSAndroid Build Coastguard Worker _winapi.CloseHandle(source_process_handle) 94*cda5da8dSAndroid Build Coastguard Worker 95*cda5da8dSAndroid Build Coastguard Worker def send_handle(conn, handle, destination_pid): 96*cda5da8dSAndroid Build Coastguard Worker '''Send a handle over a local connection.''' 97*cda5da8dSAndroid Build Coastguard Worker dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid) 98*cda5da8dSAndroid Build Coastguard Worker conn.send(dh) 99*cda5da8dSAndroid Build Coastguard Worker 100*cda5da8dSAndroid Build Coastguard Worker def recv_handle(conn): 101*cda5da8dSAndroid Build Coastguard Worker '''Receive a handle over a local connection.''' 102*cda5da8dSAndroid Build Coastguard Worker return conn.recv().detach() 103*cda5da8dSAndroid Build Coastguard Worker 104*cda5da8dSAndroid Build Coastguard Worker class DupHandle(object): 105*cda5da8dSAndroid Build Coastguard Worker '''Picklable wrapper for a handle.''' 106*cda5da8dSAndroid Build Coastguard Worker def __init__(self, handle, access, pid=None): 107*cda5da8dSAndroid Build Coastguard Worker if pid is None: 108*cda5da8dSAndroid Build Coastguard Worker # We just duplicate the handle in the current process and 109*cda5da8dSAndroid Build Coastguard Worker # let the receiving process steal the handle. 110*cda5da8dSAndroid Build Coastguard Worker pid = os.getpid() 111*cda5da8dSAndroid Build Coastguard Worker proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid) 112*cda5da8dSAndroid Build Coastguard Worker try: 113*cda5da8dSAndroid Build Coastguard Worker self._handle = _winapi.DuplicateHandle( 114*cda5da8dSAndroid Build Coastguard Worker _winapi.GetCurrentProcess(), 115*cda5da8dSAndroid Build Coastguard Worker handle, proc, access, False, 0) 116*cda5da8dSAndroid Build Coastguard Worker finally: 117*cda5da8dSAndroid Build Coastguard Worker _winapi.CloseHandle(proc) 118*cda5da8dSAndroid Build Coastguard Worker self._access = access 119*cda5da8dSAndroid Build Coastguard Worker self._pid = pid 120*cda5da8dSAndroid Build Coastguard Worker 121*cda5da8dSAndroid Build Coastguard Worker def detach(self): 122*cda5da8dSAndroid Build Coastguard Worker '''Get the handle. This should only be called once.''' 123*cda5da8dSAndroid Build Coastguard Worker # retrieve handle from process which currently owns it 124*cda5da8dSAndroid Build Coastguard Worker if self._pid == os.getpid(): 125*cda5da8dSAndroid Build Coastguard Worker # The handle has already been duplicated for this process. 126*cda5da8dSAndroid Build Coastguard Worker return self._handle 127*cda5da8dSAndroid Build Coastguard Worker # We must steal the handle from the process whose pid is self._pid. 128*cda5da8dSAndroid Build Coastguard Worker proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, 129*cda5da8dSAndroid Build Coastguard Worker self._pid) 130*cda5da8dSAndroid Build Coastguard Worker try: 131*cda5da8dSAndroid Build Coastguard Worker return _winapi.DuplicateHandle( 132*cda5da8dSAndroid Build Coastguard Worker proc, self._handle, _winapi.GetCurrentProcess(), 133*cda5da8dSAndroid Build Coastguard Worker self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE) 134*cda5da8dSAndroid Build Coastguard Worker finally: 135*cda5da8dSAndroid Build Coastguard Worker _winapi.CloseHandle(proc) 136*cda5da8dSAndroid Build Coastguard Worker 137*cda5da8dSAndroid Build Coastguard Workerelse: 138*cda5da8dSAndroid Build Coastguard Worker # Unix 139*cda5da8dSAndroid Build Coastguard Worker __all__ += ['DupFd', 'sendfds', 'recvfds'] 140*cda5da8dSAndroid Build Coastguard Worker import array 141*cda5da8dSAndroid Build Coastguard Worker 142*cda5da8dSAndroid Build Coastguard Worker # On MacOSX we should acknowledge receipt of fds -- see Issue14669 143*cda5da8dSAndroid Build Coastguard Worker ACKNOWLEDGE = sys.platform == 'darwin' 144*cda5da8dSAndroid Build Coastguard Worker 145*cda5da8dSAndroid Build Coastguard Worker def sendfds(sock, fds): 146*cda5da8dSAndroid Build Coastguard Worker '''Send an array of fds over an AF_UNIX socket.''' 147*cda5da8dSAndroid Build Coastguard Worker fds = array.array('i', fds) 148*cda5da8dSAndroid Build Coastguard Worker msg = bytes([len(fds) % 256]) 149*cda5da8dSAndroid Build Coastguard Worker sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)]) 150*cda5da8dSAndroid Build Coastguard Worker if ACKNOWLEDGE and sock.recv(1) != b'A': 151*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError('did not receive acknowledgement of fd') 152*cda5da8dSAndroid Build Coastguard Worker 153*cda5da8dSAndroid Build Coastguard Worker def recvfds(sock, size): 154*cda5da8dSAndroid Build Coastguard Worker '''Receive an array of fds over an AF_UNIX socket.''' 155*cda5da8dSAndroid Build Coastguard Worker a = array.array('i') 156*cda5da8dSAndroid Build Coastguard Worker bytes_size = a.itemsize * size 157*cda5da8dSAndroid Build Coastguard Worker msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_SPACE(bytes_size)) 158*cda5da8dSAndroid Build Coastguard Worker if not msg and not ancdata: 159*cda5da8dSAndroid Build Coastguard Worker raise EOFError 160*cda5da8dSAndroid Build Coastguard Worker try: 161*cda5da8dSAndroid Build Coastguard Worker if ACKNOWLEDGE: 162*cda5da8dSAndroid Build Coastguard Worker sock.send(b'A') 163*cda5da8dSAndroid Build Coastguard Worker if len(ancdata) != 1: 164*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError('received %d items of ancdata' % 165*cda5da8dSAndroid Build Coastguard Worker len(ancdata)) 166*cda5da8dSAndroid Build Coastguard Worker cmsg_level, cmsg_type, cmsg_data = ancdata[0] 167*cda5da8dSAndroid Build Coastguard Worker if (cmsg_level == socket.SOL_SOCKET and 168*cda5da8dSAndroid Build Coastguard Worker cmsg_type == socket.SCM_RIGHTS): 169*cda5da8dSAndroid Build Coastguard Worker if len(cmsg_data) % a.itemsize != 0: 170*cda5da8dSAndroid Build Coastguard Worker raise ValueError 171*cda5da8dSAndroid Build Coastguard Worker a.frombytes(cmsg_data) 172*cda5da8dSAndroid Build Coastguard Worker if len(a) % 256 != msg[0]: 173*cda5da8dSAndroid Build Coastguard Worker raise AssertionError( 174*cda5da8dSAndroid Build Coastguard Worker "Len is {0:n} but msg[0] is {1!r}".format( 175*cda5da8dSAndroid Build Coastguard Worker len(a), msg[0])) 176*cda5da8dSAndroid Build Coastguard Worker return list(a) 177*cda5da8dSAndroid Build Coastguard Worker except (ValueError, IndexError): 178*cda5da8dSAndroid Build Coastguard Worker pass 179*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError('Invalid data received') 180*cda5da8dSAndroid Build Coastguard Worker 181*cda5da8dSAndroid Build Coastguard Worker def send_handle(conn, handle, destination_pid): 182*cda5da8dSAndroid Build Coastguard Worker '''Send a handle over a local connection.''' 183*cda5da8dSAndroid Build Coastguard Worker with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: 184*cda5da8dSAndroid Build Coastguard Worker sendfds(s, [handle]) 185*cda5da8dSAndroid Build Coastguard Worker 186*cda5da8dSAndroid Build Coastguard Worker def recv_handle(conn): 187*cda5da8dSAndroid Build Coastguard Worker '''Receive a handle over a local connection.''' 188*cda5da8dSAndroid Build Coastguard Worker with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: 189*cda5da8dSAndroid Build Coastguard Worker return recvfds(s, 1)[0] 190*cda5da8dSAndroid Build Coastguard Worker 191*cda5da8dSAndroid Build Coastguard Worker def DupFd(fd): 192*cda5da8dSAndroid Build Coastguard Worker '''Return a wrapper for an fd.''' 193*cda5da8dSAndroid Build Coastguard Worker popen_obj = context.get_spawning_popen() 194*cda5da8dSAndroid Build Coastguard Worker if popen_obj is not None: 195*cda5da8dSAndroid Build Coastguard Worker return popen_obj.DupFd(popen_obj.duplicate_for_child(fd)) 196*cda5da8dSAndroid Build Coastguard Worker elif HAVE_SEND_HANDLE: 197*cda5da8dSAndroid Build Coastguard Worker from . import resource_sharer 198*cda5da8dSAndroid Build Coastguard Worker return resource_sharer.DupFd(fd) 199*cda5da8dSAndroid Build Coastguard Worker else: 200*cda5da8dSAndroid Build Coastguard Worker raise ValueError('SCM_RIGHTS appears not to be available') 201*cda5da8dSAndroid Build Coastguard Worker 202*cda5da8dSAndroid Build Coastguard Worker# 203*cda5da8dSAndroid Build Coastguard Worker# Try making some callable types picklable 204*cda5da8dSAndroid Build Coastguard Worker# 205*cda5da8dSAndroid Build Coastguard Worker 206*cda5da8dSAndroid Build Coastguard Workerdef _reduce_method(m): 207*cda5da8dSAndroid Build Coastguard Worker if m.__self__ is None: 208*cda5da8dSAndroid Build Coastguard Worker return getattr, (m.__class__, m.__func__.__name__) 209*cda5da8dSAndroid Build Coastguard Worker else: 210*cda5da8dSAndroid Build Coastguard Worker return getattr, (m.__self__, m.__func__.__name__) 211*cda5da8dSAndroid Build Coastguard Workerclass _C: 212*cda5da8dSAndroid Build Coastguard Worker def f(self): 213*cda5da8dSAndroid Build Coastguard Worker pass 214*cda5da8dSAndroid Build Coastguard Workerregister(type(_C().f), _reduce_method) 215*cda5da8dSAndroid Build Coastguard Worker 216*cda5da8dSAndroid Build Coastguard Worker 217*cda5da8dSAndroid Build Coastguard Workerdef _reduce_method_descriptor(m): 218*cda5da8dSAndroid Build Coastguard Worker return getattr, (m.__objclass__, m.__name__) 219*cda5da8dSAndroid Build Coastguard Workerregister(type(list.append), _reduce_method_descriptor) 220*cda5da8dSAndroid Build Coastguard Workerregister(type(int.__add__), _reduce_method_descriptor) 221*cda5da8dSAndroid Build Coastguard Worker 222*cda5da8dSAndroid Build Coastguard Worker 223*cda5da8dSAndroid Build Coastguard Workerdef _reduce_partial(p): 224*cda5da8dSAndroid Build Coastguard Worker return _rebuild_partial, (p.func, p.args, p.keywords or {}) 225*cda5da8dSAndroid Build Coastguard Workerdef _rebuild_partial(func, args, keywords): 226*cda5da8dSAndroid Build Coastguard Worker return functools.partial(func, *args, **keywords) 227*cda5da8dSAndroid Build Coastguard Workerregister(functools.partial, _reduce_partial) 228*cda5da8dSAndroid Build Coastguard Worker 229*cda5da8dSAndroid Build Coastguard Worker# 230*cda5da8dSAndroid Build Coastguard Worker# Make sockets picklable 231*cda5da8dSAndroid Build Coastguard Worker# 232*cda5da8dSAndroid Build Coastguard Worker 233*cda5da8dSAndroid Build Coastguard Workerif sys.platform == 'win32': 234*cda5da8dSAndroid Build Coastguard Worker def _reduce_socket(s): 235*cda5da8dSAndroid Build Coastguard Worker from .resource_sharer import DupSocket 236*cda5da8dSAndroid Build Coastguard Worker return _rebuild_socket, (DupSocket(s),) 237*cda5da8dSAndroid Build Coastguard Worker def _rebuild_socket(ds): 238*cda5da8dSAndroid Build Coastguard Worker return ds.detach() 239*cda5da8dSAndroid Build Coastguard Worker register(socket.socket, _reduce_socket) 240*cda5da8dSAndroid Build Coastguard Worker 241*cda5da8dSAndroid Build Coastguard Workerelse: 242*cda5da8dSAndroid Build Coastguard Worker def _reduce_socket(s): 243*cda5da8dSAndroid Build Coastguard Worker df = DupFd(s.fileno()) 244*cda5da8dSAndroid Build Coastguard Worker return _rebuild_socket, (df, s.family, s.type, s.proto) 245*cda5da8dSAndroid Build Coastguard Worker def _rebuild_socket(df, family, type, proto): 246*cda5da8dSAndroid Build Coastguard Worker fd = df.detach() 247*cda5da8dSAndroid Build Coastguard Worker return socket.socket(family, type, proto, fileno=fd) 248*cda5da8dSAndroid Build Coastguard Worker register(socket.socket, _reduce_socket) 249*cda5da8dSAndroid Build Coastguard Worker 250*cda5da8dSAndroid Build Coastguard Worker 251*cda5da8dSAndroid Build Coastguard Workerclass AbstractReducer(metaclass=ABCMeta): 252*cda5da8dSAndroid Build Coastguard Worker '''Abstract base class for use in implementing a Reduction class 253*cda5da8dSAndroid Build Coastguard Worker suitable for use in replacing the standard reduction mechanism 254*cda5da8dSAndroid Build Coastguard Worker used in multiprocessing.''' 255*cda5da8dSAndroid Build Coastguard Worker ForkingPickler = ForkingPickler 256*cda5da8dSAndroid Build Coastguard Worker register = register 257*cda5da8dSAndroid Build Coastguard Worker dump = dump 258*cda5da8dSAndroid Build Coastguard Worker send_handle = send_handle 259*cda5da8dSAndroid Build Coastguard Worker recv_handle = recv_handle 260*cda5da8dSAndroid Build Coastguard Worker 261*cda5da8dSAndroid Build Coastguard Worker if sys.platform == 'win32': 262*cda5da8dSAndroid Build Coastguard Worker steal_handle = steal_handle 263*cda5da8dSAndroid Build Coastguard Worker duplicate = duplicate 264*cda5da8dSAndroid Build Coastguard Worker DupHandle = DupHandle 265*cda5da8dSAndroid Build Coastguard Worker else: 266*cda5da8dSAndroid Build Coastguard Worker sendfds = sendfds 267*cda5da8dSAndroid Build Coastguard Worker recvfds = recvfds 268*cda5da8dSAndroid Build Coastguard Worker DupFd = DupFd 269*cda5da8dSAndroid Build Coastguard Worker 270*cda5da8dSAndroid Build Coastguard Worker _reduce_method = _reduce_method 271*cda5da8dSAndroid Build Coastguard Worker _reduce_method_descriptor = _reduce_method_descriptor 272*cda5da8dSAndroid Build Coastguard Worker _rebuild_partial = _rebuild_partial 273*cda5da8dSAndroid Build Coastguard Worker _reduce_socket = _reduce_socket 274*cda5da8dSAndroid Build Coastguard Worker _rebuild_socket = _rebuild_socket 275*cda5da8dSAndroid Build Coastguard Worker 276*cda5da8dSAndroid Build Coastguard Worker def __init__(self, *args): 277*cda5da8dSAndroid Build Coastguard Worker register(type(_C().f), _reduce_method) 278*cda5da8dSAndroid Build Coastguard Worker register(type(list.append), _reduce_method_descriptor) 279*cda5da8dSAndroid Build Coastguard Worker register(type(int.__add__), _reduce_method_descriptor) 280*cda5da8dSAndroid Build Coastguard Worker register(functools.partial, _reduce_partial) 281*cda5da8dSAndroid Build Coastguard Worker register(socket.socket, _reduce_socket) 282