1 #
2 # Simple benchmarks for the multiprocessing package
3 #
4 # Copyright (c) 2006-2008, R Oudkerk
5 # All rights reserved.
6 #
7 
8 import time, sys, multiprocessing, threading, Queue, gc
9 
10 if sys.platform == 'win32':
11     _timer = time.clock
12 else:
13     _timer = time.time
14 
15 delta = 1
16 
17 
18 #### TEST_QUEUESPEED
19 
20 def queuespeed_func(q, c, iterations):
21     a = '0' * 256
22     c.acquire()
23     c.notify()
24     c.release()
25 
26     for i in xrange(iterations):
27         q.put(a)
28 
29     q.put('STOP')
30 
31 def test_queuespeed(Process, q, c):
32     elapsed = 0
33     iterations = 1
34 
35     while elapsed < delta:
36         iterations *= 2
37 
38         p = Process(target=queuespeed_func, args=(q, c, iterations))
39         c.acquire()
40         p.start()
41         c.wait()
42         c.release()
43 
44         result = None
45         t = _timer()
46 
47         while result != 'STOP':
48             result = q.get()
49 
50         elapsed = _timer() - t
51 
52         p.join()
53 
54     print iterations, 'objects passed through the queue in', elapsed, 'seconds'
55     print 'average number/sec:', iterations/elapsed
56 
57 
58 #### TEST_PIPESPEED
59 
60 def pipe_func(c, cond, iterations):
61     a = '0' * 256
62     cond.acquire()
63     cond.notify()
64     cond.release()
65 
66     for i in xrange(iterations):
67         c.send(a)
68 
69     c.send('STOP')
70 
71 def test_pipespeed():
72     c, d = multiprocessing.Pipe()
73     cond = multiprocessing.Condition()
74     elapsed = 0
75     iterations = 1
76 
77     while elapsed < delta:
78         iterations *= 2
79 
80         p = multiprocessing.Process(target=pipe_func,
81                                     args=(d, cond, iterations))
82         cond.acquire()
83         p.start()
84         cond.wait()
85         cond.release()
86 
87         result = None
88         t = _timer()
89 
90         while result != 'STOP':
91             result = c.recv()
92 
93         elapsed = _timer() - t
94         p.join()
95 
96     print iterations, 'objects passed through connection in',elapsed,'seconds'
97     print 'average number/sec:', iterations/elapsed
98 
99 
100 #### TEST_SEQSPEED
101 
102 def test_seqspeed(seq):
103     elapsed = 0
104     iterations = 1
105 
106     while elapsed < delta:
107         iterations *= 2
108 
109         t = _timer()
110 
111         for i in xrange(iterations):
112             a = seq[5]
113 
114         elapsed = _timer()-t
115 
116     print iterations, 'iterations in', elapsed, 'seconds'
117     print 'average number/sec:', iterations/elapsed
118 
119 
120 #### TEST_LOCK
121 
122 def test_lockspeed(l):
123     elapsed = 0
124     iterations = 1
125 
126     while elapsed < delta:
127         iterations *= 2
128 
129         t = _timer()
130 
131         for i in xrange(iterations):
132             l.acquire()
133             l.release()
134 
135         elapsed = _timer()-t
136 
137     print iterations, 'iterations in', elapsed, 'seconds'
138     print 'average number/sec:', iterations/elapsed
139 
140 
141 #### TEST_CONDITION
142 
143 def conditionspeed_func(c, N):
144     c.acquire()
145     c.notify()
146 
147     for i in xrange(N):
148         c.wait()
149         c.notify()
150 
151     c.release()
152 
153 def test_conditionspeed(Process, c):
154     elapsed = 0
155     iterations = 1
156 
157     while elapsed < delta:
158         iterations *= 2
159 
160         c.acquire()
161         p = Process(target=conditionspeed_func, args=(c, iterations))
162         p.start()
163 
164         c.wait()
165 
166         t = _timer()
167 
168         for i in xrange(iterations):
169             c.notify()
170             c.wait()
171 
172         elapsed = _timer()-t
173 
174         c.release()
175         p.join()
176 
177     print iterations * 2, 'waits in', elapsed, 'seconds'
178     print 'average number/sec:', iterations * 2 / elapsed
179 
180 ####
181 
182 def test():
183     manager = multiprocessing.Manager()
184 
185     gc.disable()
186 
187     print '\n\t######## testing Queue.Queue\n'
188     test_queuespeed(threading.Thread, Queue.Queue(),
189                     threading.Condition())
190     print '\n\t######## testing multiprocessing.Queue\n'
191     test_queuespeed(multiprocessing.Process, multiprocessing.Queue(),
192                     multiprocessing.Condition())
193     print '\n\t######## testing Queue managed by server process\n'
194     test_queuespeed(multiprocessing.Process, manager.Queue(),
195                     manager.Condition())
196     print '\n\t######## testing multiprocessing.Pipe\n'
197     test_pipespeed()
198 
199     print
200 
201     print '\n\t######## testing list\n'
202     test_seqspeed(range(10))
203     print '\n\t######## testing list managed by server process\n'
204     test_seqspeed(manager.list(range(10)))
205     print '\n\t######## testing Array("i", ..., lock=False)\n'
206     test_seqspeed(multiprocessing.Array('i', range(10), lock=False))
207     print '\n\t######## testing Array("i", ..., lock=True)\n'
208     test_seqspeed(multiprocessing.Array('i', range(10), lock=True))
209 
210     print
211 
212     print '\n\t######## testing threading.Lock\n'
213     test_lockspeed(threading.Lock())
214     print '\n\t######## testing threading.RLock\n'
215     test_lockspeed(threading.RLock())
216     print '\n\t######## testing multiprocessing.Lock\n'
217     test_lockspeed(multiprocessing.Lock())
218     print '\n\t######## testing multiprocessing.RLock\n'
219     test_lockspeed(multiprocessing.RLock())
220     print '\n\t######## testing lock managed by server process\n'
221     test_lockspeed(manager.Lock())
222     print '\n\t######## testing rlock managed by server process\n'
223     test_lockspeed(manager.RLock())
224 
225     print
226 
227     print '\n\t######## testing threading.Condition\n'
228     test_conditionspeed(threading.Thread, threading.Condition())
229     print '\n\t######## testing multiprocessing.Condition\n'
230     test_conditionspeed(multiprocessing.Process, multiprocessing.Condition())
231     print '\n\t######## testing condition managed by a server process\n'
232     test_conditionspeed(multiprocessing.Process, manager.Condition())
233 
234     gc.enable()
235 
236 if __name__ == '__main__':
237     multiprocessing.freeze_support()
238     test()
239