xref: /aosp_15_r20/external/cronet/ipc/ipc_mojo_bootstrap.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2014 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "ipc/ipc_mojo_bootstrap.h"
6 
7 #include <inttypes.h>
8 #include <stdint.h>
9 
10 #include <map>
11 #include <memory>
12 #include <optional>
13 #include <set>
14 #include <utility>
15 #include <vector>
16 
17 #include "base/check_op.h"
18 #include "base/containers/circular_deque.h"
19 #include "base/containers/contains.h"
20 #include "base/feature_list.h"
21 #include "base/functional/bind.h"
22 #include "base/functional/callback.h"
23 #include "base/memory/ptr_util.h"
24 #include "base/memory/raw_ptr.h"
25 #include "base/no_destructor.h"
26 #include "base/ranges/algorithm.h"
27 #include "base/sequence_checker.h"
28 #include "base/strings/stringprintf.h"
29 #include "base/synchronization/lock.h"
30 #include "base/synchronization/waitable_event.h"
31 #include "base/task/common/task_annotator.h"
32 #include "base/task/sequenced_task_runner.h"
33 #include "base/task/single_thread_task_runner.h"
34 #include "base/thread_annotations.h"
35 #include "base/trace_event/memory_allocator_dump.h"
36 #include "base/trace_event/memory_dump_manager.h"
37 #include "base/trace_event/memory_dump_provider.h"
38 #include "base/trace_event/typed_macros.h"
39 #include "ipc/ipc_channel.h"
40 #include "ipc/urgent_message_observer.h"
41 #include "mojo/public/cpp/bindings/associated_group.h"
42 #include "mojo/public/cpp/bindings/associated_group_controller.h"
43 #include "mojo/public/cpp/bindings/connector.h"
44 #include "mojo/public/cpp/bindings/features.h"
45 #include "mojo/public/cpp/bindings/interface_endpoint_client.h"
46 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h"
47 #include "mojo/public/cpp/bindings/interface_id.h"
48 #include "mojo/public/cpp/bindings/message.h"
49 #include "mojo/public/cpp/bindings/message_header_validator.h"
50 #include "mojo/public/cpp/bindings/mojo_buildflags.h"
51 #include "mojo/public/cpp/bindings/pipe_control_message_handler.h"
52 #include "mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h"
53 #include "mojo/public/cpp/bindings/pipe_control_message_proxy.h"
54 #include "mojo/public/cpp/bindings/sequence_local_sync_event_watcher.h"
55 #include "mojo/public/cpp/bindings/tracing_helpers.h"
56 #include "third_party/abseil-cpp/absl/base/attributes.h"
57 
58 namespace IPC {
59 
60 class ChannelAssociatedGroupController;
61 
62 namespace {
63 
64 ABSL_CONST_INIT thread_local bool off_sequence_binding_allowed = false;
65 
66 BASE_FEATURE(kMojoChannelAssociatedSendUsesRunOrPostTask,
67              "MojoChannelAssociatedSendUsesRunOrPostTask",
68              base::FEATURE_DISABLED_BY_DEFAULT);
69 
70 // Used to track some internal Channel state in pursuit of message leaks.
71 //
72 // TODO(https://crbug.com/813045): Remove this.
73 class ControllerMemoryDumpProvider
74     : public base::trace_event::MemoryDumpProvider {
75  public:
ControllerMemoryDumpProvider()76   ControllerMemoryDumpProvider() {
77     base::trace_event::MemoryDumpManager::GetInstance()->RegisterDumpProvider(
78         this, "IPCChannel", nullptr);
79   }
80 
81   ControllerMemoryDumpProvider(const ControllerMemoryDumpProvider&) = delete;
82   ControllerMemoryDumpProvider& operator=(const ControllerMemoryDumpProvider&) =
83       delete;
84 
~ControllerMemoryDumpProvider()85   ~ControllerMemoryDumpProvider() override {
86     base::trace_event::MemoryDumpManager::GetInstance()->UnregisterDumpProvider(
87         this);
88   }
89 
AddController(ChannelAssociatedGroupController * controller)90   void AddController(ChannelAssociatedGroupController* controller) {
91     base::AutoLock lock(lock_);
92     controllers_.insert(controller);
93   }
94 
RemoveController(ChannelAssociatedGroupController * controller)95   void RemoveController(ChannelAssociatedGroupController* controller) {
96     base::AutoLock lock(lock_);
97     controllers_.erase(controller);
98   }
99 
100   // base::trace_event::MemoryDumpProvider:
101   bool OnMemoryDump(const base::trace_event::MemoryDumpArgs& args,
102                     base::trace_event::ProcessMemoryDump* pmd) override;
103 
104  private:
105   base::Lock lock_;
106   std::set<raw_ptr<ChannelAssociatedGroupController, SetExperimental>>
107       controllers_;
108 };
109 
GetMemoryDumpProvider()110 ControllerMemoryDumpProvider& GetMemoryDumpProvider() {
111   static base::NoDestructor<ControllerMemoryDumpProvider> provider;
112   return *provider;
113 }
114 
115 // Messages are grouped by this info when recording memory metrics.
116 struct MessageMemoryDumpInfo {
MessageMemoryDumpInfoIPC::__anon2063248d0111::MessageMemoryDumpInfo117   MessageMemoryDumpInfo(const mojo::Message& message)
118       : id(message.name()), profiler_tag(message.heap_profiler_tag()) {}
119   MessageMemoryDumpInfo() = default;
120 
operator ==IPC::__anon2063248d0111::MessageMemoryDumpInfo121   bool operator==(const MessageMemoryDumpInfo& other) const {
122     return other.id == id && other.profiler_tag == profiler_tag;
123   }
124 
125   uint32_t id = 0;
126   const char* profiler_tag = nullptr;
127 };
128 
129 struct MessageMemoryDumpInfoHash {
operator ()IPC::__anon2063248d0111::MessageMemoryDumpInfoHash130   size_t operator()(const MessageMemoryDumpInfo& info) const {
131     return base::HashInts(
132         info.id, info.profiler_tag ? base::FastHash(info.profiler_tag) : 0);
133   }
134 };
135 
136 class ScopedUrgentMessageNotification {
137  public:
ScopedUrgentMessageNotification(UrgentMessageObserver * observer=nullptr)138   explicit ScopedUrgentMessageNotification(
139       UrgentMessageObserver* observer = nullptr)
140       : observer_(observer) {
141     if (observer_) {
142       observer_->OnUrgentMessageReceived();
143     }
144   }
145 
~ScopedUrgentMessageNotification()146   ~ScopedUrgentMessageNotification() {
147     if (observer_) {
148       observer_->OnUrgentMessageProcessed();
149     }
150   }
151 
ScopedUrgentMessageNotification(ScopedUrgentMessageNotification && other)152   ScopedUrgentMessageNotification(ScopedUrgentMessageNotification&& other)
153       : observer_(std::exchange(other.observer_, nullptr)) {}
154 
operator =(ScopedUrgentMessageNotification && other)155   ScopedUrgentMessageNotification& operator=(
156       ScopedUrgentMessageNotification&& other) {
157     observer_ = std::exchange(other.observer_, nullptr);
158     return *this;
159   }
160 
161  private:
162   raw_ptr<UrgentMessageObserver> observer_;
163 };
164 
165 }  // namespace
166 
167 class ChannelAssociatedGroupController
168     : public mojo::AssociatedGroupController,
169       public mojo::MessageReceiver,
170       public mojo::PipeControlMessageHandlerDelegate {
171  public:
ChannelAssociatedGroupController(bool set_interface_id_namespace_bit,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & proxy_task_runner)172   ChannelAssociatedGroupController(
173       bool set_interface_id_namespace_bit,
174       const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
175       const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner)
176       : task_runner_(ipc_task_runner),
177         proxy_task_runner_(proxy_task_runner),
178         set_interface_id_namespace_bit_(set_interface_id_namespace_bit),
179         dispatcher_(this),
180         control_message_handler_(this),
181         control_message_proxy_thunk_(this),
182         control_message_proxy_(&control_message_proxy_thunk_) {
183     control_message_handler_.SetDescription(
184         "IPC::mojom::Bootstrap [primary] PipeControlMessageHandler");
185     dispatcher_.SetValidator(std::make_unique<mojo::MessageHeaderValidator>(
186         "IPC::mojom::Bootstrap [primary] MessageHeaderValidator"));
187 
188     GetMemoryDumpProvider().AddController(this);
189 
190     DETACH_FROM_SEQUENCE(sequence_checker_);
191   }
192 
193   ChannelAssociatedGroupController(const ChannelAssociatedGroupController&) =
194       delete;
195   ChannelAssociatedGroupController& operator=(
196       const ChannelAssociatedGroupController&) = delete;
197 
GetQueuedMessageCount()198   size_t GetQueuedMessageCount() {
199     base::AutoLock lock(outgoing_messages_lock_);
200     return outgoing_messages_.size();
201   }
202 
GetTopQueuedMessageMemoryDumpInfo(MessageMemoryDumpInfo * info,size_t * count)203   void GetTopQueuedMessageMemoryDumpInfo(MessageMemoryDumpInfo* info,
204                                          size_t* count) {
205     std::unordered_map<MessageMemoryDumpInfo, size_t, MessageMemoryDumpInfoHash>
206         counts;
207     std::pair<MessageMemoryDumpInfo, size_t> top_message_info_and_count = {
208         MessageMemoryDumpInfo(), 0};
209     base::AutoLock lock(outgoing_messages_lock_);
210     for (const auto& message : outgoing_messages_) {
211       auto it_and_inserted = counts.emplace(MessageMemoryDumpInfo(message), 0);
212       it_and_inserted.first->second++;
213       if (it_and_inserted.first->second > top_message_info_and_count.second)
214         top_message_info_and_count = *it_and_inserted.first;
215     }
216     *info = top_message_info_and_count.first;
217     *count = top_message_info_and_count.second;
218   }
219 
Pause()220   void Pause() {
221     DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
222     CHECK(was_bound_or_message_sent_);
223     CHECK(!paused_);
224     paused_ = true;
225   }
226 
Unpause()227   void Unpause() {
228     DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
229     CHECK(was_bound_or_message_sent_);
230     CHECK(paused_);
231     paused_ = false;
232   }
233 
FlushOutgoingMessages()234   void FlushOutgoingMessages() {
235     DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
236     CHECK(was_bound_or_message_sent_);
237 
238     std::vector<mojo::Message> outgoing_messages;
239     {
240       base::AutoLock lock(outgoing_messages_lock_);
241       std::swap(outgoing_messages, outgoing_messages_);
242     }
243 
244     for (auto& message : outgoing_messages)
245       SendMessage(&message);
246   }
247 
Bind(mojo::ScopedMessagePipeHandle handle,mojo::PendingAssociatedRemote<mojom::Channel> * sender,mojo::PendingAssociatedReceiver<mojom::Channel> * receiver)248   void Bind(mojo::ScopedMessagePipeHandle handle,
249             mojo::PendingAssociatedRemote<mojom::Channel>* sender,
250             mojo::PendingAssociatedReceiver<mojom::Channel>* receiver) {
251     DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
252 
253     connector_ = std::make_unique<mojo::Connector>(
254         std::move(handle), mojo::Connector::SINGLE_THREADED_SEND,
255         "IPC Channel");
256     connector_->set_incoming_receiver(&dispatcher_);
257     connector_->set_connection_error_handler(
258         base::BindOnce(&ChannelAssociatedGroupController::OnPipeError,
259                        base::Unretained(this)));
260     connector_->set_enforce_errors_from_incoming_receiver(false);
261 
262     // Don't let the Connector do any sort of queuing on our behalf. Individual
263     // messages bound for the IPC::ChannelProxy thread (i.e. that vast majority
264     // of messages received by this Connector) are already individually
265     // scheduled for dispatch by ChannelProxy, so Connector's normal mode of
266     // operation would only introduce a redundant scheduling step for most
267     // messages.
268     connector_->set_force_immediate_dispatch(true);
269 
270     mojo::InterfaceId sender_id, receiver_id;
271     if (set_interface_id_namespace_bit_) {
272       sender_id = 1 | mojo::kInterfaceIdNamespaceMask;
273       receiver_id = 1;
274     } else {
275       sender_id = 1;
276       receiver_id = 1 | mojo::kInterfaceIdNamespaceMask;
277     }
278 
279     {
280       base::AutoLock locker(lock_);
281       Endpoint* sender_endpoint = new Endpoint(this, sender_id);
282       Endpoint* receiver_endpoint = new Endpoint(this, receiver_id);
283       endpoints_.insert({ sender_id, sender_endpoint });
284       endpoints_.insert({ receiver_id, receiver_endpoint });
285       sender_endpoint->set_handle_created();
286       receiver_endpoint->set_handle_created();
287     }
288 
289     mojo::ScopedInterfaceEndpointHandle sender_handle =
290         CreateScopedInterfaceEndpointHandle(sender_id);
291     mojo::ScopedInterfaceEndpointHandle receiver_handle =
292         CreateScopedInterfaceEndpointHandle(receiver_id);
293 
294     *sender = mojo::PendingAssociatedRemote<mojom::Channel>(
295         std::move(sender_handle), 0);
296     *receiver = mojo::PendingAssociatedReceiver<mojom::Channel>(
297         std::move(receiver_handle));
298 
299     if (!was_bound_or_message_sent_) {
300       was_bound_or_message_sent_ = true;
301       DETACH_FROM_SEQUENCE(sequence_checker_);
302     }
303   }
304 
StartReceiving()305   void StartReceiving() {
306     DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
307     CHECK(was_bound_or_message_sent_);
308     connector_->StartReceiving(task_runner_);
309   }
310 
ShutDown()311   void ShutDown() {
312     DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
313     shut_down_ = true;
314     if (connector_)
315       connector_->CloseMessagePipe();
316     OnPipeError();
317     connector_.reset();
318 
319     base::AutoLock lock(outgoing_messages_lock_);
320     outgoing_messages_.clear();
321   }
322 
323   // mojo::AssociatedGroupController:
AssociateInterface(mojo::ScopedInterfaceEndpointHandle handle_to_send)324   mojo::InterfaceId AssociateInterface(
325       mojo::ScopedInterfaceEndpointHandle handle_to_send) override {
326     if (!handle_to_send.pending_association())
327       return mojo::kInvalidInterfaceId;
328 
329     uint32_t id = 0;
330     {
331       base::AutoLock locker(lock_);
332       do {
333         if (next_interface_id_ >= mojo::kInterfaceIdNamespaceMask)
334           next_interface_id_ = 2;
335         id = next_interface_id_++;
336         if (set_interface_id_namespace_bit_)
337           id |= mojo::kInterfaceIdNamespaceMask;
338       } while (base::Contains(endpoints_, id));
339 
340       Endpoint* endpoint = new Endpoint(this, id);
341       if (encountered_error_)
342         endpoint->set_peer_closed();
343       endpoint->set_handle_created();
344       endpoints_.insert({id, endpoint});
345     }
346 
347     if (!NotifyAssociation(&handle_to_send, id)) {
348       // The peer handle of |handle_to_send|, which is supposed to join this
349       // associated group, has been closed.
350       {
351         base::AutoLock locker(lock_);
352         Endpoint* endpoint = FindEndpoint(id);
353         if (endpoint)
354           MarkClosedAndMaybeRemove(endpoint);
355       }
356 
357       control_message_proxy_.NotifyPeerEndpointClosed(
358           id, handle_to_send.disconnect_reason());
359     }
360     return id;
361   }
362 
CreateLocalEndpointHandle(mojo::InterfaceId id)363   mojo::ScopedInterfaceEndpointHandle CreateLocalEndpointHandle(
364       mojo::InterfaceId id) override {
365     if (!mojo::IsValidInterfaceId(id))
366       return mojo::ScopedInterfaceEndpointHandle();
367 
368     // Unless it is the primary ID, |id| is from the remote side and therefore
369     // its namespace bit is supposed to be different than the value that this
370     // router would use.
371     if (!mojo::IsPrimaryInterfaceId(id) &&
372         set_interface_id_namespace_bit_ ==
373             mojo::HasInterfaceIdNamespaceBitSet(id)) {
374       return mojo::ScopedInterfaceEndpointHandle();
375     }
376 
377     base::AutoLock locker(lock_);
378     bool inserted = false;
379     Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
380     if (inserted) {
381       DCHECK(!endpoint->handle_created());
382       if (encountered_error_)
383         endpoint->set_peer_closed();
384     } else {
385       if (endpoint->handle_created())
386         return mojo::ScopedInterfaceEndpointHandle();
387     }
388 
389     endpoint->set_handle_created();
390     return CreateScopedInterfaceEndpointHandle(id);
391   }
392 
CloseEndpointHandle(mojo::InterfaceId id,const std::optional<mojo::DisconnectReason> & reason)393   void CloseEndpointHandle(
394       mojo::InterfaceId id,
395       const std::optional<mojo::DisconnectReason>& reason) override {
396     if (!mojo::IsValidInterfaceId(id))
397       return;
398     {
399       base::AutoLock locker(lock_);
400       DCHECK(base::Contains(endpoints_, id));
401       Endpoint* endpoint = endpoints_[id].get();
402       DCHECK(!endpoint->client());
403       DCHECK(!endpoint->closed());
404       MarkClosedAndMaybeRemove(endpoint);
405     }
406 
407     if (!mojo::IsPrimaryInterfaceId(id) || reason)
408       control_message_proxy_.NotifyPeerEndpointClosed(id, reason);
409   }
410 
NotifyLocalEndpointOfPeerClosure(mojo::InterfaceId id)411   void NotifyLocalEndpointOfPeerClosure(mojo::InterfaceId id) override {
412     if (!base::FeatureList::IsEnabled(
413             mojo::features::kMojoFixAssociatedHandleLeak)) {
414       return;
415     }
416 
417     if (!task_runner_->RunsTasksInCurrentSequence()) {
418       task_runner_->PostTask(
419           FROM_HERE, base::BindOnce(&ChannelAssociatedGroupController::
420                                         NotifyLocalEndpointOfPeerClosure,
421                                     base::WrapRefCounted(this), id));
422       return;
423     }
424     OnPeerAssociatedEndpointClosed(id, std::nullopt);
425   }
426 
AttachEndpointClient(const mojo::ScopedInterfaceEndpointHandle & handle,mojo::InterfaceEndpointClient * client,scoped_refptr<base::SequencedTaskRunner> runner)427   mojo::InterfaceEndpointController* AttachEndpointClient(
428       const mojo::ScopedInterfaceEndpointHandle& handle,
429       mojo::InterfaceEndpointClient* client,
430       scoped_refptr<base::SequencedTaskRunner> runner) override {
431     const mojo::InterfaceId id = handle.id();
432 
433     DCHECK(mojo::IsValidInterfaceId(id));
434     DCHECK(client);
435 
436     base::AutoLock locker(lock_);
437     DCHECK(base::Contains(endpoints_, id));
438 
439     Endpoint* endpoint = endpoints_[id].get();
440     endpoint->AttachClient(client, std::move(runner));
441 
442     if (endpoint->peer_closed())
443       NotifyEndpointOfError(endpoint, true /* force_async */);
444 
445     return endpoint;
446   }
447 
DetachEndpointClient(const mojo::ScopedInterfaceEndpointHandle & handle)448   void DetachEndpointClient(
449       const mojo::ScopedInterfaceEndpointHandle& handle) override {
450     const mojo::InterfaceId id = handle.id();
451 
452     DCHECK(mojo::IsValidInterfaceId(id));
453 
454     base::AutoLock locker(lock_);
455     DCHECK(base::Contains(endpoints_, id));
456 
457     Endpoint* endpoint = endpoints_[id].get();
458     endpoint->DetachClient();
459   }
460 
RaiseError()461   void RaiseError() override {
462     // We ignore errors on channel endpoints, leaving the pipe open. There are
463     // good reasons for this:
464     //
465     //   * We should never close a channel endpoint in either process as long as
466     //     the child process is still alive. The child's endpoint should only be
467     //     closed implicitly by process death, and the browser's endpoint should
468     //     only be closed after the child process is confirmed to be dead. Crash
469     //     reporting logic in Chrome relies on this behavior in order to do the
470     //     right thing.
471     //
472     //   * There are two interesting conditions under which RaiseError() can be
473     //     implicitly reached: an incoming message fails validation, or the
474     //     local endpoint drops a response callback without calling it.
475     //
476     //   * In the validation case, we also report the message as bad, and this
477     //     will imminently trigger the common bad-IPC path in the browser,
478     //     causing the browser to kill the offending renderer.
479     //
480     //   * In the dropped response callback case, the net result of ignoring the
481     //     issue is generally innocuous. While indicative of programmer error,
482     //     it's not a severe failure and is already covered by separate DCHECKs.
483     //
484     // See https://crbug.com/861607 for additional discussion.
485   }
486 
PrefersSerializedMessages()487   bool PrefersSerializedMessages() override { return true; }
488 
SetUrgentMessageObserver(UrgentMessageObserver * observer)489   void SetUrgentMessageObserver(UrgentMessageObserver* observer) {
490     DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
491     CHECK(!was_bound_or_message_sent_);
492     urgent_message_observer_ = observer;
493     DETACH_FROM_SEQUENCE(sequence_checker_);
494   }
495 
496  private:
497   class Endpoint;
498   class ControlMessageProxyThunk;
499   friend class Endpoint;
500   friend class ControlMessageProxyThunk;
501 
502   // MessageWrapper objects are always destroyed under the controller's lock. On
503   // destruction, if the message it wrappers contains
504   // ScopedInterfaceEndpointHandles (which cannot be destructed under the
505   // controller's lock), the wrapper unlocks to clean them up.
506   class MessageWrapper {
507    public:
508     MessageWrapper() = default;
509 
MessageWrapper(ChannelAssociatedGroupController * controller,mojo::Message message)510     MessageWrapper(ChannelAssociatedGroupController* controller,
511                    mojo::Message message)
512         : controller_(controller), value_(std::move(message)) {}
513 
MessageWrapper(MessageWrapper && other)514     MessageWrapper(MessageWrapper&& other)
515         : controller_(other.controller_), value_(std::move(other.value_)) {}
516 
517     MessageWrapper(const MessageWrapper&) = delete;
518     MessageWrapper& operator=(const MessageWrapper&) = delete;
519 
~MessageWrapper()520     ~MessageWrapper() {
521       if (value_.associated_endpoint_handles()->empty())
522         return;
523 
524       controller_->lock_.AssertAcquired();
525       {
526         base::AutoUnlock unlocker(controller_->lock_);
527         value_.mutable_associated_endpoint_handles()->clear();
528       }
529     }
530 
operator =(MessageWrapper && other)531     MessageWrapper& operator=(MessageWrapper&& other) {
532       controller_ = other.controller_;
533       value_ = std::move(other.value_);
534       return *this;
535     }
536 
HasRequestId(uint64_t request_id)537     bool HasRequestId(uint64_t request_id) {
538       return !value_.IsNull() && value_.version() >= 1 &&
539              value_.header_v1()->request_id == request_id;
540     }
541 
value()542     mojo::Message& value() { return value_; }
543 
544    private:
545     raw_ptr<ChannelAssociatedGroupController> controller_ = nullptr;
546     mojo::Message value_;
547   };
548 
549   class Endpoint : public base::RefCountedThreadSafe<Endpoint>,
550                    public mojo::InterfaceEndpointController {
551    public:
Endpoint(ChannelAssociatedGroupController * controller,mojo::InterfaceId id)552     Endpoint(ChannelAssociatedGroupController* controller, mojo::InterfaceId id)
553         : controller_(controller), id_(id) {}
554 
555     Endpoint(const Endpoint&) = delete;
556     Endpoint& operator=(const Endpoint&) = delete;
557 
id() const558     mojo::InterfaceId id() const { return id_; }
559 
closed() const560     bool closed() const {
561       controller_->lock_.AssertAcquired();
562       return closed_;
563     }
564 
set_closed()565     void set_closed() {
566       controller_->lock_.AssertAcquired();
567       closed_ = true;
568     }
569 
peer_closed() const570     bool peer_closed() const {
571       controller_->lock_.AssertAcquired();
572       return peer_closed_;
573     }
574 
set_peer_closed()575     void set_peer_closed() {
576       controller_->lock_.AssertAcquired();
577       peer_closed_ = true;
578     }
579 
handle_created() const580     bool handle_created() const {
581       controller_->lock_.AssertAcquired();
582       return handle_created_;
583     }
584 
set_handle_created()585     void set_handle_created() {
586       controller_->lock_.AssertAcquired();
587       handle_created_ = true;
588     }
589 
disconnect_reason() const590     const std::optional<mojo::DisconnectReason>& disconnect_reason() const {
591       return disconnect_reason_;
592     }
593 
set_disconnect_reason(const std::optional<mojo::DisconnectReason> & disconnect_reason)594     void set_disconnect_reason(
595         const std::optional<mojo::DisconnectReason>& disconnect_reason) {
596       disconnect_reason_ = disconnect_reason;
597     }
598 
task_runner() const599     base::SequencedTaskRunner* task_runner() const {
600       return task_runner_.get();
601     }
602 
was_bound_off_sequence() const603     bool was_bound_off_sequence() const { return was_bound_off_sequence_; }
604 
client() const605     mojo::InterfaceEndpointClient* client() const {
606       controller_->lock_.AssertAcquired();
607       return client_;
608     }
609 
AttachClient(mojo::InterfaceEndpointClient * client,scoped_refptr<base::SequencedTaskRunner> runner)610     void AttachClient(mojo::InterfaceEndpointClient* client,
611                       scoped_refptr<base::SequencedTaskRunner> runner) {
612       controller_->lock_.AssertAcquired();
613       DCHECK(!client_);
614       DCHECK(!closed_);
615 
616       task_runner_ = std::move(runner);
617       client_ = client;
618 
619       if (off_sequence_binding_allowed) {
620         was_bound_off_sequence_ = true;
621       }
622     }
623 
DetachClient()624     void DetachClient() {
625       controller_->lock_.AssertAcquired();
626       DCHECK(client_);
627       DCHECK(!closed_);
628 
629       task_runner_ = nullptr;
630       client_ = nullptr;
631       sync_watcher_.reset();
632     }
633 
EnqueueSyncMessage(MessageWrapper message)634     std::optional<uint32_t> EnqueueSyncMessage(MessageWrapper message) {
635       controller_->lock_.AssertAcquired();
636       if (exclusive_wait_ && exclusive_wait_->TryFulfillingWith(message)) {
637         exclusive_wait_ = nullptr;
638         return std::nullopt;
639       }
640 
641       uint32_t id = GenerateSyncMessageId();
642       sync_messages_.emplace_back(id, std::move(message));
643       SignalSyncMessageEvent();
644       return id;
645     }
646 
SignalSyncMessageEvent()647     void SignalSyncMessageEvent() {
648       controller_->lock_.AssertAcquired();
649 
650       if (sync_watcher_)
651         sync_watcher_->SignalEvent();
652     }
653 
PopSyncMessage(uint32_t id)654     MessageWrapper PopSyncMessage(uint32_t id) {
655       controller_->lock_.AssertAcquired();
656       if (sync_messages_.empty() || sync_messages_.front().first != id)
657         return MessageWrapper();
658       MessageWrapper message = std::move(sync_messages_.front().second);
659       sync_messages_.pop_front();
660       return message;
661     }
662 
663     // mojo::InterfaceEndpointController:
SendMessage(mojo::Message * message)664     bool SendMessage(mojo::Message* message) override {
665       DCHECK(task_runner_->RunsTasksInCurrentSequence());
666       message->set_interface_id(id_);
667       return controller_->SendMessage(message);
668     }
669 
AllowWokenUpBySyncWatchOnSameThread()670     void AllowWokenUpBySyncWatchOnSameThread() override {
671       DCHECK(task_runner_->RunsTasksInCurrentSequence());
672 
673       EnsureSyncWatcherExists();
674       sync_watcher_->AllowWokenUpBySyncWatchOnSameSequence();
675     }
676 
SyncWatch(const bool & should_stop)677     bool SyncWatch(const bool& should_stop) override {
678       DCHECK(task_runner_->RunsTasksInCurrentSequence());
679 
680       // It's not legal to make sync calls from the primary endpoint's thread,
681       // and in fact they must only happen from the proxy task runner.
682       DCHECK(!controller_->task_runner_->BelongsToCurrentThread());
683       DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread());
684 
685       EnsureSyncWatcherExists();
686       {
687         base::AutoLock locker(controller_->lock_);
688         if (peer_closed_) {
689           SignalSyncMessageEvent();
690         }
691       }
692       return sync_watcher_->SyncWatch(&should_stop);
693     }
694 
WaitForIncomingSyncReply(uint64_t request_id)695     MessageWrapper WaitForIncomingSyncReply(uint64_t request_id) {
696       std::optional<ExclusiveSyncWait> wait;
697       {
698         base::AutoLock lock(controller_->lock_);
699         for (auto& [id, message] : sync_messages_) {
700           if (message.HasRequestId(request_id)) {
701             return std::move(message);
702           }
703         }
704 
705         DCHECK(!exclusive_wait_);
706         wait.emplace(request_id);
707         exclusive_wait_ = &wait.value();
708       }
709 
710       wait->event.Wait();
711       return std::move(wait->message);
712     }
713 
SyncWatchExclusive(uint64_t request_id)714     bool SyncWatchExclusive(uint64_t request_id) override {
715       MessageWrapper message = WaitForIncomingSyncReply(request_id);
716       if (message.value().IsNull() || !client_) {
717         return false;
718       }
719 
720       if (!client_->HandleIncomingMessage(&message.value())) {
721         base::AutoLock locker(controller_->lock_);
722         controller_->RaiseError();
723         return false;
724       }
725 
726       return true;
727     }
728 
RegisterExternalSyncWaiter(uint64_t request_id)729     void RegisterExternalSyncWaiter(uint64_t request_id) override {}
730 
731    private:
732     friend class base::RefCountedThreadSafe<Endpoint>;
733 
~Endpoint()734     ~Endpoint() override {
735       controller_->lock_.AssertAcquired();
736       DCHECK(!client_);
737       DCHECK(closed_);
738       DCHECK(peer_closed_);
739       DCHECK(!sync_watcher_);
740       if (exclusive_wait_) {
741         exclusive_wait_->event.Signal();
742       }
743     }
744 
OnSyncMessageEventReady()745     void OnSyncMessageEventReady() {
746       DCHECK(task_runner_->RunsTasksInCurrentSequence());
747 
748       // SUBTLE: The order of these scoped_refptrs matters.
749       // `controller_keepalive` MUST outlive `keepalive` because the Endpoint
750       // holds raw pointer to the AssociatedGroupController.
751       scoped_refptr<AssociatedGroupController> controller_keepalive(
752           controller_.get());
753       scoped_refptr<Endpoint> keepalive(this);
754       base::AutoLock locker(controller_->lock_);
755       bool more_to_process = false;
756       if (!sync_messages_.empty()) {
757         MessageWrapper message_wrapper =
758             std::move(sync_messages_.front().second);
759         sync_messages_.pop_front();
760 
761         bool dispatch_succeeded;
762         mojo::InterfaceEndpointClient* client = client_;
763         {
764           base::AutoUnlock unlocker(controller_->lock_);
765           dispatch_succeeded =
766               client->HandleIncomingMessage(&message_wrapper.value());
767         }
768 
769         if (!sync_messages_.empty())
770           more_to_process = true;
771 
772         if (!dispatch_succeeded)
773           controller_->RaiseError();
774       }
775 
776       if (!more_to_process)
777         sync_watcher_->ResetEvent();
778 
779       // If there are no queued sync messages and the peer has closed, there
780       // there won't be incoming sync messages in the future. If any
781       // SyncWatch() calls are on the stack for this endpoint, resetting the
782       // watcher will allow them to exit as the stack undwinds.
783       if (!more_to_process && peer_closed_)
784         sync_watcher_.reset();
785     }
786 
EnsureSyncWatcherExists()787     void EnsureSyncWatcherExists() {
788       DCHECK(task_runner_->RunsTasksInCurrentSequence());
789       if (sync_watcher_)
790         return;
791 
792       base::AutoLock locker(controller_->lock_);
793       sync_watcher_ = std::make_unique<mojo::SequenceLocalSyncEventWatcher>(
794           base::BindRepeating(&Endpoint::OnSyncMessageEventReady,
795                               base::Unretained(this)));
796       if (peer_closed_ || !sync_messages_.empty())
797         SignalSyncMessageEvent();
798     }
799 
GenerateSyncMessageId()800     uint32_t GenerateSyncMessageId() {
801       // Overflow is fine.
802       uint32_t id = next_sync_message_id_++;
803       DCHECK(sync_messages_.empty() || sync_messages_.front().first != id);
804       return id;
805     }
806 
807     // Tracks the state of a pending sync wait which excludes all other incoming
808     // IPC on the waiting thread.
809     struct ExclusiveSyncWait {
ExclusiveSyncWaitIPC::ChannelAssociatedGroupController::Endpoint::ExclusiveSyncWait810       explicit ExclusiveSyncWait(uint64_t request_id)
811           : request_id(request_id) {}
812       ~ExclusiveSyncWait() = default;
813 
TryFulfillingWithIPC::ChannelAssociatedGroupController::Endpoint::ExclusiveSyncWait814       bool TryFulfillingWith(MessageWrapper& wrapper) {
815         if (!wrapper.HasRequestId(request_id)) {
816           return false;
817         }
818 
819         message = std::move(wrapper);
820         event.Signal();
821         return true;
822       }
823 
824       uint64_t request_id;
825       base::WaitableEvent event;
826       MessageWrapper message;
827     };
828 
829     const raw_ptr<ChannelAssociatedGroupController> controller_;
830     const mojo::InterfaceId id_;
831 
832     bool closed_ = false;
833     bool peer_closed_ = false;
834     bool handle_created_ = false;
835     bool was_bound_off_sequence_ = false;
836     std::optional<mojo::DisconnectReason> disconnect_reason_;
837     raw_ptr<mojo::InterfaceEndpointClient> client_ = nullptr;
838     scoped_refptr<base::SequencedTaskRunner> task_runner_;
839     std::unique_ptr<mojo::SequenceLocalSyncEventWatcher> sync_watcher_;
840     base::circular_deque<std::pair<uint32_t, MessageWrapper>> sync_messages_;
841     raw_ptr<ExclusiveSyncWait> exclusive_wait_ = nullptr;
842     uint32_t next_sync_message_id_ = 0;
843   };
844 
845   class ControlMessageProxyThunk : public MessageReceiver {
846    public:
ControlMessageProxyThunk(ChannelAssociatedGroupController * controller)847     explicit ControlMessageProxyThunk(
848         ChannelAssociatedGroupController* controller)
849         : controller_(controller) {}
850 
851     ControlMessageProxyThunk(const ControlMessageProxyThunk&) = delete;
852     ControlMessageProxyThunk& operator=(const ControlMessageProxyThunk&) =
853         delete;
854 
855    private:
856     // MessageReceiver:
Accept(mojo::Message * message)857     bool Accept(mojo::Message* message) override {
858       return controller_->SendMessage(message);
859     }
860 
861     raw_ptr<ChannelAssociatedGroupController> controller_;
862   };
863 
~ChannelAssociatedGroupController()864   ~ChannelAssociatedGroupController() override {
865     DCHECK(!connector_);
866 
867     base::AutoLock locker(lock_);
868     for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
869       Endpoint* endpoint = iter->second.get();
870       ++iter;
871 
872       if (!endpoint->closed()) {
873         // This happens when a NotifyPeerEndpointClosed message been received,
874         // but the interface ID hasn't been used to create local endpoint
875         // handle.
876         DCHECK(!endpoint->client());
877         DCHECK(endpoint->peer_closed());
878         MarkClosed(endpoint);
879       } else {
880         MarkPeerClosed(endpoint);
881       }
882     }
883     endpoints_.clear();
884 
885     GetMemoryDumpProvider().RemoveController(this);
886   }
887 
SendMessage(mojo::Message * message)888   bool SendMessage(mojo::Message* message) {
889     DCHECK(message->heap_profiler_tag());
890     if (task_runner_->BelongsToCurrentThread()) {
891       return SendMessageOnSequence(message);
892     }
893 
894     // PostTask (or RunOrPostTask) so that `message` is sent after messages from
895     // tasks that are already queued (e.g. by `IPC::ChannelProxy::Send`).
896     auto callback = base::BindOnce(
897         &ChannelAssociatedGroupController::SendMessageOnSequenceViaTask, this,
898         std::move(*message));
899     if (base::FeatureList::IsEnabled(
900             kMojoChannelAssociatedSendUsesRunOrPostTask)) {
901       task_runner_->RunOrPostTask(base::subtle::RunOrPostTaskPassKey(),
902                                   FROM_HERE, std::move(callback));
903     } else {
904       task_runner_->PostTask(FROM_HERE, std::move(callback));
905     }
906 
907     return true;
908   }
909 
SendMessageOnSequence(mojo::Message * message)910   bool SendMessageOnSequence(mojo::Message* message) {
911     DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
912     was_bound_or_message_sent_ = true;
913 
914     if (!connector_ || paused_) {
915       if (!shut_down_) {
916         base::AutoLock lock(outgoing_messages_lock_);
917         outgoing_messages_.emplace_back(std::move(*message));
918       }
919       return true;
920     }
921     return connector_->Accept(message);
922   }
923 
SendMessageOnSequenceViaTask(mojo::Message message)924   void SendMessageOnSequenceViaTask(mojo::Message message) {
925     if (!SendMessageOnSequence(&message)) {
926       RaiseError();
927     }
928   }
929 
OnPipeError()930   void OnPipeError() {
931     DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
932 
933     // We keep |this| alive here because it's possible for the notifications
934     // below to release all other references.
935     scoped_refptr<ChannelAssociatedGroupController> keepalive(this);
936 
937     base::AutoLock locker(lock_);
938     encountered_error_ = true;
939 
940     std::vector<uint32_t> endpoints_to_remove;
941     std::vector<scoped_refptr<Endpoint>> endpoints_to_notify;
942     for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
943       Endpoint* endpoint = iter->second.get();
944       ++iter;
945 
946       if (endpoint->client()) {
947         endpoints_to_notify.push_back(endpoint);
948       }
949 
950       if (MarkPeerClosed(endpoint)) {
951         endpoints_to_remove.push_back(endpoint->id());
952       }
953     }
954 
955     for (auto& endpoint : endpoints_to_notify) {
956       // Because a notification may in turn detach any endpoint, we have to
957       // check each client again here.
958       if (endpoint->client())
959         NotifyEndpointOfError(endpoint.get(), false /* force_async */);
960     }
961 
962     for (uint32_t id : endpoints_to_remove) {
963       endpoints_.erase(id);
964     }
965   }
966 
NotifyEndpointOfError(Endpoint * endpoint,bool force_async)967   void NotifyEndpointOfError(Endpoint* endpoint, bool force_async)
968       EXCLUSIVE_LOCKS_REQUIRED(lock_) {
969     DCHECK(endpoint->task_runner() && endpoint->client());
970     if (endpoint->task_runner()->RunsTasksInCurrentSequence() && !force_async) {
971       mojo::InterfaceEndpointClient* client = endpoint->client();
972       std::optional<mojo::DisconnectReason> reason(
973           endpoint->disconnect_reason());
974 
975       base::AutoUnlock unlocker(lock_);
976       client->NotifyError(reason);
977     } else {
978       endpoint->task_runner()->PostTask(
979           FROM_HERE,
980           base::BindOnce(&ChannelAssociatedGroupController::
981                              NotifyEndpointOfErrorOnEndpointThread,
982                          this, endpoint->id(),
983                          // This is safe as `endpoint` is verified to be in
984                          // `endpoints_` (a map with ownership) before use.
985                          base::UnsafeDangling(endpoint)));
986     }
987   }
988 
989   // `endpoint` might be a dangling ptr and must be checked before dereference.
NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id,MayBeDangling<Endpoint> endpoint)990   void NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id,
991                                              MayBeDangling<Endpoint> endpoint) {
992     base::AutoLock locker(lock_);
993     auto iter = endpoints_.find(id);
994     if (iter == endpoints_.end() || iter->second.get() != endpoint)
995       return;
996     if (!endpoint->client())
997       return;
998 
999     DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence());
1000     NotifyEndpointOfError(endpoint, false /* force_async */);
1001   }
1002 
1003   // Marks `endpoint` as closed and returns true if and only if its peer was
1004   // also already closed.
MarkClosed(Endpoint * endpoint)1005   bool MarkClosed(Endpoint* endpoint) EXCLUSIVE_LOCKS_REQUIRED(lock_) {
1006     endpoint->set_closed();
1007     return endpoint->peer_closed();
1008   }
1009 
1010   // Marks `endpoint` as having a closed peer and returns true if and only if
1011   // `endpoint` itself was also already closed.
MarkPeerClosed(Endpoint * endpoint)1012   bool MarkPeerClosed(Endpoint* endpoint) EXCLUSIVE_LOCKS_REQUIRED(lock_) {
1013     endpoint->set_peer_closed();
1014     endpoint->SignalSyncMessageEvent();
1015     return endpoint->closed();
1016   }
1017 
MarkClosedAndMaybeRemove(Endpoint * endpoint)1018   void MarkClosedAndMaybeRemove(Endpoint* endpoint)
1019       EXCLUSIVE_LOCKS_REQUIRED(lock_) {
1020     if (MarkClosed(endpoint)) {
1021       endpoints_.erase(endpoint->id());
1022     }
1023   }
1024 
MarkPeerClosedAndMaybeRemove(Endpoint * endpoint)1025   void MarkPeerClosedAndMaybeRemove(Endpoint* endpoint)
1026       EXCLUSIVE_LOCKS_REQUIRED(lock_) {
1027     if (MarkPeerClosed(endpoint)) {
1028       endpoints_.erase(endpoint->id());
1029     }
1030   }
1031 
FindOrInsertEndpoint(mojo::InterfaceId id,bool * inserted)1032   Endpoint* FindOrInsertEndpoint(mojo::InterfaceId id, bool* inserted)
1033       EXCLUSIVE_LOCKS_REQUIRED(lock_) {
1034     DCHECK(!inserted || !*inserted);
1035 
1036     Endpoint* endpoint = FindEndpoint(id);
1037     if (!endpoint) {
1038       endpoint = new Endpoint(this, id);
1039       endpoints_.insert({id, endpoint});
1040       if (inserted)
1041         *inserted = true;
1042     }
1043     return endpoint;
1044   }
1045 
FindEndpoint(mojo::InterfaceId id)1046   Endpoint* FindEndpoint(mojo::InterfaceId id) EXCLUSIVE_LOCKS_REQUIRED(lock_) {
1047     auto iter = endpoints_.find(id);
1048     return iter != endpoints_.end() ? iter->second.get() : nullptr;
1049   }
1050 
1051   // mojo::MessageReceiver:
Accept(mojo::Message * message)1052   bool Accept(mojo::Message* message) override {
1053     DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
1054 
1055     if (!message->DeserializeAssociatedEndpointHandles(this))
1056       return false;
1057 
1058     if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message))
1059       return control_message_handler_.Accept(message);
1060 
1061     mojo::InterfaceId id = message->interface_id();
1062     if (!mojo::IsValidInterfaceId(id))
1063       return false;
1064 
1065     base::ReleasableAutoLock locker(&lock_);
1066     Endpoint* endpoint = FindEndpoint(id);
1067     if (!endpoint)
1068       return true;
1069 
1070     mojo::InterfaceEndpointClient* client = endpoint->client();
1071     if (!client || !endpoint->task_runner()->RunsTasksInCurrentSequence()) {
1072       // The ChannelProxy for this channel is bound to `proxy_task_runner_` and
1073       // by default legacy IPCs must dispatch to either the IO thread or the
1074       // proxy task runner. We generally impose the same constraint on
1075       // associated interface endpoints so that FIFO can be guaranteed across
1076       // all interfaces without stalling any of them to wait for a pending
1077       // endpoint to be bound.
1078       //
1079       // This allows us to assume that if an endpoint is not yet bound when we
1080       // receive a message targeting it, it *will* be bound on the proxy task
1081       // runner by the time a newly posted task runs there. Hence we simply post
1082       // a hopeful dispatch task to that task runner.
1083       //
1084       // As it turns out, there are even some instances of endpoints binding to
1085       // alternative (non-IO-thread, non-proxy) task runners, but still
1086       // ultimately relying on the fact that we schedule their messages on the
1087       // proxy task runner. So even if the endpoint is already bound, we
1088       // default to scheduling it on the proxy task runner as long as it's not
1089       // bound specifically to the IO task runner.
1090       // TODO(rockot): Try to sort out these cases and maybe eliminate them.
1091       //
1092       // Finally, it's also possible that an endpoint was bound to an
1093       // alternative task runner and it really does want its messages to
1094       // dispatch there. In that case `was_bound_off_sequence()` will be true to
1095       // signal that we should really use that task runner.
1096       const scoped_refptr<base::SequencedTaskRunner> task_runner =
1097           client && endpoint->was_bound_off_sequence()
1098               ? endpoint->task_runner()
1099               : proxy_task_runner_.get();
1100 
1101       ScopedUrgentMessageNotification scoped_urgent_message_notification(
1102           message->has_flag(mojo::Message::kFlagIsUrgent)
1103               ? urgent_message_observer_
1104               : nullptr);
1105 
1106       if (message->has_flag(mojo::Message::kFlagIsSync)) {
1107         MessageWrapper message_wrapper(this, std::move(*message));
1108         // Sync messages may need to be handled by the endpoint if it's blocking
1109         // on a sync reply. We pass ownership of the message to the endpoint's
1110         // sync message queue. If the endpoint was blocking, it will dequeue the
1111         // message and dispatch it. Otherwise the posted |AcceptSyncMessage()|
1112         // call will dequeue the message and dispatch it.
1113         std::optional<uint32_t> message_id =
1114             endpoint->EnqueueSyncMessage(std::move(message_wrapper));
1115         if (message_id) {
1116           task_runner->PostTask(
1117               FROM_HERE,
1118               base::BindOnce(
1119                   &ChannelAssociatedGroupController::AcceptSyncMessage, this,
1120                   id, *message_id,
1121                   std::move(scoped_urgent_message_notification)));
1122         }
1123         return true;
1124       }
1125 
1126       // If |task_runner| has been torn down already, this PostTask will fail
1127       // and destroy |message|. That operation may need to in turn destroy
1128       // in-transit associated endpoints and thus acquire |lock_|. We no longer
1129       // need the lock to be held now, so we can release it before the PostTask.
1130       {
1131         // Grab interface name from |client| before releasing the lock to ensure
1132         // that |client| is safe to access.
1133         base::TaskAnnotator::ScopedSetIpcHash scoped_set_ipc_hash(
1134             client ? client->interface_name() : "unknown interface");
1135         locker.Release();
1136         task_runner->PostTask(
1137             FROM_HERE,
1138             base::BindOnce(
1139                 &ChannelAssociatedGroupController::AcceptOnEndpointThread, this,
1140                 std::move(*message),
1141                 std::move(scoped_urgent_message_notification)));
1142       }
1143       return true;
1144     }
1145 
1146     locker.Release();
1147     // It's safe to access |client| here without holding a lock, because this
1148     // code runs on a proxy thread and |client| can't be destroyed from any
1149     // thread.
1150     return client->HandleIncomingMessage(message);
1151   }
1152 
AcceptOnEndpointThread(mojo::Message message,ScopedUrgentMessageNotification scoped_urgent_message_notification)1153   void AcceptOnEndpointThread(
1154       mojo::Message message,
1155       ScopedUrgentMessageNotification scoped_urgent_message_notification) {
1156     TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("mojom"),
1157                  "ChannelAssociatedGroupController::AcceptOnEndpointThread");
1158 
1159     mojo::InterfaceId id = message.interface_id();
1160     DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsPrimaryInterfaceId(id));
1161 
1162     base::AutoLock locker(lock_);
1163     Endpoint* endpoint = FindEndpoint(id);
1164     if (!endpoint)
1165       return;
1166 
1167     mojo::InterfaceEndpointClient* client = endpoint->client();
1168     if (!client)
1169       return;
1170 
1171     if (!endpoint->task_runner()->RunsTasksInCurrentSequence() &&
1172         !proxy_task_runner_->RunsTasksInCurrentSequence()) {
1173       return;
1174     }
1175 
1176     // TODO(altimin): This event is temporarily kept as a debug fallback. Remove
1177     // it once the new implementation proves to be stable.
1178     TRACE_EVENT(
1179         TRACE_DISABLED_BY_DEFAULT("mojom"),
1180         // Using client->interface_name() is safe here because this is a static
1181         // string defined for each mojo interface.
1182         perfetto::StaticString(client->interface_name()),
1183         [&](perfetto::EventContext& ctx) {
1184           static const uint8_t* toplevel_flow_enabled =
1185               TRACE_EVENT_API_GET_CATEGORY_GROUP_ENABLED("toplevel.flow");
1186           if (!*toplevel_flow_enabled)
1187             return;
1188 
1189           perfetto::Flow::Global(message.GetTraceId())(ctx);
1190         });
1191 
1192     // Sync messages should never make their way to this method.
1193     DCHECK(!message.has_flag(mojo::Message::kFlagIsSync));
1194 
1195     bool result = false;
1196     {
1197       base::AutoUnlock unlocker(lock_);
1198       result = client->HandleIncomingMessage(&message);
1199     }
1200 
1201     if (!result)
1202       RaiseError();
1203   }
1204 
AcceptSyncMessage(mojo::InterfaceId interface_id,uint32_t message_id,ScopedUrgentMessageNotification scoped_urgent_message_notification)1205   void AcceptSyncMessage(
1206       mojo::InterfaceId interface_id,
1207       uint32_t message_id,
1208       ScopedUrgentMessageNotification scoped_urgent_message_notification) {
1209     TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("mojom"),
1210                  "ChannelAssociatedGroupController::AcceptSyncMessage");
1211 
1212     base::AutoLock locker(lock_);
1213     Endpoint* endpoint = FindEndpoint(interface_id);
1214     if (!endpoint)
1215       return;
1216 
1217     // Careful, if the endpoint is detached its members are cleared. Check for
1218     // that before dereferencing.
1219     mojo::InterfaceEndpointClient* client = endpoint->client();
1220     if (!client)
1221       return;
1222 
1223     if (!endpoint->task_runner()->RunsTasksInCurrentSequence() &&
1224         !proxy_task_runner_->RunsTasksInCurrentSequence()) {
1225       return;
1226     }
1227 
1228     // Using client->interface_name() is safe here because this is a static
1229     // string defined for each mojo interface.
1230     TRACE_EVENT0("mojom", client->interface_name());
1231     MessageWrapper message_wrapper = endpoint->PopSyncMessage(message_id);
1232 
1233     // The message must have already been dequeued by the endpoint waking up
1234     // from a sync wait. Nothing to do.
1235     if (message_wrapper.value().IsNull())
1236       return;
1237 
1238     bool result = false;
1239     {
1240       base::AutoUnlock unlocker(lock_);
1241       result = client->HandleIncomingMessage(&message_wrapper.value());
1242     }
1243 
1244     if (!result)
1245       RaiseError();
1246   }
1247 
1248   // mojo::PipeControlMessageHandlerDelegate:
OnPeerAssociatedEndpointClosed(mojo::InterfaceId id,const std::optional<mojo::DisconnectReason> & reason)1249   bool OnPeerAssociatedEndpointClosed(
1250       mojo::InterfaceId id,
1251       const std::optional<mojo::DisconnectReason>& reason) override {
1252     DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
1253 
1254     scoped_refptr<ChannelAssociatedGroupController> keepalive(this);
1255     base::AutoLock locker(lock_);
1256     scoped_refptr<Endpoint> endpoint = FindOrInsertEndpoint(id, nullptr);
1257     if (reason)
1258       endpoint->set_disconnect_reason(reason);
1259     if (!endpoint->peer_closed()) {
1260       if (endpoint->client())
1261         NotifyEndpointOfError(endpoint.get(), false /* force_async */);
1262       MarkPeerClosedAndMaybeRemove(endpoint.get());
1263     }
1264 
1265     return true;
1266   }
1267 
WaitForFlushToComplete(mojo::ScopedMessagePipeHandle flush_pipe)1268   bool WaitForFlushToComplete(
1269       mojo::ScopedMessagePipeHandle flush_pipe) override {
1270     // We don't support async flushing on the IPC Channel pipe.
1271     return false;
1272   }
1273 
1274   const scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
1275   const scoped_refptr<base::SingleThreadTaskRunner> proxy_task_runner_;
1276   const bool set_interface_id_namespace_bit_;
1277 
1278   // Ensures sequenced access to members below.
1279   SEQUENCE_CHECKER(sequence_checker_);
1280 
1281   // Whether `Bind()` or `SendMessageOnSequence()` was called.
1282   // `sequence_checker_` can be detached when this is `false`.
1283   bool was_bound_or_message_sent_ GUARDED_BY_CONTEXT(sequence_checker_) = false;
1284 
1285   bool paused_ GUARDED_BY_CONTEXT(sequence_checker_) = false;
1286   bool shut_down_ GUARDED_BY_CONTEXT(sequence_checker_) = false;
1287   std::unique_ptr<mojo::Connector> connector_
1288       GUARDED_BY_CONTEXT(sequence_checker_);
1289   mojo::MessageDispatcher dispatcher_ GUARDED_BY_CONTEXT(sequence_checker_);
1290   mojo::PipeControlMessageHandler control_message_handler_
1291       GUARDED_BY_CONTEXT(sequence_checker_);
1292   ControlMessageProxyThunk control_message_proxy_thunk_
1293       GUARDED_BY_CONTEXT(sequence_checker_);
1294   raw_ptr<UrgentMessageObserver> urgent_message_observer_
1295       GUARDED_BY_CONTEXT(sequence_checker_) = nullptr;
1296 
1297   // NOTE: It is unsafe to call into this object while holding |lock_|.
1298   mojo::PipeControlMessageProxy control_message_proxy_;
1299 
1300   // Outgoing messages sent before this controller Bound() to a pipe or while it
1301   // was paused. Protected by a lock to support memory dumps from any thread.
1302   base::Lock outgoing_messages_lock_;
1303   std::vector<mojo::Message> outgoing_messages_
1304       GUARDED_BY(outgoing_messages_lock_);
1305 
1306   // Guards the fields below for thread-safe access.
1307   base::Lock lock_;
1308 
1309   bool encountered_error_ GUARDED_BY(lock_) = false;
1310 
1311   // ID #1 is reserved for the mojom::Channel interface.
1312   uint32_t next_interface_id_ GUARDED_BY(lock_) = 2;
1313 
1314   std::map<uint32_t, scoped_refptr<Endpoint>> endpoints_ GUARDED_BY(lock_);
1315 };
1316 
1317 namespace {
1318 
OnMemoryDump(const base::trace_event::MemoryDumpArgs & args,base::trace_event::ProcessMemoryDump * pmd)1319 bool ControllerMemoryDumpProvider::OnMemoryDump(
1320     const base::trace_event::MemoryDumpArgs& args,
1321     base::trace_event::ProcessMemoryDump* pmd) {
1322   base::AutoLock lock(lock_);
1323   for (ChannelAssociatedGroupController* controller : controllers_) {
1324     base::trace_event::MemoryAllocatorDump* dump = pmd->CreateAllocatorDump(
1325         base::StringPrintf("mojo/queued_ipc_channel_message/0x%" PRIxPTR,
1326                            reinterpret_cast<uintptr_t>(controller)));
1327     dump->AddScalar(base::trace_event::MemoryAllocatorDump::kNameObjectCount,
1328                     base::trace_event::MemoryAllocatorDump::kUnitsObjects,
1329                     controller->GetQueuedMessageCount());
1330     MessageMemoryDumpInfo info;
1331     size_t count = 0;
1332     controller->GetTopQueuedMessageMemoryDumpInfo(&info, &count);
1333     dump->AddScalar("top_message_name", "id", info.id);
1334     dump->AddScalar("top_message_count",
1335                     base::trace_event::MemoryAllocatorDump::kUnitsObjects,
1336                     count);
1337 
1338     if (info.profiler_tag) {
1339       // TODO(ssid): Memory dumps currently do not support adding string
1340       // arguments in background dumps. So, add this value as a trace event for
1341       // now.
1342       TRACE_EVENT2(base::trace_event::MemoryDumpManager::kTraceCategory,
1343                    "ControllerMemoryDumpProvider::OnMemoryDump",
1344                    "top_queued_message_tag", info.profiler_tag,
1345                    "count", count);
1346     }
1347   }
1348 
1349   return true;
1350 }
1351 
1352 class MojoBootstrapImpl : public MojoBootstrap {
1353  public:
MojoBootstrapImpl(mojo::ScopedMessagePipeHandle handle,const scoped_refptr<ChannelAssociatedGroupController> controller)1354   MojoBootstrapImpl(
1355       mojo::ScopedMessagePipeHandle handle,
1356       const scoped_refptr<ChannelAssociatedGroupController> controller)
1357       : controller_(controller),
1358         associated_group_(controller),
1359         handle_(std::move(handle)) {}
1360 
1361   MojoBootstrapImpl(const MojoBootstrapImpl&) = delete;
1362   MojoBootstrapImpl& operator=(const MojoBootstrapImpl&) = delete;
1363 
~MojoBootstrapImpl()1364   ~MojoBootstrapImpl() override {
1365     controller_->ShutDown();
1366   }
1367 
1368  private:
Connect(mojo::PendingAssociatedRemote<mojom::Channel> * sender,mojo::PendingAssociatedReceiver<mojom::Channel> * receiver)1369   void Connect(
1370       mojo::PendingAssociatedRemote<mojom::Channel>* sender,
1371       mojo::PendingAssociatedReceiver<mojom::Channel>* receiver) override {
1372     controller_->Bind(std::move(handle_), sender, receiver);
1373   }
1374 
StartReceiving()1375   void StartReceiving() override { controller_->StartReceiving(); }
1376 
Pause()1377   void Pause() override {
1378     controller_->Pause();
1379   }
1380 
Unpause()1381   void Unpause() override {
1382     controller_->Unpause();
1383   }
1384 
Flush()1385   void Flush() override {
1386     controller_->FlushOutgoingMessages();
1387   }
1388 
GetAssociatedGroup()1389   mojo::AssociatedGroup* GetAssociatedGroup() override {
1390     return &associated_group_;
1391   }
1392 
SetUrgentMessageObserver(UrgentMessageObserver * observer)1393   void SetUrgentMessageObserver(UrgentMessageObserver* observer) override {
1394     controller_->SetUrgentMessageObserver(observer);
1395   }
1396 
1397   scoped_refptr<ChannelAssociatedGroupController> controller_;
1398   mojo::AssociatedGroup associated_group_;
1399 
1400   mojo::ScopedMessagePipeHandle handle_;
1401 };
1402 
1403 }  // namespace
1404 
1405 ScopedAllowOffSequenceChannelAssociatedBindings::
ScopedAllowOffSequenceChannelAssociatedBindings()1406     ScopedAllowOffSequenceChannelAssociatedBindings()
1407     : resetter_(&off_sequence_binding_allowed, true) {}
1408 
1409 ScopedAllowOffSequenceChannelAssociatedBindings::
1410     ~ScopedAllowOffSequenceChannelAssociatedBindings() = default;
1411 
1412 // static
Create(mojo::ScopedMessagePipeHandle handle,Channel::Mode mode,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & proxy_task_runner)1413 std::unique_ptr<MojoBootstrap> MojoBootstrap::Create(
1414     mojo::ScopedMessagePipeHandle handle,
1415     Channel::Mode mode,
1416     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
1417     const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner) {
1418   return std::make_unique<MojoBootstrapImpl>(
1419       std::move(handle),
1420       base::MakeRefCounted<ChannelAssociatedGroupController>(
1421           mode == Channel::MODE_SERVER, ipc_task_runner, proxy_task_runner));
1422 }
1423 
1424 }  // namespace IPC
1425