1 /*
2 * Copyright 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "os/reactor.h"
18
19 #include <bluetooth/log.h>
20 #include <sys/eventfd.h>
21
22 #include <chrono>
23 #include <future>
24 #include <thread>
25
26 #include "common/bind.h"
27 #include "common/callback.h"
28 #include "gtest/gtest.h"
29
30 namespace bluetooth {
31 namespace os {
32 namespace {
33
34 constexpr int kReadReadyValue = 100;
35
36 using common::Bind;
37
38 std::promise<int>* g_promise;
39
40 class ReactorTest : public ::testing::Test {
41 protected:
SetUp()42 void SetUp() override {
43 g_promise = new std::promise<int>;
44 reactor_ = new Reactor;
45 }
46
TearDown()47 void TearDown() override {
48 delete g_promise;
49 g_promise = nullptr;
50 delete reactor_;
51 reactor_ = nullptr;
52 }
53
54 Reactor* reactor_;
55 };
56
57 class SampleReactable {
58 public:
SampleReactable()59 SampleReactable() : fd_(eventfd(0, EFD_NONBLOCK)) { EXPECT_NE(fd_, -1); }
60
~SampleReactable()61 ~SampleReactable() { close(fd_); }
62
OnReadReady()63 void OnReadReady() {}
64
OnWriteReady()65 void OnWriteReady() {}
66
67 int fd_;
68 };
69
70 class FakeReactable {
71 public:
72 enum EventFdValue {
73 kSetPromise = 1,
74 kRegisterSampleReactable,
75 kUnregisterSampleReactable,
76 kSampleOutputValue,
77 };
FakeReactable()78 FakeReactable() : fd_(eventfd(0, 0)), reactor_(nullptr) { EXPECT_NE(fd_, -1); }
79
FakeReactable(Reactor * reactor)80 FakeReactable(Reactor* reactor) : fd_(eventfd(0, 0)), reactor_(reactor) { EXPECT_NE(fd_, -1); }
81
~FakeReactable()82 ~FakeReactable() { close(fd_); }
83
OnReadReady()84 void OnReadReady() {
85 log::info("");
86 uint64_t value = 0;
87 auto read_result = eventfd_read(fd_, &value);
88 log::info("value = {}", (int)value);
89 EXPECT_EQ(read_result, 0);
90 if (value == kSetPromise && g_promise != nullptr) {
91 g_promise->set_value(kReadReadyValue);
92 }
93 if (value == kRegisterSampleReactable) {
94 reactable_ = reactor_->Register(
95 sample_reactable_.fd_, Bind(&FakeReactable::OnReadReady, common::Unretained(this)),
96 Bind(&FakeReactable::OnWriteReadyNoOp, common::Unretained(this)));
97 g_promise->set_value(kReadReadyValue);
98 }
99 if (value == kUnregisterSampleReactable) {
100 reactor_->Unregister(reactable_);
101 g_promise->set_value(kReadReadyValue);
102 }
103 }
104
OnWriteReady()105 void OnWriteReady() {
106 auto write_result = eventfd_write(fd_, output_data_);
107 output_data_ = 0;
108 EXPECT_EQ(write_result, 0);
109 }
110
OnWriteReadyNoOp()111 void OnWriteReadyNoOp() {}
112
UnregisterInCallback()113 void UnregisterInCallback() {
114 uint64_t value = 0;
115 auto read_result = eventfd_read(fd_, &value);
116 EXPECT_EQ(read_result, 0);
117 g_promise->set_value(kReadReadyValue);
118 reactor_->Unregister(reactable_);
119 }
120
121 SampleReactable sample_reactable_;
122 Reactor::Reactable* reactable_ = nullptr;
123 int fd_;
124
125 private:
126 Reactor* reactor_;
127 uint64_t output_data_ = kSampleOutputValue;
128 };
129
130 class FakeRunningReactable {
131 public:
FakeRunningReactable()132 FakeRunningReactable() : fd_(eventfd(0, 0)) { EXPECT_NE(fd_, -1); }
133
~FakeRunningReactable()134 ~FakeRunningReactable() { close(fd_); }
135
OnReadReady()136 void OnReadReady() {
137 uint64_t value = 0;
138 auto read_result = eventfd_read(fd_, &value);
139 ASSERT_EQ(read_result, 0);
140 started.set_value();
141 can_finish.get_future().wait();
142 finished.set_value();
143 }
144
145 Reactor::Reactable* reactable_ = nullptr;
146 int fd_;
147
148 std::promise<void> started;
149 std::promise<void> can_finish;
150 std::promise<void> finished;
151 };
152
TEST_F(ReactorTest,start_and_stop)153 TEST_F(ReactorTest, start_and_stop) {
154 auto reactor_thread = std::thread(&Reactor::Run, reactor_);
155 reactor_->Stop();
156 reactor_thread.join();
157 }
158
TEST_F(ReactorTest,stop_and_start)159 TEST_F(ReactorTest, stop_and_start) {
160 auto reactor_thread = std::thread(&Reactor::Stop, reactor_);
161 auto another_thread = std::thread(&Reactor::Run, reactor_);
162 reactor_thread.join();
163 another_thread.join();
164 }
165
TEST_F(ReactorTest,stop_multi_times)166 TEST_F(ReactorTest, stop_multi_times) {
167 auto reactor_thread = std::thread(&Reactor::Run, reactor_);
168 for (int i = 0; i < 5; i++) {
169 reactor_->Stop();
170 }
171 reactor_thread.join();
172 }
173
TEST_F(ReactorTest,cold_register_only)174 TEST_F(ReactorTest, cold_register_only) {
175 FakeReactable fake_reactable;
176 auto* reactable =
177 reactor_->Register(fake_reactable.fd_,
178 Bind(&FakeReactable::OnReadReady, common::Unretained(&fake_reactable)),
179 common::Closure());
180
181 reactor_->Unregister(reactable);
182 }
183
TEST_F(ReactorTest,cold_register)184 TEST_F(ReactorTest, cold_register) {
185 FakeReactable fake_reactable;
186 auto* reactable =
187 reactor_->Register(fake_reactable.fd_,
188 Bind(&FakeReactable::OnReadReady, common::Unretained(&fake_reactable)),
189 common::Closure());
190 auto reactor_thread = std::thread(&Reactor::Run, reactor_);
191 auto future = g_promise->get_future();
192
193 auto write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kSetPromise);
194 EXPECT_EQ(write_result, 0);
195 EXPECT_EQ(future.get(), kReadReadyValue);
196 reactor_->Stop();
197 reactor_thread.join();
198 reactor_->Unregister(reactable);
199 }
200
TEST_F(ReactorTest,hot_register_from_different_thread)201 TEST_F(ReactorTest, hot_register_from_different_thread) {
202 auto reactor_thread = std::thread(&Reactor::Run, reactor_);
203 auto future = g_promise->get_future();
204
205 FakeReactable fake_reactable;
206 auto* reactable =
207 reactor_->Register(fake_reactable.fd_,
208 Bind(&FakeReactable::OnReadReady, common::Unretained(&fake_reactable)),
209 common::Closure());
210 auto write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kSetPromise);
211 EXPECT_EQ(write_result, 0);
212 EXPECT_EQ(future.get(), kReadReadyValue);
213 reactor_->Stop();
214 reactor_thread.join();
215
216 reactor_->Unregister(reactable);
217 }
218
TEST_F(ReactorTest,unregister_from_different_thread_while_task_is_executing_)219 TEST_F(ReactorTest, unregister_from_different_thread_while_task_is_executing_) {
220 FakeRunningReactable fake_reactable;
221 auto* reactable = reactor_->Register(
222 fake_reactable.fd_,
223 Bind(&FakeRunningReactable::OnReadReady, common::Unretained(&fake_reactable)),
224 common::Closure());
225 auto reactor_thread = std::thread(&Reactor::Run, reactor_);
226 auto write_result = eventfd_write(fake_reactable.fd_, 1);
227 ASSERT_EQ(write_result, 0);
228 fake_reactable.started.get_future().wait();
229 reactor_->Unregister(reactable);
230 fake_reactable.can_finish.set_value();
231 fake_reactable.finished.get_future().wait();
232
233 reactor_->Stop();
234 reactor_thread.join();
235 }
236
TEST_F(ReactorTest,unregister_from_different_thread_while_task_is_executing_wait_fails)237 TEST_F(ReactorTest, unregister_from_different_thread_while_task_is_executing_wait_fails) {
238 FakeRunningReactable fake_reactable;
239 auto* reactable = reactor_->Register(
240 fake_reactable.fd_,
241 common::Bind(&FakeRunningReactable::OnReadReady, common::Unretained(&fake_reactable)),
242 common::Closure());
243 auto reactor_thread = std::thread(&Reactor::Run, reactor_);
244 auto write_result = eventfd_write(fake_reactable.fd_, 1);
245 ASSERT_EQ(write_result, 0);
246 fake_reactable.started.get_future().wait();
247 reactor_->Unregister(reactable);
248 ASSERT_FALSE(reactor_->WaitForUnregisteredReactable(std::chrono::milliseconds(1)));
249 fake_reactable.can_finish.set_value();
250 fake_reactable.finished.get_future().wait();
251
252 reactor_->Stop();
253 reactor_thread.join();
254 }
255
TEST_F(ReactorTest,unregister_from_different_thread_while_task_is_executing_wait_succeeds)256 TEST_F(ReactorTest, unregister_from_different_thread_while_task_is_executing_wait_succeeds) {
257 FakeRunningReactable fake_reactable;
258 auto* reactable = reactor_->Register(
259 fake_reactable.fd_,
260 common::Bind(&FakeRunningReactable::OnReadReady, common::Unretained(&fake_reactable)),
261 common::Closure());
262 auto reactor_thread = std::thread(&Reactor::Run, reactor_);
263 auto write_result = eventfd_write(fake_reactable.fd_, 1);
264 ASSERT_EQ(write_result, 0);
265 fake_reactable.started.get_future().wait();
266 reactor_->Unregister(reactable);
267 fake_reactable.can_finish.set_value();
268 fake_reactable.finished.get_future().wait();
269 ASSERT_TRUE(reactor_->WaitForUnregisteredReactable(std::chrono::milliseconds(1)));
270
271 reactor_->Stop();
272 reactor_thread.join();
273 }
274
TEST_F(ReactorTest,hot_unregister_from_different_thread)275 TEST_F(ReactorTest, hot_unregister_from_different_thread) {
276 FakeReactable fake_reactable;
277 auto* reactable =
278 reactor_->Register(fake_reactable.fd_,
279 Bind(&FakeReactable::OnReadReady, common::Unretained(&fake_reactable)),
280 common::Closure());
281 auto reactor_thread = std::thread(&Reactor::Run, reactor_);
282 reactor_->Unregister(reactable);
283 auto future = g_promise->get_future();
284
285 auto write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kSetPromise);
286 EXPECT_EQ(write_result, 0);
287 future.wait_for(std::chrono::milliseconds(10));
288 g_promise->set_value(2);
289 EXPECT_EQ(future.get(), 2);
290 reactor_->Stop();
291 reactor_thread.join();
292 }
293
TEST_F(ReactorTest,hot_register_from_same_thread)294 TEST_F(ReactorTest, hot_register_from_same_thread) {
295 auto reactor_thread = std::thread(&Reactor::Run, reactor_);
296 auto future = g_promise->get_future();
297
298 FakeReactable fake_reactable(reactor_);
299 auto* reactable =
300 reactor_->Register(fake_reactable.fd_,
301 Bind(&FakeReactable::OnReadReady, common::Unretained(&fake_reactable)),
302 common::Closure());
303 auto write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kRegisterSampleReactable);
304 EXPECT_EQ(write_result, 0);
305 EXPECT_EQ(future.get(), kReadReadyValue);
306 delete g_promise;
307 g_promise = new std::promise<int>;
308 future = g_promise->get_future();
309 write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kUnregisterSampleReactable);
310 EXPECT_EQ(write_result, 0);
311 reactor_->Stop();
312 reactor_thread.join();
313
314 reactor_->Unregister(reactable);
315 }
316
TEST_F(ReactorTest,hot_unregister_from_same_thread)317 TEST_F(ReactorTest, hot_unregister_from_same_thread) {
318 auto reactor_thread = std::thread(&Reactor::Run, reactor_);
319 auto future = g_promise->get_future();
320
321 FakeReactable fake_reactable(reactor_);
322 auto* reactable =
323 reactor_->Register(fake_reactable.fd_,
324 Bind(&FakeReactable::OnReadReady, common::Unretained(&fake_reactable)),
325 common::Closure());
326 auto write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kRegisterSampleReactable);
327 EXPECT_EQ(write_result, 0);
328 EXPECT_EQ(future.get(), kReadReadyValue);
329 log::info("");
330 delete g_promise;
331 g_promise = new std::promise<int>;
332 future = g_promise->get_future();
333 write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kUnregisterSampleReactable);
334 EXPECT_EQ(write_result, 0);
335 EXPECT_EQ(future.get(), kReadReadyValue);
336 log::info("");
337 reactor_->Stop();
338 reactor_thread.join();
339
340 reactor_->Unregister(reactable);
341 }
342
TEST_F(ReactorTest,hot_unregister_from_callback)343 TEST_F(ReactorTest, hot_unregister_from_callback) {
344 auto reactor_thread = std::thread(&Reactor::Run, reactor_);
345
346 FakeReactable fake_reactable1(reactor_);
347 auto* reactable1 = reactor_->Register(
348 fake_reactable1.fd_,
349 Bind(&FakeReactable::OnReadReady, common::Unretained(&fake_reactable1)),
350 common::Closure());
351
352 FakeReactable fake_reactable2(reactor_);
353 auto* reactable2 = reactor_->Register(
354 fake_reactable2.fd_,
355 Bind(&FakeReactable::UnregisterInCallback, common::Unretained(&fake_reactable2)),
356 common::Closure());
357 fake_reactable2.reactable_ = reactable2;
358 auto write_result = eventfd_write(fake_reactable2.fd_, 1);
359 EXPECT_EQ(write_result, 0);
360 reactor_->Stop();
361 reactor_thread.join();
362
363 reactor_->Unregister(reactable1);
364 }
365
TEST_F(ReactorTest,hot_unregister_during_unregister_from_callback)366 TEST_F(ReactorTest, hot_unregister_during_unregister_from_callback) {
367 auto reactor_thread = std::thread(&Reactor::Run, reactor_);
368 auto future = g_promise->get_future();
369
370 FakeReactable fake_reactable1(reactor_);
371 auto* reactable1 = reactor_->Register(
372 fake_reactable1.fd_,
373 Bind(&FakeReactable::OnReadReady, common::Unretained(&fake_reactable1)),
374 common::Closure());
375
376 FakeReactable fake_reactable2(reactor_);
377 auto* reactable2 = reactor_->Register(
378 fake_reactable2.fd_,
379 Bind(&FakeReactable::UnregisterInCallback, common::Unretained(&fake_reactable2)),
380 common::Closure());
381 fake_reactable2.reactable_ = reactable2;
382 auto write_result = eventfd_write(fake_reactable2.fd_, 1);
383 EXPECT_EQ(write_result, 0);
384 EXPECT_EQ(future.get(), kReadReadyValue);
385 reactor_->Unregister(reactable1);
386
387 reactor_->Stop();
388 reactor_thread.join();
389 }
390
TEST_F(ReactorTest,start_and_stop_multi_times)391 TEST_F(ReactorTest, start_and_stop_multi_times) {
392 auto reactor_thread = std::thread(&Reactor::Run, reactor_);
393 reactor_->Stop();
394 reactor_thread.join();
395 for (int i = 0; i < 5; i++) {
396 reactor_thread = std::thread(&Reactor::Run, reactor_);
397 reactor_->Stop();
398 reactor_thread.join();
399 }
400 }
401
TEST_F(ReactorTest,on_write_ready)402 TEST_F(ReactorTest, on_write_ready) {
403 FakeReactable fake_reactable;
404 auto* reactable = reactor_->Register(
405 fake_reactable.fd_, common::Closure(),
406 Bind(&FakeReactable::OnWriteReady, common::Unretained(&fake_reactable)));
407 auto reactor_thread = std::thread(&Reactor::Run, reactor_);
408 uint64_t value = 0;
409 auto read_result = eventfd_read(fake_reactable.fd_, &value);
410 EXPECT_EQ(read_result, 0);
411 EXPECT_EQ(value, FakeReactable::kSampleOutputValue);
412
413 reactor_->Stop();
414 reactor_thread.join();
415
416 reactor_->Unregister(reactable);
417 }
418
TEST_F(ReactorTest,modify_registration)419 TEST_F(ReactorTest, modify_registration) {
420 FakeReactable fake_reactable;
421 auto* reactable = reactor_->Register(
422 fake_reactable.fd_,
423 Bind(&FakeReactable::OnReadReady, common::Unretained(&fake_reactable)),
424 Bind(&FakeReactable::OnWriteReady, common::Unretained(&fake_reactable)));
425
426 auto reactor_thread = std::thread(&Reactor::Run, reactor_);
427
428 using namespace std::chrono_literals;
429 auto future = g_promise->get_future();
430
431 auto write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kSetPromise);
432 ASSERT_EQ(write_result, 0);
433 ASSERT_EQ(future.wait_for(10ms), std::future_status::ready);
434 ASSERT_EQ(future.get(), kReadReadyValue);
435
436 /* Disable on_read callback */
437 reactor_->ModifyRegistration(reactable, Reactor::REACT_ON_WRITE_ONLY);
438
439 delete g_promise;
440 g_promise = new std::promise<int>;
441 future = g_promise->get_future();
442
443 write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kSetPromise);
444 ASSERT_EQ(write_result, 0);
445 ASSERT_NE(future.wait_for(10ms), std::future_status::ready);
446
447 reactor_->Stop();
448 reactor_thread.join();
449
450 reactor_->Unregister(reactable);
451 }
452
453 } // namespace
454 } // namespace os
455 } // namespace bluetooth
456