1 // Copyright 2014 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "ipc/ipc_mojo_bootstrap.h"
6
7 #include <inttypes.h>
8 #include <stdint.h>
9
10 #include <map>
11 #include <memory>
12 #include <optional>
13 #include <set>
14 #include <utility>
15 #include <vector>
16
17 #include "base/check_op.h"
18 #include "base/containers/circular_deque.h"
19 #include "base/containers/contains.h"
20 #include "base/feature_list.h"
21 #include "base/functional/bind.h"
22 #include "base/functional/callback.h"
23 #include "base/memory/ptr_util.h"
24 #include "base/memory/raw_ptr.h"
25 #include "base/no_destructor.h"
26 #include "base/ranges/algorithm.h"
27 #include "base/sequence_checker.h"
28 #include "base/strings/stringprintf.h"
29 #include "base/synchronization/lock.h"
30 #include "base/synchronization/waitable_event.h"
31 #include "base/task/common/task_annotator.h"
32 #include "base/task/sequenced_task_runner.h"
33 #include "base/task/single_thread_task_runner.h"
34 #include "base/thread_annotations.h"
35 #include "base/trace_event/memory_allocator_dump.h"
36 #include "base/trace_event/memory_dump_manager.h"
37 #include "base/trace_event/memory_dump_provider.h"
38 #include "base/trace_event/typed_macros.h"
39 #include "ipc/ipc_channel.h"
40 #include "ipc/urgent_message_observer.h"
41 #include "mojo/public/cpp/bindings/associated_group.h"
42 #include "mojo/public/cpp/bindings/associated_group_controller.h"
43 #include "mojo/public/cpp/bindings/connector.h"
44 #include "mojo/public/cpp/bindings/features.h"
45 #include "mojo/public/cpp/bindings/interface_endpoint_client.h"
46 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h"
47 #include "mojo/public/cpp/bindings/interface_id.h"
48 #include "mojo/public/cpp/bindings/message.h"
49 #include "mojo/public/cpp/bindings/message_header_validator.h"
50 #include "mojo/public/cpp/bindings/mojo_buildflags.h"
51 #include "mojo/public/cpp/bindings/pipe_control_message_handler.h"
52 #include "mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h"
53 #include "mojo/public/cpp/bindings/pipe_control_message_proxy.h"
54 #include "mojo/public/cpp/bindings/sequence_local_sync_event_watcher.h"
55 #include "mojo/public/cpp/bindings/tracing_helpers.h"
56 #include "third_party/abseil-cpp/absl/base/attributes.h"
57
58 namespace IPC {
59
60 class ChannelAssociatedGroupController;
61
62 namespace {
63
64 ABSL_CONST_INIT thread_local bool off_sequence_binding_allowed = false;
65
66 BASE_FEATURE(kMojoChannelAssociatedSendUsesRunOrPostTask,
67 "MojoChannelAssociatedSendUsesRunOrPostTask",
68 base::FEATURE_DISABLED_BY_DEFAULT);
69
70 // Used to track some internal Channel state in pursuit of message leaks.
71 //
72 // TODO(https://crbug.com/813045): Remove this.
73 class ControllerMemoryDumpProvider
74 : public base::trace_event::MemoryDumpProvider {
75 public:
ControllerMemoryDumpProvider()76 ControllerMemoryDumpProvider() {
77 base::trace_event::MemoryDumpManager::GetInstance()->RegisterDumpProvider(
78 this, "IPCChannel", nullptr);
79 }
80
81 ControllerMemoryDumpProvider(const ControllerMemoryDumpProvider&) = delete;
82 ControllerMemoryDumpProvider& operator=(const ControllerMemoryDumpProvider&) =
83 delete;
84
~ControllerMemoryDumpProvider()85 ~ControllerMemoryDumpProvider() override {
86 base::trace_event::MemoryDumpManager::GetInstance()->UnregisterDumpProvider(
87 this);
88 }
89
AddController(ChannelAssociatedGroupController * controller)90 void AddController(ChannelAssociatedGroupController* controller) {
91 base::AutoLock lock(lock_);
92 controllers_.insert(controller);
93 }
94
RemoveController(ChannelAssociatedGroupController * controller)95 void RemoveController(ChannelAssociatedGroupController* controller) {
96 base::AutoLock lock(lock_);
97 controllers_.erase(controller);
98 }
99
100 // base::trace_event::MemoryDumpProvider:
101 bool OnMemoryDump(const base::trace_event::MemoryDumpArgs& args,
102 base::trace_event::ProcessMemoryDump* pmd) override;
103
104 private:
105 base::Lock lock_;
106 std::set<raw_ptr<ChannelAssociatedGroupController, SetExperimental>>
107 controllers_;
108 };
109
GetMemoryDumpProvider()110 ControllerMemoryDumpProvider& GetMemoryDumpProvider() {
111 static base::NoDestructor<ControllerMemoryDumpProvider> provider;
112 return *provider;
113 }
114
115 // Messages are grouped by this info when recording memory metrics.
116 struct MessageMemoryDumpInfo {
MessageMemoryDumpInfoIPC::__anon2063248d0111::MessageMemoryDumpInfo117 MessageMemoryDumpInfo(const mojo::Message& message)
118 : id(message.name()), profiler_tag(message.heap_profiler_tag()) {}
119 MessageMemoryDumpInfo() = default;
120
operator ==IPC::__anon2063248d0111::MessageMemoryDumpInfo121 bool operator==(const MessageMemoryDumpInfo& other) const {
122 return other.id == id && other.profiler_tag == profiler_tag;
123 }
124
125 uint32_t id = 0;
126 const char* profiler_tag = nullptr;
127 };
128
129 struct MessageMemoryDumpInfoHash {
operator ()IPC::__anon2063248d0111::MessageMemoryDumpInfoHash130 size_t operator()(const MessageMemoryDumpInfo& info) const {
131 return base::HashInts(
132 info.id, info.profiler_tag ? base::FastHash(info.profiler_tag) : 0);
133 }
134 };
135
136 class ScopedUrgentMessageNotification {
137 public:
ScopedUrgentMessageNotification(UrgentMessageObserver * observer=nullptr)138 explicit ScopedUrgentMessageNotification(
139 UrgentMessageObserver* observer = nullptr)
140 : observer_(observer) {
141 if (observer_) {
142 observer_->OnUrgentMessageReceived();
143 }
144 }
145
~ScopedUrgentMessageNotification()146 ~ScopedUrgentMessageNotification() {
147 if (observer_) {
148 observer_->OnUrgentMessageProcessed();
149 }
150 }
151
ScopedUrgentMessageNotification(ScopedUrgentMessageNotification && other)152 ScopedUrgentMessageNotification(ScopedUrgentMessageNotification&& other)
153 : observer_(std::exchange(other.observer_, nullptr)) {}
154
operator =(ScopedUrgentMessageNotification && other)155 ScopedUrgentMessageNotification& operator=(
156 ScopedUrgentMessageNotification&& other) {
157 observer_ = std::exchange(other.observer_, nullptr);
158 return *this;
159 }
160
161 private:
162 raw_ptr<UrgentMessageObserver> observer_;
163 };
164
165 } // namespace
166
167 class ChannelAssociatedGroupController
168 : public mojo::AssociatedGroupController,
169 public mojo::MessageReceiver,
170 public mojo::PipeControlMessageHandlerDelegate {
171 public:
ChannelAssociatedGroupController(bool set_interface_id_namespace_bit,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & proxy_task_runner)172 ChannelAssociatedGroupController(
173 bool set_interface_id_namespace_bit,
174 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
175 const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner)
176 : task_runner_(ipc_task_runner),
177 proxy_task_runner_(proxy_task_runner),
178 set_interface_id_namespace_bit_(set_interface_id_namespace_bit),
179 dispatcher_(this),
180 control_message_handler_(this),
181 control_message_proxy_thunk_(this),
182 control_message_proxy_(&control_message_proxy_thunk_) {
183 control_message_handler_.SetDescription(
184 "IPC::mojom::Bootstrap [primary] PipeControlMessageHandler");
185 dispatcher_.SetValidator(std::make_unique<mojo::MessageHeaderValidator>(
186 "IPC::mojom::Bootstrap [primary] MessageHeaderValidator"));
187
188 GetMemoryDumpProvider().AddController(this);
189
190 DETACH_FROM_SEQUENCE(sequence_checker_);
191 }
192
193 ChannelAssociatedGroupController(const ChannelAssociatedGroupController&) =
194 delete;
195 ChannelAssociatedGroupController& operator=(
196 const ChannelAssociatedGroupController&) = delete;
197
GetQueuedMessageCount()198 size_t GetQueuedMessageCount() {
199 base::AutoLock lock(outgoing_messages_lock_);
200 return outgoing_messages_.size();
201 }
202
GetTopQueuedMessageMemoryDumpInfo(MessageMemoryDumpInfo * info,size_t * count)203 void GetTopQueuedMessageMemoryDumpInfo(MessageMemoryDumpInfo* info,
204 size_t* count) {
205 std::unordered_map<MessageMemoryDumpInfo, size_t, MessageMemoryDumpInfoHash>
206 counts;
207 std::pair<MessageMemoryDumpInfo, size_t> top_message_info_and_count = {
208 MessageMemoryDumpInfo(), 0};
209 base::AutoLock lock(outgoing_messages_lock_);
210 for (const auto& message : outgoing_messages_) {
211 auto it_and_inserted = counts.emplace(MessageMemoryDumpInfo(message), 0);
212 it_and_inserted.first->second++;
213 if (it_and_inserted.first->second > top_message_info_and_count.second)
214 top_message_info_and_count = *it_and_inserted.first;
215 }
216 *info = top_message_info_and_count.first;
217 *count = top_message_info_and_count.second;
218 }
219
Pause()220 void Pause() {
221 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
222 CHECK(was_bound_or_message_sent_);
223 CHECK(!paused_);
224 paused_ = true;
225 }
226
Unpause()227 void Unpause() {
228 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
229 CHECK(was_bound_or_message_sent_);
230 CHECK(paused_);
231 paused_ = false;
232 }
233
FlushOutgoingMessages()234 void FlushOutgoingMessages() {
235 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
236 CHECK(was_bound_or_message_sent_);
237
238 std::vector<mojo::Message> outgoing_messages;
239 {
240 base::AutoLock lock(outgoing_messages_lock_);
241 std::swap(outgoing_messages, outgoing_messages_);
242 }
243
244 for (auto& message : outgoing_messages)
245 SendMessage(&message);
246 }
247
Bind(mojo::ScopedMessagePipeHandle handle,mojo::PendingAssociatedRemote<mojom::Channel> * sender,mojo::PendingAssociatedReceiver<mojom::Channel> * receiver)248 void Bind(mojo::ScopedMessagePipeHandle handle,
249 mojo::PendingAssociatedRemote<mojom::Channel>* sender,
250 mojo::PendingAssociatedReceiver<mojom::Channel>* receiver) {
251 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
252
253 connector_ = std::make_unique<mojo::Connector>(
254 std::move(handle), mojo::Connector::SINGLE_THREADED_SEND,
255 "IPC Channel");
256 connector_->set_incoming_receiver(&dispatcher_);
257 connector_->set_connection_error_handler(
258 base::BindOnce(&ChannelAssociatedGroupController::OnPipeError,
259 base::Unretained(this)));
260 connector_->set_enforce_errors_from_incoming_receiver(false);
261
262 // Don't let the Connector do any sort of queuing on our behalf. Individual
263 // messages bound for the IPC::ChannelProxy thread (i.e. that vast majority
264 // of messages received by this Connector) are already individually
265 // scheduled for dispatch by ChannelProxy, so Connector's normal mode of
266 // operation would only introduce a redundant scheduling step for most
267 // messages.
268 connector_->set_force_immediate_dispatch(true);
269
270 mojo::InterfaceId sender_id, receiver_id;
271 if (set_interface_id_namespace_bit_) {
272 sender_id = 1 | mojo::kInterfaceIdNamespaceMask;
273 receiver_id = 1;
274 } else {
275 sender_id = 1;
276 receiver_id = 1 | mojo::kInterfaceIdNamespaceMask;
277 }
278
279 {
280 base::AutoLock locker(lock_);
281 Endpoint* sender_endpoint = new Endpoint(this, sender_id);
282 Endpoint* receiver_endpoint = new Endpoint(this, receiver_id);
283 endpoints_.insert({ sender_id, sender_endpoint });
284 endpoints_.insert({ receiver_id, receiver_endpoint });
285 sender_endpoint->set_handle_created();
286 receiver_endpoint->set_handle_created();
287 }
288
289 mojo::ScopedInterfaceEndpointHandle sender_handle =
290 CreateScopedInterfaceEndpointHandle(sender_id);
291 mojo::ScopedInterfaceEndpointHandle receiver_handle =
292 CreateScopedInterfaceEndpointHandle(receiver_id);
293
294 *sender = mojo::PendingAssociatedRemote<mojom::Channel>(
295 std::move(sender_handle), 0);
296 *receiver = mojo::PendingAssociatedReceiver<mojom::Channel>(
297 std::move(receiver_handle));
298
299 if (!was_bound_or_message_sent_) {
300 was_bound_or_message_sent_ = true;
301 DETACH_FROM_SEQUENCE(sequence_checker_);
302 }
303 }
304
StartReceiving()305 void StartReceiving() {
306 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
307 CHECK(was_bound_or_message_sent_);
308 connector_->StartReceiving(task_runner_);
309 }
310
ShutDown()311 void ShutDown() {
312 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
313 shut_down_ = true;
314 if (connector_)
315 connector_->CloseMessagePipe();
316 OnPipeError();
317 connector_.reset();
318
319 base::AutoLock lock(outgoing_messages_lock_);
320 outgoing_messages_.clear();
321 }
322
323 // mojo::AssociatedGroupController:
AssociateInterface(mojo::ScopedInterfaceEndpointHandle handle_to_send)324 mojo::InterfaceId AssociateInterface(
325 mojo::ScopedInterfaceEndpointHandle handle_to_send) override {
326 if (!handle_to_send.pending_association())
327 return mojo::kInvalidInterfaceId;
328
329 uint32_t id = 0;
330 {
331 base::AutoLock locker(lock_);
332 do {
333 if (next_interface_id_ >= mojo::kInterfaceIdNamespaceMask)
334 next_interface_id_ = 2;
335 id = next_interface_id_++;
336 if (set_interface_id_namespace_bit_)
337 id |= mojo::kInterfaceIdNamespaceMask;
338 } while (base::Contains(endpoints_, id));
339
340 Endpoint* endpoint = new Endpoint(this, id);
341 if (encountered_error_)
342 endpoint->set_peer_closed();
343 endpoint->set_handle_created();
344 endpoints_.insert({id, endpoint});
345 }
346
347 if (!NotifyAssociation(&handle_to_send, id)) {
348 // The peer handle of |handle_to_send|, which is supposed to join this
349 // associated group, has been closed.
350 {
351 base::AutoLock locker(lock_);
352 Endpoint* endpoint = FindEndpoint(id);
353 if (endpoint)
354 MarkClosedAndMaybeRemove(endpoint);
355 }
356
357 control_message_proxy_.NotifyPeerEndpointClosed(
358 id, handle_to_send.disconnect_reason());
359 }
360 return id;
361 }
362
CreateLocalEndpointHandle(mojo::InterfaceId id)363 mojo::ScopedInterfaceEndpointHandle CreateLocalEndpointHandle(
364 mojo::InterfaceId id) override {
365 if (!mojo::IsValidInterfaceId(id))
366 return mojo::ScopedInterfaceEndpointHandle();
367
368 // Unless it is the primary ID, |id| is from the remote side and therefore
369 // its namespace bit is supposed to be different than the value that this
370 // router would use.
371 if (!mojo::IsPrimaryInterfaceId(id) &&
372 set_interface_id_namespace_bit_ ==
373 mojo::HasInterfaceIdNamespaceBitSet(id)) {
374 return mojo::ScopedInterfaceEndpointHandle();
375 }
376
377 base::AutoLock locker(lock_);
378 bool inserted = false;
379 Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
380 if (inserted) {
381 DCHECK(!endpoint->handle_created());
382 if (encountered_error_)
383 endpoint->set_peer_closed();
384 } else {
385 if (endpoint->handle_created())
386 return mojo::ScopedInterfaceEndpointHandle();
387 }
388
389 endpoint->set_handle_created();
390 return CreateScopedInterfaceEndpointHandle(id);
391 }
392
CloseEndpointHandle(mojo::InterfaceId id,const std::optional<mojo::DisconnectReason> & reason)393 void CloseEndpointHandle(
394 mojo::InterfaceId id,
395 const std::optional<mojo::DisconnectReason>& reason) override {
396 if (!mojo::IsValidInterfaceId(id))
397 return;
398 {
399 base::AutoLock locker(lock_);
400 DCHECK(base::Contains(endpoints_, id));
401 Endpoint* endpoint = endpoints_[id].get();
402 DCHECK(!endpoint->client());
403 DCHECK(!endpoint->closed());
404 MarkClosedAndMaybeRemove(endpoint);
405 }
406
407 if (!mojo::IsPrimaryInterfaceId(id) || reason)
408 control_message_proxy_.NotifyPeerEndpointClosed(id, reason);
409 }
410
NotifyLocalEndpointOfPeerClosure(mojo::InterfaceId id)411 void NotifyLocalEndpointOfPeerClosure(mojo::InterfaceId id) override {
412 if (!base::FeatureList::IsEnabled(
413 mojo::features::kMojoFixAssociatedHandleLeak)) {
414 return;
415 }
416
417 if (!task_runner_->RunsTasksInCurrentSequence()) {
418 task_runner_->PostTask(
419 FROM_HERE, base::BindOnce(&ChannelAssociatedGroupController::
420 NotifyLocalEndpointOfPeerClosure,
421 base::WrapRefCounted(this), id));
422 return;
423 }
424 OnPeerAssociatedEndpointClosed(id, std::nullopt);
425 }
426
AttachEndpointClient(const mojo::ScopedInterfaceEndpointHandle & handle,mojo::InterfaceEndpointClient * client,scoped_refptr<base::SequencedTaskRunner> runner)427 mojo::InterfaceEndpointController* AttachEndpointClient(
428 const mojo::ScopedInterfaceEndpointHandle& handle,
429 mojo::InterfaceEndpointClient* client,
430 scoped_refptr<base::SequencedTaskRunner> runner) override {
431 const mojo::InterfaceId id = handle.id();
432
433 DCHECK(mojo::IsValidInterfaceId(id));
434 DCHECK(client);
435
436 base::AutoLock locker(lock_);
437 DCHECK(base::Contains(endpoints_, id));
438
439 Endpoint* endpoint = endpoints_[id].get();
440 endpoint->AttachClient(client, std::move(runner));
441
442 if (endpoint->peer_closed())
443 NotifyEndpointOfError(endpoint, true /* force_async */);
444
445 return endpoint;
446 }
447
DetachEndpointClient(const mojo::ScopedInterfaceEndpointHandle & handle)448 void DetachEndpointClient(
449 const mojo::ScopedInterfaceEndpointHandle& handle) override {
450 const mojo::InterfaceId id = handle.id();
451
452 DCHECK(mojo::IsValidInterfaceId(id));
453
454 base::AutoLock locker(lock_);
455 DCHECK(base::Contains(endpoints_, id));
456
457 Endpoint* endpoint = endpoints_[id].get();
458 endpoint->DetachClient();
459 }
460
RaiseError()461 void RaiseError() override {
462 // We ignore errors on channel endpoints, leaving the pipe open. There are
463 // good reasons for this:
464 //
465 // * We should never close a channel endpoint in either process as long as
466 // the child process is still alive. The child's endpoint should only be
467 // closed implicitly by process death, and the browser's endpoint should
468 // only be closed after the child process is confirmed to be dead. Crash
469 // reporting logic in Chrome relies on this behavior in order to do the
470 // right thing.
471 //
472 // * There are two interesting conditions under which RaiseError() can be
473 // implicitly reached: an incoming message fails validation, or the
474 // local endpoint drops a response callback without calling it.
475 //
476 // * In the validation case, we also report the message as bad, and this
477 // will imminently trigger the common bad-IPC path in the browser,
478 // causing the browser to kill the offending renderer.
479 //
480 // * In the dropped response callback case, the net result of ignoring the
481 // issue is generally innocuous. While indicative of programmer error,
482 // it's not a severe failure and is already covered by separate DCHECKs.
483 //
484 // See https://crbug.com/861607 for additional discussion.
485 }
486
PrefersSerializedMessages()487 bool PrefersSerializedMessages() override { return true; }
488
SetUrgentMessageObserver(UrgentMessageObserver * observer)489 void SetUrgentMessageObserver(UrgentMessageObserver* observer) {
490 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
491 CHECK(!was_bound_or_message_sent_);
492 urgent_message_observer_ = observer;
493 DETACH_FROM_SEQUENCE(sequence_checker_);
494 }
495
496 private:
497 class Endpoint;
498 class ControlMessageProxyThunk;
499 friend class Endpoint;
500 friend class ControlMessageProxyThunk;
501
502 // MessageWrapper objects are always destroyed under the controller's lock. On
503 // destruction, if the message it wrappers contains
504 // ScopedInterfaceEndpointHandles (which cannot be destructed under the
505 // controller's lock), the wrapper unlocks to clean them up.
506 class MessageWrapper {
507 public:
508 MessageWrapper() = default;
509
MessageWrapper(ChannelAssociatedGroupController * controller,mojo::Message message)510 MessageWrapper(ChannelAssociatedGroupController* controller,
511 mojo::Message message)
512 : controller_(controller), value_(std::move(message)) {}
513
MessageWrapper(MessageWrapper && other)514 MessageWrapper(MessageWrapper&& other)
515 : controller_(other.controller_), value_(std::move(other.value_)) {}
516
517 MessageWrapper(const MessageWrapper&) = delete;
518 MessageWrapper& operator=(const MessageWrapper&) = delete;
519
~MessageWrapper()520 ~MessageWrapper() {
521 if (value_.associated_endpoint_handles()->empty())
522 return;
523
524 controller_->lock_.AssertAcquired();
525 {
526 base::AutoUnlock unlocker(controller_->lock_);
527 value_.mutable_associated_endpoint_handles()->clear();
528 }
529 }
530
operator =(MessageWrapper && other)531 MessageWrapper& operator=(MessageWrapper&& other) {
532 controller_ = other.controller_;
533 value_ = std::move(other.value_);
534 return *this;
535 }
536
HasRequestId(uint64_t request_id)537 bool HasRequestId(uint64_t request_id) {
538 return !value_.IsNull() && value_.version() >= 1 &&
539 value_.header_v1()->request_id == request_id;
540 }
541
value()542 mojo::Message& value() { return value_; }
543
544 private:
545 raw_ptr<ChannelAssociatedGroupController> controller_ = nullptr;
546 mojo::Message value_;
547 };
548
549 class Endpoint : public base::RefCountedThreadSafe<Endpoint>,
550 public mojo::InterfaceEndpointController {
551 public:
Endpoint(ChannelAssociatedGroupController * controller,mojo::InterfaceId id)552 Endpoint(ChannelAssociatedGroupController* controller, mojo::InterfaceId id)
553 : controller_(controller), id_(id) {}
554
555 Endpoint(const Endpoint&) = delete;
556 Endpoint& operator=(const Endpoint&) = delete;
557
id() const558 mojo::InterfaceId id() const { return id_; }
559
closed() const560 bool closed() const {
561 controller_->lock_.AssertAcquired();
562 return closed_;
563 }
564
set_closed()565 void set_closed() {
566 controller_->lock_.AssertAcquired();
567 closed_ = true;
568 }
569
peer_closed() const570 bool peer_closed() const {
571 controller_->lock_.AssertAcquired();
572 return peer_closed_;
573 }
574
set_peer_closed()575 void set_peer_closed() {
576 controller_->lock_.AssertAcquired();
577 peer_closed_ = true;
578 }
579
handle_created() const580 bool handle_created() const {
581 controller_->lock_.AssertAcquired();
582 return handle_created_;
583 }
584
set_handle_created()585 void set_handle_created() {
586 controller_->lock_.AssertAcquired();
587 handle_created_ = true;
588 }
589
disconnect_reason() const590 const std::optional<mojo::DisconnectReason>& disconnect_reason() const {
591 return disconnect_reason_;
592 }
593
set_disconnect_reason(const std::optional<mojo::DisconnectReason> & disconnect_reason)594 void set_disconnect_reason(
595 const std::optional<mojo::DisconnectReason>& disconnect_reason) {
596 disconnect_reason_ = disconnect_reason;
597 }
598
task_runner() const599 base::SequencedTaskRunner* task_runner() const {
600 return task_runner_.get();
601 }
602
was_bound_off_sequence() const603 bool was_bound_off_sequence() const { return was_bound_off_sequence_; }
604
client() const605 mojo::InterfaceEndpointClient* client() const {
606 controller_->lock_.AssertAcquired();
607 return client_;
608 }
609
AttachClient(mojo::InterfaceEndpointClient * client,scoped_refptr<base::SequencedTaskRunner> runner)610 void AttachClient(mojo::InterfaceEndpointClient* client,
611 scoped_refptr<base::SequencedTaskRunner> runner) {
612 controller_->lock_.AssertAcquired();
613 DCHECK(!client_);
614 DCHECK(!closed_);
615
616 task_runner_ = std::move(runner);
617 client_ = client;
618
619 if (off_sequence_binding_allowed) {
620 was_bound_off_sequence_ = true;
621 }
622 }
623
DetachClient()624 void DetachClient() {
625 controller_->lock_.AssertAcquired();
626 DCHECK(client_);
627 DCHECK(!closed_);
628
629 task_runner_ = nullptr;
630 client_ = nullptr;
631 sync_watcher_.reset();
632 }
633
EnqueueSyncMessage(MessageWrapper message)634 std::optional<uint32_t> EnqueueSyncMessage(MessageWrapper message) {
635 controller_->lock_.AssertAcquired();
636 if (exclusive_wait_ && exclusive_wait_->TryFulfillingWith(message)) {
637 exclusive_wait_ = nullptr;
638 return std::nullopt;
639 }
640
641 uint32_t id = GenerateSyncMessageId();
642 sync_messages_.emplace_back(id, std::move(message));
643 SignalSyncMessageEvent();
644 return id;
645 }
646
SignalSyncMessageEvent()647 void SignalSyncMessageEvent() {
648 controller_->lock_.AssertAcquired();
649
650 if (sync_watcher_)
651 sync_watcher_->SignalEvent();
652 }
653
PopSyncMessage(uint32_t id)654 MessageWrapper PopSyncMessage(uint32_t id) {
655 controller_->lock_.AssertAcquired();
656 if (sync_messages_.empty() || sync_messages_.front().first != id)
657 return MessageWrapper();
658 MessageWrapper message = std::move(sync_messages_.front().second);
659 sync_messages_.pop_front();
660 return message;
661 }
662
663 // mojo::InterfaceEndpointController:
SendMessage(mojo::Message * message)664 bool SendMessage(mojo::Message* message) override {
665 DCHECK(task_runner_->RunsTasksInCurrentSequence());
666 message->set_interface_id(id_);
667 return controller_->SendMessage(message);
668 }
669
AllowWokenUpBySyncWatchOnSameThread()670 void AllowWokenUpBySyncWatchOnSameThread() override {
671 DCHECK(task_runner_->RunsTasksInCurrentSequence());
672
673 EnsureSyncWatcherExists();
674 sync_watcher_->AllowWokenUpBySyncWatchOnSameSequence();
675 }
676
SyncWatch(const bool & should_stop)677 bool SyncWatch(const bool& should_stop) override {
678 DCHECK(task_runner_->RunsTasksInCurrentSequence());
679
680 // It's not legal to make sync calls from the primary endpoint's thread,
681 // and in fact they must only happen from the proxy task runner.
682 DCHECK(!controller_->task_runner_->BelongsToCurrentThread());
683 DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread());
684
685 EnsureSyncWatcherExists();
686 {
687 base::AutoLock locker(controller_->lock_);
688 if (peer_closed_) {
689 SignalSyncMessageEvent();
690 }
691 }
692 return sync_watcher_->SyncWatch(&should_stop);
693 }
694
WaitForIncomingSyncReply(uint64_t request_id)695 MessageWrapper WaitForIncomingSyncReply(uint64_t request_id) {
696 std::optional<ExclusiveSyncWait> wait;
697 {
698 base::AutoLock lock(controller_->lock_);
699 for (auto& [id, message] : sync_messages_) {
700 if (message.HasRequestId(request_id)) {
701 return std::move(message);
702 }
703 }
704
705 DCHECK(!exclusive_wait_);
706 wait.emplace(request_id);
707 exclusive_wait_ = &wait.value();
708 }
709
710 wait->event.Wait();
711 return std::move(wait->message);
712 }
713
SyncWatchExclusive(uint64_t request_id)714 bool SyncWatchExclusive(uint64_t request_id) override {
715 MessageWrapper message = WaitForIncomingSyncReply(request_id);
716 if (message.value().IsNull() || !client_) {
717 return false;
718 }
719
720 if (!client_->HandleIncomingMessage(&message.value())) {
721 base::AutoLock locker(controller_->lock_);
722 controller_->RaiseError();
723 return false;
724 }
725
726 return true;
727 }
728
RegisterExternalSyncWaiter(uint64_t request_id)729 void RegisterExternalSyncWaiter(uint64_t request_id) override {}
730
731 private:
732 friend class base::RefCountedThreadSafe<Endpoint>;
733
~Endpoint()734 ~Endpoint() override {
735 controller_->lock_.AssertAcquired();
736 DCHECK(!client_);
737 DCHECK(closed_);
738 DCHECK(peer_closed_);
739 DCHECK(!sync_watcher_);
740 if (exclusive_wait_) {
741 exclusive_wait_->event.Signal();
742 }
743 }
744
OnSyncMessageEventReady()745 void OnSyncMessageEventReady() {
746 DCHECK(task_runner_->RunsTasksInCurrentSequence());
747
748 // SUBTLE: The order of these scoped_refptrs matters.
749 // `controller_keepalive` MUST outlive `keepalive` because the Endpoint
750 // holds raw pointer to the AssociatedGroupController.
751 scoped_refptr<AssociatedGroupController> controller_keepalive(
752 controller_.get());
753 scoped_refptr<Endpoint> keepalive(this);
754 base::AutoLock locker(controller_->lock_);
755 bool more_to_process = false;
756 if (!sync_messages_.empty()) {
757 MessageWrapper message_wrapper =
758 std::move(sync_messages_.front().second);
759 sync_messages_.pop_front();
760
761 bool dispatch_succeeded;
762 mojo::InterfaceEndpointClient* client = client_;
763 {
764 base::AutoUnlock unlocker(controller_->lock_);
765 dispatch_succeeded =
766 client->HandleIncomingMessage(&message_wrapper.value());
767 }
768
769 if (!sync_messages_.empty())
770 more_to_process = true;
771
772 if (!dispatch_succeeded)
773 controller_->RaiseError();
774 }
775
776 if (!more_to_process)
777 sync_watcher_->ResetEvent();
778
779 // If there are no queued sync messages and the peer has closed, there
780 // there won't be incoming sync messages in the future. If any
781 // SyncWatch() calls are on the stack for this endpoint, resetting the
782 // watcher will allow them to exit as the stack undwinds.
783 if (!more_to_process && peer_closed_)
784 sync_watcher_.reset();
785 }
786
EnsureSyncWatcherExists()787 void EnsureSyncWatcherExists() {
788 DCHECK(task_runner_->RunsTasksInCurrentSequence());
789 if (sync_watcher_)
790 return;
791
792 base::AutoLock locker(controller_->lock_);
793 sync_watcher_ = std::make_unique<mojo::SequenceLocalSyncEventWatcher>(
794 base::BindRepeating(&Endpoint::OnSyncMessageEventReady,
795 base::Unretained(this)));
796 if (peer_closed_ || !sync_messages_.empty())
797 SignalSyncMessageEvent();
798 }
799
GenerateSyncMessageId()800 uint32_t GenerateSyncMessageId() {
801 // Overflow is fine.
802 uint32_t id = next_sync_message_id_++;
803 DCHECK(sync_messages_.empty() || sync_messages_.front().first != id);
804 return id;
805 }
806
807 // Tracks the state of a pending sync wait which excludes all other incoming
808 // IPC on the waiting thread.
809 struct ExclusiveSyncWait {
ExclusiveSyncWaitIPC::ChannelAssociatedGroupController::Endpoint::ExclusiveSyncWait810 explicit ExclusiveSyncWait(uint64_t request_id)
811 : request_id(request_id) {}
812 ~ExclusiveSyncWait() = default;
813
TryFulfillingWithIPC::ChannelAssociatedGroupController::Endpoint::ExclusiveSyncWait814 bool TryFulfillingWith(MessageWrapper& wrapper) {
815 if (!wrapper.HasRequestId(request_id)) {
816 return false;
817 }
818
819 message = std::move(wrapper);
820 event.Signal();
821 return true;
822 }
823
824 uint64_t request_id;
825 base::WaitableEvent event;
826 MessageWrapper message;
827 };
828
829 const raw_ptr<ChannelAssociatedGroupController> controller_;
830 const mojo::InterfaceId id_;
831
832 bool closed_ = false;
833 bool peer_closed_ = false;
834 bool handle_created_ = false;
835 bool was_bound_off_sequence_ = false;
836 std::optional<mojo::DisconnectReason> disconnect_reason_;
837 raw_ptr<mojo::InterfaceEndpointClient> client_ = nullptr;
838 scoped_refptr<base::SequencedTaskRunner> task_runner_;
839 std::unique_ptr<mojo::SequenceLocalSyncEventWatcher> sync_watcher_;
840 base::circular_deque<std::pair<uint32_t, MessageWrapper>> sync_messages_;
841 raw_ptr<ExclusiveSyncWait> exclusive_wait_ = nullptr;
842 uint32_t next_sync_message_id_ = 0;
843 };
844
845 class ControlMessageProxyThunk : public MessageReceiver {
846 public:
ControlMessageProxyThunk(ChannelAssociatedGroupController * controller)847 explicit ControlMessageProxyThunk(
848 ChannelAssociatedGroupController* controller)
849 : controller_(controller) {}
850
851 ControlMessageProxyThunk(const ControlMessageProxyThunk&) = delete;
852 ControlMessageProxyThunk& operator=(const ControlMessageProxyThunk&) =
853 delete;
854
855 private:
856 // MessageReceiver:
Accept(mojo::Message * message)857 bool Accept(mojo::Message* message) override {
858 return controller_->SendMessage(message);
859 }
860
861 raw_ptr<ChannelAssociatedGroupController> controller_;
862 };
863
~ChannelAssociatedGroupController()864 ~ChannelAssociatedGroupController() override {
865 DCHECK(!connector_);
866
867 base::AutoLock locker(lock_);
868 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
869 Endpoint* endpoint = iter->second.get();
870 ++iter;
871
872 if (!endpoint->closed()) {
873 // This happens when a NotifyPeerEndpointClosed message been received,
874 // but the interface ID hasn't been used to create local endpoint
875 // handle.
876 DCHECK(!endpoint->client());
877 DCHECK(endpoint->peer_closed());
878 MarkClosed(endpoint);
879 } else {
880 MarkPeerClosed(endpoint);
881 }
882 }
883 endpoints_.clear();
884
885 GetMemoryDumpProvider().RemoveController(this);
886 }
887
SendMessage(mojo::Message * message)888 bool SendMessage(mojo::Message* message) {
889 DCHECK(message->heap_profiler_tag());
890 if (task_runner_->BelongsToCurrentThread()) {
891 return SendMessageOnSequence(message);
892 }
893
894 // PostTask (or RunOrPostTask) so that `message` is sent after messages from
895 // tasks that are already queued (e.g. by `IPC::ChannelProxy::Send`).
896 auto callback = base::BindOnce(
897 &ChannelAssociatedGroupController::SendMessageOnSequenceViaTask, this,
898 std::move(*message));
899 if (base::FeatureList::IsEnabled(
900 kMojoChannelAssociatedSendUsesRunOrPostTask)) {
901 task_runner_->RunOrPostTask(base::subtle::RunOrPostTaskPassKey(),
902 FROM_HERE, std::move(callback));
903 } else {
904 task_runner_->PostTask(FROM_HERE, std::move(callback));
905 }
906
907 return true;
908 }
909
SendMessageOnSequence(mojo::Message * message)910 bool SendMessageOnSequence(mojo::Message* message) {
911 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
912 was_bound_or_message_sent_ = true;
913
914 if (!connector_ || paused_) {
915 if (!shut_down_) {
916 base::AutoLock lock(outgoing_messages_lock_);
917 outgoing_messages_.emplace_back(std::move(*message));
918 }
919 return true;
920 }
921 return connector_->Accept(message);
922 }
923
SendMessageOnSequenceViaTask(mojo::Message message)924 void SendMessageOnSequenceViaTask(mojo::Message message) {
925 if (!SendMessageOnSequence(&message)) {
926 RaiseError();
927 }
928 }
929
OnPipeError()930 void OnPipeError() {
931 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
932
933 // We keep |this| alive here because it's possible for the notifications
934 // below to release all other references.
935 scoped_refptr<ChannelAssociatedGroupController> keepalive(this);
936
937 base::AutoLock locker(lock_);
938 encountered_error_ = true;
939
940 std::vector<uint32_t> endpoints_to_remove;
941 std::vector<scoped_refptr<Endpoint>> endpoints_to_notify;
942 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
943 Endpoint* endpoint = iter->second.get();
944 ++iter;
945
946 if (endpoint->client()) {
947 endpoints_to_notify.push_back(endpoint);
948 }
949
950 if (MarkPeerClosed(endpoint)) {
951 endpoints_to_remove.push_back(endpoint->id());
952 }
953 }
954
955 for (auto& endpoint : endpoints_to_notify) {
956 // Because a notification may in turn detach any endpoint, we have to
957 // check each client again here.
958 if (endpoint->client())
959 NotifyEndpointOfError(endpoint.get(), false /* force_async */);
960 }
961
962 for (uint32_t id : endpoints_to_remove) {
963 endpoints_.erase(id);
964 }
965 }
966
NotifyEndpointOfError(Endpoint * endpoint,bool force_async)967 void NotifyEndpointOfError(Endpoint* endpoint, bool force_async)
968 EXCLUSIVE_LOCKS_REQUIRED(lock_) {
969 DCHECK(endpoint->task_runner() && endpoint->client());
970 if (endpoint->task_runner()->RunsTasksInCurrentSequence() && !force_async) {
971 mojo::InterfaceEndpointClient* client = endpoint->client();
972 std::optional<mojo::DisconnectReason> reason(
973 endpoint->disconnect_reason());
974
975 base::AutoUnlock unlocker(lock_);
976 client->NotifyError(reason);
977 } else {
978 endpoint->task_runner()->PostTask(
979 FROM_HERE,
980 base::BindOnce(&ChannelAssociatedGroupController::
981 NotifyEndpointOfErrorOnEndpointThread,
982 this, endpoint->id(),
983 // This is safe as `endpoint` is verified to be in
984 // `endpoints_` (a map with ownership) before use.
985 base::UnsafeDangling(endpoint)));
986 }
987 }
988
989 // `endpoint` might be a dangling ptr and must be checked before dereference.
NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id,MayBeDangling<Endpoint> endpoint)990 void NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id,
991 MayBeDangling<Endpoint> endpoint) {
992 base::AutoLock locker(lock_);
993 auto iter = endpoints_.find(id);
994 if (iter == endpoints_.end() || iter->second.get() != endpoint)
995 return;
996 if (!endpoint->client())
997 return;
998
999 DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence());
1000 NotifyEndpointOfError(endpoint, false /* force_async */);
1001 }
1002
1003 // Marks `endpoint` as closed and returns true if and only if its peer was
1004 // also already closed.
MarkClosed(Endpoint * endpoint)1005 bool MarkClosed(Endpoint* endpoint) EXCLUSIVE_LOCKS_REQUIRED(lock_) {
1006 endpoint->set_closed();
1007 return endpoint->peer_closed();
1008 }
1009
1010 // Marks `endpoint` as having a closed peer and returns true if and only if
1011 // `endpoint` itself was also already closed.
MarkPeerClosed(Endpoint * endpoint)1012 bool MarkPeerClosed(Endpoint* endpoint) EXCLUSIVE_LOCKS_REQUIRED(lock_) {
1013 endpoint->set_peer_closed();
1014 endpoint->SignalSyncMessageEvent();
1015 return endpoint->closed();
1016 }
1017
MarkClosedAndMaybeRemove(Endpoint * endpoint)1018 void MarkClosedAndMaybeRemove(Endpoint* endpoint)
1019 EXCLUSIVE_LOCKS_REQUIRED(lock_) {
1020 if (MarkClosed(endpoint)) {
1021 endpoints_.erase(endpoint->id());
1022 }
1023 }
1024
MarkPeerClosedAndMaybeRemove(Endpoint * endpoint)1025 void MarkPeerClosedAndMaybeRemove(Endpoint* endpoint)
1026 EXCLUSIVE_LOCKS_REQUIRED(lock_) {
1027 if (MarkPeerClosed(endpoint)) {
1028 endpoints_.erase(endpoint->id());
1029 }
1030 }
1031
FindOrInsertEndpoint(mojo::InterfaceId id,bool * inserted)1032 Endpoint* FindOrInsertEndpoint(mojo::InterfaceId id, bool* inserted)
1033 EXCLUSIVE_LOCKS_REQUIRED(lock_) {
1034 DCHECK(!inserted || !*inserted);
1035
1036 Endpoint* endpoint = FindEndpoint(id);
1037 if (!endpoint) {
1038 endpoint = new Endpoint(this, id);
1039 endpoints_.insert({id, endpoint});
1040 if (inserted)
1041 *inserted = true;
1042 }
1043 return endpoint;
1044 }
1045
FindEndpoint(mojo::InterfaceId id)1046 Endpoint* FindEndpoint(mojo::InterfaceId id) EXCLUSIVE_LOCKS_REQUIRED(lock_) {
1047 auto iter = endpoints_.find(id);
1048 return iter != endpoints_.end() ? iter->second.get() : nullptr;
1049 }
1050
1051 // mojo::MessageReceiver:
Accept(mojo::Message * message)1052 bool Accept(mojo::Message* message) override {
1053 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
1054
1055 if (!message->DeserializeAssociatedEndpointHandles(this))
1056 return false;
1057
1058 if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message))
1059 return control_message_handler_.Accept(message);
1060
1061 mojo::InterfaceId id = message->interface_id();
1062 if (!mojo::IsValidInterfaceId(id))
1063 return false;
1064
1065 base::ReleasableAutoLock locker(&lock_);
1066 Endpoint* endpoint = FindEndpoint(id);
1067 if (!endpoint)
1068 return true;
1069
1070 mojo::InterfaceEndpointClient* client = endpoint->client();
1071 if (!client || !endpoint->task_runner()->RunsTasksInCurrentSequence()) {
1072 // The ChannelProxy for this channel is bound to `proxy_task_runner_` and
1073 // by default legacy IPCs must dispatch to either the IO thread or the
1074 // proxy task runner. We generally impose the same constraint on
1075 // associated interface endpoints so that FIFO can be guaranteed across
1076 // all interfaces without stalling any of them to wait for a pending
1077 // endpoint to be bound.
1078 //
1079 // This allows us to assume that if an endpoint is not yet bound when we
1080 // receive a message targeting it, it *will* be bound on the proxy task
1081 // runner by the time a newly posted task runs there. Hence we simply post
1082 // a hopeful dispatch task to that task runner.
1083 //
1084 // As it turns out, there are even some instances of endpoints binding to
1085 // alternative (non-IO-thread, non-proxy) task runners, but still
1086 // ultimately relying on the fact that we schedule their messages on the
1087 // proxy task runner. So even if the endpoint is already bound, we
1088 // default to scheduling it on the proxy task runner as long as it's not
1089 // bound specifically to the IO task runner.
1090 // TODO(rockot): Try to sort out these cases and maybe eliminate them.
1091 //
1092 // Finally, it's also possible that an endpoint was bound to an
1093 // alternative task runner and it really does want its messages to
1094 // dispatch there. In that case `was_bound_off_sequence()` will be true to
1095 // signal that we should really use that task runner.
1096 const scoped_refptr<base::SequencedTaskRunner> task_runner =
1097 client && endpoint->was_bound_off_sequence()
1098 ? endpoint->task_runner()
1099 : proxy_task_runner_.get();
1100
1101 ScopedUrgentMessageNotification scoped_urgent_message_notification(
1102 message->has_flag(mojo::Message::kFlagIsUrgent)
1103 ? urgent_message_observer_
1104 : nullptr);
1105
1106 if (message->has_flag(mojo::Message::kFlagIsSync)) {
1107 MessageWrapper message_wrapper(this, std::move(*message));
1108 // Sync messages may need to be handled by the endpoint if it's blocking
1109 // on a sync reply. We pass ownership of the message to the endpoint's
1110 // sync message queue. If the endpoint was blocking, it will dequeue the
1111 // message and dispatch it. Otherwise the posted |AcceptSyncMessage()|
1112 // call will dequeue the message and dispatch it.
1113 std::optional<uint32_t> message_id =
1114 endpoint->EnqueueSyncMessage(std::move(message_wrapper));
1115 if (message_id) {
1116 task_runner->PostTask(
1117 FROM_HERE,
1118 base::BindOnce(
1119 &ChannelAssociatedGroupController::AcceptSyncMessage, this,
1120 id, *message_id,
1121 std::move(scoped_urgent_message_notification)));
1122 }
1123 return true;
1124 }
1125
1126 // If |task_runner| has been torn down already, this PostTask will fail
1127 // and destroy |message|. That operation may need to in turn destroy
1128 // in-transit associated endpoints and thus acquire |lock_|. We no longer
1129 // need the lock to be held now, so we can release it before the PostTask.
1130 {
1131 // Grab interface name from |client| before releasing the lock to ensure
1132 // that |client| is safe to access.
1133 base::TaskAnnotator::ScopedSetIpcHash scoped_set_ipc_hash(
1134 client ? client->interface_name() : "unknown interface");
1135 locker.Release();
1136 task_runner->PostTask(
1137 FROM_HERE,
1138 base::BindOnce(
1139 &ChannelAssociatedGroupController::AcceptOnEndpointThread, this,
1140 std::move(*message),
1141 std::move(scoped_urgent_message_notification)));
1142 }
1143 return true;
1144 }
1145
1146 locker.Release();
1147 // It's safe to access |client| here without holding a lock, because this
1148 // code runs on a proxy thread and |client| can't be destroyed from any
1149 // thread.
1150 return client->HandleIncomingMessage(message);
1151 }
1152
AcceptOnEndpointThread(mojo::Message message,ScopedUrgentMessageNotification scoped_urgent_message_notification)1153 void AcceptOnEndpointThread(
1154 mojo::Message message,
1155 ScopedUrgentMessageNotification scoped_urgent_message_notification) {
1156 TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("mojom"),
1157 "ChannelAssociatedGroupController::AcceptOnEndpointThread");
1158
1159 mojo::InterfaceId id = message.interface_id();
1160 DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsPrimaryInterfaceId(id));
1161
1162 base::AutoLock locker(lock_);
1163 Endpoint* endpoint = FindEndpoint(id);
1164 if (!endpoint)
1165 return;
1166
1167 mojo::InterfaceEndpointClient* client = endpoint->client();
1168 if (!client)
1169 return;
1170
1171 if (!endpoint->task_runner()->RunsTasksInCurrentSequence() &&
1172 !proxy_task_runner_->RunsTasksInCurrentSequence()) {
1173 return;
1174 }
1175
1176 // TODO(altimin): This event is temporarily kept as a debug fallback. Remove
1177 // it once the new implementation proves to be stable.
1178 TRACE_EVENT(
1179 TRACE_DISABLED_BY_DEFAULT("mojom"),
1180 // Using client->interface_name() is safe here because this is a static
1181 // string defined for each mojo interface.
1182 perfetto::StaticString(client->interface_name()),
1183 [&](perfetto::EventContext& ctx) {
1184 static const uint8_t* toplevel_flow_enabled =
1185 TRACE_EVENT_API_GET_CATEGORY_GROUP_ENABLED("toplevel.flow");
1186 if (!*toplevel_flow_enabled)
1187 return;
1188
1189 perfetto::Flow::Global(message.GetTraceId())(ctx);
1190 });
1191
1192 // Sync messages should never make their way to this method.
1193 DCHECK(!message.has_flag(mojo::Message::kFlagIsSync));
1194
1195 bool result = false;
1196 {
1197 base::AutoUnlock unlocker(lock_);
1198 result = client->HandleIncomingMessage(&message);
1199 }
1200
1201 if (!result)
1202 RaiseError();
1203 }
1204
AcceptSyncMessage(mojo::InterfaceId interface_id,uint32_t message_id,ScopedUrgentMessageNotification scoped_urgent_message_notification)1205 void AcceptSyncMessage(
1206 mojo::InterfaceId interface_id,
1207 uint32_t message_id,
1208 ScopedUrgentMessageNotification scoped_urgent_message_notification) {
1209 TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("mojom"),
1210 "ChannelAssociatedGroupController::AcceptSyncMessage");
1211
1212 base::AutoLock locker(lock_);
1213 Endpoint* endpoint = FindEndpoint(interface_id);
1214 if (!endpoint)
1215 return;
1216
1217 // Careful, if the endpoint is detached its members are cleared. Check for
1218 // that before dereferencing.
1219 mojo::InterfaceEndpointClient* client = endpoint->client();
1220 if (!client)
1221 return;
1222
1223 if (!endpoint->task_runner()->RunsTasksInCurrentSequence() &&
1224 !proxy_task_runner_->RunsTasksInCurrentSequence()) {
1225 return;
1226 }
1227
1228 // Using client->interface_name() is safe here because this is a static
1229 // string defined for each mojo interface.
1230 TRACE_EVENT0("mojom", client->interface_name());
1231 MessageWrapper message_wrapper = endpoint->PopSyncMessage(message_id);
1232
1233 // The message must have already been dequeued by the endpoint waking up
1234 // from a sync wait. Nothing to do.
1235 if (message_wrapper.value().IsNull())
1236 return;
1237
1238 bool result = false;
1239 {
1240 base::AutoUnlock unlocker(lock_);
1241 result = client->HandleIncomingMessage(&message_wrapper.value());
1242 }
1243
1244 if (!result)
1245 RaiseError();
1246 }
1247
1248 // mojo::PipeControlMessageHandlerDelegate:
OnPeerAssociatedEndpointClosed(mojo::InterfaceId id,const std::optional<mojo::DisconnectReason> & reason)1249 bool OnPeerAssociatedEndpointClosed(
1250 mojo::InterfaceId id,
1251 const std::optional<mojo::DisconnectReason>& reason) override {
1252 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
1253
1254 scoped_refptr<ChannelAssociatedGroupController> keepalive(this);
1255 base::AutoLock locker(lock_);
1256 scoped_refptr<Endpoint> endpoint = FindOrInsertEndpoint(id, nullptr);
1257 if (reason)
1258 endpoint->set_disconnect_reason(reason);
1259 if (!endpoint->peer_closed()) {
1260 if (endpoint->client())
1261 NotifyEndpointOfError(endpoint.get(), false /* force_async */);
1262 MarkPeerClosedAndMaybeRemove(endpoint.get());
1263 }
1264
1265 return true;
1266 }
1267
WaitForFlushToComplete(mojo::ScopedMessagePipeHandle flush_pipe)1268 bool WaitForFlushToComplete(
1269 mojo::ScopedMessagePipeHandle flush_pipe) override {
1270 // We don't support async flushing on the IPC Channel pipe.
1271 return false;
1272 }
1273
1274 const scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
1275 const scoped_refptr<base::SingleThreadTaskRunner> proxy_task_runner_;
1276 const bool set_interface_id_namespace_bit_;
1277
1278 // Ensures sequenced access to members below.
1279 SEQUENCE_CHECKER(sequence_checker_);
1280
1281 // Whether `Bind()` or `SendMessageOnSequence()` was called.
1282 // `sequence_checker_` can be detached when this is `false`.
1283 bool was_bound_or_message_sent_ GUARDED_BY_CONTEXT(sequence_checker_) = false;
1284
1285 bool paused_ GUARDED_BY_CONTEXT(sequence_checker_) = false;
1286 bool shut_down_ GUARDED_BY_CONTEXT(sequence_checker_) = false;
1287 std::unique_ptr<mojo::Connector> connector_
1288 GUARDED_BY_CONTEXT(sequence_checker_);
1289 mojo::MessageDispatcher dispatcher_ GUARDED_BY_CONTEXT(sequence_checker_);
1290 mojo::PipeControlMessageHandler control_message_handler_
1291 GUARDED_BY_CONTEXT(sequence_checker_);
1292 ControlMessageProxyThunk control_message_proxy_thunk_
1293 GUARDED_BY_CONTEXT(sequence_checker_);
1294 raw_ptr<UrgentMessageObserver> urgent_message_observer_
1295 GUARDED_BY_CONTEXT(sequence_checker_) = nullptr;
1296
1297 // NOTE: It is unsafe to call into this object while holding |lock_|.
1298 mojo::PipeControlMessageProxy control_message_proxy_;
1299
1300 // Outgoing messages sent before this controller Bound() to a pipe or while it
1301 // was paused. Protected by a lock to support memory dumps from any thread.
1302 base::Lock outgoing_messages_lock_;
1303 std::vector<mojo::Message> outgoing_messages_
1304 GUARDED_BY(outgoing_messages_lock_);
1305
1306 // Guards the fields below for thread-safe access.
1307 base::Lock lock_;
1308
1309 bool encountered_error_ GUARDED_BY(lock_) = false;
1310
1311 // ID #1 is reserved for the mojom::Channel interface.
1312 uint32_t next_interface_id_ GUARDED_BY(lock_) = 2;
1313
1314 std::map<uint32_t, scoped_refptr<Endpoint>> endpoints_ GUARDED_BY(lock_);
1315 };
1316
1317 namespace {
1318
OnMemoryDump(const base::trace_event::MemoryDumpArgs & args,base::trace_event::ProcessMemoryDump * pmd)1319 bool ControllerMemoryDumpProvider::OnMemoryDump(
1320 const base::trace_event::MemoryDumpArgs& args,
1321 base::trace_event::ProcessMemoryDump* pmd) {
1322 base::AutoLock lock(lock_);
1323 for (ChannelAssociatedGroupController* controller : controllers_) {
1324 base::trace_event::MemoryAllocatorDump* dump = pmd->CreateAllocatorDump(
1325 base::StringPrintf("mojo/queued_ipc_channel_message/0x%" PRIxPTR,
1326 reinterpret_cast<uintptr_t>(controller)));
1327 dump->AddScalar(base::trace_event::MemoryAllocatorDump::kNameObjectCount,
1328 base::trace_event::MemoryAllocatorDump::kUnitsObjects,
1329 controller->GetQueuedMessageCount());
1330 MessageMemoryDumpInfo info;
1331 size_t count = 0;
1332 controller->GetTopQueuedMessageMemoryDumpInfo(&info, &count);
1333 dump->AddScalar("top_message_name", "id", info.id);
1334 dump->AddScalar("top_message_count",
1335 base::trace_event::MemoryAllocatorDump::kUnitsObjects,
1336 count);
1337
1338 if (info.profiler_tag) {
1339 // TODO(ssid): Memory dumps currently do not support adding string
1340 // arguments in background dumps. So, add this value as a trace event for
1341 // now.
1342 TRACE_EVENT2(base::trace_event::MemoryDumpManager::kTraceCategory,
1343 "ControllerMemoryDumpProvider::OnMemoryDump",
1344 "top_queued_message_tag", info.profiler_tag,
1345 "count", count);
1346 }
1347 }
1348
1349 return true;
1350 }
1351
1352 class MojoBootstrapImpl : public MojoBootstrap {
1353 public:
MojoBootstrapImpl(mojo::ScopedMessagePipeHandle handle,const scoped_refptr<ChannelAssociatedGroupController> controller)1354 MojoBootstrapImpl(
1355 mojo::ScopedMessagePipeHandle handle,
1356 const scoped_refptr<ChannelAssociatedGroupController> controller)
1357 : controller_(controller),
1358 associated_group_(controller),
1359 handle_(std::move(handle)) {}
1360
1361 MojoBootstrapImpl(const MojoBootstrapImpl&) = delete;
1362 MojoBootstrapImpl& operator=(const MojoBootstrapImpl&) = delete;
1363
~MojoBootstrapImpl()1364 ~MojoBootstrapImpl() override {
1365 controller_->ShutDown();
1366 }
1367
1368 private:
Connect(mojo::PendingAssociatedRemote<mojom::Channel> * sender,mojo::PendingAssociatedReceiver<mojom::Channel> * receiver)1369 void Connect(
1370 mojo::PendingAssociatedRemote<mojom::Channel>* sender,
1371 mojo::PendingAssociatedReceiver<mojom::Channel>* receiver) override {
1372 controller_->Bind(std::move(handle_), sender, receiver);
1373 }
1374
StartReceiving()1375 void StartReceiving() override { controller_->StartReceiving(); }
1376
Pause()1377 void Pause() override {
1378 controller_->Pause();
1379 }
1380
Unpause()1381 void Unpause() override {
1382 controller_->Unpause();
1383 }
1384
Flush()1385 void Flush() override {
1386 controller_->FlushOutgoingMessages();
1387 }
1388
GetAssociatedGroup()1389 mojo::AssociatedGroup* GetAssociatedGroup() override {
1390 return &associated_group_;
1391 }
1392
SetUrgentMessageObserver(UrgentMessageObserver * observer)1393 void SetUrgentMessageObserver(UrgentMessageObserver* observer) override {
1394 controller_->SetUrgentMessageObserver(observer);
1395 }
1396
1397 scoped_refptr<ChannelAssociatedGroupController> controller_;
1398 mojo::AssociatedGroup associated_group_;
1399
1400 mojo::ScopedMessagePipeHandle handle_;
1401 };
1402
1403 } // namespace
1404
1405 ScopedAllowOffSequenceChannelAssociatedBindings::
ScopedAllowOffSequenceChannelAssociatedBindings()1406 ScopedAllowOffSequenceChannelAssociatedBindings()
1407 : resetter_(&off_sequence_binding_allowed, true) {}
1408
1409 ScopedAllowOffSequenceChannelAssociatedBindings::
1410 ~ScopedAllowOffSequenceChannelAssociatedBindings() = default;
1411
1412 // static
Create(mojo::ScopedMessagePipeHandle handle,Channel::Mode mode,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & proxy_task_runner)1413 std::unique_ptr<MojoBootstrap> MojoBootstrap::Create(
1414 mojo::ScopedMessagePipeHandle handle,
1415 Channel::Mode mode,
1416 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
1417 const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner) {
1418 return std::make_unique<MojoBootstrapImpl>(
1419 std::move(handle),
1420 base::MakeRefCounted<ChannelAssociatedGroupController>(
1421 mode == Channel::MODE_SERVER, ipc_task_runner, proxy_task_runner));
1422 }
1423
1424 } // namespace IPC
1425