tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

NodeController.cpp (32434B)


      1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
      2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
      3 /* This Source Code Form is subject to the terms of the Mozilla Public
      4 * License, v. 2.0. If a copy of the MPL was not distributed with this file,
      5 * You can obtain one at http://mozilla.org/MPL/2.0/. */
      6 
      7 #include "mozilla/ipc/NodeController.h"
      8 #include "MainThreadUtils.h"
      9 #include "base/process_util.h"
     10 #include "chrome/common/ipc_message.h"
     11 #include "mojo/core/ports/name.h"
     12 #include "mojo/core/ports/node.h"
     13 #include "mojo/core/ports/port_locker.h"
     14 #include "mozilla/AlreadyAddRefed.h"
     15 #include "mozilla/RandomNum.h"
     16 #include "mozilla/StaticPtr.h"
     17 #include "mozilla/Assertions.h"
     18 #include "mozilla/ToString.h"
     19 #include "mozilla/ipc/IOThread.h"
     20 #include "mozilla/ipc/ProtocolUtils.h"
     21 #include "mozilla/mozalloc.h"
     22 #include "nsISerialEventTarget.h"
     23 #include "nsTArray.h"
     24 #include "nsXULAppAPI.h"
     25 #include "nsPrintfCString.h"
     26 
     27 #define PORTS_ALWAYS_OK(expr) MOZ_ALWAYS_TRUE(mojo::core::ports::OK == (expr))
     28 
     29 namespace mozilla::ipc {
     30 
     31 static StaticRefPtr<NodeController> gNodeController;
     32 
     33 static LazyLogModule gNodeControllerLog{"NodeController"};
     34 
     35 // Helper logger macro which includes the name of the `this` NodeController in
     36 // the logged messages.
     37 #define NODECONTROLLER_LOG(level_, fmt_, ...) \
     38  MOZ_LOG(gNodeControllerLog, level_,         \
     39          ("[%s]: " fmt_, ToString(mName).c_str(), ##__VA_ARGS__))
     40 
     41 // Helper warning macro which both does logger logging and emits NS_WARNING logs
     42 // under debug mode.
     43 #ifdef DEBUG
     44 #  define NODECONTROLLER_WARNING(fmt_, ...)                                \
     45    do {                                                                   \
     46      nsPrintfCString warning("[%s]: " fmt_, ToString(mName).c_str(),      \
     47                              ##__VA_ARGS__);                              \
     48      NS_WARNING(warning.get());                                           \
     49      MOZ_LOG(gNodeControllerLog, LogLevel::Debug, ("%s", warning.get())); \
     50    } while (0)
     51 #else
     52 #  define NODECONTROLLER_WARNING(fmt_, ...) \
     53    NODECONTROLLER_LOG(LogLevel::Warning, fmt_, ##__VA_ARGS__)
     54 #endif
     55 
     56 NodeController::NodeController(const NodeName& aName,
     57                               const IPC::Channel::ChannelKind* aChannelKind)
     58    : mName(aName),
     59      mNode(MakeUnique<Node>(aName, this)),
     60      mChannelKind(aChannelKind) {}
     61 
     62 NodeController::~NodeController() {
     63  auto state = mState.Lock();
     64  MOZ_RELEASE_ASSERT(state->mPeers.IsEmpty(),
     65                     "Destroying NodeController before closing all peers");
     66  MOZ_RELEASE_ASSERT(state->mInvites.IsEmpty(),
     67                     "Destroying NodeController before closing all invites");
     68 };
     69 
     70 // FIXME: Actually provide some way to create the thing.
     71 /* static */ NodeController* NodeController::GetSingleton() {
     72  MOZ_ASSERT(gNodeController);
     73  return gNodeController;
     74 }
     75 
     76 std::pair<ScopedPort, ScopedPort> NodeController::CreatePortPair() {
     77  PortRef port0, port1;
     78  PORTS_ALWAYS_OK(mNode->CreatePortPair(&port0, &port1));
     79  return {ScopedPort{std::move(port0), this},
     80          ScopedPort{std::move(port1), this}};
     81 }
     82 
     83 auto NodeController::GetPort(const PortName& aName) -> PortRef {
     84  PortRef port;
     85  int rv = mNode->GetPort(aName, &port);
     86  if (NS_WARN_IF(rv != mojo::core::ports::OK)) {
     87    NODECONTROLLER_WARNING("Call to GetPort(%s) Failed",
     88                           ToString(aName).c_str());
     89    return {};
     90  }
     91  return port;
     92 }
     93 
     94 void NodeController::SetPortObserver(const PortRef& aPort,
     95                                     PortObserver* aObserver) {
     96  PORTS_ALWAYS_OK(mNode->SetUserData(aPort, aObserver));
     97 }
     98 
     99 auto NodeController::GetStatus(const PortRef& aPort) -> Maybe<PortStatus> {
    100  PortStatus status{};
    101  int rv = mNode->GetStatus(aPort, &status);
    102  if (rv != mojo::core::ports::OK) {
    103    return Nothing();
    104  }
    105  return Some(status);
    106 }
    107 
    108 void NodeController::ClosePort(const PortRef& aPort) {
    109  PORTS_ALWAYS_OK(mNode->ClosePort(aPort));
    110 }
    111 
    112 bool NodeController::GetMessage(const PortRef& aPort,
    113                                UniquePtr<IPC::Message>* aMessage) {
    114  UniquePtr<UserMessageEvent> messageEvent;
    115  int rv = mNode->GetMessage(aPort, &messageEvent, nullptr);
    116  if (rv != mojo::core::ports::OK) {
    117    if (rv == mojo::core::ports::ERROR_PORT_PEER_CLOSED) {
    118      return false;
    119    }
    120    MOZ_CRASH("GetMessage on port in invalid state");
    121  }
    122 
    123  if (messageEvent) {
    124    UniquePtr<IPC::Message> message = messageEvent->TakeMessage<IPC::Message>();
    125 
    126    // If our UserMessageEvent has any ports directly attached to it, fetch them
    127    // from our node and attach them to the IPC::Message we extracted.
    128    //
    129    // It's important to only do this if we have nonempty set of ports on the
    130    // event, as we may have never serialized our IPC::Message's ports onto the
    131    // event if it was routed in-process.
    132    if (messageEvent->num_ports() > 0) {
    133      nsTArray<ScopedPort> attachedPorts(messageEvent->num_ports());
    134      for (size_t i = 0; i < messageEvent->num_ports(); ++i) {
    135        attachedPorts.AppendElement(
    136            ScopedPort{GetPort(messageEvent->ports()[i]), this});
    137      }
    138      message->SetAttachedPorts(std::move(attachedPorts));
    139    }
    140 
    141    *aMessage = std::move(message);
    142  } else {
    143    *aMessage = nullptr;
    144  }
    145  return true;
    146 }
    147 
    148 bool NodeController::SendUserMessage(const PortRef& aPort,
    149                                     UniquePtr<IPC::Message> aMessage) {
    150  auto messageEvent = MakeUnique<UserMessageEvent>(0);
    151  messageEvent->AttachMessage(std::move(aMessage));
    152 
    153  int rv = mNode->SendUserMessage(aPort, std::move(messageEvent));
    154  if (rv == mojo::core::ports::OK) {
    155    return true;
    156  }
    157  if (rv == mojo::core::ports::ERROR_PORT_PEER_CLOSED) {
    158    NODECONTROLLER_LOG(LogLevel::Debug,
    159                       "Ignoring message to port %s as peer was closed",
    160                       ToString(aPort.name()).c_str());
    161    return true;
    162  }
    163  NODECONTROLLER_WARNING("Failed to send message to port %s",
    164                         ToString(aPort.name()).c_str());
    165  return false;
    166 }
    167 
    168 auto NodeController::SerializeEventMessage(UniquePtr<Event> aEvent,
    169                                           const NodeName* aRelayTarget,
    170                                           uint32_t aType)
    171    -> UniquePtr<IPC::Message> {
    172  UniquePtr<IPC::Message> message;
    173  if (aEvent->type() == Event::kUserMessage) {
    174    MOZ_DIAGNOSTIC_ASSERT(
    175        aType == EVENT_MESSAGE_TYPE,
    176        "Can only send a UserMessage in an EVENT_MESSAGE_TYPE");
    177    message = static_cast<UserMessageEvent*>(aEvent.get())
    178                  ->TakeMessage<IPC::Message>();
    179  } else {
    180    message = MakeUnique<IPC::Message>(MSG_ROUTING_CONTROL, aType);
    181  }
    182 
    183  message->set_relay(aRelayTarget != nullptr);
    184 
    185  size_t length = aEvent->GetSerializedSize();
    186  if (aRelayTarget) {
    187    length += sizeof(NodeName);
    188  }
    189 
    190  // Use an intermediate buffer to serialize to avoid potential issues with the
    191  // segmented `IPC::Message` bufferlist. This should be fairly cheap, as the
    192  // majority of events are fairly small.
    193  Vector<char, 256, InfallibleAllocPolicy> buffer;
    194  (void)buffer.initLengthUninitialized(length);
    195  if (aRelayTarget) {
    196    memcpy(buffer.begin(), aRelayTarget, sizeof(NodeName));
    197    aEvent->Serialize(buffer.begin() + sizeof(NodeName));
    198  } else {
    199    aEvent->Serialize(buffer.begin());
    200  }
    201 
    202  message->WriteFooter(buffer.begin(), buffer.length());
    203  message->set_event_footer_size(buffer.length());
    204 
    205 #ifdef DEBUG
    206  // Debug-assert that we can read the same data back out of the buffer.
    207  MOZ_ASSERT(message->event_footer_size() == length);
    208  Vector<char, 256, InfallibleAllocPolicy> buffer2;
    209  (void)buffer2.initLengthUninitialized(message->event_footer_size());
    210  MOZ_ASSERT(message->ReadFooter(buffer2.begin(), buffer2.length(),
    211                                 /* truncate */ false));
    212  MOZ_ASSERT(!memcmp(buffer2.begin(), buffer.begin(), buffer.length()));
    213 #endif
    214 
    215  return message;
    216 }
    217 
    218 auto NodeController::DeserializeEventMessage(UniquePtr<IPC::Message> aMessage,
    219                                             NodeName* aRelayTarget)
    220    -> UniquePtr<Event> {
    221  if (aMessage->is_relay() && !aRelayTarget) {
    222    NODECONTROLLER_WARNING("Unexpected relay message '%s'", aMessage->name());
    223    return nullptr;
    224  }
    225 
    226  Vector<char, 256, InfallibleAllocPolicy> buffer;
    227  (void)buffer.initLengthUninitialized(aMessage->event_footer_size());
    228  // Truncate the message when reading the footer, so that the extra footer data
    229  // is no longer present in the message. This allows future code to eventually
    230  // send the same `IPC::Message` to another process.
    231  if (!aMessage->ReadFooter(buffer.begin(), buffer.length(),
    232                            /* truncate */ true)) {
    233    NODECONTROLLER_WARNING("Call to ReadFooter for message '%s' Failed",
    234                           aMessage->name());
    235    return nullptr;
    236  }
    237  aMessage->set_event_footer_size(0);
    238 
    239  UniquePtr<Event> event;
    240  if (aRelayTarget) {
    241    MOZ_ASSERT(aMessage->is_relay());
    242    if (buffer.length() < sizeof(NodeName)) {
    243      NODECONTROLLER_WARNING(
    244          "Insufficient space in message footer for message '%s'",
    245          aMessage->name());
    246      return nullptr;
    247    }
    248    memcpy(aRelayTarget, buffer.begin(), sizeof(NodeName));
    249    event = Event::Deserialize(buffer.begin() + sizeof(NodeName),
    250                               buffer.length() - sizeof(NodeName));
    251  } else {
    252    event = Event::Deserialize(buffer.begin(), buffer.length());
    253  }
    254 
    255  if (!event) {
    256    NODECONTROLLER_WARNING("Call to Event::Deserialize for message '%s' Failed",
    257                           aMessage->name());
    258    return nullptr;
    259  }
    260 
    261  if (event->type() == Event::kUserMessage) {
    262    static_cast<UserMessageEvent*>(event.get())
    263        ->AttachMessage(std::move(aMessage));
    264  }
    265  return event;
    266 }
    267 
    268 already_AddRefed<NodeChannel> NodeController::GetNodeChannel(
    269    const NodeName& aName) {
    270  auto state = mState.Lock();
    271  return do_AddRef(state->mPeers.Get(aName));
    272 }
    273 
    274 void NodeController::DropPeer(NodeName aNodeName) {
    275  AssertIOThread();
    276 
    277 #ifdef FUZZING_SNAPSHOT
    278  MOZ_FUZZING_IPC_DROP_PEER("NodeController::DropPeer");
    279 #endif
    280 
    281  Invite invite;
    282  RefPtr<NodeChannel> channel;
    283  nsTArray<PortRef> pendingMerges;
    284  {
    285    auto state = mState.Lock();
    286    state->mPeers.Remove(aNodeName, &channel);
    287    state->mPendingMessages.Remove(aNodeName);
    288    state->mInvites.Remove(aNodeName, &invite);
    289    state->mPendingMerges.Remove(aNodeName, &pendingMerges);
    290  }
    291 
    292  NODECONTROLLER_LOG(LogLevel::Info, "Dropping Peer %s (pid: %" PRIPID ")",
    293                     ToString(aNodeName).c_str(),
    294                     channel ? channel->OtherPid() : base::kInvalidProcessId);
    295 
    296  if (channel) {
    297    channel->Close();
    298  }
    299  if (invite.mChannel) {
    300    invite.mChannel->Close();
    301  }
    302  if (invite.mToMerge.is_valid()) {
    303    // Ignore any possible errors here.
    304    (void)mNode->ClosePort(invite.mToMerge);
    305  }
    306  for (auto& port : pendingMerges) {
    307    // Ignore any possible errors here.
    308    (void)mNode->ClosePort(port);
    309  }
    310  mNode->LostConnectionToNode(aNodeName);
    311 }
    312 
    313 void NodeController::ContactRemotePeer(const NodeName& aNode,
    314                                       UniquePtr<Event> aEvent) {
    315  // On Windows and macOS, messages holding HANDLEs or mach ports must be
    316  // relayed via the broker process so it can transfer ownership.
    317  bool needsRelay = false;
    318 #if defined(XP_WIN) || defined(XP_DARWIN)
    319  if (aEvent && !IsBroker() && aNode != kBrokerNodeName &&
    320      aEvent->type() == Event::kUserMessage) {
    321    auto* userEvent = static_cast<UserMessageEvent*>(aEvent.get());
    322    needsRelay = userEvent->HasMessage() &&
    323                 mChannelKind->num_relayed_attachments(
    324                     *userEvent->GetMessage<IPC::Message>()) > 0;
    325  }
    326 #endif
    327 
    328  UniquePtr<IPC::Message> message;
    329  if (aEvent) {
    330    message =
    331        SerializeEventMessage(std::move(aEvent), needsRelay ? &aNode : nullptr);
    332    MOZ_ASSERT(message->is_relay() == needsRelay,
    333               "Message relay status set incorrectly");
    334  }
    335 
    336  RefPtr<NodeChannel> peer;
    337  RefPtr<NodeChannel> broker;
    338  bool needsIntroduction = false;
    339  bool needsBroker = needsRelay;
    340  {
    341    auto state = mState.Lock();
    342 
    343    // Check if we know this peer. If we don't, we'll need to request an
    344    // introduction.
    345    peer = state->mPeers.Get(aNode);
    346    if (!peer) {
    347      // We don't know the peer, check if we've already requested an
    348      // introduction, or if we need to request a new one.
    349      auto& queue = state->mPendingMessages.LookupOrInsertWith(aNode, [&]() {
    350        needsIntroduction = true;
    351        needsBroker = true;
    352        return Queue<UniquePtr<IPC::Message>, 64>{};
    353      });
    354      // If we aren't relaying, queue up the message to be sent.
    355      if (message && !needsRelay) {
    356        queue.Push(std::move(message));
    357      }
    358    }
    359 
    360    if (needsBroker && !IsBroker()) {
    361      broker = state->mPeers.Get(kBrokerNodeName);
    362    }
    363  }
    364 
    365  if (needsBroker && !broker) {
    366    NODECONTROLLER_WARNING(
    367        "Dropping message '%s'; no connection to unknown peer %s",
    368        message ? message->name() : "<null>", ToString(aNode).c_str());
    369    if (needsIntroduction) {
    370      // We have no broker and will never be able to be introduced to this node.
    371      // Queue a task to clean up any ports connected to it.
    372      XRE_GetAsyncIOEventTarget()->Dispatch(NewRunnableMethod<NodeName>(
    373          "NodeController::DropPeer", this, &NodeController::DropPeer, aNode));
    374    }
    375    return;
    376  }
    377 
    378  if (needsIntroduction) {
    379    NODECONTROLLER_LOG(LogLevel::Info, "Requesting introduction to peer %s",
    380                       ToString(aNode).c_str());
    381    broker->RequestIntroduction(aNode);
    382  }
    383 
    384  if (message) {
    385    if (needsRelay) {
    386      NODECONTROLLER_LOG(LogLevel::Info,
    387                         "Relaying message '%s' for peer %s due to %" PRIu32
    388                         " attachments",
    389                         message->name(), ToString(aNode).c_str(),
    390                         mChannelKind->num_relayed_attachments(*message));
    391      MOZ_ASSERT(mChannelKind->num_relayed_attachments(*message) > 0 && broker);
    392      broker->SendEventMessage(std::move(message));
    393    } else if (peer) {
    394      peer->SendEventMessage(std::move(message));
    395    }
    396  }
    397 }
    398 
    399 void NodeController::ForwardEvent(const NodeName& aNode,
    400                                  UniquePtr<Event> aEvent) {
    401  MOZ_ASSERT(aEvent, "cannot forward null event");
    402  if (aNode == mName) {
    403    (void)mNode->AcceptEvent(mName, std::move(aEvent));
    404  } else {
    405    ContactRemotePeer(aNode, std::move(aEvent));
    406  }
    407 }
    408 
    409 void NodeController::BroadcastEvent(UniquePtr<Event> aEvent) {
    410  UniquePtr<IPC::Message> message =
    411      SerializeEventMessage(std::move(aEvent), nullptr, BROADCAST_MESSAGE_TYPE);
    412 
    413  if (IsBroker()) {
    414    OnBroadcast(mName, std::move(message));
    415  } else if (RefPtr<NodeChannel> broker = GetNodeChannel(kBrokerNodeName)) {
    416    broker->Broadcast(std::move(message));
    417  } else {
    418    NODECONTROLLER_WARNING(
    419        "Trying to broadcast event, but no connection to broker");
    420  }
    421 }
    422 
    423 void NodeController::PortStatusChanged(const PortRef& aPortRef) {
    424  RefPtr<UserData> userData;
    425  int rv = mNode->GetUserData(aPortRef, &userData);
    426  if (rv != mojo::core::ports::OK) {
    427    NODECONTROLLER_WARNING("GetUserData call for port '%s' failed",
    428                           ToString(aPortRef.name()).c_str());
    429    return;
    430  }
    431  if (userData) {
    432    // All instances of `UserData` attached to ports in this node must be of
    433    // type `PortObserver`, so we can call `OnPortStatusChanged` directly on
    434    // them.
    435    static_cast<PortObserver*>(userData.get())->OnPortStatusChanged();
    436  }
    437 }
    438 
    439 void NodeController::ObserveRemoteNode(const NodeName& aNode) {
    440  MOZ_ASSERT(aNode != mName);
    441  ContactRemotePeer(aNode, nullptr);
    442 }
    443 
    444 void NodeController::OnEventMessage(const NodeName& aFromNode,
    445                                    UniquePtr<IPC::Message> aMessage) {
    446  AssertIOThread();
    447 
    448  bool isRelay = aMessage->is_relay();
    449  if (isRelay && mChannelKind->num_relayed_attachments(*aMessage) == 0) {
    450    NODECONTROLLER_WARNING(
    451        "Invalid relay message without relayed attachments from peer %s!",
    452        ToString(aFromNode).c_str());
    453    DropPeer(aFromNode);
    454    return;
    455  }
    456 
    457  NodeName relayTarget;
    458  UniquePtr<Event> event = DeserializeEventMessage(
    459      std::move(aMessage), isRelay ? &relayTarget : nullptr);
    460  if (!event) {
    461    NODECONTROLLER_WARNING("Invalid EventMessage from peer %s!",
    462                           ToString(aFromNode).c_str());
    463    DropPeer(aFromNode);
    464    return;
    465  }
    466 
    467  NodeName fromNode = aFromNode;
    468 #if defined(XP_WIN) || defined(XP_DARWIN)
    469  if (isRelay) {
    470    if (event->type() != Event::kUserMessage) {
    471      NODECONTROLLER_WARNING(
    472          "Unexpected relay of non-UserMessage event from peer %s!",
    473          ToString(aFromNode).c_str());
    474      DropPeer(aFromNode);
    475      return;
    476    }
    477 
    478    // If we're the broker, then we'll need to forward this message on to the
    479    // true recipient. To do this, we re-serialize the message, passing along
    480    // the original source node, and send it to the final node.
    481    if (IsBroker()) {
    482      UniquePtr<IPC::Message> message =
    483          SerializeEventMessage(std::move(event), &aFromNode);
    484      if (!message) {
    485        NODECONTROLLER_WARNING(
    486            "Relaying EventMessage from peer %s failed to re-serialize!",
    487            ToString(aFromNode).c_str());
    488        DropPeer(aFromNode);
    489        return;
    490      }
    491      MOZ_ASSERT(message->is_relay(), "Message stopped being a relay message?");
    492      MOZ_ASSERT(mChannelKind->num_relayed_attachments(*message) > 0,
    493                 "Message doesn't have relayed attachments?");
    494 
    495      NODECONTROLLER_LOG(
    496          LogLevel::Info,
    497          "Relaying message '%s' from peer %s to peer %s (%" PRIu32
    498          " attachments)",
    499          message->name(), ToString(aFromNode).c_str(),
    500          ToString(relayTarget).c_str(),
    501          mChannelKind->num_relayed_attachments(*message));
    502 
    503      RefPtr<NodeChannel> peer;
    504      {
    505        auto state = mState.Lock();
    506        peer = state->mPeers.Get(relayTarget);
    507      }
    508      if (!peer) {
    509        NODECONTROLLER_WARNING(
    510            "Dropping relayed message from %s to unknown peer %s",
    511            ToString(aFromNode).c_str(), ToString(relayTarget).c_str());
    512        return;
    513      }
    514 
    515      peer->SendEventMessage(std::move(message));
    516      return;
    517    }
    518 
    519    // Otherwise, we're the final recipient, so we can continue & process the
    520    // message as usual.
    521    if (aFromNode != kBrokerNodeName) {
    522      NODECONTROLLER_WARNING(
    523          "Unexpected relayed EventMessage from non-broker peer %s!",
    524          ToString(aFromNode).c_str());
    525      DropPeer(aFromNode);
    526      return;
    527    }
    528    fromNode = relayTarget;
    529 
    530    NODECONTROLLER_LOG(LogLevel::Info, "Got relayed message from peer %s",
    531                       ToString(fromNode).c_str());
    532  }
    533 #endif
    534 
    535  // If we're getting a requested port merge from another process, check to make
    536  // sure that we're expecting the request, and record that the merge has
    537  // arrived so we don't try to close the port on error.
    538  if (event->type() == Event::kMergePort) {
    539    // Check that the target port for the merge actually exists.
    540    auto targetPort = GetPort(event->port_name());
    541    if (!targetPort.is_valid()) {
    542      NODECONTROLLER_WARNING(
    543          "Unexpected MergePortEvent from peer %s for unknown port %s",
    544          ToString(fromNode).c_str(), ToString(event->port_name()).c_str());
    545      DropPeer(fromNode);
    546      return;
    547    }
    548 
    549    // Check if `targetPort` is in our pending merges entry for the given source
    550    // node. If this makes the `mPendingMerges` entry empty, remove it.
    551    bool expectingMerge = [&] {
    552      auto state = mState.Lock();
    553      auto pendingMerges = state->mPendingMerges.Lookup(aFromNode);
    554      if (!pendingMerges) {
    555        return false;
    556      }
    557      size_t removed = pendingMerges->RemoveElementsBy(
    558          [&](auto& port) { return port.name() == targetPort.name(); });
    559      if (removed != 0 && pendingMerges->IsEmpty()) {
    560        pendingMerges.Remove();
    561      }
    562      return removed != 0;
    563    }();
    564 
    565    if (!expectingMerge) {
    566      NODECONTROLLER_WARNING(
    567          "Unexpected MergePortEvent from peer %s for port %s",
    568          ToString(fromNode).c_str(), ToString(event->port_name()).c_str());
    569      DropPeer(fromNode);
    570      return;
    571    }
    572  }
    573 
    574  (void)mNode->AcceptEvent(fromNode, std::move(event));
    575 }
    576 
    577 void NodeController::OnBroadcast(const NodeName& aFromNode,
    578                                 UniquePtr<IPC::Message> aMessage) {
    579  MOZ_DIAGNOSTIC_ASSERT(aMessage->type() == BROADCAST_MESSAGE_TYPE);
    580 
    581  // NOTE: This method may be called off of the IO thread by the
    582  // `BroadcastEvent` node callback.
    583  if (!IsBroker()) {
    584    NODECONTROLLER_WARNING("Broadcast request received by non-broker node");
    585    return;
    586  }
    587 
    588  UniquePtr<Event> event = DeserializeEventMessage(std::move(aMessage));
    589  if (!event) {
    590    NODECONTROLLER_WARNING("Invalid broadcast message from peer");
    591    return;
    592  }
    593 
    594  nsTArray<RefPtr<NodeChannel>> peers;
    595  {
    596    auto state = mState.Lock();
    597    peers.SetCapacity(state->mPeers.Count());
    598    for (const auto& peer : state->mPeers.Values()) {
    599      peers.AppendElement(peer);
    600    }
    601  }
    602  for (const auto& peer : peers) {
    603    // NOTE: This `clone` operation is only supported for a limited number of
    604    // message types by the ports API, which provides some extra security by
    605    // only allowing those specific types of messages to be broadcasted.
    606    // Messages which don't support `CloneForBroadcast` cannot be broadcast, and
    607    // the ports library will not attempt to broadcast them.
    608    auto clone = event->CloneForBroadcast();
    609    if (!clone) {
    610      NODECONTROLLER_WARNING("Attempt to broadcast unsupported message");
    611      break;
    612    }
    613 
    614    peer->SendEventMessage(SerializeEventMessage(std::move(clone)));
    615  }
    616 }
    617 
    618 void NodeController::OnIntroduce(const NodeName& aFromNode,
    619                                 NodeChannel::Introduction aIntroduction) {
    620  AssertIOThread();
    621 
    622  if (aFromNode != kBrokerNodeName) {
    623    NODECONTROLLER_WARNING("Introduction received from non-broker node");
    624    DropPeer(aFromNode);
    625    return;
    626  }
    627 
    628  MOZ_ASSERT(aIntroduction.mMyPid == base::GetCurrentProcId(),
    629             "We're the wrong process to receive this?");
    630 
    631  if (!mChannelKind->is_valid_handle(aIntroduction.mHandle)) {
    632    NODECONTROLLER_WARNING("Could not be introduced to peer %s",
    633                           ToString(aIntroduction.mName).c_str());
    634    mNode->LostConnectionToNode(aIntroduction.mName);
    635 
    636    auto state = mState.Lock();
    637    state->mPendingMessages.Remove(aIntroduction.mName);
    638    return;
    639  }
    640 
    641  RefPtr<IPC::Channel> channel =
    642      IPC::Channel::Create(std::move(aIntroduction.mHandle),
    643                           IPC::Channel::MODE_PEER, aIntroduction.mOtherPid);
    644  MOZ_ASSERT(channel->GetKind() == mChannelKind);
    645 
    646  auto nodeChannel = MakeRefPtr<NodeChannel>(aIntroduction.mName, channel, this,
    647                                             aIntroduction.mOtherPid);
    648 
    649  {
    650    auto state = mState.Lock();
    651    bool isNew = false;
    652    state->mPeers.LookupOrInsertWith(aIntroduction.mName, [&]() {
    653      isNew = true;
    654      return nodeChannel;
    655    });
    656    if (!isNew) {
    657      // We got a duplicate introduction. This can happen during normal
    658      // execution if both sides request an introduction at the same time. We
    659      // can just ignore the second one, as they'll arrive in the same order in
    660      // both processes.
    661      nodeChannel->Close();
    662      return;
    663    }
    664 
    665    // Deliver any pending messages, then remove the entry from our table. We do
    666    // this while `mState` is still held to ensure that these messages are
    667    // all sent before another thread can observe the newly created channel.
    668    // As the channel hasn't been `Connect()`-ed yet, this will only queue the
    669    // messages up to be sent, so is OK to do with the mutex held.  These
    670    // messages will be processed to be sent during `Start()` below, which is
    671    // performed outside of the lock.
    672    if (auto pending = state->mPendingMessages.Lookup(aIntroduction.mName)) {
    673      while (!pending->IsEmpty()) {
    674        nodeChannel->SendEventMessage(pending->Pop());
    675      }
    676      pending.Remove();
    677    }
    678  }
    679 
    680  // NodeChannel::Start must be called with the lock not held, as it may lead to
    681  // callbacks being made into `OnChannelError` or `OnMessageReceived`, which
    682  // will attempt to re-acquire our lock.
    683  nodeChannel->Start();
    684 }
    685 
    686 void NodeController::OnRequestIntroduction(const NodeName& aFromNode,
    687                                           const NodeName& aName) {
    688  AssertIOThread();
    689  if (NS_WARN_IF(!IsBroker())) {
    690    return;
    691  }
    692 
    693  RefPtr<NodeChannel> peerA = GetNodeChannel(aFromNode);
    694  if (!peerA || aName == mojo::core::ports::kInvalidNodeName) {
    695    NODECONTROLLER_WARNING("Invalid OnRequestIntroduction message from node %s",
    696                           ToString(aFromNode).c_str());
    697    DropPeer(aFromNode);
    698    return;
    699  }
    700 
    701  RefPtr<NodeChannel> peerB = GetNodeChannel(aName);
    702  IPC::Channel::ChannelHandle handleA, handleB;
    703  if (!peerB || !mChannelKind->create_raw_pipe(&handleA, &handleB)) {
    704    NODECONTROLLER_WARNING(
    705        "Rejecting introduction request from '%s' for unknown peer '%s'",
    706        ToString(aFromNode).c_str(), ToString(aName).c_str());
    707 
    708    // We don't know this peer, or ran into issues creating the descriptor! Send
    709    // an invalid introduction to content to clean up any pending outbound
    710    // messages.
    711    NodeChannel::Introduction intro{aName, IPC::Channel::ChannelHandle{},
    712                                    peerA->OtherPid(), base::kInvalidProcessId};
    713    peerA->Introduce(std::move(intro));
    714    return;
    715  }
    716 
    717  NodeChannel::Introduction introA{aName, std::move(handleA), peerA->OtherPid(),
    718                                   peerB->OtherPid()};
    719  NodeChannel::Introduction introB{aFromNode, std::move(handleB),
    720                                   peerB->OtherPid(), peerA->OtherPid()};
    721  peerA->Introduce(std::move(introA));
    722  peerB->Introduce(std::move(introB));
    723 }
    724 
    725 void NodeController::OnAcceptInvite(const NodeName& aFromNode,
    726                                    const NodeName& aRealName,
    727                                    const PortName& aInitialPort) {
    728  AssertIOThread();
    729  if (!IsBroker()) {
    730    NODECONTROLLER_WARNING("Ignoring AcceptInvite message as non-broker");
    731    return;
    732  }
    733 
    734  if (aRealName == mojo::core::ports::kInvalidNodeName ||
    735      aInitialPort == mojo::core::ports::kInvalidPortName) {
    736    NODECONTROLLER_WARNING("Invalid name in AcceptInvite message");
    737    DropPeer(aFromNode);
    738    return;
    739  }
    740 
    741  bool inserted = false;
    742  Invite invite;
    743  {
    744    auto state = mState.Lock();
    745 
    746    // Try to remove the source node from our invites list and insert it into
    747    // our peers map under the new name.
    748    if (state->mInvites.Remove(aFromNode, &invite)) {
    749      MOZ_ASSERT(invite.mChannel && invite.mToMerge.is_valid());
    750      state->mPeers.LookupOrInsertWith(aRealName, [&]() {
    751        inserted = true;
    752        return invite.mChannel;
    753      });
    754    }
    755  }
    756  if (!inserted) {
    757    NODECONTROLLER_WARNING("Invalid AcceptInvite message from node %s",
    758                           ToString(aFromNode).c_str());
    759    DropPeer(aFromNode);
    760    return;
    761  }
    762 
    763  // Update the name of the node. This field is only accessed from the IO
    764  // thread, so it's safe to update it without a lock held.
    765  invite.mChannel->SetName(aRealName);
    766 
    767  // Start the port merge to allow our existing initial port to begin
    768  // communicating with the remote port.
    769  PORTS_ALWAYS_OK(mNode->MergePorts(invite.mToMerge, aRealName, aInitialPort));
    770 }
    771 
    772 void NodeController::OnChannelError(const NodeName& aFromNode) {
    773  AssertIOThread();
    774  DropPeer(aFromNode);
    775 }
    776 
    777 static mojo::core::ports::NodeName RandomNodeName() {
    778  return {RandomUint64OrDie(), RandomUint64OrDie()};
    779 }
    780 
    781 bool NodeController::InviteChildProcess(
    782    GeckoChildProcessHost* aChildProcessHost,
    783    IPC::Channel::ChannelHandle* aClientHandle, ScopedPort* aInitialPort,
    784    NodeChannel** aNodeChannel) {
    785  MOZ_ASSERT(IsBroker());
    786  AssertIOThread();
    787 
    788  IPC::Channel::ChannelHandle serverHandle;
    789  if (!mChannelKind->create_raw_pipe(&serverHandle, aClientHandle)) {
    790    return false;
    791  }
    792 
    793  RefPtr<IPC::Channel> channel = IPC::Channel::Create(
    794      std::move(serverHandle), IPC::Channel::MODE_BROKER_SERVER,
    795      base::kInvalidProcessId);
    796  MOZ_ASSERT(channel->GetKind() == mChannelKind);
    797 
    798  // Create the peer with a randomly generated name, and store it in `mInvites`.
    799  // This channel and name will be used for communication with the node until it
    800  // sends us its' real name in an `AcceptInvite` message.
    801  auto ports = CreatePortPair();
    802  auto inviteName = RandomNodeName();
    803  auto nodeChannel = MakeRefPtr<NodeChannel>(
    804      inviteName, channel, this, base::kInvalidProcessId, aChildProcessHost);
    805  {
    806    auto state = mState.Lock();
    807    MOZ_DIAGNOSTIC_ASSERT(!state->mPeers.Contains(inviteName),
    808                          "UUID conflict?");
    809    MOZ_DIAGNOSTIC_ASSERT(!state->mInvites.Contains(inviteName),
    810                          "UUID conflict?");
    811    state->mInvites.InsertOrUpdate(inviteName,
    812                                   Invite{nodeChannel, ports.second.Release()});
    813  }
    814 
    815  nodeChannel->Start();
    816 
    817  *aInitialPort = std::move(ports.first);
    818  nodeChannel.forget(aNodeChannel);
    819  return true;
    820 }
    821 
    822 void NodeController::InitBrokerProcess(
    823    const IPC::Channel::ChannelKind* aChannelKind) {
    824  AssertIOThread();
    825  MOZ_ASSERT(!gNodeController);
    826  gNodeController = new NodeController(kBrokerNodeName, aChannelKind);
    827 }
    828 
    829 ScopedPort NodeController::InitChildProcess(
    830    IPC::Channel::ChannelHandle&& aChannelHandle, base::ProcessId aParentPid) {
    831  AssertIOThread();
    832  MOZ_ASSERT(!gNodeController);
    833 
    834  RefPtr<IPC::Channel> channel = IPC::Channel::Create(
    835      std::move(aChannelHandle), IPC::Channel::MODE_BROKER_CLIENT, aParentPid);
    836 
    837  auto nodeName = RandomNodeName();
    838  gNodeController = new NodeController(nodeName, channel->GetKind());
    839 
    840  auto ports = gNodeController->CreatePortPair();
    841  PortRef toMerge = ports.second.Release();
    842 
    843  // Mark the port as expecting a pending merge. This is a duplicate of the
    844  // information tracked by `mPendingMerges`, and was added by upstream
    845  // chromium.
    846  // See https://chromium-review.googlesource.com/c/chromium/src/+/3289065
    847  {
    848    mojo::core::ports::SinglePortLocker locker(&toMerge);
    849    locker.port()->pending_merge_peer = true;
    850  }
    851 
    852  auto nodeChannel = MakeRefPtr<NodeChannel>(kBrokerNodeName, channel,
    853                                             gNodeController, aParentPid);
    854  {
    855    auto state = gNodeController->mState.Lock();
    856    MOZ_DIAGNOSTIC_ASSERT(!state->mPeers.Contains(kBrokerNodeName));
    857    state->mPeers.InsertOrUpdate(kBrokerNodeName, nodeChannel);
    858    MOZ_DIAGNOSTIC_ASSERT(!state->mPendingMerges.Contains(kBrokerNodeName));
    859    state->mPendingMerges.LookupOrInsert(kBrokerNodeName)
    860        .AppendElement(toMerge);
    861  }
    862 
    863  nodeChannel->Start();
    864  nodeChannel->AcceptInvite(nodeName, toMerge.name());
    865  return std::move(ports.first);
    866 }
    867 
    868 void NodeController::CleanUp() {
    869  AssertIOThread();
    870  MOZ_ASSERT(gNodeController);
    871 
    872  RefPtr<NodeController> nodeController = gNodeController;
    873  gNodeController = nullptr;
    874 
    875  // Collect all objects from our state which need to be cleaned up.
    876  nsTArray<NodeName> lostConnections;
    877  nsTArray<RefPtr<NodeChannel>> channelsToClose;
    878  nsTArray<PortRef> portsToClose;
    879  {
    880    auto state = nodeController->mState.Lock();
    881    for (const auto& chan : state->mPeers) {
    882      lostConnections.AppendElement(chan.GetKey());
    883      channelsToClose.AppendElement(chan.GetData());
    884    }
    885    for (const auto& pending : state->mPendingMessages.Keys()) {
    886      lostConnections.AppendElement(pending);
    887    }
    888    for (const auto& invite : state->mInvites.Values()) {
    889      channelsToClose.AppendElement(invite.mChannel);
    890      portsToClose.AppendElement(invite.mToMerge);
    891    }
    892    for (const auto& pendingPorts : state->mPendingMerges.Values()) {
    893      portsToClose.AppendElements(pendingPorts);
    894    }
    895    state->mPeers.Clear();
    896    state->mPendingMessages.Clear();
    897    state->mInvites.Clear();
    898    state->mPendingMerges.Clear();
    899  }
    900  for (auto& nodeChannel : channelsToClose) {
    901    nodeChannel->Close();
    902  }
    903  for (auto& port : portsToClose) {
    904    nodeController->mNode->ClosePort(port);
    905  }
    906  for (auto& name : lostConnections) {
    907    nodeController->mNode->LostConnectionToNode(name);
    908  }
    909 }
    910 
    911 #undef NODECONTROLLER_LOG
    912 #undef NODECONTROLLER_WARNING
    913 
    914 }  // namespace mozilla::ipc