tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

node.cc (76891B)


      1 // Copyright 2016 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/core/ports/node.h"
      6 
      7 #include <string.h>
      8 
      9 #include <algorithm>
     10 #include <atomic>
     11 #include <utility>
     12 #include <vector>
     13 
     14 #include "mozilla/Mutex.h"
     15 #include "mozilla/RandomNum.h"
     16 #include "nsTArray.h"
     17 
     18 #include "base/logging.h"
     19 #include "mojo/core/ports/event.h"
     20 #include "mojo/core/ports/node_delegate.h"
     21 #include "mojo/core/ports/port_locker.h"
     22 
     23 namespace mojo {
     24 namespace core {
     25 namespace ports {
     26 
     27 namespace {
     28 
     29 int DebugError(const char* message, int error_code) {
     30  NOTREACHED() << "Oops: " << message;
     31  return error_code;
     32 }
     33 
     34 #define OOPS(x) DebugError(#x, x)
     35 
     36 bool CanAcceptMoreMessages(const Port* port) {
     37  // Have we already doled out the last message (i.e., do we expect to NOT
     38  // receive further messages)?
     39  uint64_t next_sequence_num = port->message_queue.next_sequence_num();
     40  if (port->state == Port::kClosed) {
     41    return false;
     42  }
     43  if (port->peer_closed || port->remove_proxy_on_last_message) {
     44    if (port->peer_lost_unexpectedly) {
     45      return port->message_queue.HasNextMessage();
     46    }
     47    if (port->last_sequence_num_to_receive == next_sequence_num - 1) {
     48      return false;
     49    }
     50  }
     51  return true;
     52 }
     53 
     54 void GenerateRandomPortName(PortName* name) {
     55  // FIXME: Chrome uses a cache to avoid extra calls to the system RNG when
     56  // generating port names to keep this overhead down. If this method starts
     57  // showing up on profiles we should consider doing the same.
     58  *name = PortName{mozilla::RandomUint64OrDie(), mozilla::RandomUint64OrDie()};
     59 }
     60 
     61 }  // namespace
     62 
     63 Node::Node(const NodeName& name, NodeDelegate* delegate)
     64    : name_(name), delegate_(this, delegate) {}
     65 
     66 Node::~Node() {
     67  if (!ports_.empty()) {
     68    DLOG(WARNING) << "Unclean shutdown for node " << name_;
     69  }
     70 }
     71 
     72 bool Node::CanShutdownCleanly(ShutdownPolicy policy) {
     73  PortLocker::AssertNoPortsLockedOnCurrentThread();
     74  mozilla::MutexAutoLock ports_lock(ports_lock_);
     75 
     76  if (policy == ShutdownPolicy::DONT_ALLOW_LOCAL_PORTS) {
     77 #ifdef DEBUG
     78    for (auto& entry : ports_) {
     79      DVLOG(2) << "Port " << entry.first << " referencing node "
     80               << entry.second->peer_node_name << " is blocking shutdown of "
     81               << "node " << name_ << " (state=" << entry.second->state << ")";
     82    }
     83 #endif
     84    return ports_.empty();
     85  }
     86 
     87  DCHECK_EQ(policy, ShutdownPolicy::ALLOW_LOCAL_PORTS);
     88 
     89  // NOTE: This is not efficient, though it probably doesn't need to be since
     90  // relatively few ports should be open during shutdown and shutdown doesn't
     91  // need to be blazingly fast.
     92  bool can_shutdown = true;
     93  for (auto& entry : ports_) {
     94    PortRef port_ref(entry.first, entry.second);
     95    SinglePortLocker locker(&port_ref);
     96    auto* port = locker.port();
     97    if (port->peer_node_name != name_ && port->state != Port::kReceiving) {
     98      can_shutdown = false;
     99 #ifdef DEBUG
    100      DVLOG(2) << "Port " << entry.first << " referencing node "
    101               << port->peer_node_name << " is blocking shutdown of "
    102               << "node " << name_ << " (state=" << port->state << ")";
    103 #else
    104      // Exit early when not debugging.
    105      break;
    106 #endif
    107    }
    108  }
    109 
    110  return can_shutdown;
    111 }
    112 
    113 int Node::GetPort(const PortName& port_name, PortRef* port_ref) {
    114  PortLocker::AssertNoPortsLockedOnCurrentThread();
    115  mozilla::MutexAutoLock lock(ports_lock_);
    116  auto iter = ports_.find(port_name);
    117  if (iter == ports_.end()) {
    118    return ERROR_PORT_UNKNOWN;
    119  }
    120 
    121 #if defined(ANDROID) && defined(__aarch64__)
    122  // Workaround for https://crbug.com/665869.
    123  std::atomic_thread_fence(std::memory_order_seq_cst);
    124 #endif
    125 
    126  *port_ref = PortRef(port_name, iter->second);
    127  return OK;
    128 }
    129 
    130 int Node::CreateUninitializedPort(PortRef* port_ref) {
    131  PortName port_name;
    132  GenerateRandomPortName(&port_name);
    133 
    134  RefPtr<Port> port(new Port(kInitialSequenceNum, kInitialSequenceNum));
    135  int rv = AddPortWithName(port_name, port);
    136  if (rv != OK) {
    137    return rv;
    138  }
    139 
    140  *port_ref = PortRef(port_name, std::move(port));
    141  return OK;
    142 }
    143 
    144 int Node::InitializePort(const PortRef& port_ref,
    145                         const NodeName& peer_node_name,
    146                         const PortName& peer_port_name,
    147                         const NodeName& prev_node_name,
    148                         const PortName& prev_port_name) {
    149  {
    150    // Must be acquired for UpdatePortPeerAddress below.
    151    PortLocker::AssertNoPortsLockedOnCurrentThread();
    152    mozilla::MutexAutoLock ports_lock(ports_lock_);
    153 
    154    SinglePortLocker locker(&port_ref);
    155    auto* port = locker.port();
    156    if (port->state != Port::kUninitialized) {
    157      return ERROR_PORT_STATE_UNEXPECTED;
    158    }
    159 
    160    port->state = Port::kReceiving;
    161    UpdatePortPeerAddress(port_ref.name(), port, peer_node_name,
    162                          peer_port_name);
    163 
    164    port->prev_node_name = prev_node_name;
    165    port->prev_port_name = prev_port_name;
    166  }
    167 
    168  delegate_->PortStatusChanged(port_ref);
    169 
    170  return OK;
    171 }
    172 
    173 int Node::CreatePortPair(PortRef* port0_ref, PortRef* port1_ref) {
    174  int rv;
    175 
    176  rv = CreateUninitializedPort(port0_ref);
    177  if (rv != OK) {
    178    return rv;
    179  }
    180 
    181  rv = CreateUninitializedPort(port1_ref);
    182  if (rv != OK) {
    183    return rv;
    184  }
    185 
    186  rv = InitializePort(*port0_ref, name_, port1_ref->name(), name_,
    187                      port1_ref->name());
    188  if (rv != OK) {
    189    return rv;
    190  }
    191 
    192  rv = InitializePort(*port1_ref, name_, port0_ref->name(), name_,
    193                      port0_ref->name());
    194  if (rv != OK) {
    195    return rv;
    196  }
    197 
    198  return OK;
    199 }
    200 
    201 int Node::SetUserData(const PortRef& port_ref, RefPtr<UserData> user_data) {
    202  SinglePortLocker locker(&port_ref);
    203  auto* port = locker.port();
    204  if (port->state == Port::kClosed) {
    205    return ERROR_PORT_STATE_UNEXPECTED;
    206  }
    207 
    208  port->user_data = std::move(user_data);
    209 
    210  return OK;
    211 }
    212 
    213 int Node::GetUserData(const PortRef& port_ref, RefPtr<UserData>* user_data) {
    214  SinglePortLocker locker(&port_ref);
    215  auto* port = locker.port();
    216  if (port->state == Port::kClosed) {
    217    return ERROR_PORT_STATE_UNEXPECTED;
    218  }
    219 
    220  *user_data = port->user_data;
    221 
    222  return OK;
    223 }
    224 
    225 int Node::ClosePort(const PortRef& port_ref) {
    226  std::vector<mozilla::UniquePtr<UserMessageEvent>> undelivered_messages;
    227  NodeName peer_node_name;
    228  PortName peer_port_name;
    229  uint64_t sequence_num = 0;
    230  uint64_t last_sequence_num = 0;
    231  bool was_initialized = false;
    232  {
    233    SinglePortLocker locker(&port_ref);
    234    auto* port = locker.port();
    235    switch (port->state) {
    236      case Port::kUninitialized:
    237        break;
    238 
    239      case Port::kReceiving:
    240        was_initialized = true;
    241        port->state = Port::kClosed;
    242 
    243        // We pass along the sequence number of the last message sent from this
    244        // port to allow the peer to have the opportunity to consume all inbound
    245        // messages before notifying the embedder that this port is closed.
    246        last_sequence_num = port->next_sequence_num_to_send - 1;
    247 
    248        peer_node_name = port->peer_node_name;
    249        peer_port_name = port->peer_port_name;
    250 
    251        sequence_num = port->next_control_sequence_num_to_send++;
    252 
    253        // If the port being closed still has unread messages, then we need to
    254        // take care to close those ports so as to avoid leaking memory.
    255        port->message_queue.TakeAllMessages(&undelivered_messages);
    256        port->TakePendingMessages(undelivered_messages);
    257        break;
    258 
    259      default:
    260        return ERROR_PORT_STATE_UNEXPECTED;
    261    }
    262  }
    263 
    264  ErasePort(port_ref.name());
    265 
    266  if (was_initialized) {
    267    DVLOG(2) << "Sending ObserveClosure from " << port_ref.name() << "@"
    268             << name_ << " to " << peer_port_name << "@" << peer_node_name;
    269    delegate_->ForwardEvent(
    270        peer_node_name,
    271        mozilla::MakeUnique<ObserveClosureEvent>(
    272            peer_port_name, port_ref.name(), sequence_num, last_sequence_num));
    273    for (const auto& message : undelivered_messages) {
    274      for (size_t i = 0; i < message->num_ports(); ++i) {
    275        PortRef ref;
    276        if (GetPort(message->ports()[i], &ref) == OK) {
    277          ClosePort(ref);
    278        }
    279      }
    280    }
    281  }
    282  return OK;
    283 }
    284 
    285 int Node::GetStatus(const PortRef& port_ref, PortStatus* port_status) {
    286  SinglePortLocker locker(&port_ref);
    287  auto* port = locker.port();
    288  if (port->state != Port::kReceiving) {
    289    return ERROR_PORT_STATE_UNEXPECTED;
    290  }
    291 
    292  // TODO(sroettger): include messages pending sender verification here?
    293  port_status->has_messages = port->message_queue.HasNextMessage();
    294  port_status->receiving_messages = CanAcceptMoreMessages(port);
    295  port_status->peer_closed = port->peer_closed;
    296  port_status->peer_remote = port->peer_node_name != name_;
    297  port_status->queued_message_count =
    298      port->message_queue.queued_message_count();
    299  port_status->queued_num_bytes = port->message_queue.queued_num_bytes();
    300  port_status->unacknowledged_message_count =
    301      port->next_sequence_num_to_send - port->last_sequence_num_acknowledged -
    302      1;
    303 
    304 #ifdef FUZZING_SNAPSHOT
    305  port_status->peer_node_name = port->peer_node_name;
    306 #endif
    307 
    308  return OK;
    309 }
    310 
    311 int Node::GetMessage(const PortRef& port_ref,
    312                     mozilla::UniquePtr<UserMessageEvent>* message,
    313                     MessageFilter* filter) {
    314  *message = nullptr;
    315 
    316  DVLOG(4) << "GetMessage for " << port_ref.name() << "@" << name_;
    317 
    318  NodeName peer_node_name;
    319  ScopedEvent ack_event;
    320  {
    321    SinglePortLocker locker(&port_ref);
    322    auto* port = locker.port();
    323 
    324    // This could also be treated like the port being unknown since the
    325    // embedder should no longer be referring to a port that has been sent.
    326    if (port->state != Port::kReceiving) {
    327      return ERROR_PORT_STATE_UNEXPECTED;
    328    }
    329 
    330    // Let the embedder get messages until there are no more before reporting
    331    // that the peer closed its end.
    332    if (!CanAcceptMoreMessages(port)) {
    333      return ERROR_PORT_PEER_CLOSED;
    334    }
    335 
    336    port->message_queue.GetNextMessage(message, filter);
    337    if (*message &&
    338        (*message)->sequence_num() == port->sequence_num_to_acknowledge) {
    339      peer_node_name = port->peer_node_name;
    340      ack_event = mozilla::MakeUnique<UserMessageReadAckEvent>(
    341          port->peer_port_name, port_ref.name(),
    342          port->next_control_sequence_num_to_send++,
    343          port->sequence_num_to_acknowledge);
    344    }
    345    if (*message) {
    346      // Message will be passed to the user, no need to block the queue.
    347      port->message_queue.MessageProcessed();
    348    }
    349  }
    350 
    351  if (ack_event) {
    352    delegate_->ForwardEvent(peer_node_name, std::move(ack_event));
    353  }
    354 
    355  // Allow referenced ports to trigger PortStatusChanged calls.
    356  if (*message) {
    357    for (size_t i = 0; i < (*message)->num_ports(); ++i) {
    358      PortRef new_port_ref;
    359      int rv = GetPort((*message)->ports()[i], &new_port_ref);
    360 
    361      DCHECK_EQ(OK, rv) << "Port " << new_port_ref.name() << "@" << name_
    362                        << " does not exist!";
    363 
    364      SinglePortLocker locker(&new_port_ref);
    365      DCHECK_EQ(locker.port()->state, Port::kReceiving);
    366      locker.port()->message_queue.set_signalable(true);
    367    }
    368 
    369    // The user may retransmit this message from another port. We reset the
    370    // sequence number so that the message will get a new one if that happens.
    371    (*message)->set_sequence_num(0);
    372  }
    373 
    374  return OK;
    375 }
    376 
    377 int Node::SendUserMessage(const PortRef& port_ref,
    378                          mozilla::UniquePtr<UserMessageEvent> message) {
    379  int rv = SendUserMessageInternal(port_ref, &message);
    380  if (rv != OK) {
    381    // If send failed, close all carried ports. Note that we're careful not to
    382    // close the sending port itself if it happened to be one of the encoded
    383    // ports (an invalid but possible condition.)
    384    for (size_t i = 0; i < message->num_ports(); ++i) {
    385      if (message->ports()[i] == port_ref.name()) {
    386        continue;
    387      }
    388 
    389      PortRef port;
    390      if (GetPort(message->ports()[i], &port) == OK) {
    391        ClosePort(port);
    392      }
    393    }
    394  }
    395  return rv;
    396 }
    397 
    398 int Node::SetAcknowledgeRequestInterval(
    399    const PortRef& port_ref, uint64_t sequence_num_acknowledge_interval) {
    400  NodeName peer_node_name;
    401  PortName peer_port_name;
    402  uint64_t sequence_num_to_request_ack = 0;
    403  uint64_t sequence_num = 0;
    404  {
    405    SinglePortLocker locker(&port_ref);
    406    auto* port = locker.port();
    407    if (port->state != Port::kReceiving) {
    408      return ERROR_PORT_STATE_UNEXPECTED;
    409    }
    410 
    411    port->sequence_num_acknowledge_interval = sequence_num_acknowledge_interval;
    412    if (!sequence_num_acknowledge_interval) {
    413      return OK;
    414    }
    415 
    416    peer_node_name = port->peer_node_name;
    417    peer_port_name = port->peer_port_name;
    418 
    419    sequence_num_to_request_ack = port->last_sequence_num_acknowledged +
    420                                  sequence_num_acknowledge_interval;
    421    sequence_num = port->next_control_sequence_num_to_send++;
    422  }
    423 
    424  delegate_->ForwardEvent(peer_node_name,
    425                          mozilla::MakeUnique<UserMessageReadAckRequestEvent>(
    426                              peer_port_name, port_ref.name(), sequence_num,
    427                              sequence_num_to_request_ack));
    428  return OK;
    429 }
    430 
    431 bool Node::IsEventFromPreviousPeer(const Event& event) {
    432  switch (event.type()) {
    433    case Event::Type::kUserMessage:
    434      return true;
    435    case Event::Type::kPortAccepted:
    436      // PortAccepted is sent by the next peer
    437      return false;
    438    case Event::Type::kObserveProxy:
    439      // ObserveProxy with an invalid port name is a broadcast event
    440      return event.port_name() != kInvalidPortName;
    441    case Event::Type::kObserveProxyAck:
    442      return true;
    443    case Event::Type::kObserveClosure:
    444      return true;
    445    case Event::Type::kMergePort:
    446      // MergePort is not from the previous peer
    447      return false;
    448    case Event::Type::kUserMessageReadAckRequest:
    449      return true;
    450    case Event::Type::kUserMessageReadAck:
    451      return true;
    452    case Event::Type::kUpdatePreviousPeer:
    453      return true;
    454    default:
    455      // No need to check unknown message types since AcceptPeer will return
    456      // an error.
    457      return false;
    458  }
    459 }
    460 
    461 int Node::AcceptEventInternal(const PortRef& port_ref,
    462                              const NodeName& from_node, ScopedEvent event) {
    463  switch (event->type()) {
    464    case Event::Type::kUserMessage:
    465      return OnUserMessage(port_ref, from_node,
    466                           Event::Cast<UserMessageEvent>(&event));
    467    case Event::Type::kPortAccepted:
    468      return OnPortAccepted(port_ref, Event::Cast<PortAcceptedEvent>(&event));
    469    case Event::Type::kObserveProxy:
    470      return OnObserveProxy(port_ref, Event::Cast<ObserveProxyEvent>(&event));
    471    case Event::Type::kObserveProxyAck:
    472      return OnObserveProxyAck(port_ref,
    473                               Event::Cast<ObserveProxyAckEvent>(&event));
    474    case Event::Type::kObserveClosure:
    475      return OnObserveClosure(port_ref,
    476                              Event::Cast<ObserveClosureEvent>(&event));
    477    case Event::Type::kMergePort:
    478      return OnMergePort(port_ref, Event::Cast<MergePortEvent>(&event));
    479    case Event::Type::kUserMessageReadAckRequest:
    480      return OnUserMessageReadAckRequest(
    481          port_ref, Event::Cast<UserMessageReadAckRequestEvent>(&event));
    482    case Event::Type::kUserMessageReadAck:
    483      return OnUserMessageReadAck(port_ref,
    484                                  Event::Cast<UserMessageReadAckEvent>(&event));
    485    case Event::Type::kUpdatePreviousPeer:
    486      return OnUpdatePreviousPeer(port_ref,
    487                                  Event::Cast<UpdatePreviousPeerEvent>(&event));
    488  }
    489  return OOPS(ERROR_NOT_IMPLEMENTED);
    490 }
    491 
    492 int Node::AcceptEvent(const NodeName& from_node, ScopedEvent event) {
    493  PortRef port_ref;
    494  GetPort(event->port_name(), &port_ref);
    495 
    496  DVLOG(2) << "AcceptEvent type: " << event->type() << ", "
    497           << event->from_port() << "@" << from_node << " => "
    498           << port_ref.name() << "@" << name_
    499           << " seq nr: " << event->control_sequence_num() << " port valid? "
    500           << port_ref.is_valid();
    501 
    502  if (!IsEventFromPreviousPeer(*event)) {
    503    DCHECK_EQ(event->control_sequence_num(), kInvalidSequenceNum);
    504    // Some events are not coming from the previous peer, e.g. broadcasts or
    505    // PortAccepted events. No need to check the sequence number or sender.
    506    return AcceptEventInternal(port_ref, from_node, std::move(event));
    507  }
    508 
    509  DCHECK_NE(event->control_sequence_num(), kInvalidSequenceNum);
    510 
    511  if (!port_ref.is_valid()) {
    512    // If we don't have a valid port, there's nothing for us to check. However,
    513    // we pass the ref on to AcceptEventInternal to make sure there's no race
    514    // where it becomes valid and we skipped the peer check.
    515    return AcceptEventInternal(port_ref, from_node, std::move(event));
    516  }
    517 
    518 #ifndef FUZZING_SNAPSHOT
    519  // Before processing the event, verify the sender and sequence number.
    520  {
    521    SinglePortLocker locker(&port_ref);
    522    auto* port = locker.port();
    523    if (!port->IsNextEvent(from_node, *event)) {
    524      DVLOG(2) << "Buffering event (type " << event->type()
    525               << "): " << event->from_port() << "@" << from_node << " => "
    526               << port_ref.name() << "@" << name_
    527               << " seq nr: " << event->control_sequence_num() << " / "
    528               << port->next_control_sequence_num_to_receive << ", want "
    529               << port->prev_port_name << "@" << port->prev_node_name;
    530 
    531      port->BufferEvent(from_node, std::move(event));
    532      return OK;
    533    }
    534  }
    535 #endif
    536 
    537  int ret = AcceptEventInternal(port_ref, from_node, std::move(event));
    538 
    539  // More events might have been enqueued during processing.
    540  while (true) {
    541    ScopedEvent next_event;
    542    NodeName next_from_node;
    543    {
    544      SinglePortLocker locker(&port_ref);
    545      auto* port = locker.port();
    546      // We always increment the control sequence number after we finished
    547      // processing the event. That way we ensure that the events are handled
    548      // in order without keeping a lock the whole time.
    549      port->next_control_sequence_num_to_receive++;
    550      port->NextEvent(&next_from_node, &next_event);
    551 
    552      if (next_event) {
    553        DVLOG(2) << "Handling buffered event (type " << next_event->type()
    554                 << "): " << next_event->from_port() << "@" << next_from_node
    555                 << " => " << port_ref.name() << "@" << name_
    556                 << " seq nr: " << next_event->control_sequence_num() << " / "
    557                 << port->next_control_sequence_num_to_receive;
    558      }
    559    }
    560    if (!next_event) {
    561      break;
    562    }
    563    AcceptEventInternal(port_ref, next_from_node, std::move(next_event));
    564  }
    565 
    566  return ret;
    567 }
    568 
    569 int Node::MergePorts(const PortRef& port_ref,
    570                     const NodeName& destination_node_name,
    571                     const PortName& destination_port_name) {
    572  PortName new_port_name;
    573  Event::PortDescriptor new_port_descriptor;
    574  PendingUpdatePreviousPeer pending_update_event{.from_port = port_ref.name()};
    575  {
    576    // Must be held for ConvertToProxy.
    577    PortLocker::AssertNoPortsLockedOnCurrentThread();
    578    mozilla::MutexAutoLock ports_locker(ports_lock_);
    579 
    580    SinglePortLocker locker(&port_ref);
    581 
    582    DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_
    583             << " to " << destination_port_name << "@" << destination_node_name;
    584 
    585    // Send the port-to-merge over to the destination node so it can be merged
    586    // into the port cycle atomically there.
    587    new_port_name = port_ref.name();
    588    ConvertToProxy(locker.port(), destination_node_name, &new_port_name,
    589                   &new_port_descriptor, &pending_update_event);
    590  }
    591 
    592  delegate_->ForwardEvent(
    593      pending_update_event.receiver,
    594      mozilla::MakeUnique<UpdatePreviousPeerEvent>(
    595          pending_update_event.port, pending_update_event.from_port,
    596          pending_update_event.sequence_num, pending_update_event.new_prev_node,
    597          pending_update_event.new_prev_port));
    598 
    599  if (new_port_descriptor.peer_node_name == name_ &&
    600      destination_node_name != name_) {
    601    // Ensure that the locally retained peer of the new proxy gets a status
    602    // update so it notices that its peer is now remote.
    603    PortRef local_peer;
    604    if (GetPort(new_port_descriptor.peer_port_name, &local_peer) == OK) {
    605      delegate_->PortStatusChanged(local_peer);
    606    }
    607  }
    608 
    609  delegate_->ForwardEvent(
    610      destination_node_name,
    611      mozilla::MakeUnique<MergePortEvent>(destination_port_name,
    612                                          kInvalidPortName, kInvalidSequenceNum,
    613                                          new_port_name, new_port_descriptor));
    614  return OK;
    615 }
    616 
    617 int Node::MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref) {
    618  DVLOG(1) << "Merging local ports " << port0_ref.name() << "@" << name_
    619           << " and " << port1_ref.name() << "@" << name_;
    620  return MergePortsInternal(port0_ref, port1_ref,
    621                            true /* allow_close_on_bad_state */);
    622 }
    623 
    624 int Node::LostConnectionToNode(const NodeName& node_name) {
    625  // We can no longer send events to the given node. We also can't expect any
    626  // PortAccepted events.
    627 
    628  DVLOG(1) << "Observing lost connection from node " << name_ << " to node "
    629           << node_name;
    630 
    631  DestroyAllPortsWithPeer(node_name, kInvalidPortName);
    632  return OK;
    633 }
    634 
    635 int Node::OnUserMessage(const PortRef& port_ref, const NodeName& from_node,
    636                        mozilla::UniquePtr<UserMessageEvent> message) {
    637 #ifdef DEBUG
    638  std::ostringstream ports_buf;
    639  for (size_t i = 0; i < message->num_ports(); ++i) {
    640    if (i > 0) {
    641      ports_buf << ",";
    642    }
    643    ports_buf << message->ports()[i];
    644  }
    645 
    646  DVLOG(4) << "OnUserMessage " << message->sequence_num()
    647           << " [ports=" << ports_buf.str() << "] at " << message->port_name()
    648           << "@" << name_;
    649 #endif
    650 
    651  // Even if this port does not exist, cannot receive anymore messages or is
    652  // buffering or proxying messages, we still need these ports to be bound to
    653  // this node. When the message is forwarded, these ports will get transferred
    654  // following the usual method. If the message cannot be accepted, then the
    655  // newly bound ports will simply be closed.
    656  if (from_node != name_) {
    657    for (size_t i = 0; i < message->num_ports(); ++i) {
    658      Event::PortDescriptor& descriptor = message->port_descriptors()[i];
    659      int rv = AcceptPort(message->ports()[i], descriptor);
    660      if (rv != OK) {
    661        return rv;
    662      }
    663    }
    664  }
    665 
    666  bool has_next_message = false;
    667  bool message_accepted = false;
    668  bool should_forward_messages = false;
    669  if (port_ref.is_valid()) {
    670    SinglePortLocker locker(&port_ref);
    671    auto* port = locker.port();
    672 
    673    // Reject spurious messages if we've already received the last expected
    674    // message.
    675    if (CanAcceptMoreMessages(port)) {
    676      message_accepted = true;
    677      port->message_queue.AcceptMessage(std::move(message), &has_next_message);
    678 
    679      if (port->state == Port::kBuffering) {
    680        has_next_message = false;
    681      } else if (port->state == Port::kProxying) {
    682        has_next_message = false;
    683        should_forward_messages = true;
    684      }
    685    }
    686  }
    687 
    688  if (should_forward_messages) {
    689    int rv = ForwardUserMessagesFromProxy(port_ref);
    690    if (rv != OK) {
    691      return rv;
    692    }
    693    TryRemoveProxy(port_ref);
    694  }
    695 
    696  if (!message_accepted) {
    697    DVLOG(2) << "Message not accepted!\n";
    698    // Close all newly accepted ports as they are effectively orphaned.
    699    for (size_t i = 0; i < message->num_ports(); ++i) {
    700      PortRef attached_port_ref;
    701      if (GetPort(message->ports()[i], &attached_port_ref) == OK) {
    702        ClosePort(attached_port_ref);
    703      } else {
    704        DLOG(WARNING) << "Cannot close non-existent port!\n";
    705      }
    706    }
    707  } else if (has_next_message) {
    708    delegate_->PortStatusChanged(port_ref);
    709  }
    710 
    711  return OK;
    712 }
    713 
    714 int Node::OnPortAccepted(const PortRef& port_ref,
    715                         mozilla::UniquePtr<PortAcceptedEvent> event) {
    716  if (!port_ref.is_valid()) {
    717    return ERROR_PORT_UNKNOWN;
    718  }
    719 
    720 #ifdef DEBUG
    721  {
    722    SinglePortLocker locker(&port_ref);
    723    DVLOG(2) << "PortAccepted at " << port_ref.name() << "@" << name_
    724             << " pointing to " << locker.port()->peer_port_name << "@"
    725             << locker.port()->peer_node_name;
    726  }
    727 #endif
    728 
    729  return BeginProxying(port_ref);
    730 }
    731 
    732 int Node::OnObserveProxy(const PortRef& port_ref,
    733                         mozilla::UniquePtr<ObserveProxyEvent> event) {
    734  if (event->port_name() == kInvalidPortName) {
    735    // An ObserveProxy with an invalid target port name is a broadcast used to
    736    // inform ports when their peer (which was itself a proxy) has become
    737    // defunct due to unexpected node disconnection.
    738    //
    739    // Receiving ports affected by this treat it as equivalent to peer closure.
    740    // Proxies affected by this can be removed and will in turn broadcast their
    741    // own death with a similar message.
    742    DCHECK_EQ(event->proxy_target_node_name(), kInvalidNodeName);
    743    DCHECK_EQ(event->proxy_target_port_name(), kInvalidPortName);
    744    DestroyAllPortsWithPeer(event->proxy_node_name(), event->proxy_port_name());
    745    return OK;
    746  }
    747 
    748  // The port may have already been closed locally, in which case the
    749  // ObserveClosure message will contain the last_sequence_num field.
    750  // We can then silently ignore this message.
    751  if (!port_ref.is_valid()) {
    752    DVLOG(1) << "ObserveProxy: " << event->port_name() << "@" << name_
    753             << " not found";
    754    return OK;
    755  }
    756 
    757  DVLOG(2) << "ObserveProxy at " << port_ref.name() << "@" << name_
    758           << ", proxy at " << event->proxy_port_name() << "@"
    759           << event->proxy_node_name() << " pointing to "
    760           << event->proxy_target_port_name() << "@"
    761           << event->proxy_target_node_name();
    762 
    763  bool peer_changed = false;
    764  ScopedEvent event_to_forward;
    765  NodeName event_target_node;
    766  {
    767    // Must be acquired for UpdatePortPeerAddress below.
    768    PortLocker::AssertNoPortsLockedOnCurrentThread();
    769    mozilla::MutexAutoLock ports_locker(ports_lock_);
    770 
    771    SinglePortLocker locker(&port_ref);
    772    auto* port = locker.port();
    773 
    774    if (port->peer_node_name == event->proxy_node_name() &&
    775        port->peer_port_name == event->proxy_port_name()) {
    776      if (port->state == Port::kReceiving) {
    777        // Updating the port peer will reset the sequence num. Grab it now;
    778        uint64_t sequence_num = port->next_control_sequence_num_to_send++;
    779        UpdatePortPeerAddress(port_ref.name(), port,
    780                              event->proxy_target_node_name(),
    781                              event->proxy_target_port_name());
    782        event_target_node = event->proxy_node_name();
    783        event_to_forward = mozilla::MakeUnique<ObserveProxyAckEvent>(
    784            event->proxy_port_name(), port_ref.name(), sequence_num,
    785            port->next_sequence_num_to_send - 1);
    786        peer_changed = true;
    787        DVLOG(2) << "Forwarding ObserveProxyAck from " << event->port_name()
    788                 << "@" << name_ << " to " << event->proxy_port_name() << "@"
    789                 << event_target_node;
    790      } else {
    791        // As a proxy ourselves, we don't know how to honor the ObserveProxy
    792        // event or to populate the last_sequence_num field of ObserveProxyAck.
    793        // Afterall, another port could be sending messages to our peer now
    794        // that we've sent out our own ObserveProxy event.  Instead, we will
    795        // send an ObserveProxyAck indicating that the ObserveProxy event
    796        // should be re-sent (last_sequence_num set to kInvalidSequenceNum).
    797        // However, this has to be done after we are removed as a proxy.
    798        // Otherwise, we might just find ourselves back here again, which
    799        // would be akin to a busy loop.
    800 
    801        DVLOG(2) << "Delaying ObserveProxyAck to " << event->proxy_port_name()
    802                 << "@" << event->proxy_node_name();
    803 
    804        port->send_on_proxy_removal =
    805            mozilla::MakeUnique<std::pair<NodeName, ScopedEvent>>(
    806                event->proxy_node_name(),
    807                mozilla::MakeUnique<ObserveProxyAckEvent>(
    808                    event->proxy_port_name(), port_ref.name(),
    809                    kInvalidSequenceNum, kInvalidSequenceNum));
    810      }
    811    } else {
    812      // Forward this event along to our peer. Eventually, it should find the
    813      // port referring to the proxy.
    814      event_target_node = port->peer_node_name;
    815      event->set_port_name(port->peer_port_name);
    816      event->set_from_port(port_ref.name());
    817      event->set_control_sequence_num(
    818          port->next_control_sequence_num_to_send++);
    819      if (port->state == Port::kBuffering) {
    820        port->control_message_queue.push_back(
    821            {event_target_node, std::move(event)});
    822      } else {
    823        event_to_forward = std::move(event);
    824      }
    825    }
    826  }
    827 
    828  if (event_to_forward) {
    829    delegate_->ForwardEvent(event_target_node, std::move(event_to_forward));
    830  }
    831 
    832  if (peer_changed) {
    833    // Re-send ack and/or ack requests, as the previous peer proxy may not have
    834    // forwarded the previous request before it died.
    835    MaybeResendAck(port_ref);
    836    MaybeResendAckRequest(port_ref);
    837 
    838    delegate_->PortStatusChanged(port_ref);
    839 
    840    if (event->proxy_target_node_name() != name_) {
    841      delegate_->ObserveRemoteNode(event->proxy_target_node_name());
    842    }
    843  }
    844 
    845  return OK;
    846 }
    847 
    848 int Node::OnObserveProxyAck(const PortRef& port_ref,
    849                            mozilla::UniquePtr<ObserveProxyAckEvent> event) {
    850  DVLOG(2) << "ObserveProxyAck at " << event->port_name() << "@" << name_
    851           << " (last_sequence_num=" << event->last_sequence_num() << ")";
    852 
    853  if (!port_ref.is_valid()) {
    854    return ERROR_PORT_UNKNOWN;  // The port may have observed closure first.
    855  }
    856 
    857  bool try_remove_proxy_immediately;
    858  bool erase_port = false;
    859  {
    860    SinglePortLocker locker(&port_ref);
    861    auto* port = locker.port();
    862 
    863    if (port->state == Port::kProxying) {
    864      // If the last sequence number is invalid, this is a signal that we need
    865      // to retransmit the ObserveProxy event for this port rather than flagging
    866      // the the proxy for removal ASAP.
    867      try_remove_proxy_immediately =
    868          event->last_sequence_num() != kInvalidSequenceNum;
    869      if (try_remove_proxy_immediately) {
    870        // We can now remove this port once we have received and forwarded the
    871        // last message addressed to this port.
    872        port->remove_proxy_on_last_message = true;
    873        port->last_sequence_num_to_receive = event->last_sequence_num();
    874      }
    875    } else if (port->state == Port::kClosed) {
    876      erase_port = true;
    877    } else {
    878      return OOPS(ERROR_PORT_STATE_UNEXPECTED);
    879    }
    880  }
    881 
    882  if (erase_port) {
    883    ErasePort(port_ref.name());
    884    return OK;
    885  }
    886 
    887  if (try_remove_proxy_immediately) {
    888    TryRemoveProxy(port_ref);
    889  } else {
    890    InitiateProxyRemoval(port_ref);
    891  }
    892 
    893  return OK;
    894 }
    895 
    896 int Node::OnObserveClosure(const PortRef& port_ref,
    897                           mozilla::UniquePtr<ObserveClosureEvent> event) {
    898  // OK if the port doesn't exist, as it may have been closed already.
    899  if (!port_ref.is_valid()) {
    900    return OK;
    901  }
    902 
    903  // This message tells the port that it should no longer expect more messages
    904  // beyond last_sequence_num. This message is forwarded along until we reach
    905  // the receiving end, and this message serves as an equivalent to
    906  // ObserveProxyAck.
    907 
    908  bool notify_delegate = false;
    909  NodeName peer_node_name;
    910  bool try_remove_proxy = false;
    911  bool erase_port = false;
    912  {
    913    SinglePortLocker locker(&port_ref);
    914    auto* port = locker.port();
    915 
    916    port->peer_closed = true;
    917    port->last_sequence_num_to_receive = event->last_sequence_num();
    918 
    919    DVLOG(2) << "ObserveClosure at " << port_ref.name() << "@" << name_
    920             << " (state=" << port->state << ") pointing to "
    921             << port->peer_port_name << "@" << port->peer_node_name
    922             << " (last_sequence_num=" << event->last_sequence_num() << ")";
    923 
    924    // We always forward ObserveClosure, even beyond the receiving port which
    925    // cares about it. This ensures that any dead-end proxies beyond that port
    926    // are notified to remove themselves.
    927 
    928    if (port->state == Port::kReceiving) {
    929      notify_delegate = true;
    930 
    931      // When forwarding along the other half of the port cycle, this will only
    932      // reach dead-end proxies. Tell them we've sent our last message so they
    933      // can go away.
    934      //
    935      // TODO: Repurposing ObserveClosure for this has the desired result but
    936      // may be semantically confusing since the forwarding port is not actually
    937      // closed. Consider replacing this with a new event type.
    938      event->set_last_sequence_num(port->next_sequence_num_to_send - 1);
    939 
    940      // Treat the closure as an acknowledge that all sent messages have been
    941      // read from the other end.
    942      port->last_sequence_num_acknowledged =
    943          port->next_sequence_num_to_send - 1;
    944    } else if (port->state == Port::kClosed) {
    945      // This is the ack for a closed proxy port notification. Now it's fine to
    946      // delete the port.
    947      erase_port = true;
    948    } else {
    949      // We haven't yet reached the receiving peer of the closed port, so we'll
    950      // forward the message along as-is.
    951      // See about removing the port if it is a proxy as our peer won't be able
    952      // to participate in proxy removal.
    953      port->remove_proxy_on_last_message = true;
    954      if (port->state == Port::kProxying) {
    955        try_remove_proxy = true;
    956      }
    957    }
    958 
    959    DVLOG(2) << "Forwarding ObserveClosure from " << port_ref.name() << "@"
    960             << name_ << " to peer " << port->peer_port_name << "@"
    961             << port->peer_node_name
    962             << " (last_sequence_num=" << event->last_sequence_num() << ")";
    963 
    964    event->set_port_name(port->peer_port_name);
    965    event->set_from_port(port_ref.name());
    966    event->set_control_sequence_num(port->next_control_sequence_num_to_send++);
    967    peer_node_name = port->peer_node_name;
    968 
    969    if (port->state == Port::kBuffering) {
    970      port->control_message_queue.push_back({peer_node_name, std::move(event)});
    971    }
    972  }
    973 
    974  if (try_remove_proxy) {
    975    TryRemoveProxy(port_ref);
    976  }
    977 
    978  if (erase_port) {
    979    ErasePort(port_ref.name());
    980  }
    981 
    982  if (event) {
    983    delegate_->ForwardEvent(peer_node_name, std::move(event));
    984  }
    985 
    986  if (notify_delegate) {
    987    delegate_->PortStatusChanged(port_ref);
    988  }
    989 
    990  return OK;
    991 }
    992 
    993 int Node::OnMergePort(const PortRef& port_ref,
    994                      mozilla::UniquePtr<MergePortEvent> event) {
    995  DVLOG(1) << "MergePort at " << port_ref.name() << "@" << name_
    996           << " merging with proxy " << event->new_port_name() << "@" << name_
    997           << " pointing to " << event->new_port_descriptor().peer_port_name
    998           << "@" << event->new_port_descriptor().peer_node_name
    999           << " referred by "
   1000           << event->new_port_descriptor().referring_port_name << "@"
   1001           << event->new_port_descriptor().referring_node_name;
   1002 
   1003  // Accept the new port. This is now the receiving end of the other port cycle
   1004  // to be merged with ours. Note that we always attempt to accept the new port
   1005  // first as otherwise its peer receiving port could be left stranded
   1006  // indefinitely.
   1007  if (AcceptPort(event->new_port_name(), event->new_port_descriptor()) != OK) {
   1008    if (port_ref.is_valid()) {
   1009      ClosePort(port_ref);
   1010    }
   1011    return ERROR_PORT_STATE_UNEXPECTED;
   1012  }
   1013 
   1014  PortRef new_port_ref;
   1015  GetPort(event->new_port_name(), &new_port_ref);
   1016  if (!port_ref.is_valid() && new_port_ref.is_valid()) {
   1017    ClosePort(new_port_ref);
   1018    return ERROR_PORT_UNKNOWN;
   1019  }
   1020  if (port_ref.is_valid() && !new_port_ref.is_valid()) {
   1021    ClosePort(port_ref);
   1022    return ERROR_PORT_UNKNOWN;
   1023  }
   1024 
   1025  bool peer_allowed = true;
   1026  {
   1027    SinglePortLocker locker(&port_ref);
   1028    auto* port = locker.port();
   1029    if (!port->pending_merge_peer) {
   1030      CHROMIUM_LOG(ERROR) << "MergePort called on unexpected port: "
   1031                          << event->port_name();
   1032      peer_allowed = false;
   1033    } else {
   1034      port->pending_merge_peer = false;
   1035    }
   1036  }
   1037  if (!peer_allowed) {
   1038    ClosePort(port_ref);
   1039    return ERROR_PORT_STATE_UNEXPECTED;
   1040  }
   1041 
   1042  return MergePortsInternal(port_ref, new_port_ref,
   1043                            false /* allow_close_on_bad_state */);
   1044 }
   1045 
   1046 int Node::OnUserMessageReadAckRequest(
   1047    const PortRef& port_ref,
   1048    mozilla::UniquePtr<UserMessageReadAckRequestEvent> event) {
   1049  DVLOG(1) << "AckRequest " << port_ref.name() << "@" << name_ << " sequence "
   1050           << event->sequence_num_to_acknowledge();
   1051 
   1052  if (!port_ref.is_valid()) {
   1053    return ERROR_PORT_UNKNOWN;
   1054  }
   1055 
   1056  NodeName peer_node_name;
   1057  mozilla::UniquePtr<Event> event_to_send;
   1058  {
   1059    SinglePortLocker locker(&port_ref);
   1060    auto* port = locker.port();
   1061 
   1062    peer_node_name = port->peer_node_name;
   1063    if (port->state == Port::kProxying) {
   1064      // Proxies simply forward the ack request to their peer.
   1065      event->set_port_name(port->peer_port_name);
   1066      event->set_from_port(port_ref.name());
   1067      event->set_control_sequence_num(
   1068          port->next_control_sequence_num_to_send++);
   1069      event_to_send = std::move(event);
   1070    } else {
   1071      uint64_t current_sequence_num =
   1072          port->message_queue.next_sequence_num() - 1;
   1073      // Either this is requesting an ack for a sequence number already read, or
   1074      // else for a sequence number that is yet to be read.
   1075      if (current_sequence_num >= event->sequence_num_to_acknowledge()) {
   1076        // If the current sequence number to read already exceeds the ack
   1077        // request, send an ack immediately.
   1078        event_to_send = mozilla::MakeUnique<UserMessageReadAckEvent>(
   1079            port->peer_port_name, port_ref.name(),
   1080            port->next_control_sequence_num_to_send++, current_sequence_num);
   1081 
   1082        if (port->state == Port::kBuffering) {
   1083          port->control_message_queue.push_back(
   1084              {peer_node_name, std::move(event_to_send)});
   1085        }
   1086 
   1087        // This might be a late or duplicate acknowledge request, that's
   1088        // requesting acknowledge for an already read message. There may already
   1089        // have been a request for future reads, so take care not to back up
   1090        // the requested acknowledge counter.
   1091        if (current_sequence_num > port->sequence_num_to_acknowledge) {
   1092          port->sequence_num_to_acknowledge = current_sequence_num;
   1093        }
   1094      } else {
   1095        // This is request to ack a sequence number that hasn't been read yet.
   1096        // The state of the port can either be that it already has a
   1097        // future-requested ack, or not. Because ack requests aren't guaranteed
   1098        // to arrive in order, store the earlier of the current  queued request
   1099        // and the new one, if one was already requested.
   1100        bool has_queued_ack_request =
   1101            port->sequence_num_to_acknowledge > current_sequence_num;
   1102        if (!has_queued_ack_request ||
   1103            port->sequence_num_to_acknowledge >
   1104                event->sequence_num_to_acknowledge()) {
   1105          port->sequence_num_to_acknowledge =
   1106              event->sequence_num_to_acknowledge();
   1107        }
   1108        return OK;
   1109      }
   1110    }
   1111  }
   1112 
   1113  if (event_to_send) {
   1114    delegate_->ForwardEvent(peer_node_name, std::move(event_to_send));
   1115  }
   1116 
   1117  return OK;
   1118 }
   1119 
   1120 int Node::OnUserMessageReadAck(
   1121    const PortRef& port_ref,
   1122    mozilla::UniquePtr<UserMessageReadAckEvent> event) {
   1123  DVLOG(1) << "Acknowledge " << port_ref.name() << "@" << name_ << " sequence "
   1124           << event->sequence_num_acknowledged();
   1125 
   1126  NodeName peer_node_name;
   1127  ScopedEvent ack_request_event;
   1128  if (port_ref.is_valid()) {
   1129    SinglePortLocker locker(&port_ref);
   1130    auto* port = locker.port();
   1131 
   1132    if (event->sequence_num_acknowledged() >= port->next_sequence_num_to_send) {
   1133      // TODO(http://crbug.com/980952): This is a malformed event.
   1134      //      This could return a new error "ERROR_MALFORMED_EVENT" which the
   1135      //      delegate could use as a signal to drop the peer node.
   1136      return OK;
   1137    }
   1138 
   1139    // Keep the largest acknowledge seen.
   1140    if (event->sequence_num_acknowledged() <=
   1141        port->last_sequence_num_acknowledged) {
   1142      // The acknowledge was late or a duplicate, it's safe to ignore it.
   1143      return OK;
   1144    }
   1145 
   1146    port->last_sequence_num_acknowledged = event->sequence_num_acknowledged();
   1147    // Send another ack request if the interval is non-zero and the peer has
   1148    // not been closed.
   1149    if (port->sequence_num_acknowledge_interval && !port->peer_closed) {
   1150      peer_node_name = port->peer_node_name;
   1151      ack_request_event = mozilla::MakeUnique<UserMessageReadAckRequestEvent>(
   1152          port->peer_port_name, port_ref.name(),
   1153          port->next_control_sequence_num_to_send++,
   1154          port->last_sequence_num_acknowledged +
   1155              port->sequence_num_acknowledge_interval);
   1156      DCHECK_NE(port->state, Port::kBuffering);
   1157    }
   1158  }
   1159  if (ack_request_event) {
   1160    delegate_->ForwardEvent(peer_node_name, std::move(ack_request_event));
   1161  }
   1162 
   1163  if (port_ref.is_valid()) {
   1164    delegate_->PortStatusChanged(port_ref);
   1165  }
   1166 
   1167  return OK;
   1168 }
   1169 
   1170 int Node::OnUpdatePreviousPeer(
   1171    const PortRef& port_ref,
   1172    mozilla::UniquePtr<UpdatePreviousPeerEvent> event) {
   1173  DVLOG(1) << "OnUpdatePreviousPeer port: " << event->port_name()
   1174           << " changing to " << event->new_node_name()
   1175           << ", port: " << event->from_port() << " => "
   1176           << event->new_port_name();
   1177 
   1178  if (!port_ref.is_valid()) {
   1179    return ERROR_PORT_UNKNOWN;
   1180  }
   1181 
   1182  const NodeName& new_node_name = event->new_node_name();
   1183  const PortName& new_port_name = event->new_port_name();
   1184  DCHECK_NE(new_node_name, kInvalidNodeName);
   1185  DCHECK_NE(new_port_name, kInvalidPortName);
   1186  if (new_node_name == kInvalidNodeName || new_port_name == kInvalidPortName) {
   1187    return ERROR_PORT_STATE_UNEXPECTED;
   1188  }
   1189 
   1190  {
   1191    SinglePortLocker locker(&port_ref);
   1192    auto* port = locker.port();
   1193 
   1194    port->prev_node_name = new_node_name;
   1195    port->prev_port_name = new_port_name;
   1196    // The sequence number will get incremented after this event has been
   1197    // handled.
   1198    port->next_control_sequence_num_to_receive = kInitialSequenceNum - 1;
   1199  }
   1200 
   1201  return OK;
   1202 }
   1203 
   1204 int Node::AddPortWithName(const PortName& port_name, RefPtr<Port> port) {
   1205  PortLocker::AssertNoPortsLockedOnCurrentThread();
   1206  mozilla::MutexAutoLock lock(ports_lock_);
   1207  if (port->peer_port_name != kInvalidPortName) {
   1208    DCHECK_NE(kInvalidNodeName, port->peer_node_name);
   1209    peer_port_maps_[port->peer_node_name][port->peer_port_name].emplace(
   1210        port_name, PortRef(port_name, port));
   1211  }
   1212  if (!ports_.emplace(port_name, std::move(port)).second) {
   1213    return OOPS(ERROR_PORT_EXISTS);  // Suggests a bad UUID generator.
   1214  }
   1215  DVLOG(2) << "Created port " << port_name << "@" << name_;
   1216  return OK;
   1217 }
   1218 
   1219 void Node::ErasePort(const PortName& port_name) {
   1220  PortLocker::AssertNoPortsLockedOnCurrentThread();
   1221  RefPtr<Port> port;
   1222  {
   1223    mozilla::MutexAutoLock lock(ports_lock_);
   1224    auto it = ports_.find(port_name);
   1225    if (it == ports_.end()) {
   1226      return;
   1227    }
   1228    port = std::move(it->second);
   1229    ports_.erase(it);
   1230 
   1231    RemoveFromPeerPortMap(port_name, port.get());
   1232  }
   1233  // NOTE: We are careful not to release the port's messages while holding any
   1234  // locks, since they may run arbitrary user code upon destruction.
   1235  std::vector<mozilla::UniquePtr<UserMessageEvent>> messages;
   1236  {
   1237    PortRef port_ref(port_name, std::move(port));
   1238    SinglePortLocker locker(&port_ref);
   1239    locker.port()->message_queue.TakeAllMessages(&messages);
   1240  }
   1241  DVLOG(2) << "Deleted port " << port_name << "@" << name_;
   1242 }
   1243 
   1244 int Node::SendUserMessageInternal(
   1245    const PortRef& port_ref, mozilla::UniquePtr<UserMessageEvent>* message) {
   1246  mozilla::UniquePtr<UserMessageEvent>& m = *message;
   1247 
   1248  m->set_from_port(port_ref.name());
   1249 
   1250  for (size_t i = 0; i < m->num_ports(); ++i) {
   1251    if (m->ports()[i] == port_ref.name()) {
   1252      return ERROR_PORT_CANNOT_SEND_SELF;
   1253    }
   1254  }
   1255 
   1256  NodeName target_node;
   1257  int rv = PrepareToForwardUserMessage(port_ref, Port::kReceiving,
   1258                                       false /* ignore_closed_peer */, m.get(),
   1259                                       &target_node);
   1260  if (rv != OK) {
   1261    return rv;
   1262  }
   1263 
   1264  // Beyond this point there's no sense in returning anything but OK. Even if
   1265  // message forwarding or acceptance fails, there's nothing the embedder can
   1266  // do to recover. Assume that failure beyond this point must be treated as a
   1267  // transport failure.
   1268 
   1269  DCHECK_NE(kInvalidNodeName, target_node);
   1270  if (target_node != name_) {
   1271    delegate_->ForwardEvent(target_node, std::move(m));
   1272    return OK;
   1273  }
   1274 
   1275  int accept_result = AcceptEvent(name_, std::move(m));
   1276  if (accept_result != OK) {
   1277    // See comment above for why we don't return an error in this case.
   1278    DVLOG(2) << "AcceptEvent failed: " << accept_result;
   1279  }
   1280 
   1281  return OK;
   1282 }
   1283 
   1284 int Node::MergePortsInternal(const PortRef& port0_ref, const PortRef& port1_ref,
   1285                             bool allow_close_on_bad_state) {
   1286  const PortRef* port_refs[2] = {&port0_ref, &port1_ref};
   1287  PendingUpdatePreviousPeer pending_update_events[2];
   1288  uint64_t original_sequence_number[2];
   1289  {
   1290    // Needed to swap peer map entries below.
   1291    PortLocker::AssertNoPortsLockedOnCurrentThread();
   1292    mozilla::ReleasableMutexAutoLock ports_locker(ports_lock_);
   1293 
   1294    mozilla::Maybe<PortLocker> locker(std::in_place, port_refs, size_t(2));
   1295    auto* port0 = locker->GetPort(port0_ref);
   1296    auto* port1 = locker->GetPort(port1_ref);
   1297 
   1298    // There are several conditions which must be met before we'll consider
   1299    // merging two ports:
   1300    //
   1301    // - They must both be in the kReceiving state
   1302    // - They must not be each other's peer
   1303    // - They must have never sent a user message
   1304    //
   1305    // If any of these criteria are not met, we fail early.
   1306    if (port0->state != Port::kReceiving || port1->state != Port::kReceiving ||
   1307        (port0->peer_node_name == name_ &&
   1308         port0->peer_port_name == port1_ref.name()) ||
   1309        (port1->peer_node_name == name_ &&
   1310         port1->peer_port_name == port0_ref.name()) ||
   1311        port0->next_sequence_num_to_send != kInitialSequenceNum ||
   1312        port1->next_sequence_num_to_send != kInitialSequenceNum) {
   1313      // On failure, we only close a port if it was at least properly in the
   1314      // |kReceiving| state. This avoids getting the system in an inconsistent
   1315      // state by e.g. closing a proxy abruptly.
   1316      //
   1317      // Note that we must release the port locks before closing ports.
   1318      const bool close_port0 =
   1319          port0->state == Port::kReceiving || allow_close_on_bad_state;
   1320      const bool close_port1 =
   1321          port1->state == Port::kReceiving || allow_close_on_bad_state;
   1322      locker.reset();
   1323      ports_locker.Unlock();
   1324      if (close_port0) {
   1325        ClosePort(port0_ref);
   1326      }
   1327      if (close_port1) {
   1328        ClosePort(port1_ref);
   1329      }
   1330      return ERROR_PORT_STATE_UNEXPECTED;
   1331    }
   1332 
   1333    pending_update_events[0] = {
   1334        .receiver = port0->peer_node_name,
   1335        .port = port0->peer_port_name,
   1336        .from_port = port0_ref.name(),
   1337        .sequence_num = port0->next_control_sequence_num_to_send++,
   1338        .new_prev_node = name_,
   1339        .new_prev_port = port1_ref.name()};
   1340    pending_update_events[1] = {
   1341        .receiver = port1->peer_node_name,
   1342        .port = port1->peer_port_name,
   1343        .from_port = port1_ref.name(),
   1344        .sequence_num = port1->next_control_sequence_num_to_send++,
   1345        .new_prev_node = name_,
   1346        .new_prev_port = port0_ref.name()};
   1347 
   1348    // Swap the ports' peer information and switch them both to proxying mode.
   1349    SwapPortPeers(port0_ref.name(), port0, port1_ref.name(), port1);
   1350    port0->state = Port::kProxying;
   1351    port1->state = Port::kProxying;
   1352    original_sequence_number[0] = port0->next_control_sequence_num_to_send;
   1353    original_sequence_number[1] = port1->next_control_sequence_num_to_send;
   1354    port0->next_control_sequence_num_to_send = kInitialSequenceNum;
   1355    port1->next_control_sequence_num_to_send = kInitialSequenceNum;
   1356    if (port0->peer_closed) {
   1357      port0->remove_proxy_on_last_message = true;
   1358    }
   1359    if (port1->peer_closed) {
   1360      port1->remove_proxy_on_last_message = true;
   1361    }
   1362  }
   1363 
   1364  // Flush any queued messages from the new proxies and, if successful, complete
   1365  // the merge by initiating proxy removals.
   1366  if (ForwardUserMessagesFromProxy(port0_ref) == OK &&
   1367      ForwardUserMessagesFromProxy(port1_ref) == OK) {
   1368    // Send the prev peer updates out after the forwarding the user messages
   1369    // succeeded. Otherwise, we won't be able to restore the previous state
   1370    // below.
   1371    for (const auto& pending_update_event : pending_update_events) {
   1372      delegate_->ForwardEvent(
   1373          pending_update_event.receiver,
   1374          mozilla::MakeUnique<UpdatePreviousPeerEvent>(
   1375              pending_update_event.port, pending_update_event.from_port,
   1376              pending_update_event.sequence_num,
   1377              pending_update_event.new_prev_node,
   1378              pending_update_event.new_prev_port));
   1379    }
   1380 
   1381    for (const auto* const port_ref : port_refs) {
   1382      bool try_remove_proxy_immediately = false;
   1383      ScopedEvent closure_event;
   1384      NodeName closure_event_target_node;
   1385      {
   1386        SinglePortLocker locker(port_ref);
   1387        auto* port = locker.port();
   1388        DCHECK_EQ(port->state, Port::kProxying);
   1389        try_remove_proxy_immediately = port->remove_proxy_on_last_message;
   1390        if (try_remove_proxy_immediately || port->peer_closed) {
   1391          // If either end of the port cycle is closed, we propagate an
   1392          // ObserveClosure event.
   1393          closure_event_target_node = port->peer_node_name;
   1394          closure_event = mozilla::MakeUnique<ObserveClosureEvent>(
   1395              port->peer_port_name, port_ref->name(),
   1396              port->next_control_sequence_num_to_send++,
   1397              port->last_sequence_num_to_receive);
   1398        }
   1399      }
   1400      if (try_remove_proxy_immediately) {
   1401        TryRemoveProxy(*port_ref);
   1402      } else {
   1403        InitiateProxyRemoval(*port_ref);
   1404      }
   1405 
   1406      if (closure_event) {
   1407        delegate_->ForwardEvent(closure_event_target_node,
   1408                                std::move(closure_event));
   1409      }
   1410    }
   1411 
   1412    return OK;
   1413  }
   1414 
   1415  // If we failed to forward proxied messages, we keep the system in a
   1416  // consistent state by undoing the peer swap and closing the ports.
   1417  {
   1418    PortLocker::AssertNoPortsLockedOnCurrentThread();
   1419    mozilla::MutexAutoLock ports_locker(ports_lock_);
   1420    PortLocker locker(port_refs, 2);
   1421    auto* port0 = locker.GetPort(port0_ref);
   1422    auto* port1 = locker.GetPort(port1_ref);
   1423    SwapPortPeers(port0_ref.name(), port0, port1_ref.name(), port1);
   1424    port0->remove_proxy_on_last_message = false;
   1425    port1->remove_proxy_on_last_message = false;
   1426    DCHECK_EQ(Port::kProxying, port0->state);
   1427    DCHECK_EQ(Port::kProxying, port1->state);
   1428    port0->state = Port::kReceiving;
   1429    port1->state = Port::kReceiving;
   1430    port0->next_control_sequence_num_to_send = original_sequence_number[0];
   1431    port1->next_control_sequence_num_to_send = original_sequence_number[1];
   1432  }
   1433 
   1434  ClosePort(port0_ref);
   1435  ClosePort(port1_ref);
   1436  return ERROR_PORT_STATE_UNEXPECTED;
   1437 }
   1438 
   1439 void Node::ConvertToProxy(Port* port, const NodeName& to_node_name,
   1440                          PortName* port_name,
   1441                          Event::PortDescriptor* port_descriptor,
   1442                          PendingUpdatePreviousPeer* pending_update) {
   1443  port->AssertLockAcquired();
   1444  PortName local_port_name = *port_name;
   1445 
   1446  PortName new_port_name;
   1447  GenerateRandomPortName(&new_port_name);
   1448 
   1449  pending_update->receiver = port->peer_node_name;
   1450  pending_update->port = port->peer_port_name;
   1451  pending_update->sequence_num = port->next_control_sequence_num_to_send++;
   1452  pending_update->new_prev_node = to_node_name;
   1453  pending_update->new_prev_port = new_port_name;
   1454 
   1455  // Make sure we don't send messages to the new peer until after we know it
   1456  // exists. In the meantime, just buffer messages locally.
   1457  DCHECK_EQ(port->state, Port::kReceiving);
   1458  port->state = Port::kBuffering;
   1459 
   1460  // If we already know our peer is closed, we already know this proxy can
   1461  // be removed once it receives and forwards its last expected message.
   1462  if (port->peer_closed) {
   1463    port->remove_proxy_on_last_message = true;
   1464  }
   1465 
   1466  *port_name = new_port_name;
   1467 
   1468  port_descriptor->peer_node_name = port->peer_node_name;
   1469  port_descriptor->peer_port_name = port->peer_port_name;
   1470  port_descriptor->referring_node_name = name_;
   1471  port_descriptor->referring_port_name = local_port_name;
   1472  port_descriptor->next_sequence_num_to_send = port->next_sequence_num_to_send;
   1473  port_descriptor->next_sequence_num_to_receive =
   1474      port->message_queue.next_sequence_num();
   1475  port_descriptor->last_sequence_num_to_receive =
   1476      port->last_sequence_num_to_receive;
   1477  port_descriptor->peer_closed = port->peer_closed;
   1478  memset(port_descriptor->padding, 0, sizeof(port_descriptor->padding));
   1479 
   1480  // Configure the local port to point to the new port.
   1481  UpdatePortPeerAddress(local_port_name, port, to_node_name, new_port_name);
   1482 }
   1483 
   1484 int Node::AcceptPort(const PortName& port_name,
   1485                     const Event::PortDescriptor& port_descriptor) {
   1486  RefPtr<Port> port =
   1487      mozilla::MakeRefPtr<Port>(port_descriptor.next_sequence_num_to_send,
   1488                                port_descriptor.next_sequence_num_to_receive);
   1489  port->state = Port::kReceiving;
   1490  port->peer_node_name = port_descriptor.peer_node_name;
   1491  port->peer_port_name = port_descriptor.peer_port_name;
   1492  port->next_control_sequence_num_to_send = kInitialSequenceNum;
   1493  port->next_control_sequence_num_to_receive = kInitialSequenceNum;
   1494  port->prev_node_name = port_descriptor.referring_node_name;
   1495  port->prev_port_name = port_descriptor.referring_port_name;
   1496  port->last_sequence_num_to_receive =
   1497      port_descriptor.last_sequence_num_to_receive;
   1498  port->peer_closed = port_descriptor.peer_closed;
   1499 
   1500  DVLOG(2) << "Accepting port " << port_name
   1501           << " [peer_closed=" << port->peer_closed
   1502           << "; last_sequence_num_to_receive="
   1503           << port->last_sequence_num_to_receive << "]";
   1504 
   1505  // A newly accepted port is not signalable until the message referencing the
   1506  // new port finds its way to the consumer (see GetMessage).
   1507  port->message_queue.set_signalable(false);
   1508 
   1509  int rv = AddPortWithName(port_name, std::move(port));
   1510  if (rv != OK) {
   1511    return rv;
   1512  }
   1513 
   1514  // Allow referring port to forward messages.
   1515  delegate_->ForwardEvent(port_descriptor.referring_node_name,
   1516                          mozilla::MakeUnique<PortAcceptedEvent>(
   1517                              port_descriptor.referring_port_name,
   1518                              kInvalidPortName, kInvalidSequenceNum));
   1519 
   1520  if (port_descriptor.peer_node_name != name_) {
   1521    delegate_->ObserveRemoteNode(port_descriptor.peer_node_name);
   1522  }
   1523 
   1524  return OK;
   1525 }
   1526 
   1527 int Node::PrepareToForwardUserMessage(const PortRef& forwarding_port_ref,
   1528                                      Port::State expected_port_state,
   1529                                      bool ignore_closed_peer,
   1530                                      UserMessageEvent* message,
   1531                                      NodeName* forward_to_node) {
   1532  bool target_is_remote = false;
   1533  std::vector<PendingUpdatePreviousPeer> peer_update_events;
   1534 
   1535  for (;;) {
   1536    NodeName target_node_name;
   1537    {
   1538      SinglePortLocker locker(&forwarding_port_ref);
   1539      target_node_name = locker.port()->peer_node_name;
   1540    }
   1541 
   1542    // NOTE: This may call out to arbitrary user code, so it's important to call
   1543    // it only while no port locks are held on the calling thread.
   1544    if (target_node_name != name_) {
   1545      if (!message->NotifyWillBeRoutedExternally()) {
   1546        CHROMIUM_LOG(ERROR)
   1547            << "NotifyWillBeRoutedExternally failed unexpectedly.";
   1548        return ERROR_PORT_STATE_UNEXPECTED;
   1549      }
   1550    }
   1551 
   1552    // Must be held because ConvertToProxy needs to update |peer_port_maps_|.
   1553    PortLocker::AssertNoPortsLockedOnCurrentThread();
   1554    mozilla::MutexAutoLock ports_locker(ports_lock_);
   1555 
   1556    // Simultaneously lock the forwarding port as well as all attached ports.
   1557    AutoTArray<PortRef, 4> attached_port_refs;
   1558    AutoTArray<const PortRef*, 5> ports_to_lock;
   1559    attached_port_refs.SetCapacity(message->num_ports());
   1560    ports_to_lock.SetCapacity(message->num_ports() + 1);
   1561    ports_to_lock.AppendElement(&forwarding_port_ref);
   1562    for (size_t i = 0; i < message->num_ports(); ++i) {
   1563      const PortName& attached_port_name = message->ports()[i];
   1564      auto iter = ports_.find(attached_port_name);
   1565      DCHECK(iter != ports_.end());
   1566      attached_port_refs.AppendElement(
   1567          PortRef(attached_port_name, iter->second));
   1568      ports_to_lock.AppendElement(&attached_port_refs[i]);
   1569    }
   1570    PortLocker locker(ports_to_lock.Elements(), ports_to_lock.Length());
   1571    auto* forwarding_port = locker.GetPort(forwarding_port_ref);
   1572 
   1573    if (forwarding_port->peer_node_name != target_node_name) {
   1574      // The target node has already changed since we last held the lock.
   1575      if (target_node_name == name_) {
   1576        // If the target node was previously this local node, we need to restart
   1577        // the loop, since that means we may now route the message externally.
   1578        continue;
   1579      }
   1580 
   1581      target_node_name = forwarding_port->peer_node_name;
   1582    }
   1583    target_is_remote = target_node_name != name_;
   1584 
   1585    if (forwarding_port->state != expected_port_state) {
   1586      return ERROR_PORT_STATE_UNEXPECTED;
   1587    }
   1588    if (forwarding_port->peer_closed && !ignore_closed_peer) {
   1589      return ERROR_PORT_PEER_CLOSED;
   1590    }
   1591 
   1592    // Messages may already have a sequence number if they're being forwarded by
   1593    // a proxy. Otherwise, use the next outgoing sequence number.
   1594    if (message->sequence_num() == 0) {
   1595      message->set_sequence_num(forwarding_port->next_sequence_num_to_send++);
   1596    }
   1597 #ifdef DEBUG
   1598    std::ostringstream ports_buf;
   1599    for (size_t i = 0; i < message->num_ports(); ++i) {
   1600      if (i > 0) {
   1601        ports_buf << ",";
   1602      }
   1603      ports_buf << message->ports()[i];
   1604    }
   1605 #endif
   1606 
   1607    if (message->num_ports() > 0) {
   1608      // Sanity check to make sure we can actually send all the attached ports.
   1609      // They must all be in the |kReceiving| state and must not be the sender's
   1610      // own peer.
   1611      DCHECK_EQ(message->num_ports(), attached_port_refs.Length());
   1612      for (size_t i = 0; i < message->num_ports(); ++i) {
   1613        auto* attached_port = locker.GetPort(attached_port_refs[i]);
   1614        int error = OK;
   1615        if (attached_port->state != Port::kReceiving) {
   1616          error = ERROR_PORT_STATE_UNEXPECTED;
   1617        } else if (attached_port_refs[i].name() ==
   1618                   forwarding_port->peer_port_name) {
   1619          error = ERROR_PORT_CANNOT_SEND_PEER;
   1620        }
   1621 
   1622        if (error != OK) {
   1623          // Not going to send. Backpedal on the sequence number.
   1624          forwarding_port->next_sequence_num_to_send--;
   1625          return error;
   1626        }
   1627      }
   1628 
   1629      if (target_is_remote) {
   1630        // We only bother to proxy and rewrite ports in the event if it's
   1631        // going to be routed to an external node. This substantially reduces
   1632        // the amount of port churn in the system, as many port-carrying
   1633        // events are routed at least 1 or 2 intra-node hops before (if ever)
   1634        // being routed externally.
   1635        Event::PortDescriptor* port_descriptors = message->port_descriptors();
   1636        for (size_t i = 0; i < message->num_ports(); ++i) {
   1637          auto* port = locker.GetPort(attached_port_refs[i]);
   1638          PendingUpdatePreviousPeer update_event = {
   1639              .from_port = attached_port_refs[i].name()};
   1640          ConvertToProxy(port, target_node_name, message->ports() + i,
   1641                         port_descriptors + i, &update_event);
   1642          peer_update_events.push_back(update_event);
   1643        }
   1644      }
   1645    }
   1646 
   1647 #ifdef DEBUG
   1648    DVLOG(4) << "Sending message " << message->sequence_num()
   1649             << " [ports=" << ports_buf.str() << "]"
   1650             << " from " << forwarding_port_ref.name() << "@" << name_ << " to "
   1651             << forwarding_port->peer_port_name << "@" << target_node_name;
   1652 #endif
   1653 
   1654    *forward_to_node = target_node_name;
   1655    message->set_port_name(forwarding_port->peer_port_name);
   1656    message->set_from_port(forwarding_port_ref.name());
   1657    message->set_control_sequence_num(
   1658        forwarding_port->next_control_sequence_num_to_send++);
   1659    break;
   1660  }
   1661 
   1662  for (auto& pending_update_event : peer_update_events) {
   1663    delegate_->ForwardEvent(
   1664        pending_update_event.receiver,
   1665        mozilla::MakeUnique<UpdatePreviousPeerEvent>(
   1666            pending_update_event.port, pending_update_event.from_port,
   1667            pending_update_event.sequence_num,
   1668            pending_update_event.new_prev_node,
   1669            pending_update_event.new_prev_port));
   1670  }
   1671 
   1672  if (target_is_remote) {
   1673    for (size_t i = 0; i < message->num_ports(); ++i) {
   1674      // For any ports that were converted to proxies above, make sure their
   1675      // prior local peer (if applicable) receives a status update so it can be
   1676      // made aware of its peer's location.
   1677      const Event::PortDescriptor& descriptor = message->port_descriptors()[i];
   1678      if (descriptor.peer_node_name == name_) {
   1679        PortRef local_peer;
   1680        if (GetPort(descriptor.peer_port_name, &local_peer) == OK) {
   1681          delegate_->PortStatusChanged(local_peer);
   1682        }
   1683      }
   1684    }
   1685  }
   1686 
   1687  return OK;
   1688 }
   1689 
   1690 int Node::BeginProxying(const PortRef& port_ref) {
   1691  std::vector<std::pair<NodeName, ScopedEvent>> control_message_queue;
   1692  {
   1693    SinglePortLocker locker(&port_ref);
   1694    auto* port = locker.port();
   1695    if (port->state != Port::kBuffering) {
   1696      return OOPS(ERROR_PORT_STATE_UNEXPECTED);
   1697    }
   1698    port->state = Port::kProxying;
   1699    std::swap(port->control_message_queue, control_message_queue);
   1700  }
   1701 
   1702  for (auto& [control_message_node_name, control_message_event] :
   1703       control_message_queue) {
   1704    delegate_->ForwardEvent(control_message_node_name,
   1705                            std::move(control_message_event));
   1706  }
   1707  control_message_queue.clear();
   1708 
   1709  int rv = ForwardUserMessagesFromProxy(port_ref);
   1710  if (rv != OK) {
   1711    return rv;
   1712  }
   1713 
   1714  // Forward any pending acknowledge request.
   1715  MaybeForwardAckRequest(port_ref);
   1716 
   1717  bool try_remove_proxy_immediately;
   1718  {
   1719    SinglePortLocker locker(&port_ref);
   1720    auto* port = locker.port();
   1721    if (port->state != Port::kProxying) {
   1722      return OOPS(ERROR_PORT_STATE_UNEXPECTED);
   1723    }
   1724 
   1725    try_remove_proxy_immediately = port->remove_proxy_on_last_message;
   1726  }
   1727 
   1728  if (try_remove_proxy_immediately) {
   1729    TryRemoveProxy(port_ref);
   1730  } else {
   1731    InitiateProxyRemoval(port_ref);
   1732  }
   1733 
   1734  return OK;
   1735 }
   1736 
   1737 int Node::ForwardUserMessagesFromProxy(const PortRef& port_ref) {
   1738  for (;;) {
   1739    // NOTE: We forward messages in sequential order here so that we maintain
   1740    // the message queue's notion of next sequence number. That's useful for the
   1741    // proxy removal process as we can tell when this port has seen all of the
   1742    // messages it is expected to see.
   1743    mozilla::UniquePtr<UserMessageEvent> message;
   1744    {
   1745      SinglePortLocker locker(&port_ref);
   1746      locker.port()->message_queue.GetNextMessage(&message, nullptr);
   1747      if (!message) {
   1748        break;
   1749      }
   1750    }
   1751 
   1752    NodeName target_node;
   1753    int rv = PrepareToForwardUserMessage(port_ref, Port::kProxying,
   1754                                         true /* ignore_closed_peer */,
   1755                                         message.get(), &target_node);
   1756    {
   1757      // Mark the message as processed after we ran PrepareToForwardUserMessage.
   1758      // This is important to prevent another thread from deleting the port
   1759      // before we grabbed a sequence number for the message.
   1760      SinglePortLocker locker(&port_ref);
   1761      locker.port()->message_queue.MessageProcessed();
   1762    }
   1763    if (rv != OK) {
   1764      return rv;
   1765    }
   1766 
   1767    delegate_->ForwardEvent(target_node, std::move(message));
   1768  }
   1769  return OK;
   1770 }
   1771 
   1772 void Node::InitiateProxyRemoval(const PortRef& port_ref) {
   1773  NodeName peer_node_name;
   1774  PortName peer_port_name;
   1775  uint64_t sequence_num;
   1776  {
   1777    SinglePortLocker locker(&port_ref);
   1778    auto* port = locker.port();
   1779    if (port->state == Port::kClosed) {
   1780      return;
   1781    }
   1782    peer_node_name = port->peer_node_name;
   1783    peer_port_name = port->peer_port_name;
   1784    sequence_num = port->next_control_sequence_num_to_send++;
   1785    DCHECK_EQ(port->state, Port::kProxying);
   1786  }
   1787 
   1788  // To remove this node, we start by notifying the connected graph that we are
   1789  // a proxy. This allows whatever port is referencing this node to skip it.
   1790  // Eventually, this node will receive ObserveProxyAck (or ObserveClosure if
   1791  // the peer was closed in the meantime).
   1792  delegate_->ForwardEvent(
   1793      peer_node_name, mozilla::MakeUnique<ObserveProxyEvent>(
   1794                          peer_port_name, port_ref.name(), sequence_num, name_,
   1795                          port_ref.name(), peer_node_name, peer_port_name));
   1796 }
   1797 
   1798 void Node::TryRemoveProxy(const PortRef& port_ref) {
   1799  bool should_erase = false;
   1800  NodeName removal_target_node;
   1801  ScopedEvent removal_event;
   1802  PendingUpdatePreviousPeer pending_update_event;
   1803 
   1804  {
   1805    SinglePortLocker locker(&port_ref);
   1806    auto* port = locker.port();
   1807    if (port->state == Port::kClosed) {
   1808      return;
   1809    }
   1810    DCHECK_EQ(port->state, Port::kProxying);
   1811 
   1812    // Make sure we have seen ObserveProxyAck before removing the port.
   1813    if (!port->remove_proxy_on_last_message) {
   1814      return;
   1815    }
   1816 
   1817    if (!CanAcceptMoreMessages(port)) {
   1818      DCHECK_EQ(port->message_queue.queued_message_count(), 0lu);
   1819      should_erase = true;
   1820      if (port->send_on_proxy_removal) {
   1821        removal_target_node = port->send_on_proxy_removal->first;
   1822        removal_event = std::move(port->send_on_proxy_removal->second);
   1823        if (removal_event) {
   1824          removal_event->set_control_sequence_num(
   1825              port->next_control_sequence_num_to_send++);
   1826          DCHECK_EQ(removal_target_node, port->peer_node_name);
   1827          DCHECK_EQ(removal_event->port_name(), port->peer_port_name);
   1828        }
   1829      }
   1830      // Tell the peer_node to accept messages from prev_node from now.
   1831      pending_update_event = {
   1832          .receiver = port->peer_node_name,
   1833          .port = port->peer_port_name,
   1834          .from_port = port_ref.name(),
   1835          .sequence_num = port->next_control_sequence_num_to_send++,
   1836          .new_prev_node = port->prev_node_name,
   1837          .new_prev_port = port->prev_port_name};
   1838    } else {
   1839      DVLOG(2) << "Cannot remove port " << port_ref.name() << "@" << name_
   1840               << " now; waiting for more messages";
   1841    }
   1842  }
   1843 
   1844  if (should_erase) {
   1845    delegate_->ForwardEvent(
   1846        pending_update_event.receiver,
   1847        mozilla::MakeUnique<UpdatePreviousPeerEvent>(
   1848            pending_update_event.port, pending_update_event.from_port,
   1849            pending_update_event.sequence_num,
   1850            pending_update_event.new_prev_node,
   1851            pending_update_event.new_prev_port));
   1852    ErasePort(port_ref.name());
   1853  }
   1854 
   1855  if (removal_event) {
   1856    delegate_->ForwardEvent(removal_target_node, std::move(removal_event));
   1857  }
   1858 }
   1859 
   1860 void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
   1861                                   const PortName& port_name) {
   1862  // Wipes out all ports whose peer node matches |node_name| and whose peer port
   1863  // matches |port_name|. If |port_name| is |kInvalidPortName|, only the peer
   1864  // node is matched.
   1865 
   1866  std::vector<PortRef> ports_to_notify;
   1867  std::vector<PortName> dead_proxies_to_broadcast;
   1868  std::vector<mozilla::UniquePtr<UserMessageEvent>> undelivered_messages;
   1869  std::vector<std::pair<NodeName, ScopedEvent>> closure_events;
   1870 
   1871  {
   1872    PortLocker::AssertNoPortsLockedOnCurrentThread();
   1873    mozilla::MutexAutoLock ports_lock(ports_lock_);
   1874 
   1875    auto node_peer_port_map_iter = peer_port_maps_.find(node_name);
   1876    if (node_peer_port_map_iter == peer_port_maps_.end()) {
   1877      return;
   1878    }
   1879 
   1880    auto& node_peer_port_map = node_peer_port_map_iter->second;
   1881    auto peer_ports_begin = node_peer_port_map.begin();
   1882    auto peer_ports_end = node_peer_port_map.end();
   1883    if (port_name != kInvalidPortName) {
   1884      // If |port_name| is given, we limit the set of local ports to the ones
   1885      // with that specific port as their peer.
   1886      peer_ports_begin = node_peer_port_map.find(port_name);
   1887      if (peer_ports_begin == node_peer_port_map.end()) {
   1888        return;
   1889      }
   1890 
   1891      peer_ports_end = peer_ports_begin;
   1892      ++peer_ports_end;
   1893    }
   1894 
   1895    for (auto peer_port_iter = peer_ports_begin;
   1896         peer_port_iter != peer_ports_end; ++peer_port_iter) {
   1897      auto& local_ports = peer_port_iter->second;
   1898      // NOTE: This inner loop almost always has only one element. There are
   1899      // relatively short-lived cases where more than one local port points to
   1900      // the same peer, and this only happens when extra ports are bypassed
   1901      // proxies waiting to be torn down.
   1902      for (auto& local_port : local_ports) {
   1903        auto& local_port_ref = local_port.second;
   1904 
   1905        SinglePortLocker locker(&local_port_ref);
   1906        auto* port = locker.port();
   1907 
   1908        if (port_name != kInvalidPortName) {
   1909          // If this is a targeted observe dead proxy event, send out an
   1910          // ObserveClosure to acknowledge it.
   1911          closure_events.push_back(
   1912              std::pair{port->peer_node_name,
   1913                        mozilla::MakeUnique<ObserveClosureEvent>(
   1914                            port->peer_port_name, local_port_ref.name(),
   1915                            port->next_control_sequence_num_to_send++,
   1916                            port->last_sequence_num_to_receive)});
   1917        }
   1918 
   1919        if (!port->peer_closed) {
   1920          // Treat this as immediate peer closure. It's an exceptional
   1921          // condition akin to a broken pipe, so we don't care about losing
   1922          // messages.
   1923 
   1924          port->peer_closed = true;
   1925          port->peer_lost_unexpectedly = true;
   1926          if (port->state == Port::kReceiving) {
   1927            ports_to_notify.push_back(local_port_ref);
   1928          }
   1929        }
   1930 
   1931        // We don't expect to forward any further messages, and we don't
   1932        // expect to receive a Port{Accepted,Rejected} event. Because we're
   1933        // a proxy with no active peer, we cannot use the normal proxy removal
   1934        // procedure of forward-propagating an ObserveProxy. Instead we
   1935        // broadcast our own death so it can be back-propagated. This is
   1936        // inefficient but rare.
   1937        if (port->state == Port::kBuffering || port->state == Port::kProxying) {
   1938          port->state = Port::kClosed;
   1939          dead_proxies_to_broadcast.push_back(local_port_ref.name());
   1940          std::vector<mozilla::UniquePtr<UserMessageEvent>> messages;
   1941          port->message_queue.TakeAllMessages(&messages);
   1942          port->TakePendingMessages(messages);
   1943          for (auto& message : messages) {
   1944            undelivered_messages.emplace_back(std::move(message));
   1945          }
   1946        }
   1947      }
   1948    }
   1949  }
   1950 
   1951  for (auto& [closure_event_target_node, closure_event] : closure_events) {
   1952    delegate_->ForwardEvent(closure_event_target_node,
   1953                            std::move(closure_event));
   1954  }
   1955 
   1956  // Wake up any receiving ports who have just observed simulated peer closure.
   1957  for (const auto& port : ports_to_notify) {
   1958    delegate_->PortStatusChanged(port);
   1959  }
   1960 
   1961  for (const auto& proxy_name : dead_proxies_to_broadcast) {
   1962    // Broadcast an event signifying that this proxy is no longer functioning.
   1963    delegate_->BroadcastEvent(mozilla::MakeUnique<ObserveProxyEvent>(
   1964        kInvalidPortName, kInvalidPortName, kInvalidSequenceNum, name_,
   1965        proxy_name, kInvalidNodeName, kInvalidPortName));
   1966 
   1967    // Also process death locally since the port that points this closed one
   1968    // could be on the current node.
   1969    // Note: Although this is recursive, only a single port is involved which
   1970    // limits the expected branching to 1.
   1971    DestroyAllPortsWithPeer(name_, proxy_name);
   1972  }
   1973 
   1974  // Close any ports referenced by undelivered messages.
   1975  for (const auto& message : undelivered_messages) {
   1976    for (size_t i = 0; i < message->num_ports(); ++i) {
   1977      PortRef ref;
   1978      if (GetPort(message->ports()[i], &ref) == OK) {
   1979        ClosePort(ref);
   1980      }
   1981    }
   1982  }
   1983 }
   1984 
   1985 void Node::UpdatePortPeerAddress(const PortName& local_port_name,
   1986                                 Port* local_port,
   1987                                 const NodeName& new_peer_node,
   1988                                 const PortName& new_peer_port) {
   1989  ports_lock_.AssertCurrentThreadOwns();
   1990  local_port->AssertLockAcquired();
   1991 
   1992  RemoveFromPeerPortMap(local_port_name, local_port);
   1993  local_port->peer_node_name = new_peer_node;
   1994  local_port->peer_port_name = new_peer_port;
   1995  local_port->next_control_sequence_num_to_send = kInitialSequenceNum;
   1996  if (new_peer_port != kInvalidPortName) {
   1997    peer_port_maps_[new_peer_node][new_peer_port].emplace(
   1998        local_port_name, PortRef(local_port_name, RefPtr<Port>{local_port}));
   1999  }
   2000 }
   2001 
   2002 void Node::RemoveFromPeerPortMap(const PortName& local_port_name,
   2003                                 Port* local_port) {
   2004  if (local_port->peer_port_name == kInvalidPortName) {
   2005    return;
   2006  }
   2007 
   2008  auto node_iter = peer_port_maps_.find(local_port->peer_node_name);
   2009  if (node_iter == peer_port_maps_.end()) {
   2010    return;
   2011  }
   2012 
   2013  auto& node_peer_port_map = node_iter->second;
   2014  auto ports_iter = node_peer_port_map.find(local_port->peer_port_name);
   2015  if (ports_iter == node_peer_port_map.end()) {
   2016    return;
   2017  }
   2018 
   2019  auto& local_ports_with_this_peer = ports_iter->second;
   2020  local_ports_with_this_peer.erase(local_port_name);
   2021  if (local_ports_with_this_peer.empty()) {
   2022    node_peer_port_map.erase(ports_iter);
   2023  }
   2024  if (node_peer_port_map.empty()) {
   2025    peer_port_maps_.erase(node_iter);
   2026  }
   2027 }
   2028 
   2029 void Node::SwapPortPeers(const PortName& port0_name, Port* port0,
   2030                         const PortName& port1_name, Port* port1) {
   2031  ports_lock_.AssertCurrentThreadOwns();
   2032  port0->AssertLockAcquired();
   2033  port1->AssertLockAcquired();
   2034 
   2035  auto& peer0_ports =
   2036      peer_port_maps_[port0->peer_node_name][port0->peer_port_name];
   2037  auto& peer1_ports =
   2038      peer_port_maps_[port1->peer_node_name][port1->peer_port_name];
   2039  peer0_ports.erase(port0_name);
   2040  peer1_ports.erase(port1_name);
   2041  peer0_ports.emplace(port1_name, PortRef(port1_name, RefPtr<Port>{port1}));
   2042  peer1_ports.emplace(port0_name, PortRef(port0_name, RefPtr<Port>{port0}));
   2043 
   2044  std::swap(port0->peer_node_name, port1->peer_node_name);
   2045  std::swap(port0->peer_port_name, port1->peer_port_name);
   2046 }
   2047 
   2048 void Node::MaybeResendAckRequest(const PortRef& port_ref) {
   2049  NodeName peer_node_name;
   2050  ScopedEvent ack_request_event;
   2051  {
   2052    SinglePortLocker locker(&port_ref);
   2053    auto* port = locker.port();
   2054    if (port->state != Port::kReceiving) {
   2055      return;
   2056    }
   2057 
   2058    if (!port->sequence_num_acknowledge_interval) {
   2059      return;
   2060    }
   2061 
   2062    peer_node_name = port->peer_node_name;
   2063    ack_request_event = mozilla::MakeUnique<UserMessageReadAckRequestEvent>(
   2064        port->peer_port_name, port_ref.name(),
   2065        port->next_control_sequence_num_to_send++,
   2066        port->last_sequence_num_acknowledged +
   2067            port->sequence_num_acknowledge_interval);
   2068  }
   2069 
   2070  delegate_->ForwardEvent(peer_node_name, std::move(ack_request_event));
   2071 }
   2072 
   2073 void Node::MaybeForwardAckRequest(const PortRef& port_ref) {
   2074  NodeName peer_node_name;
   2075  ScopedEvent ack_request_event;
   2076  {
   2077    SinglePortLocker locker(&port_ref);
   2078    auto* port = locker.port();
   2079    if (port->state != Port::kProxying) {
   2080      return;
   2081    }
   2082 
   2083    if (!port->sequence_num_to_acknowledge) {
   2084      return;
   2085    }
   2086 
   2087    peer_node_name = port->peer_node_name;
   2088    ack_request_event = mozilla::MakeUnique<UserMessageReadAckRequestEvent>(
   2089        port->peer_port_name, port_ref.name(),
   2090        port->next_control_sequence_num_to_send++,
   2091        port->sequence_num_to_acknowledge);
   2092 
   2093    port->sequence_num_to_acknowledge = 0;
   2094  }
   2095 
   2096  delegate_->ForwardEvent(peer_node_name, std::move(ack_request_event));
   2097 }
   2098 
   2099 void Node::MaybeResendAck(const PortRef& port_ref) {
   2100  NodeName peer_node_name;
   2101  ScopedEvent ack_event;
   2102  {
   2103    SinglePortLocker locker(&port_ref);
   2104    auto* port = locker.port();
   2105    if (port->state != Port::kReceiving) {
   2106      return;
   2107    }
   2108 
   2109    uint64_t last_sequence_num_read =
   2110        port->message_queue.next_sequence_num() - 1;
   2111    if (!port->sequence_num_to_acknowledge || !last_sequence_num_read) {
   2112      return;
   2113    }
   2114 
   2115    peer_node_name = port->peer_node_name;
   2116    ack_event = mozilla::MakeUnique<UserMessageReadAckEvent>(
   2117        port->peer_port_name, port_ref.name(),
   2118        port->next_control_sequence_num_to_send++, last_sequence_num_read);
   2119  }
   2120 
   2121  delegate_->ForwardEvent(peer_node_name, std::move(ack_event));
   2122 }
   2123 
   2124 Node::DelegateHolder::DelegateHolder(Node* node, NodeDelegate* delegate)
   2125    : node_(node), delegate_(delegate) {
   2126  DCHECK(node_);
   2127 }
   2128 
   2129 Node::DelegateHolder::~DelegateHolder() = default;
   2130 
   2131 #ifdef DEBUG
   2132 void Node::DelegateHolder::EnsureSafeDelegateAccess() const {
   2133  PortLocker::AssertNoPortsLockedOnCurrentThread();
   2134  mozilla::MutexAutoLock lock(node_->ports_lock_);
   2135 }
   2136 #endif
   2137 
   2138 }  // namespace ports
   2139 }  // namespace core
   2140 }  // namespace mojo