1 import os
2 import unittest
3 import random
4 from test import support
5 from test.support import threading_helper
6 import _thread as thread
7 import time
8 import weakref
9 
10 from test import lock_tests
11 
12 threading_helper.requires_working_threading(module=True)
13 
14 NUMTASKS = 10
15 NUMTRIPS = 3
16 POLL_SLEEP = 0.010 # seconds = 10 ms
17 
18 _print_mutex = thread.allocate_lock()
19 
20 def verbose_print(arg):
21     """Helper function for printing out debugging output."""
22     if support.verbose:
23         with _print_mutex:
24             print(arg)
25 
26 
27 class BasicThreadTest(unittest.TestCase):
28 
29     def setUp(self):
30         self.done_mutex = thread.allocate_lock()
31         self.done_mutex.acquire()
32         self.running_mutex = thread.allocate_lock()
33         self.random_mutex = thread.allocate_lock()
34         self.created = 0
35         self.running = 0
36         self.next_ident = 0
37 
38         key = threading_helper.threading_setup()
39         self.addCleanup(threading_helper.threading_cleanup, *key)
40 
41 
42 class ThreadRunningTests(BasicThreadTest):
43 
44     def newtask(self):
45         with self.running_mutex:
46             self.next_ident += 1
47             verbose_print("creating task %s" % self.next_ident)
48             thread.start_new_thread(self.task, (self.next_ident,))
49             self.created += 1
50             self.running += 1
51 
52     def task(self, ident):
53         with self.random_mutex:
54             delay = random.random() / 10000.0
55         verbose_print("task %s will run for %sus" % (ident, round(delay*1e6)))
56         time.sleep(delay)
57         verbose_print("task %s done" % ident)
58         with self.running_mutex:
59             self.running -= 1
60             if self.created == NUMTASKS and self.running == 0:
61                 self.done_mutex.release()
62 
63     def test_starting_threads(self):
64         with threading_helper.wait_threads_exit():
65             # Basic test for thread creation.
66             for i in range(NUMTASKS):
67                 self.newtask()
68             verbose_print("waiting for tasks to complete...")
69             self.done_mutex.acquire()
70             verbose_print("all tasks done")
71 
72     def test_stack_size(self):
73         # Various stack size tests.
74         self.assertEqual(thread.stack_size(), 0, "initial stack size is not 0")
75 
76         thread.stack_size(0)
77         self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default")
78 
79     @unittest.skipIf(os.name not in ("nt", "posix"), 'test meant for nt and posix')
80     def test_nt_and_posix_stack_size(self):
81         try:
82             thread.stack_size(4096)
83         except ValueError:
84             verbose_print("caught expected ValueError setting "
85                             "stack_size(4096)")
86         except thread.error:
87             self.skipTest("platform does not support changing thread stack "
88                           "size")
89 
90         fail_msg = "stack_size(%d) failed - should succeed"
91         for tss in (262144, 0x100000, 0):
92             thread.stack_size(tss)
93             self.assertEqual(thread.stack_size(), tss, fail_msg % tss)
94             verbose_print("successfully set stack_size(%d)" % tss)
95 
96         for tss in (262144, 0x100000):
97             verbose_print("trying stack_size = (%d)" % tss)
98             self.next_ident = 0
99             self.created = 0
100             with threading_helper.wait_threads_exit():
101                 for i in range(NUMTASKS):
102                     self.newtask()
103 
104                 verbose_print("waiting for all tasks to complete")
105                 self.done_mutex.acquire()
106                 verbose_print("all tasks done")
107 
108         thread.stack_size(0)
109 
110     def test__count(self):
111         # Test the _count() function.
112         orig = thread._count()
113         mut = thread.allocate_lock()
114         mut.acquire()
115         started = []
116 
117         def task():
118             started.append(None)
119             mut.acquire()
120             mut.release()
121 
122         with threading_helper.wait_threads_exit():
123             thread.start_new_thread(task, ())
124             while not started:
125                 time.sleep(POLL_SLEEP)
126             self.assertEqual(thread._count(), orig + 1)
127             # Allow the task to finish.
128             mut.release()
129             # The only reliable way to be sure that the thread ended from the
130             # interpreter's point of view is to wait for the function object to be
131             # destroyed.
132             done = []
133             wr = weakref.ref(task, lambda _: done.append(None))
134             del task
135             while not done:
136                 time.sleep(POLL_SLEEP)
137                 support.gc_collect()  # For PyPy or other GCs.
138             self.assertEqual(thread._count(), orig)
139 
140     def test_unraisable_exception(self):
141         def task():
142             started.release()
143             raise ValueError("task failed")
144 
145         started = thread.allocate_lock()
146         with support.catch_unraisable_exception() as cm:
147             with threading_helper.wait_threads_exit():
148                 started.acquire()
149                 thread.start_new_thread(task, ())
150                 started.acquire()
151 
152             self.assertEqual(str(cm.unraisable.exc_value), "task failed")
153             self.assertIs(cm.unraisable.object, task)
154             self.assertEqual(cm.unraisable.err_msg,
155                              "Exception ignored in thread started by")
156             self.assertIsNotNone(cm.unraisable.exc_traceback)
157 
158 
159 class Barrier:
160     def __init__(self, num_threads):
161         self.num_threads = num_threads
162         self.waiting = 0
163         self.checkin_mutex  = thread.allocate_lock()
164         self.checkout_mutex = thread.allocate_lock()
165         self.checkout_mutex.acquire()
166 
167     def enter(self):
168         self.checkin_mutex.acquire()
169         self.waiting = self.waiting + 1
170         if self.waiting == self.num_threads:
171             self.waiting = self.num_threads - 1
172             self.checkout_mutex.release()
173             return
174         self.checkin_mutex.release()
175 
176         self.checkout_mutex.acquire()
177         self.waiting = self.waiting - 1
178         if self.waiting == 0:
179             self.checkin_mutex.release()
180             return
181         self.checkout_mutex.release()
182 
183 
184 class BarrierTest(BasicThreadTest):
185 
186     def test_barrier(self):
187         with threading_helper.wait_threads_exit():
188             self.bar = Barrier(NUMTASKS)
189             self.running = NUMTASKS
190             for i in range(NUMTASKS):
191                 thread.start_new_thread(self.task2, (i,))
192             verbose_print("waiting for tasks to end")
193             self.done_mutex.acquire()
194             verbose_print("tasks done")
195 
196     def task2(self, ident):
197         for i in range(NUMTRIPS):
198             if ident == 0:
199                 # give it a good chance to enter the next
200                 # barrier before the others are all out
201                 # of the current one
202                 delay = 0
203             else:
204                 with self.random_mutex:
205                     delay = random.random() / 10000.0
206             verbose_print("task %s will run for %sus" %
207                           (ident, round(delay * 1e6)))
208             time.sleep(delay)
209             verbose_print("task %s entering %s" % (ident, i))
210             self.bar.enter()
211             verbose_print("task %s leaving barrier" % ident)
212         with self.running_mutex:
213             self.running -= 1
214             # Must release mutex before releasing done, else the main thread can
215             # exit and set mutex to None as part of global teardown; then
216             # mutex.release() raises AttributeError.
217             finished = self.running == 0
218         if finished:
219             self.done_mutex.release()
220 
221 class LockTests(lock_tests.LockTests):
222     locktype = thread.allocate_lock
223 
224 
225 class TestForkInThread(unittest.TestCase):
226     def setUp(self):
227         self.read_fd, self.write_fd = os.pipe()
228 
229     @support.requires_fork()
230     @threading_helper.reap_threads
231     def test_forkinthread(self):
232         pid = None
233 
234         def fork_thread(read_fd, write_fd):
235             nonlocal pid
236 
237             # fork in a thread
238             pid = os.fork()
239             if pid:
240                 # parent process
241                 return
242 
243             # child process
244             try:
245                 os.close(read_fd)
246                 os.write(write_fd, b"OK")
247             finally:
248                 os._exit(0)
249 
250         with threading_helper.wait_threads_exit():
251             thread.start_new_thread(fork_thread, (self.read_fd, self.write_fd))
252             self.assertEqual(os.read(self.read_fd, 2), b"OK")
253             os.close(self.write_fd)
254 
255         self.assertIsNotNone(pid)
256         support.wait_process(pid, exitcode=0)
257 
258     def tearDown(self):
259         try:
260             os.close(self.read_fd)
261         except OSError:
262             pass
263 
264         try:
265             os.close(self.write_fd)
266         except OSError:
267             pass
268 
269 
270 if __name__ == "__main__":
271     unittest.main()
272