1# 2# Module providing the `Pool` class for managing a process pool 3# 4# multiprocessing/pool.py 5# 6# Copyright (c) 2006-2008, R Oudkerk 7# All rights reserved. 8# 9# Redistribution and use in source and binary forms, with or without 10# modification, are permitted provided that the following conditions 11# are met: 12# 13# 1. Redistributions of source code must retain the above copyright 14# notice, this list of conditions and the following disclaimer. 15# 2. Redistributions in binary form must reproduce the above copyright 16# notice, this list of conditions and the following disclaimer in the 17# documentation and/or other materials provided with the distribution. 18# 3. Neither the name of author nor the names of any contributors may be 19# used to endorse or promote products derived from this software 20# without specific prior written permission. 21# 22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 25# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32# SUCH DAMAGE. 33# 34 35__all__ = ['Pool'] 36 37# 38# Imports 39# 40 41import threading 42import Queue 43import itertools 44import collections 45import time 46 47from multiprocessing import Process, cpu_count, TimeoutError 48from multiprocessing.util import Finalize, debug 49 50# 51# Constants representing the state of a pool 52# 53 54RUN = 0 55CLOSE = 1 56TERMINATE = 2 57 58# 59# Miscellaneous 60# 61 62job_counter = itertools.count() 63 64def mapstar(args): 65 return map(*args) 66 67# 68# Code run by worker processes 69# 70 71class MaybeEncodingError(Exception): 72 """Wraps possible unpickleable errors, so they can be 73 safely sent through the socket.""" 74 75 def __init__(self, exc, value): 76 self.exc = repr(exc) 77 self.value = repr(value) 78 super(MaybeEncodingError, self).__init__(self.exc, self.value) 79 80 def __str__(self): 81 return "Error sending result: '%s'. Reason: '%s'" % (self.value, 82 self.exc) 83 84 def __repr__(self): 85 return "<MaybeEncodingError: %s>" % str(self) 86 87 88def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): 89 assert maxtasks is None or (type(maxtasks) in (int, long) and maxtasks > 0) 90 put = outqueue.put 91 get = inqueue.get 92 if hasattr(inqueue, '_writer'): 93 inqueue._writer.close() 94 outqueue._reader.close() 95 96 if initializer is not None: 97 initializer(*initargs) 98 99 completed = 0 100 while maxtasks is None or (maxtasks and completed < maxtasks): 101 try: 102 task = get() 103 except (EOFError, IOError): 104 debug('worker got EOFError or IOError -- exiting') 105 break 106 107 if task is None: 108 debug('worker got sentinel -- exiting') 109 break 110 111 job, i, func, args, kwds = task 112 try: 113 result = (True, func(*args, **kwds)) 114 except Exception, e: 115 result = (False, e) 116 try: 117 put((job, i, result)) 118 except Exception as e: 119 wrapped = MaybeEncodingError(e, result[1]) 120 debug("Possible encoding error while sending result: %s" % ( 121 wrapped)) 122 put((job, i, (False, wrapped))) 123 124 task = job = result = func = args = kwds = None 125 completed += 1 126 debug('worker exiting after %d tasks' % completed) 127 128# 129# Class representing a process pool 130# 131 132class Pool(object): 133 ''' 134 Class which supports an async version of the `apply()` builtin 135 ''' 136 Process = Process 137 138 def __init__(self, processes=None, initializer=None, initargs=(), 139 maxtasksperchild=None): 140 self._setup_queues() 141 self._taskqueue = Queue.Queue() 142 self._cache = {} 143 self._state = RUN 144 self._maxtasksperchild = maxtasksperchild 145 self._initializer = initializer 146 self._initargs = initargs 147 148 if processes is None: 149 try: 150 processes = cpu_count() 151 except NotImplementedError: 152 processes = 1 153 if processes < 1: 154 raise ValueError("Number of processes must be at least 1") 155 156 if initializer is not None and not hasattr(initializer, '__call__'): 157 raise TypeError('initializer must be a callable') 158 159 self._processes = processes 160 self._pool = [] 161 self._repopulate_pool() 162 163 self._worker_handler = threading.Thread( 164 target=Pool._handle_workers, 165 args=(self, ) 166 ) 167 self._worker_handler.daemon = True 168 self._worker_handler._state = RUN 169 self._worker_handler.start() 170 171 172 self._task_handler = threading.Thread( 173 target=Pool._handle_tasks, 174 args=(self._taskqueue, self._quick_put, self._outqueue, 175 self._pool, self._cache) 176 ) 177 self._task_handler.daemon = True 178 self._task_handler._state = RUN 179 self._task_handler.start() 180 181 self._result_handler = threading.Thread( 182 target=Pool._handle_results, 183 args=(self._outqueue, self._quick_get, self._cache) 184 ) 185 self._result_handler.daemon = True 186 self._result_handler._state = RUN 187 self._result_handler.start() 188 189 self._terminate = Finalize( 190 self, self._terminate_pool, 191 args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, 192 self._worker_handler, self._task_handler, 193 self._result_handler, self._cache), 194 exitpriority=15 195 ) 196 197 def _join_exited_workers(self): 198 """Cleanup after any worker processes which have exited due to reaching 199 their specified lifetime. Returns True if any workers were cleaned up. 200 """ 201 cleaned = False 202 for i in reversed(range(len(self._pool))): 203 worker = self._pool[i] 204 if worker.exitcode is not None: 205 # worker exited 206 debug('cleaning up worker %d' % i) 207 worker.join() 208 cleaned = True 209 del self._pool[i] 210 return cleaned 211 212 def _repopulate_pool(self): 213 """Bring the number of pool processes up to the specified number, 214 for use after reaping workers which have exited. 215 """ 216 for i in range(self._processes - len(self._pool)): 217 w = self.Process(target=worker, 218 args=(self._inqueue, self._outqueue, 219 self._initializer, 220 self._initargs, self._maxtasksperchild) 221 ) 222 self._pool.append(w) 223 w.name = w.name.replace('Process', 'PoolWorker') 224 w.daemon = True 225 w.start() 226 debug('added worker') 227 228 def _maintain_pool(self): 229 """Clean up any exited workers and start replacements for them. 230 """ 231 if self._join_exited_workers(): 232 self._repopulate_pool() 233 234 def _setup_queues(self): 235 from .queues import SimpleQueue 236 self._inqueue = SimpleQueue() 237 self._outqueue = SimpleQueue() 238 self._quick_put = self._inqueue._writer.send 239 self._quick_get = self._outqueue._reader.recv 240 241 def apply(self, func, args=(), kwds={}): 242 ''' 243 Equivalent of `apply()` builtin 244 ''' 245 assert self._state == RUN 246 return self.apply_async(func, args, kwds).get() 247 248 def map(self, func, iterable, chunksize=None): 249 ''' 250 Equivalent of `map()` builtin 251 ''' 252 assert self._state == RUN 253 return self.map_async(func, iterable, chunksize).get() 254 255 def imap(self, func, iterable, chunksize=1): 256 ''' 257 Equivalent of `itertools.imap()` -- can be MUCH slower than `Pool.map()` 258 ''' 259 assert self._state == RUN 260 if chunksize == 1: 261 result = IMapIterator(self._cache) 262 self._taskqueue.put((((result._job, i, func, (x,), {}) 263 for i, x in enumerate(iterable)), result._set_length)) 264 return result 265 else: 266 assert chunksize > 1 267 task_batches = Pool._get_tasks(func, iterable, chunksize) 268 result = IMapIterator(self._cache) 269 self._taskqueue.put((((result._job, i, mapstar, (x,), {}) 270 for i, x in enumerate(task_batches)), result._set_length)) 271 return (item for chunk in result for item in chunk) 272 273 def imap_unordered(self, func, iterable, chunksize=1): 274 ''' 275 Like `imap()` method but ordering of results is arbitrary 276 ''' 277 assert self._state == RUN 278 if chunksize == 1: 279 result = IMapUnorderedIterator(self._cache) 280 self._taskqueue.put((((result._job, i, func, (x,), {}) 281 for i, x in enumerate(iterable)), result._set_length)) 282 return result 283 else: 284 assert chunksize > 1 285 task_batches = Pool._get_tasks(func, iterable, chunksize) 286 result = IMapUnorderedIterator(self._cache) 287 self._taskqueue.put((((result._job, i, mapstar, (x,), {}) 288 for i, x in enumerate(task_batches)), result._set_length)) 289 return (item for chunk in result for item in chunk) 290 291 def apply_async(self, func, args=(), kwds={}, callback=None): 292 ''' 293 Asynchronous equivalent of `apply()` builtin 294 ''' 295 assert self._state == RUN 296 result = ApplyResult(self._cache, callback) 297 self._taskqueue.put(([(result._job, None, func, args, kwds)], None)) 298 return result 299 300 def map_async(self, func, iterable, chunksize=None, callback=None): 301 ''' 302 Asynchronous equivalent of `map()` builtin 303 ''' 304 assert self._state == RUN 305 if not hasattr(iterable, '__len__'): 306 iterable = list(iterable) 307 308 if chunksize is None: 309 chunksize, extra = divmod(len(iterable), len(self._pool) * 4) 310 if extra: 311 chunksize += 1 312 if len(iterable) == 0: 313 chunksize = 0 314 315 task_batches = Pool._get_tasks(func, iterable, chunksize) 316 result = MapResult(self._cache, chunksize, len(iterable), callback) 317 self._taskqueue.put((((result._job, i, mapstar, (x,), {}) 318 for i, x in enumerate(task_batches)), None)) 319 return result 320 321 @staticmethod 322 def _handle_workers(pool): 323 thread = threading.current_thread() 324 325 # Keep maintaining workers until the cache gets drained, unless the pool 326 # is terminated. 327 while thread._state == RUN or (pool._cache and thread._state != TERMINATE): 328 pool._maintain_pool() 329 time.sleep(0.1) 330 # send sentinel to stop workers 331 pool._taskqueue.put(None) 332 debug('worker handler exiting') 333 334 @staticmethod 335 def _handle_tasks(taskqueue, put, outqueue, pool, cache): 336 thread = threading.current_thread() 337 338 for taskseq, set_length in iter(taskqueue.get, None): 339 task = None 340 i = -1 341 try: 342 for i, task in enumerate(taskseq): 343 if thread._state: 344 debug('task handler found thread._state != RUN') 345 break 346 try: 347 put(task) 348 except Exception as e: 349 job, ind = task[:2] 350 try: 351 cache[job]._set(ind, (False, e)) 352 except KeyError: 353 pass 354 else: 355 if set_length: 356 debug('doing set_length()') 357 set_length(i+1) 358 continue 359 break 360 except Exception as ex: 361 job, ind = task[:2] if task else (0, 0) 362 if job in cache: 363 cache[job]._set(ind + 1, (False, ex)) 364 if set_length: 365 debug('doing set_length()') 366 set_length(i+1) 367 finally: 368 task = taskseq = job = None 369 else: 370 debug('task handler got sentinel') 371 372 try: 373 # tell result handler to finish when cache is empty 374 debug('task handler sending sentinel to result handler') 375 outqueue.put(None) 376 377 # tell workers there is no more work 378 debug('task handler sending sentinel to workers') 379 for p in pool: 380 put(None) 381 except IOError: 382 debug('task handler got IOError when sending sentinels') 383 384 debug('task handler exiting') 385 386 @staticmethod 387 def _handle_results(outqueue, get, cache): 388 thread = threading.current_thread() 389 390 while 1: 391 try: 392 task = get() 393 except (IOError, EOFError): 394 debug('result handler got EOFError/IOError -- exiting') 395 return 396 397 if thread._state: 398 assert thread._state == TERMINATE 399 debug('result handler found thread._state=TERMINATE') 400 break 401 402 if task is None: 403 debug('result handler got sentinel') 404 break 405 406 job, i, obj = task 407 try: 408 cache[job]._set(i, obj) 409 except KeyError: 410 pass 411 task = job = obj = None 412 413 while cache and thread._state != TERMINATE: 414 try: 415 task = get() 416 except (IOError, EOFError): 417 debug('result handler got EOFError/IOError -- exiting') 418 return 419 420 if task is None: 421 debug('result handler ignoring extra sentinel') 422 continue 423 job, i, obj = task 424 try: 425 cache[job]._set(i, obj) 426 except KeyError: 427 pass 428 task = job = obj = None 429 430 if hasattr(outqueue, '_reader'): 431 debug('ensuring that outqueue is not full') 432 # If we don't make room available in outqueue then 433 # attempts to add the sentinel (None) to outqueue may 434 # block. There is guaranteed to be no more than 2 sentinels. 435 try: 436 for i in range(10): 437 if not outqueue._reader.poll(): 438 break 439 get() 440 except (IOError, EOFError): 441 pass 442 443 debug('result handler exiting: len(cache)=%s, thread._state=%s', 444 len(cache), thread._state) 445 446 @staticmethod 447 def _get_tasks(func, it, size): 448 it = iter(it) 449 while 1: 450 x = tuple(itertools.islice(it, size)) 451 if not x: 452 return 453 yield (func, x) 454 455 def __reduce__(self): 456 raise NotImplementedError( 457 'pool objects cannot be passed between processes or pickled' 458 ) 459 460 def close(self): 461 debug('closing pool') 462 if self._state == RUN: 463 self._state = CLOSE 464 self._worker_handler._state = CLOSE 465 466 def terminate(self): 467 debug('terminating pool') 468 self._state = TERMINATE 469 self._worker_handler._state = TERMINATE 470 self._terminate() 471 472 def join(self): 473 debug('joining pool') 474 assert self._state in (CLOSE, TERMINATE) 475 self._worker_handler.join() 476 self._task_handler.join() 477 self._result_handler.join() 478 for p in self._pool: 479 p.join() 480 481 @staticmethod 482 def _help_stuff_finish(inqueue, task_handler, size): 483 # task_handler may be blocked trying to put items on inqueue 484 debug('removing tasks from inqueue until task handler finished') 485 inqueue._rlock.acquire() 486 while task_handler.is_alive() and inqueue._reader.poll(): 487 inqueue._reader.recv() 488 time.sleep(0) 489 490 @classmethod 491 def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, 492 worker_handler, task_handler, result_handler, cache): 493 # this is guaranteed to only be called once 494 debug('finalizing pool') 495 496 worker_handler._state = TERMINATE 497 task_handler._state = TERMINATE 498 499 debug('helping task handler/workers to finish') 500 cls._help_stuff_finish(inqueue, task_handler, len(pool)) 501 502 assert result_handler.is_alive() or len(cache) == 0 503 504 result_handler._state = TERMINATE 505 outqueue.put(None) # sentinel 506 507 # We must wait for the worker handler to exit before terminating 508 # workers because we don't want workers to be restarted behind our back. 509 debug('joining worker handler') 510 if threading.current_thread() is not worker_handler: 511 worker_handler.join(1e100) 512 513 # Terminate workers which haven't already finished. 514 if pool and hasattr(pool[0], 'terminate'): 515 debug('terminating workers') 516 for p in pool: 517 if p.exitcode is None: 518 p.terminate() 519 520 debug('joining task handler') 521 if threading.current_thread() is not task_handler: 522 task_handler.join(1e100) 523 524 debug('joining result handler') 525 if threading.current_thread() is not result_handler: 526 result_handler.join(1e100) 527 528 if pool and hasattr(pool[0], 'terminate'): 529 debug('joining pool workers') 530 for p in pool: 531 if p.is_alive(): 532 # worker has not yet exited 533 debug('cleaning up worker %d' % p.pid) 534 p.join() 535 536# 537# Class whose instances are returned by `Pool.apply_async()` 538# 539 540class ApplyResult(object): 541 542 def __init__(self, cache, callback): 543 self._cond = threading.Condition(threading.Lock()) 544 self._job = job_counter.next() 545 self._cache = cache 546 self._ready = False 547 self._callback = callback 548 cache[self._job] = self 549 550 def ready(self): 551 return self._ready 552 553 def successful(self): 554 assert self._ready 555 return self._success 556 557 def wait(self, timeout=None): 558 self._cond.acquire() 559 try: 560 if not self._ready: 561 self._cond.wait(timeout) 562 finally: 563 self._cond.release() 564 565 def get(self, timeout=None): 566 self.wait(timeout) 567 if not self._ready: 568 raise TimeoutError 569 if self._success: 570 return self._value 571 else: 572 raise self._value 573 574 def _set(self, i, obj): 575 self._success, self._value = obj 576 if self._callback and self._success: 577 self._callback(self._value) 578 self._cond.acquire() 579 try: 580 self._ready = True 581 self._cond.notify() 582 finally: 583 self._cond.release() 584 del self._cache[self._job] 585 586AsyncResult = ApplyResult # create alias -- see #17805 587 588# 589# Class whose instances are returned by `Pool.map_async()` 590# 591 592class MapResult(ApplyResult): 593 594 def __init__(self, cache, chunksize, length, callback): 595 ApplyResult.__init__(self, cache, callback) 596 self._success = True 597 self._value = [None] * length 598 self._chunksize = chunksize 599 if chunksize <= 0: 600 self._number_left = 0 601 self._ready = True 602 del cache[self._job] 603 else: 604 self._number_left = length//chunksize + bool(length % chunksize) 605 606 def _set(self, i, success_result): 607 success, result = success_result 608 if success: 609 self._value[i*self._chunksize:(i+1)*self._chunksize] = result 610 self._number_left -= 1 611 if self._number_left == 0: 612 if self._callback: 613 self._callback(self._value) 614 del self._cache[self._job] 615 self._cond.acquire() 616 try: 617 self._ready = True 618 self._cond.notify() 619 finally: 620 self._cond.release() 621 622 else: 623 self._success = False 624 self._value = result 625 del self._cache[self._job] 626 self._cond.acquire() 627 try: 628 self._ready = True 629 self._cond.notify() 630 finally: 631 self._cond.release() 632 633# 634# Class whose instances are returned by `Pool.imap()` 635# 636 637class IMapIterator(object): 638 639 def __init__(self, cache): 640 self._cond = threading.Condition(threading.Lock()) 641 self._job = job_counter.next() 642 self._cache = cache 643 self._items = collections.deque() 644 self._index = 0 645 self._length = None 646 self._unsorted = {} 647 cache[self._job] = self 648 649 def __iter__(self): 650 return self 651 652 def next(self, timeout=None): 653 self._cond.acquire() 654 try: 655 try: 656 item = self._items.popleft() 657 except IndexError: 658 if self._index == self._length: 659 raise StopIteration 660 self._cond.wait(timeout) 661 try: 662 item = self._items.popleft() 663 except IndexError: 664 if self._index == self._length: 665 raise StopIteration 666 raise TimeoutError 667 finally: 668 self._cond.release() 669 670 success, value = item 671 if success: 672 return value 673 raise value 674 675 __next__ = next # XXX 676 677 def _set(self, i, obj): 678 self._cond.acquire() 679 try: 680 if self._index == i: 681 self._items.append(obj) 682 self._index += 1 683 while self._index in self._unsorted: 684 obj = self._unsorted.pop(self._index) 685 self._items.append(obj) 686 self._index += 1 687 self._cond.notify() 688 else: 689 self._unsorted[i] = obj 690 691 if self._index == self._length: 692 del self._cache[self._job] 693 finally: 694 self._cond.release() 695 696 def _set_length(self, length): 697 self._cond.acquire() 698 try: 699 self._length = length 700 if self._index == self._length: 701 self._cond.notify() 702 del self._cache[self._job] 703 finally: 704 self._cond.release() 705 706# 707# Class whose instances are returned by `Pool.imap_unordered()` 708# 709 710class IMapUnorderedIterator(IMapIterator): 711 712 def _set(self, i, obj): 713 self._cond.acquire() 714 try: 715 self._items.append(obj) 716 self._index += 1 717 self._cond.notify() 718 if self._index == self._length: 719 del self._cache[self._job] 720 finally: 721 self._cond.release() 722 723# 724# 725# 726 727class ThreadPool(Pool): 728 729 from .dummy import Process 730 731 def __init__(self, processes=None, initializer=None, initargs=()): 732 Pool.__init__(self, processes, initializer, initargs) 733 734 def _setup_queues(self): 735 self._inqueue = Queue.Queue() 736 self._outqueue = Queue.Queue() 737 self._quick_put = self._inqueue.put 738 self._quick_get = self._outqueue.get 739 740 @staticmethod 741 def _help_stuff_finish(inqueue, task_handler, size): 742 # put sentinels at head of inqueue to make workers finish 743 inqueue.not_empty.acquire() 744 try: 745 inqueue.queue.clear() 746 inqueue.queue.extend([None] * size) 747 inqueue.not_empty.notify_all() 748 finally: 749 inqueue.not_empty.release() 750