1 /*
2  * Copyright 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "os/reactor.h"
18 
19 #include <bluetooth/log.h>
20 #include <sys/eventfd.h>
21 
22 #include <chrono>
23 #include <future>
24 #include <thread>
25 
26 #include "common/bind.h"
27 #include "common/callback.h"
28 #include "gtest/gtest.h"
29 
30 namespace bluetooth {
31 namespace os {
32 namespace {
33 
34 constexpr int kReadReadyValue = 100;
35 
36 using common::Bind;
37 
38 std::promise<int>* g_promise;
39 
40 class ReactorTest : public ::testing::Test {
41 protected:
SetUp()42   void SetUp() override {
43     g_promise = new std::promise<int>;
44     reactor_ = new Reactor;
45   }
46 
TearDown()47   void TearDown() override {
48     delete g_promise;
49     g_promise = nullptr;
50     delete reactor_;
51     reactor_ = nullptr;
52   }
53 
54   Reactor* reactor_;
55 };
56 
57 class SampleReactable {
58 public:
SampleReactable()59   SampleReactable() : fd_(eventfd(0, EFD_NONBLOCK)) { EXPECT_NE(fd_, -1); }
60 
~SampleReactable()61   ~SampleReactable() { close(fd_); }
62 
OnReadReady()63   void OnReadReady() {}
64 
OnWriteReady()65   void OnWriteReady() {}
66 
67   int fd_;
68 };
69 
70 class FakeReactable {
71 public:
72   enum EventFdValue {
73     kSetPromise = 1,
74     kRegisterSampleReactable,
75     kUnregisterSampleReactable,
76     kSampleOutputValue,
77   };
FakeReactable()78   FakeReactable() : fd_(eventfd(0, 0)), reactor_(nullptr) { EXPECT_NE(fd_, -1); }
79 
FakeReactable(Reactor * reactor)80   FakeReactable(Reactor* reactor) : fd_(eventfd(0, 0)), reactor_(reactor) { EXPECT_NE(fd_, -1); }
81 
~FakeReactable()82   ~FakeReactable() { close(fd_); }
83 
OnReadReady()84   void OnReadReady() {
85     log::info("");
86     uint64_t value = 0;
87     auto read_result = eventfd_read(fd_, &value);
88     log::info("value = {}", (int)value);
89     EXPECT_EQ(read_result, 0);
90     if (value == kSetPromise && g_promise != nullptr) {
91       g_promise->set_value(kReadReadyValue);
92     }
93     if (value == kRegisterSampleReactable) {
94       reactable_ = reactor_->Register(
95               sample_reactable_.fd_, Bind(&FakeReactable::OnReadReady, common::Unretained(this)),
96               Bind(&FakeReactable::OnWriteReadyNoOp, common::Unretained(this)));
97       g_promise->set_value(kReadReadyValue);
98     }
99     if (value == kUnregisterSampleReactable) {
100       reactor_->Unregister(reactable_);
101       g_promise->set_value(kReadReadyValue);
102     }
103   }
104 
OnWriteReady()105   void OnWriteReady() {
106     auto write_result = eventfd_write(fd_, output_data_);
107     output_data_ = 0;
108     EXPECT_EQ(write_result, 0);
109   }
110 
OnWriteReadyNoOp()111   void OnWriteReadyNoOp() {}
112 
UnregisterInCallback()113   void UnregisterInCallback() {
114     uint64_t value = 0;
115     auto read_result = eventfd_read(fd_, &value);
116     EXPECT_EQ(read_result, 0);
117     g_promise->set_value(kReadReadyValue);
118     reactor_->Unregister(reactable_);
119   }
120 
121   SampleReactable sample_reactable_;
122   Reactor::Reactable* reactable_ = nullptr;
123   int fd_;
124 
125 private:
126   Reactor* reactor_;
127   uint64_t output_data_ = kSampleOutputValue;
128 };
129 
130 class FakeRunningReactable {
131 public:
FakeRunningReactable()132   FakeRunningReactable() : fd_(eventfd(0, 0)) { EXPECT_NE(fd_, -1); }
133 
~FakeRunningReactable()134   ~FakeRunningReactable() { close(fd_); }
135 
OnReadReady()136   void OnReadReady() {
137     uint64_t value = 0;
138     auto read_result = eventfd_read(fd_, &value);
139     ASSERT_EQ(read_result, 0);
140     started.set_value();
141     can_finish.get_future().wait();
142     finished.set_value();
143   }
144 
145   Reactor::Reactable* reactable_ = nullptr;
146   int fd_;
147 
148   std::promise<void> started;
149   std::promise<void> can_finish;
150   std::promise<void> finished;
151 };
152 
TEST_F(ReactorTest,start_and_stop)153 TEST_F(ReactorTest, start_and_stop) {
154   auto reactor_thread = std::thread(&Reactor::Run, reactor_);
155   reactor_->Stop();
156   reactor_thread.join();
157 }
158 
TEST_F(ReactorTest,stop_and_start)159 TEST_F(ReactorTest, stop_and_start) {
160   auto reactor_thread = std::thread(&Reactor::Stop, reactor_);
161   auto another_thread = std::thread(&Reactor::Run, reactor_);
162   reactor_thread.join();
163   another_thread.join();
164 }
165 
TEST_F(ReactorTest,stop_multi_times)166 TEST_F(ReactorTest, stop_multi_times) {
167   auto reactor_thread = std::thread(&Reactor::Run, reactor_);
168   for (int i = 0; i < 5; i++) {
169     reactor_->Stop();
170   }
171   reactor_thread.join();
172 }
173 
TEST_F(ReactorTest,cold_register_only)174 TEST_F(ReactorTest, cold_register_only) {
175   FakeReactable fake_reactable;
176   auto* reactable =
177           reactor_->Register(fake_reactable.fd_,
178                              Bind(&FakeReactable::OnReadReady, common::Unretained(&fake_reactable)),
179                              common::Closure());
180 
181   reactor_->Unregister(reactable);
182 }
183 
TEST_F(ReactorTest,cold_register)184 TEST_F(ReactorTest, cold_register) {
185   FakeReactable fake_reactable;
186   auto* reactable =
187           reactor_->Register(fake_reactable.fd_,
188                              Bind(&FakeReactable::OnReadReady, common::Unretained(&fake_reactable)),
189                              common::Closure());
190   auto reactor_thread = std::thread(&Reactor::Run, reactor_);
191   auto future = g_promise->get_future();
192 
193   auto write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kSetPromise);
194   EXPECT_EQ(write_result, 0);
195   EXPECT_EQ(future.get(), kReadReadyValue);
196   reactor_->Stop();
197   reactor_thread.join();
198   reactor_->Unregister(reactable);
199 }
200 
TEST_F(ReactorTest,hot_register_from_different_thread)201 TEST_F(ReactorTest, hot_register_from_different_thread) {
202   auto reactor_thread = std::thread(&Reactor::Run, reactor_);
203   auto future = g_promise->get_future();
204 
205   FakeReactable fake_reactable;
206   auto* reactable =
207           reactor_->Register(fake_reactable.fd_,
208                              Bind(&FakeReactable::OnReadReady, common::Unretained(&fake_reactable)),
209                              common::Closure());
210   auto write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kSetPromise);
211   EXPECT_EQ(write_result, 0);
212   EXPECT_EQ(future.get(), kReadReadyValue);
213   reactor_->Stop();
214   reactor_thread.join();
215 
216   reactor_->Unregister(reactable);
217 }
218 
TEST_F(ReactorTest,unregister_from_different_thread_while_task_is_executing_)219 TEST_F(ReactorTest, unregister_from_different_thread_while_task_is_executing_) {
220   FakeRunningReactable fake_reactable;
221   auto* reactable = reactor_->Register(
222           fake_reactable.fd_,
223           Bind(&FakeRunningReactable::OnReadReady, common::Unretained(&fake_reactable)),
224           common::Closure());
225   auto reactor_thread = std::thread(&Reactor::Run, reactor_);
226   auto write_result = eventfd_write(fake_reactable.fd_, 1);
227   ASSERT_EQ(write_result, 0);
228   fake_reactable.started.get_future().wait();
229   reactor_->Unregister(reactable);
230   fake_reactable.can_finish.set_value();
231   fake_reactable.finished.get_future().wait();
232 
233   reactor_->Stop();
234   reactor_thread.join();
235 }
236 
TEST_F(ReactorTest,unregister_from_different_thread_while_task_is_executing_wait_fails)237 TEST_F(ReactorTest, unregister_from_different_thread_while_task_is_executing_wait_fails) {
238   FakeRunningReactable fake_reactable;
239   auto* reactable = reactor_->Register(
240           fake_reactable.fd_,
241           common::Bind(&FakeRunningReactable::OnReadReady, common::Unretained(&fake_reactable)),
242           common::Closure());
243   auto reactor_thread = std::thread(&Reactor::Run, reactor_);
244   auto write_result = eventfd_write(fake_reactable.fd_, 1);
245   ASSERT_EQ(write_result, 0);
246   fake_reactable.started.get_future().wait();
247   reactor_->Unregister(reactable);
248   ASSERT_FALSE(reactor_->WaitForUnregisteredReactable(std::chrono::milliseconds(1)));
249   fake_reactable.can_finish.set_value();
250   fake_reactable.finished.get_future().wait();
251 
252   reactor_->Stop();
253   reactor_thread.join();
254 }
255 
TEST_F(ReactorTest,unregister_from_different_thread_while_task_is_executing_wait_succeeds)256 TEST_F(ReactorTest, unregister_from_different_thread_while_task_is_executing_wait_succeeds) {
257   FakeRunningReactable fake_reactable;
258   auto* reactable = reactor_->Register(
259           fake_reactable.fd_,
260           common::Bind(&FakeRunningReactable::OnReadReady, common::Unretained(&fake_reactable)),
261           common::Closure());
262   auto reactor_thread = std::thread(&Reactor::Run, reactor_);
263   auto write_result = eventfd_write(fake_reactable.fd_, 1);
264   ASSERT_EQ(write_result, 0);
265   fake_reactable.started.get_future().wait();
266   reactor_->Unregister(reactable);
267   fake_reactable.can_finish.set_value();
268   fake_reactable.finished.get_future().wait();
269   ASSERT_TRUE(reactor_->WaitForUnregisteredReactable(std::chrono::milliseconds(1)));
270 
271   reactor_->Stop();
272   reactor_thread.join();
273 }
274 
TEST_F(ReactorTest,hot_unregister_from_different_thread)275 TEST_F(ReactorTest, hot_unregister_from_different_thread) {
276   FakeReactable fake_reactable;
277   auto* reactable =
278           reactor_->Register(fake_reactable.fd_,
279                              Bind(&FakeReactable::OnReadReady, common::Unretained(&fake_reactable)),
280                              common::Closure());
281   auto reactor_thread = std::thread(&Reactor::Run, reactor_);
282   reactor_->Unregister(reactable);
283   auto future = g_promise->get_future();
284 
285   auto write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kSetPromise);
286   EXPECT_EQ(write_result, 0);
287   future.wait_for(std::chrono::milliseconds(10));
288   g_promise->set_value(2);
289   EXPECT_EQ(future.get(), 2);
290   reactor_->Stop();
291   reactor_thread.join();
292 }
293 
TEST_F(ReactorTest,hot_register_from_same_thread)294 TEST_F(ReactorTest, hot_register_from_same_thread) {
295   auto reactor_thread = std::thread(&Reactor::Run, reactor_);
296   auto future = g_promise->get_future();
297 
298   FakeReactable fake_reactable(reactor_);
299   auto* reactable =
300           reactor_->Register(fake_reactable.fd_,
301                              Bind(&FakeReactable::OnReadReady, common::Unretained(&fake_reactable)),
302                              common::Closure());
303   auto write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kRegisterSampleReactable);
304   EXPECT_EQ(write_result, 0);
305   EXPECT_EQ(future.get(), kReadReadyValue);
306   delete g_promise;
307   g_promise = new std::promise<int>;
308   future = g_promise->get_future();
309   write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kUnregisterSampleReactable);
310   EXPECT_EQ(write_result, 0);
311   reactor_->Stop();
312   reactor_thread.join();
313 
314   reactor_->Unregister(reactable);
315 }
316 
TEST_F(ReactorTest,hot_unregister_from_same_thread)317 TEST_F(ReactorTest, hot_unregister_from_same_thread) {
318   auto reactor_thread = std::thread(&Reactor::Run, reactor_);
319   auto future = g_promise->get_future();
320 
321   FakeReactable fake_reactable(reactor_);
322   auto* reactable =
323           reactor_->Register(fake_reactable.fd_,
324                              Bind(&FakeReactable::OnReadReady, common::Unretained(&fake_reactable)),
325                              common::Closure());
326   auto write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kRegisterSampleReactable);
327   EXPECT_EQ(write_result, 0);
328   EXPECT_EQ(future.get(), kReadReadyValue);
329   log::info("");
330   delete g_promise;
331   g_promise = new std::promise<int>;
332   future = g_promise->get_future();
333   write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kUnregisterSampleReactable);
334   EXPECT_EQ(write_result, 0);
335   EXPECT_EQ(future.get(), kReadReadyValue);
336   log::info("");
337   reactor_->Stop();
338   reactor_thread.join();
339 
340   reactor_->Unregister(reactable);
341 }
342 
TEST_F(ReactorTest,hot_unregister_from_callback)343 TEST_F(ReactorTest, hot_unregister_from_callback) {
344   auto reactor_thread = std::thread(&Reactor::Run, reactor_);
345 
346   FakeReactable fake_reactable1(reactor_);
347   auto* reactable1 = reactor_->Register(
348           fake_reactable1.fd_,
349           Bind(&FakeReactable::OnReadReady, common::Unretained(&fake_reactable1)),
350           common::Closure());
351 
352   FakeReactable fake_reactable2(reactor_);
353   auto* reactable2 = reactor_->Register(
354           fake_reactable2.fd_,
355           Bind(&FakeReactable::UnregisterInCallback, common::Unretained(&fake_reactable2)),
356           common::Closure());
357   fake_reactable2.reactable_ = reactable2;
358   auto write_result = eventfd_write(fake_reactable2.fd_, 1);
359   EXPECT_EQ(write_result, 0);
360   reactor_->Stop();
361   reactor_thread.join();
362 
363   reactor_->Unregister(reactable1);
364 }
365 
TEST_F(ReactorTest,hot_unregister_during_unregister_from_callback)366 TEST_F(ReactorTest, hot_unregister_during_unregister_from_callback) {
367   auto reactor_thread = std::thread(&Reactor::Run, reactor_);
368   auto future = g_promise->get_future();
369 
370   FakeReactable fake_reactable1(reactor_);
371   auto* reactable1 = reactor_->Register(
372           fake_reactable1.fd_,
373           Bind(&FakeReactable::OnReadReady, common::Unretained(&fake_reactable1)),
374           common::Closure());
375 
376   FakeReactable fake_reactable2(reactor_);
377   auto* reactable2 = reactor_->Register(
378           fake_reactable2.fd_,
379           Bind(&FakeReactable::UnregisterInCallback, common::Unretained(&fake_reactable2)),
380           common::Closure());
381   fake_reactable2.reactable_ = reactable2;
382   auto write_result = eventfd_write(fake_reactable2.fd_, 1);
383   EXPECT_EQ(write_result, 0);
384   EXPECT_EQ(future.get(), kReadReadyValue);
385   reactor_->Unregister(reactable1);
386 
387   reactor_->Stop();
388   reactor_thread.join();
389 }
390 
TEST_F(ReactorTest,start_and_stop_multi_times)391 TEST_F(ReactorTest, start_and_stop_multi_times) {
392   auto reactor_thread = std::thread(&Reactor::Run, reactor_);
393   reactor_->Stop();
394   reactor_thread.join();
395   for (int i = 0; i < 5; i++) {
396     reactor_thread = std::thread(&Reactor::Run, reactor_);
397     reactor_->Stop();
398     reactor_thread.join();
399   }
400 }
401 
TEST_F(ReactorTest,on_write_ready)402 TEST_F(ReactorTest, on_write_ready) {
403   FakeReactable fake_reactable;
404   auto* reactable = reactor_->Register(
405           fake_reactable.fd_, common::Closure(),
406           Bind(&FakeReactable::OnWriteReady, common::Unretained(&fake_reactable)));
407   auto reactor_thread = std::thread(&Reactor::Run, reactor_);
408   uint64_t value = 0;
409   auto read_result = eventfd_read(fake_reactable.fd_, &value);
410   EXPECT_EQ(read_result, 0);
411   EXPECT_EQ(value, FakeReactable::kSampleOutputValue);
412 
413   reactor_->Stop();
414   reactor_thread.join();
415 
416   reactor_->Unregister(reactable);
417 }
418 
TEST_F(ReactorTest,modify_registration)419 TEST_F(ReactorTest, modify_registration) {
420   FakeReactable fake_reactable;
421   auto* reactable = reactor_->Register(
422           fake_reactable.fd_,
423           Bind(&FakeReactable::OnReadReady, common::Unretained(&fake_reactable)),
424           Bind(&FakeReactable::OnWriteReady, common::Unretained(&fake_reactable)));
425 
426   auto reactor_thread = std::thread(&Reactor::Run, reactor_);
427 
428   using namespace std::chrono_literals;
429   auto future = g_promise->get_future();
430 
431   auto write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kSetPromise);
432   ASSERT_EQ(write_result, 0);
433   ASSERT_EQ(future.wait_for(10ms), std::future_status::ready);
434   ASSERT_EQ(future.get(), kReadReadyValue);
435 
436   /* Disable on_read callback */
437   reactor_->ModifyRegistration(reactable, Reactor::REACT_ON_WRITE_ONLY);
438 
439   delete g_promise;
440   g_promise = new std::promise<int>;
441   future = g_promise->get_future();
442 
443   write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kSetPromise);
444   ASSERT_EQ(write_result, 0);
445   ASSERT_NE(future.wait_for(10ms), std::future_status::ready);
446 
447   reactor_->Stop();
448   reactor_thread.join();
449 
450   reactor_->Unregister(reactable);
451 }
452 
453 }  // namespace
454 }  // namespace os
455 }  // namespace bluetooth
456