tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

ports_unittest.cc (53595B)


      1 // Copyright 2016 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include <inttypes.h>
      6 #include <stdio.h>
      7 
      8 #include <map>
      9 #include <utility>
     10 
     11 #include "base/logging.h"
     12 #include "base/waitable_event.h"
     13 #include "base/thread.h"
     14 #include "base/string_util.h"
     15 #include "mojo/core/ports/event.h"
     16 #include "mojo/core/ports/node.h"
     17 #include "mojo/core/ports/node_delegate.h"
     18 #include "mojo/core/ports/port_locker.h"
     19 #include "mojo/core/ports/user_message.h"
     20 #include "testing/gtest/include/gtest/gtest.h"
     21 
     22 #include "mozilla/Mutex.h"
     23 
     24 namespace mojo {
     25 namespace core {
     26 namespace ports {
     27 namespace test {
     28 
     29 namespace {
     30 
     31 // TODO(rockot): Remove this unnecessary alias.
     32 using ScopedMessage = mozilla::UniquePtr<UserMessageEvent>;
     33 
     34 class TestMessage : public UserMessage {
     35 public:
     36  static const TypeInfo kUserMessageTypeInfo;
     37 
     38  explicit TestMessage(const std::string& payload)
     39      : UserMessage(&kUserMessageTypeInfo), payload_(payload) {}
     40  ~TestMessage() override = default;
     41 
     42  const std::string& payload() const { return payload_; }
     43 
     44 private:
     45  std::string payload_;
     46 };
     47 
     48 const UserMessage::TypeInfo TestMessage::kUserMessageTypeInfo = {};
     49 
     50 ScopedMessage NewUserMessageEvent(const std::string& payload,
     51                                  size_t num_ports) {
     52  auto event = mozilla::MakeUnique<UserMessageEvent>(num_ports);
     53  event->AttachMessage(mozilla::MakeUnique<TestMessage>(payload));
     54  return event;
     55 }
     56 
     57 bool MessageEquals(const ScopedMessage& message, const std::string& s) {
     58  return message->GetMessage<TestMessage>()->payload() == s;
     59 }
     60 
     61 class TestNode;
     62 
     63 class MessageRouter {
     64 public:
     65  virtual ~MessageRouter() = default;
     66 
     67  virtual void ForwardEvent(TestNode* from_node, const NodeName& node_name,
     68                            ScopedEvent event) = 0;
     69  virtual void BroadcastEvent(TestNode* from_node, ScopedEvent event) = 0;
     70 };
     71 
     72 class TestNode : public NodeDelegate {
     73 public:
     74  explicit TestNode(uint64_t id)
     75      : node_name_(id, 1),
     76        node_(node_name_, this),
     77        node_thread_(StringPrintf("Node %" PRIu64 " thread", id).c_str()),
     78        events_available_event_(/* manual_reset */ false,
     79                                /* initially_signaled */ false),
     80        idle_event_(/* manual_reset */ true, /* initially_signaled */ true) {}
     81 
     82  ~TestNode() override {
     83    StopWhenIdle();
     84    node_thread_.Stop();
     85  }
     86 
     87  const NodeName& name() const { return node_name_; }
     88 
     89  // NOTE: Node is thread-safe.
     90  Node& node() { return node_; }
     91 
     92  base::WaitableEvent& idle_event() { return idle_event_; }
     93 
     94  bool IsIdle() {
     95    mozilla::MutexAutoLock lock(lock_);
     96    return started_ && !dispatching_ &&
     97           (incoming_events_.empty() || (block_on_event_ && blocked_));
     98  }
     99 
    100  void BlockOnEvent(Event::Type type) {
    101    mozilla::MutexAutoLock lock(lock_);
    102    blocked_event_type_ = type;
    103    block_on_event_ = true;
    104  }
    105 
    106  void Unblock() {
    107    mozilla::MutexAutoLock lock(lock_);
    108    block_on_event_ = false;
    109    events_available_event_.Signal();
    110  }
    111 
    112  void Start(MessageRouter* router) {
    113    router_ = router;
    114    node_thread_.Start();
    115    node_thread_.message_loop()->PostTask(mozilla::NewNonOwningRunnableMethod(
    116        "TestNode::ProcessEvents", this, &TestNode::ProcessEvents));
    117  }
    118 
    119  void StopWhenIdle() {
    120    mozilla::MutexAutoLock lock(lock_);
    121    should_quit_ = true;
    122    events_available_event_.Signal();
    123  }
    124 
    125  void WakeUp() { events_available_event_.Signal(); }
    126 
    127  int SendStringMessage(const PortRef& port, const std::string& s) {
    128    return node_.SendUserMessage(port, NewUserMessageEvent(s, 0));
    129  }
    130 
    131  int SendMultipleMessages(const PortRef& port, size_t num_messages) {
    132    for (size_t i = 0; i < num_messages; ++i) {
    133      int result = SendStringMessage(port, "");
    134      if (result != OK) {
    135        return result;
    136      }
    137    }
    138    return OK;
    139  }
    140 
    141  int SendStringMessageWithPort(const PortRef& port, const std::string& s,
    142                                const PortName& sent_port_name) {
    143    auto event = NewUserMessageEvent(s, 1);
    144    event->ports()[0] = sent_port_name;
    145    return node_.SendUserMessage(port, std::move(event));
    146  }
    147 
    148  int SendStringMessageWithPort(const PortRef& port, const std::string& s,
    149                                const PortRef& sent_port) {
    150    return SendStringMessageWithPort(port, s, sent_port.name());
    151  }
    152 
    153  void set_drop_messages(bool value) {
    154    mozilla::MutexAutoLock lock(lock_);
    155    drop_messages_ = value;
    156  }
    157 
    158  void set_save_messages(bool value) {
    159    mozilla::MutexAutoLock lock(lock_);
    160    save_messages_ = value;
    161  }
    162 
    163  bool ReadMessage(const PortRef& port, ScopedMessage* message) {
    164    return node_.GetMessage(port, message, nullptr) == OK && *message;
    165  }
    166 
    167  bool ReadMultipleMessages(const PortRef& port, size_t num_messages) {
    168    for (size_t i = 0; i < num_messages; ++i) {
    169      ScopedMessage message;
    170      if (!ReadMessage(port, &message)) {
    171        return false;
    172      }
    173    }
    174    return true;
    175  }
    176 
    177  bool GetSavedMessage(ScopedMessage* message) {
    178    mozilla::MutexAutoLock lock(lock_);
    179    if (saved_messages_.empty()) {
    180      message->reset();
    181      return false;
    182    }
    183    std::swap(*message, saved_messages_.front());
    184    saved_messages_.pop();
    185    return true;
    186  }
    187 
    188  void EnqueueEvent(const NodeName& from_node, ScopedEvent event) {
    189    idle_event_.Reset();
    190 
    191    // NOTE: This may be called from ForwardMessage and thus must not reenter
    192    // |node_|.
    193    mozilla::MutexAutoLock lock(lock_);
    194    incoming_events_.push({from_node, std::move(event)});
    195    events_available_event_.Signal();
    196  }
    197 
    198  void ForwardEvent(const NodeName& node_name, ScopedEvent event) override {
    199    {
    200      mozilla::MutexAutoLock lock(lock_);
    201      if (drop_messages_) {
    202        DVLOG(1) << "Dropping ForwardMessage from node " << node_name_ << " to "
    203                 << node_name;
    204 
    205        mozilla::MutexAutoUnlock unlock(lock_);
    206        ClosePortsInEvent(event.get());
    207        return;
    208      }
    209    }
    210 
    211    DCHECK(router_);
    212    DVLOG(1) << "ForwardEvent from node " << node_name_ << " to " << node_name;
    213    router_->ForwardEvent(this, node_name, std::move(event));
    214  }
    215 
    216  void BroadcastEvent(ScopedEvent event) override {
    217    router_->BroadcastEvent(this, std::move(event));
    218  }
    219 
    220  void PortStatusChanged(const PortRef& port) override {
    221    // The port may be closed, in which case we ignore the notification.
    222    mozilla::MutexAutoLock lock(lock_);
    223    if (!save_messages_) {
    224      return;
    225    }
    226 
    227    for (;;) {
    228      ScopedMessage message;
    229      {
    230        mozilla::MutexAutoUnlock unlock(lock_);
    231        if (!ReadMessage(port, &message)) {
    232          break;
    233        }
    234      }
    235 
    236      saved_messages_.emplace(std::move(message));
    237    }
    238  }
    239 
    240  void ObserveRemoteNode(const NodeName& node) override {
    241    DCHECK(node != node_name_);
    242  }
    243 
    244  void ClosePortsInEvent(Event* event) {
    245    if (event->type() != Event::Type::kUserMessage) {
    246      return;
    247    }
    248 
    249    UserMessageEvent* message_event = static_cast<UserMessageEvent*>(event);
    250    for (size_t i = 0; i < message_event->num_ports(); ++i) {
    251      PortRef port;
    252      ASSERT_EQ(OK, node_.GetPort(message_event->ports()[i], &port));
    253      EXPECT_EQ(OK, node_.ClosePort(port));
    254    }
    255  }
    256 
    257  uint64_t GetUnacknowledgedMessageCount(const PortRef& port_ref) {
    258    PortStatus status{};
    259    if (node_.GetStatus(port_ref, &status) != OK) {
    260      return 0;
    261    }
    262 
    263    return status.unacknowledged_message_count;
    264  }
    265 
    266  void AllowPortMerge(const PortRef& port_ref) {
    267    SinglePortLocker locker(&port_ref);
    268    locker.port()->pending_merge_peer = true;
    269  }
    270 
    271 private:
    272  void ProcessEvents() {
    273    for (;;) {
    274      events_available_event_.Wait();
    275      mozilla::MutexAutoLock lock(lock_);
    276 
    277      if (should_quit_) {
    278        return;
    279      }
    280 
    281      dispatching_ = true;
    282      while (!incoming_events_.empty()) {
    283        if (block_on_event_ &&
    284            incoming_events_.front().second->type() == blocked_event_type_) {
    285          blocked_ = true;
    286          // Go idle if we hit a blocked event type.
    287          break;
    288        }
    289        blocked_ = false;
    290 
    291        auto node_event_pair = std::move(incoming_events_.front());
    292        incoming_events_.pop();
    293 
    294        // NOTE: AcceptMessage() can re-enter this object to call any of the
    295        // NodeDelegate interface methods.
    296        mozilla::MutexAutoUnlock unlock(lock_);
    297        node_.AcceptEvent(node_event_pair.first,
    298                          std::move(node_event_pair.second));
    299      }
    300 
    301      dispatching_ = false;
    302      started_ = true;
    303      idle_event_.Signal();
    304    };
    305  }
    306 
    307  const NodeName node_name_;
    308  Node node_;
    309  MessageRouter* router_ = nullptr;
    310 
    311  base::Thread node_thread_;
    312  base::WaitableEvent events_available_event_;
    313  base::WaitableEvent idle_event_;
    314 
    315  // Guards fields below.
    316  mozilla::Mutex lock_ MOZ_UNANNOTATED{"TestNode"};
    317  bool started_ = false;
    318  bool dispatching_ = false;
    319  bool should_quit_ = false;
    320  bool drop_messages_ = false;
    321  bool save_messages_ = false;
    322  bool blocked_ = false;
    323  bool block_on_event_ = false;
    324  Event::Type blocked_event_type_{};
    325  std::queue<std::pair<NodeName, ScopedEvent>> incoming_events_;
    326  std::queue<ScopedMessage> saved_messages_;
    327 };
    328 
    329 class PortsTest : public testing::Test, public MessageRouter {
    330 public:
    331  void AddNode(TestNode* node) {
    332    {
    333      mozilla::MutexAutoLock lock(lock_);
    334      nodes_[node->name()] = node;
    335    }
    336    node->Start(this);
    337  }
    338 
    339  void RemoveNode(TestNode* node) {
    340    {
    341      mozilla::MutexAutoLock lock(lock_);
    342      nodes_.erase(node->name());
    343    }
    344 
    345    for (const auto& entry : nodes_) {
    346      entry.second->node().LostConnectionToNode(node->name());
    347    }
    348  }
    349 
    350  // Waits until all known Nodes are idle. Message forwarding and processing
    351  // is handled in such a way that idleness is a stable state: once all nodes in
    352  // the system are idle, they will remain idle until the test explicitly
    353  // initiates some further event (e.g. sending a message, closing a port, or
    354  // removing a Node).
    355  void WaitForIdle() {
    356    for (;;) {
    357      mozilla::MutexAutoLock global_lock(global_lock_);
    358      bool all_nodes_idle = true;
    359      for (const auto& entry : nodes_) {
    360        if (!entry.second->IsIdle()) {
    361          all_nodes_idle = false;
    362        }
    363        entry.second->WakeUp();
    364      }
    365      if (all_nodes_idle) {
    366        return;
    367      }
    368 
    369      // Wait for any Node to signal that it's idle.
    370      mozilla::MutexAutoUnlock global_unlock(global_lock_);
    371      std::vector<base::WaitableEvent*> events;
    372      for (const auto& entry : nodes_) {
    373        events.push_back(&entry.second->idle_event());
    374      }
    375      base::WaitableEvent::WaitMany(events.data(), events.size());
    376    }
    377  }
    378 
    379  void CreatePortPair(TestNode* node0, PortRef* port0, TestNode* node1,
    380                      PortRef* port1) {
    381    if (node0 == node1) {
    382      EXPECT_EQ(OK, node0->node().CreatePortPair(port0, port1));
    383    } else {
    384      EXPECT_EQ(OK, node0->node().CreateUninitializedPort(port0));
    385      EXPECT_EQ(OK, node1->node().CreateUninitializedPort(port1));
    386      EXPECT_EQ(
    387          OK, node0->node().InitializePort(*port0, node1->name(), port1->name(),
    388                                           node1->name(), port1->name()));
    389      EXPECT_EQ(
    390          OK, node1->node().InitializePort(*port1, node0->name(), port0->name(),
    391                                           node0->name(), port0->name()));
    392    }
    393  }
    394 
    395 private:
    396  // MessageRouter:
    397  void ForwardEvent(TestNode* from_node, const NodeName& node_name,
    398                    ScopedEvent event) override {
    399    mozilla::MutexAutoLock global_lock(global_lock_);
    400    mozilla::MutexAutoLock lock(lock_);
    401    // Drop messages from nodes that have been removed.
    402    if (nodes_.find(from_node->name()) == nodes_.end()) {
    403      from_node->ClosePortsInEvent(event.get());
    404      return;
    405    }
    406 
    407    auto it = nodes_.find(node_name);
    408    if (it == nodes_.end()) {
    409      DVLOG(1) << "Node not found: " << node_name;
    410      return;
    411    }
    412 
    413    // Serialize and de-serialize all forwarded events.
    414    size_t buf_size = event->GetSerializedSize();
    415    mozilla::UniquePtr<char[]> buf(new char[buf_size]);
    416    event->Serialize(buf.get());
    417    ScopedEvent copy = Event::Deserialize(buf.get(), buf_size);
    418    // This should always succeed unless serialization or deserialization
    419    // is broken. In that case, the loss of events should cause a test failure.
    420    ASSERT_TRUE(copy);
    421 
    422    // Also copy the payload for user messages.
    423    if (event->type() == Event::Type::kUserMessage) {
    424      UserMessageEvent* message_event =
    425          static_cast<UserMessageEvent*>(event.get());
    426      UserMessageEvent* message_copy =
    427          static_cast<UserMessageEvent*>(copy.get());
    428 
    429      message_copy->AttachMessage(mozilla::MakeUnique<TestMessage>(
    430          message_event->GetMessage<TestMessage>()->payload()));
    431    }
    432 
    433    it->second->EnqueueEvent(from_node->name(), std::move(event));
    434  }
    435 
    436  void BroadcastEvent(TestNode* from_node, ScopedEvent event) override {
    437    mozilla::MutexAutoLock global_lock(global_lock_);
    438    mozilla::MutexAutoLock lock(lock_);
    439 
    440    // Drop messages from nodes that have been removed.
    441    if (nodes_.find(from_node->name()) == nodes_.end()) {
    442      return;
    443    }
    444 
    445    for (const auto& entry : nodes_) {
    446      TestNode* node = entry.second;
    447      // Broadcast doesn't deliver to the local node.
    448      if (node == from_node) {
    449        continue;
    450      }
    451      node->EnqueueEvent(from_node->name(), event->CloneForBroadcast());
    452    }
    453  }
    454 
    455  // Acquired before any operation which makes a Node busy, and before testing
    456  // if all nodes are idle.
    457  mozilla::Mutex global_lock_ MOZ_UNANNOTATED{"PortsTest Global Lock"};
    458 
    459  mozilla::Mutex lock_ MOZ_UNANNOTATED{"PortsTest Lock"};
    460  std::map<NodeName, TestNode*> nodes_;
    461 };
    462 
    463 }  // namespace
    464 
    465 TEST_F(PortsTest, Basic1) {
    466  TestNode node0(0);
    467  AddNode(&node0);
    468 
    469  TestNode node1(1);
    470  AddNode(&node1);
    471 
    472  PortRef x0, x1;
    473  CreatePortPair(&node0, &x0, &node1, &x1);
    474 
    475  PortRef a0, a1;
    476  EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
    477  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1));
    478  EXPECT_EQ(OK, node0.node().ClosePort(a0));
    479 
    480  EXPECT_EQ(OK, node0.node().ClosePort(x0));
    481  EXPECT_EQ(OK, node1.node().ClosePort(x1));
    482 
    483  WaitForIdle();
    484 
    485  EXPECT_TRUE(node0.node().CanShutdownCleanly());
    486  EXPECT_TRUE(node1.node().CanShutdownCleanly());
    487 }
    488 
    489 TEST_F(PortsTest, Basic2) {
    490  TestNode node0(0);
    491  AddNode(&node0);
    492 
    493  TestNode node1(1);
    494  AddNode(&node1);
    495 
    496  PortRef x0, x1;
    497  CreatePortPair(&node0, &x0, &node1, &x1);
    498 
    499  PortRef b0, b1;
    500  EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1));
    501  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", b1));
    502  EXPECT_EQ(OK, node0.SendStringMessage(b0, "hello again"));
    503 
    504  EXPECT_EQ(OK, node0.node().ClosePort(b0));
    505 
    506  EXPECT_EQ(OK, node0.node().ClosePort(x0));
    507  EXPECT_EQ(OK, node1.node().ClosePort(x1));
    508 
    509  WaitForIdle();
    510 
    511  EXPECT_TRUE(node0.node().CanShutdownCleanly());
    512  EXPECT_TRUE(node1.node().CanShutdownCleanly());
    513 }
    514 
    515 TEST_F(PortsTest, Basic3) {
    516  TestNode node0(0);
    517  AddNode(&node0);
    518 
    519  TestNode node1(1);
    520  AddNode(&node1);
    521 
    522  PortRef x0, x1;
    523  CreatePortPair(&node0, &x0, &node1, &x1);
    524 
    525  PortRef a0, a1;
    526  EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
    527 
    528  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1));
    529  EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello again"));
    530 
    531  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a0));
    532 
    533  PortRef b0, b1;
    534  EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1));
    535  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "bar", b1));
    536  EXPECT_EQ(OK, node0.SendStringMessage(b0, "baz"));
    537 
    538  EXPECT_EQ(OK, node0.node().ClosePort(b0));
    539 
    540  EXPECT_EQ(OK, node0.node().ClosePort(x0));
    541  EXPECT_EQ(OK, node1.node().ClosePort(x1));
    542 
    543  WaitForIdle();
    544 
    545  EXPECT_TRUE(node0.node().CanShutdownCleanly());
    546  EXPECT_TRUE(node1.node().CanShutdownCleanly());
    547 }
    548 
    549 TEST_F(PortsTest, LostConnectionToNode1) {
    550  TestNode node0(0);
    551  AddNode(&node0);
    552 
    553  TestNode node1(1);
    554  AddNode(&node1);
    555  node1.set_drop_messages(true);
    556 
    557  PortRef x0, x1;
    558  CreatePortPair(&node0, &x0, &node1, &x1);
    559 
    560  // Transfer a port to node1 and simulate a lost connection to node1.
    561 
    562  PortRef a0, a1;
    563  EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
    564  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a1));
    565 
    566  WaitForIdle();
    567 
    568  RemoveNode(&node1);
    569 
    570  WaitForIdle();
    571 
    572  EXPECT_EQ(OK, node0.node().ClosePort(a0));
    573  EXPECT_EQ(OK, node0.node().ClosePort(x0));
    574  EXPECT_EQ(OK, node1.node().ClosePort(x1));
    575 
    576  WaitForIdle();
    577 
    578  EXPECT_TRUE(node0.node().CanShutdownCleanly());
    579  EXPECT_TRUE(node1.node().CanShutdownCleanly());
    580 }
    581 
    582 TEST_F(PortsTest, LostConnectionToNode2) {
    583  TestNode node0(0);
    584  AddNode(&node0);
    585 
    586  TestNode node1(1);
    587  AddNode(&node1);
    588 
    589  PortRef x0, x1;
    590  CreatePortPair(&node0, &x0, &node1, &x1);
    591 
    592  PortRef a0, a1;
    593  EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
    594  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "take a1", a1));
    595 
    596  WaitForIdle();
    597 
    598  node1.set_drop_messages(true);
    599 
    600  RemoveNode(&node1);
    601 
    602  WaitForIdle();
    603 
    604  // a0 should have eventually detected peer closure after node loss.
    605  ScopedMessage message;
    606  EXPECT_EQ(ERROR_PORT_PEER_CLOSED,
    607            node0.node().GetMessage(a0, &message, nullptr));
    608  EXPECT_FALSE(message);
    609 
    610  EXPECT_EQ(OK, node0.node().ClosePort(a0));
    611 
    612  EXPECT_EQ(OK, node0.node().ClosePort(x0));
    613 
    614  EXPECT_EQ(OK, node1.node().GetMessage(x1, &message, nullptr));
    615  EXPECT_TRUE(message);
    616  node1.ClosePortsInEvent(message.get());
    617 
    618  EXPECT_EQ(OK, node1.node().ClosePort(x1));
    619 
    620  WaitForIdle();
    621 
    622  EXPECT_TRUE(node0.node().CanShutdownCleanly());
    623  EXPECT_TRUE(node1.node().CanShutdownCleanly());
    624 }
    625 
    626 TEST_F(PortsTest, LostConnectionToNodeWithSecondaryProxy) {
    627  // Tests that a proxy gets cleaned up when its indirect peer lives on a lost
    628  // node.
    629 
    630  TestNode node0(0);
    631  AddNode(&node0);
    632 
    633  TestNode node1(1);
    634  AddNode(&node1);
    635 
    636  TestNode node2(2);
    637  AddNode(&node2);
    638 
    639  // Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2.
    640  PortRef A, B, C, D;
    641  CreatePortPair(&node0, &A, &node1, &B);
    642  CreatePortPair(&node1, &C, &node2, &D);
    643 
    644  // Create E-F and send F over A to node 1.
    645  PortRef E, F;
    646  EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));
    647  EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", F));
    648 
    649  WaitForIdle();
    650 
    651  ScopedMessage message;
    652  ASSERT_TRUE(node1.ReadMessage(B, &message));
    653  ASSERT_EQ(1u, message->num_ports());
    654 
    655  EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &F));
    656 
    657  // Send F over C to node 2 and then simulate node 2 loss from node 1. Node 1
    658  // will trivially become aware of the loss, and this test verifies that the
    659  // port A on node 0 will eventually also become aware of it.
    660 
    661  // Make sure node2 stops processing events when it encounters an ObserveProxy.
    662  node2.BlockOnEvent(Event::Type::kObserveProxy);
    663 
    664  EXPECT_EQ(OK, node1.SendStringMessageWithPort(C, ".", F));
    665  WaitForIdle();
    666 
    667  // Simulate node 1 and 2 disconnecting.
    668  EXPECT_EQ(OK, node1.node().LostConnectionToNode(node2.name()));
    669 
    670  // Let node2 continue processing events and wait for everyone to go idle.
    671  node2.Unblock();
    672  WaitForIdle();
    673 
    674  // Port F should be gone.
    675  EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(F.name(), &F));
    676 
    677  // Port E should have detected peer closure despite the fact that there is
    678  // no longer a continuous route from F to E over which the event could travel.
    679  PortStatus status{};
    680  EXPECT_EQ(OK, node0.node().GetStatus(E, &status));
    681  EXPECT_TRUE(status.peer_closed);
    682 
    683  EXPECT_EQ(OK, node0.node().ClosePort(A));
    684  EXPECT_EQ(OK, node1.node().ClosePort(B));
    685  EXPECT_EQ(OK, node1.node().ClosePort(C));
    686  EXPECT_EQ(OK, node0.node().ClosePort(E));
    687 
    688  WaitForIdle();
    689 
    690  EXPECT_TRUE(node0.node().CanShutdownCleanly());
    691  EXPECT_TRUE(node1.node().CanShutdownCleanly());
    692 }
    693 
    694 TEST_F(PortsTest, LostConnectionToNodeAfterSendingObserveProxy) {
    695  // Tests that a proxy gets cleaned up after a node disconnect if the
    696  // previous port already received the ObserveProxy event.
    697 
    698  TestNode node0(0);
    699  AddNode(&node0);
    700 
    701  TestNode node1(1);
    702  AddNode(&node1);
    703 
    704  TestNode node2(2);
    705  AddNode(&node2);
    706 
    707  // Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2.
    708  PortRef A, B, C, D;
    709  CreatePortPair(&node0, &A, &node1, &B);
    710  CreatePortPair(&node1, &C, &node2, &D);
    711 
    712  // Create E-F and send F over A to node 1.
    713  PortRef E, F;
    714  EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));
    715  EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", F));
    716 
    717  WaitForIdle();
    718 
    719  ScopedMessage message;
    720  ASSERT_TRUE(node1.ReadMessage(B, &message));
    721  ASSERT_EQ(1u, message->num_ports());
    722 
    723  EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &F));
    724 
    725  // Send F over C to node 2 and then simulate node 2 loss from node 1 after
    726  // node 0 received the ObserveProxy event. Node 1 needs to clean up the
    727  // closed proxy while the node 0 to node 2 connection is still intact.
    728  node0.BlockOnEvent(Event::Type::kObserveProxy);
    729 
    730  EXPECT_EQ(OK, node1.SendStringMessageWithPort(C, ".", F));
    731  WaitForIdle();
    732 
    733  // Simulate node 1 and 2 disconnecting.
    734  EXPECT_EQ(OK, node1.node().LostConnectionToNode(node2.name()));
    735 
    736  // Let node2 continue processing events and wait for everyone to go idle.
    737  node0.Unblock();
    738  WaitForIdle();
    739 
    740  // Port F should be gone.
    741  EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(F.name(), &F));
    742 
    743  EXPECT_EQ(OK, node0.node().ClosePort(A));
    744  EXPECT_EQ(OK, node1.node().ClosePort(B));
    745  EXPECT_EQ(OK, node1.node().ClosePort(C));
    746  EXPECT_EQ(OK, node0.node().ClosePort(E));
    747 
    748  WaitForIdle();
    749 
    750  EXPECT_TRUE(node0.node().CanShutdownCleanly());
    751  EXPECT_TRUE(node1.node().CanShutdownCleanly());
    752 }
    753 
    754 TEST_F(PortsTest, LostConnectionToNodeWithLocalProxy) {
    755  // Tests that a proxy gets cleaned up when its direct peer lives on a lost
    756  // node and it's predecessor lives on the same node.
    757 
    758  TestNode node0(0);
    759  AddNode(&node0);
    760 
    761  TestNode node1(1);
    762  AddNode(&node1);
    763 
    764  PortRef A, B;
    765  CreatePortPair(&node0, &A, &node1, &B);
    766 
    767  PortRef C, D;
    768  EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D));
    769 
    770  // Send D but block node0 on an ObserveProxy event.
    771  node0.BlockOnEvent(Event::Type::kObserveProxy);
    772  EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", D));
    773 
    774  // node0 won't collapse the proxy but node1 will receive the message before
    775  // going idle.
    776  WaitForIdle();
    777 
    778  ScopedMessage message;
    779  ASSERT_TRUE(node1.ReadMessage(B, &message));
    780  ASSERT_EQ(1u, message->num_ports());
    781  PortRef E;
    782  EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &E));
    783 
    784  RemoveNode(&node1);
    785 
    786  node0.Unblock();
    787  WaitForIdle();
    788 
    789  // Port C should have detected peer closure.
    790  PortStatus status{};
    791  EXPECT_EQ(OK, node0.node().GetStatus(C, &status));
    792  EXPECT_TRUE(status.peer_closed);
    793 
    794  EXPECT_EQ(OK, node0.node().ClosePort(A));
    795  EXPECT_EQ(OK, node1.node().ClosePort(B));
    796  EXPECT_EQ(OK, node0.node().ClosePort(C));
    797  EXPECT_EQ(OK, node1.node().ClosePort(E));
    798 
    799  EXPECT_TRUE(node0.node().CanShutdownCleanly());
    800  EXPECT_TRUE(node1.node().CanShutdownCleanly());
    801 }
    802 
    803 TEST_F(PortsTest, GetMessage1) {
    804  TestNode node(0);
    805  AddNode(&node);
    806 
    807  PortRef a0, a1;
    808  EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
    809 
    810  ScopedMessage message;
    811  EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
    812  EXPECT_FALSE(message);
    813 
    814  EXPECT_EQ(OK, node.node().ClosePort(a1));
    815 
    816  WaitForIdle();
    817 
    818  EXPECT_EQ(ERROR_PORT_PEER_CLOSED,
    819            node.node().GetMessage(a0, &message, nullptr));
    820  EXPECT_FALSE(message);
    821 
    822  EXPECT_EQ(OK, node.node().ClosePort(a0));
    823 
    824  WaitForIdle();
    825 
    826  EXPECT_TRUE(node.node().CanShutdownCleanly());
    827 }
    828 
    829 TEST_F(PortsTest, GetMessage2) {
    830  TestNode node(0);
    831  AddNode(&node);
    832 
    833  PortRef a0, a1;
    834  EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
    835 
    836  EXPECT_EQ(OK, node.SendStringMessage(a1, "1"));
    837 
    838  ScopedMessage message;
    839  EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
    840 
    841  ASSERT_TRUE(message);
    842  EXPECT_TRUE(MessageEquals(message, "1"));
    843 
    844  EXPECT_EQ(OK, node.node().ClosePort(a0));
    845  EXPECT_EQ(OK, node.node().ClosePort(a1));
    846 
    847  EXPECT_TRUE(node.node().CanShutdownCleanly());
    848 }
    849 
    850 TEST_F(PortsTest, GetMessage3) {
    851  TestNode node(0);
    852  AddNode(&node);
    853 
    854  PortRef a0, a1;
    855  EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
    856 
    857  const char* kStrings[] = {"1", "2", "3"};
    858 
    859  for (auto& kString : kStrings) {
    860    EXPECT_EQ(OK, node.SendStringMessage(a1, kString));
    861  }
    862 
    863  ScopedMessage message;
    864  for (auto& kString : kStrings) {
    865    EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
    866    ASSERT_TRUE(message);
    867    EXPECT_TRUE(MessageEquals(message, kString));
    868  }
    869 
    870  EXPECT_EQ(OK, node.node().ClosePort(a0));
    871  EXPECT_EQ(OK, node.node().ClosePort(a1));
    872 
    873  EXPECT_TRUE(node.node().CanShutdownCleanly());
    874 }
    875 
    876 TEST_F(PortsTest, Delegation1) {
    877  TestNode node0(0);
    878  AddNode(&node0);
    879 
    880  TestNode node1(1);
    881  AddNode(&node1);
    882 
    883  PortRef x0, x1;
    884  CreatePortPair(&node0, &x0, &node1, &x1);
    885 
    886  // In this test, we send a message to a port that has been moved.
    887 
    888  PortRef a0, a1;
    889  EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
    890  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "a1", a1));
    891  WaitForIdle();
    892 
    893  ScopedMessage message;
    894  ASSERT_TRUE(node1.ReadMessage(x1, &message));
    895  ASSERT_EQ(1u, message->num_ports());
    896  EXPECT_TRUE(MessageEquals(message, "a1"));
    897 
    898  // This is "a1" from the point of view of node1.
    899  PortName a2_name = message->ports()[0];
    900  EXPECT_EQ(OK, node1.SendStringMessageWithPort(x1, "a2", a2_name));
    901  EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello"));
    902 
    903  WaitForIdle();
    904 
    905  ASSERT_TRUE(node0.ReadMessage(x0, &message));
    906  ASSERT_EQ(1u, message->num_ports());
    907  EXPECT_TRUE(MessageEquals(message, "a2"));
    908 
    909  // This is "a2" from the point of view of node1.
    910  PortName a3_name = message->ports()[0];
    911 
    912  PortRef a3;
    913  EXPECT_EQ(OK, node0.node().GetPort(a3_name, &a3));
    914 
    915  ASSERT_TRUE(node0.ReadMessage(a3, &message));
    916  EXPECT_EQ(0u, message->num_ports());
    917  EXPECT_TRUE(MessageEquals(message, "hello"));
    918 
    919  EXPECT_EQ(OK, node0.node().ClosePort(a0));
    920  EXPECT_EQ(OK, node0.node().ClosePort(a3));
    921 
    922  EXPECT_EQ(OK, node0.node().ClosePort(x0));
    923  EXPECT_EQ(OK, node1.node().ClosePort(x1));
    924 
    925  EXPECT_TRUE(node0.node().CanShutdownCleanly());
    926  EXPECT_TRUE(node1.node().CanShutdownCleanly());
    927 }
    928 
    929 TEST_F(PortsTest, Delegation2) {
    930  TestNode node0(0);
    931  AddNode(&node0);
    932 
    933  TestNode node1(1);
    934  AddNode(&node1);
    935 
    936  for (int i = 0; i < 100; ++i) {
    937    // Setup pipe a<->b between node0 and node1.
    938    PortRef A, B;
    939    CreatePortPair(&node0, &A, &node1, &B);
    940 
    941    PortRef C, D;
    942    EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D));
    943 
    944    PortRef E, F;
    945    EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));
    946 
    947    node1.set_save_messages(true);
    948 
    949    // Pass D over A to B.
    950    EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, "1", D));
    951 
    952    // Pass F over C to D.
    953    EXPECT_EQ(OK, node0.SendStringMessageWithPort(C, "1", F));
    954 
    955    // This message should find its way to node1.
    956    EXPECT_EQ(OK, node0.SendStringMessage(E, "hello"));
    957 
    958    WaitForIdle();
    959 
    960    EXPECT_EQ(OK, node0.node().ClosePort(C));
    961    EXPECT_EQ(OK, node0.node().ClosePort(E));
    962 
    963    EXPECT_EQ(OK, node0.node().ClosePort(A));
    964    EXPECT_EQ(OK, node1.node().ClosePort(B));
    965 
    966    bool got_hello = false;
    967    ScopedMessage message;
    968    while (node1.GetSavedMessage(&message)) {
    969      node1.ClosePortsInEvent(message.get());
    970      if (MessageEquals(message, "hello")) {
    971        got_hello = true;
    972        break;
    973      }
    974    }
    975 
    976    EXPECT_TRUE(got_hello);
    977 
    978    WaitForIdle();  // Because closing ports may have generated tasks.
    979  }
    980 
    981  EXPECT_TRUE(node0.node().CanShutdownCleanly());
    982  EXPECT_TRUE(node1.node().CanShutdownCleanly());
    983 }
    984 
    985 TEST_F(PortsTest, SendUninitialized) {
    986  TestNode node(0);
    987  AddNode(&node);
    988 
    989  PortRef x0;
    990  EXPECT_EQ(OK, node.node().CreateUninitializedPort(&x0));
    991  EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED, node.SendStringMessage(x0, "oops"));
    992  EXPECT_EQ(OK, node.node().ClosePort(x0));
    993  EXPECT_TRUE(node.node().CanShutdownCleanly());
    994 }
    995 
    996 TEST_F(PortsTest, SendFailure) {
    997  TestNode node(0);
    998  AddNode(&node);
    999 
   1000  node.set_save_messages(true);
   1001 
   1002  PortRef A, B;
   1003  EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
   1004 
   1005  // Try to send A over itself.
   1006 
   1007  EXPECT_EQ(ERROR_PORT_CANNOT_SEND_SELF,
   1008            node.SendStringMessageWithPort(A, "oops", A));
   1009 
   1010  // Try to send B over A.
   1011 
   1012  EXPECT_EQ(ERROR_PORT_CANNOT_SEND_PEER,
   1013            node.SendStringMessageWithPort(A, "nope", B));
   1014 
   1015  // B should be closed immediately.
   1016  EXPECT_EQ(ERROR_PORT_UNKNOWN, node.node().GetPort(B.name(), &B));
   1017 
   1018  WaitForIdle();
   1019 
   1020  // There should have been no messages accepted.
   1021  ScopedMessage message;
   1022  EXPECT_FALSE(node.GetSavedMessage(&message));
   1023 
   1024  EXPECT_EQ(OK, node.node().ClosePort(A));
   1025 
   1026  WaitForIdle();
   1027 
   1028  EXPECT_TRUE(node.node().CanShutdownCleanly());
   1029 }
   1030 
   1031 TEST_F(PortsTest, DontLeakUnreceivedPorts) {
   1032  TestNode node(0);
   1033  AddNode(&node);
   1034 
   1035  PortRef A, B, C, D;
   1036  EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
   1037  EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D));
   1038 
   1039  EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D));
   1040 
   1041  EXPECT_EQ(OK, node.node().ClosePort(C));
   1042  EXPECT_EQ(OK, node.node().ClosePort(A));
   1043  EXPECT_EQ(OK, node.node().ClosePort(B));
   1044 
   1045  WaitForIdle();
   1046 
   1047  EXPECT_TRUE(node.node().CanShutdownCleanly());
   1048 }
   1049 
   1050 TEST_F(PortsTest, AllowShutdownWithLocalPortsOpen) {
   1051  TestNode node(0);
   1052  AddNode(&node);
   1053 
   1054  PortRef A, B, C, D;
   1055  EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
   1056  EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D));
   1057 
   1058  EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D));
   1059 
   1060  ScopedMessage message;
   1061  EXPECT_TRUE(node.ReadMessage(B, &message));
   1062  ASSERT_EQ(1u, message->num_ports());
   1063  EXPECT_TRUE(MessageEquals(message, "foo"));
   1064  PortRef E;
   1065  ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
   1066 
   1067  EXPECT_TRUE(
   1068      node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
   1069 
   1070  WaitForIdle();
   1071 
   1072  EXPECT_TRUE(
   1073      node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
   1074  EXPECT_FALSE(node.node().CanShutdownCleanly());
   1075 
   1076  EXPECT_EQ(OK, node.node().ClosePort(A));
   1077  EXPECT_EQ(OK, node.node().ClosePort(B));
   1078  EXPECT_EQ(OK, node.node().ClosePort(C));
   1079  EXPECT_EQ(OK, node.node().ClosePort(E));
   1080 
   1081  WaitForIdle();
   1082 
   1083  EXPECT_TRUE(node.node().CanShutdownCleanly());
   1084 }
   1085 
   1086 TEST_F(PortsTest, ProxyCollapse1) {
   1087  TestNode node(0);
   1088  AddNode(&node);
   1089 
   1090  PortRef A, B;
   1091  EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
   1092 
   1093  PortRef X, Y;
   1094  EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
   1095 
   1096  ScopedMessage message;
   1097 
   1098  // Send B and receive it as C.
   1099  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
   1100  ASSERT_TRUE(node.ReadMessage(Y, &message));
   1101  ASSERT_EQ(1u, message->num_ports());
   1102  PortRef C;
   1103  ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
   1104 
   1105  // Send C and receive it as D.
   1106  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C));
   1107  ASSERT_TRUE(node.ReadMessage(Y, &message));
   1108  ASSERT_EQ(1u, message->num_ports());
   1109  PortRef D;
   1110  ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D));
   1111 
   1112  // Send D and receive it as E.
   1113  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", D));
   1114  ASSERT_TRUE(node.ReadMessage(Y, &message));
   1115  ASSERT_EQ(1u, message->num_ports());
   1116  PortRef E;
   1117  ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
   1118 
   1119  EXPECT_EQ(OK, node.node().ClosePort(X));
   1120  EXPECT_EQ(OK, node.node().ClosePort(Y));
   1121 
   1122  EXPECT_EQ(OK, node.node().ClosePort(A));
   1123  EXPECT_EQ(OK, node.node().ClosePort(E));
   1124 
   1125  // The node should not idle until all proxies are collapsed.
   1126  WaitForIdle();
   1127 
   1128  EXPECT_TRUE(node.node().CanShutdownCleanly());
   1129 }
   1130 
   1131 TEST_F(PortsTest, ProxyCollapse2) {
   1132  TestNode node(0);
   1133  AddNode(&node);
   1134 
   1135  PortRef A, B;
   1136  EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
   1137 
   1138  PortRef X, Y;
   1139  EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
   1140 
   1141  ScopedMessage message;
   1142 
   1143  // Send B and A to create proxies in each direction.
   1144  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
   1145  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A));
   1146 
   1147  EXPECT_EQ(OK, node.node().ClosePort(X));
   1148  EXPECT_EQ(OK, node.node().ClosePort(Y));
   1149 
   1150  // At this point we have a scenario with:
   1151  //
   1152  // D -> [B] -> C -> [A]
   1153  //
   1154  // Ensure that the proxies can collapse. The sent ports will be closed
   1155  // eventually as a result of Y's closure.
   1156 
   1157  WaitForIdle();
   1158 
   1159  EXPECT_TRUE(node.node().CanShutdownCleanly());
   1160 }
   1161 
   1162 TEST_F(PortsTest, SendWithClosedPeer) {
   1163  // This tests that if a port is sent when its peer is already known to be
   1164  // closed, the newly created port will be aware of that peer closure, and the
   1165  // proxy will eventually collapse.
   1166 
   1167  TestNode node(0);
   1168  AddNode(&node);
   1169 
   1170  // Send a message from A to B, then close A.
   1171  PortRef A, B;
   1172  EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
   1173  EXPECT_EQ(OK, node.SendStringMessage(A, "hey"));
   1174  EXPECT_EQ(OK, node.node().ClosePort(A));
   1175 
   1176  // Now send B over X-Y as new port C.
   1177  PortRef X, Y;
   1178  EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
   1179  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
   1180  ScopedMessage message;
   1181  ASSERT_TRUE(node.ReadMessage(Y, &message));
   1182  ASSERT_EQ(1u, message->num_ports());
   1183  PortRef C;
   1184  ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
   1185 
   1186  EXPECT_EQ(OK, node.node().ClosePort(X));
   1187  EXPECT_EQ(OK, node.node().ClosePort(Y));
   1188 
   1189  WaitForIdle();
   1190 
   1191  // C should have received the message originally sent to B, and it should also
   1192  // be aware of A's closure.
   1193 
   1194  ASSERT_TRUE(node.ReadMessage(C, &message));
   1195  EXPECT_TRUE(MessageEquals(message, "hey"));
   1196 
   1197  PortStatus status{};
   1198  EXPECT_EQ(OK, node.node().GetStatus(C, &status));
   1199  EXPECT_FALSE(status.receiving_messages);
   1200  EXPECT_FALSE(status.has_messages);
   1201  EXPECT_TRUE(status.peer_closed);
   1202 
   1203  node.node().ClosePort(C);
   1204 
   1205  WaitForIdle();
   1206 
   1207  EXPECT_TRUE(node.node().CanShutdownCleanly());
   1208 }
   1209 
   1210 TEST_F(PortsTest, SendWithClosedPeerSent) {
   1211  // This tests that if a port is closed while some number of proxies are still
   1212  // routing messages (directly or indirectly) to it, that the peer port is
   1213  // eventually notified of the closure, and the dead-end proxies will
   1214  // eventually be removed.
   1215 
   1216  TestNode node(0);
   1217  AddNode(&node);
   1218 
   1219  PortRef X, Y;
   1220  EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
   1221 
   1222  PortRef A, B;
   1223  EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
   1224 
   1225  ScopedMessage message;
   1226 
   1227  // Send A as new port C.
   1228  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A));
   1229 
   1230  ASSERT_TRUE(node.ReadMessage(Y, &message));
   1231  ASSERT_EQ(1u, message->num_ports());
   1232  PortRef C;
   1233  ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
   1234 
   1235  // Send C as new port D.
   1236  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C));
   1237 
   1238  ASSERT_TRUE(node.ReadMessage(Y, &message));
   1239  ASSERT_EQ(1u, message->num_ports());
   1240  PortRef D;
   1241  ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D));
   1242 
   1243  // Send a message to B through D, then close D.
   1244  EXPECT_EQ(OK, node.SendStringMessage(D, "hey"));
   1245  EXPECT_EQ(OK, node.node().ClosePort(D));
   1246 
   1247  // Now send B as new port E.
   1248 
   1249  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
   1250  EXPECT_EQ(OK, node.node().ClosePort(X));
   1251 
   1252  ASSERT_TRUE(node.ReadMessage(Y, &message));
   1253  ASSERT_EQ(1u, message->num_ports());
   1254  PortRef E;
   1255  ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
   1256 
   1257  EXPECT_EQ(OK, node.node().ClosePort(Y));
   1258 
   1259  WaitForIdle();
   1260 
   1261  // E should receive the message originally sent to B, and it should also be
   1262  // aware of D's closure.
   1263 
   1264  ASSERT_TRUE(node.ReadMessage(E, &message));
   1265  EXPECT_TRUE(MessageEquals(message, "hey"));
   1266 
   1267  PortStatus status{};
   1268  EXPECT_EQ(OK, node.node().GetStatus(E, &status));
   1269  EXPECT_FALSE(status.receiving_messages);
   1270  EXPECT_FALSE(status.has_messages);
   1271  EXPECT_TRUE(status.peer_closed);
   1272 
   1273  EXPECT_EQ(OK, node.node().ClosePort(E));
   1274 
   1275  WaitForIdle();
   1276 
   1277  EXPECT_TRUE(node.node().CanShutdownCleanly());
   1278 }
   1279 
   1280 TEST_F(PortsTest, MergePorts) {
   1281  TestNode node0(0);
   1282  AddNode(&node0);
   1283 
   1284  TestNode node1(1);
   1285  AddNode(&node1);
   1286 
   1287  // Setup two independent port pairs, A-B on node0 and C-D on node1.
   1288  PortRef A, B, C, D;
   1289  EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
   1290  EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
   1291 
   1292  // Write a message on A.
   1293  EXPECT_EQ(OK, node0.SendStringMessage(A, "hey"));
   1294 
   1295  // Initiate a merge between B and C.
   1296  node1.AllowPortMerge(C);
   1297  EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
   1298 
   1299  WaitForIdle();
   1300 
   1301  // Expect all proxies to be gone once idle.
   1302  EXPECT_TRUE(
   1303      node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
   1304  EXPECT_TRUE(
   1305      node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
   1306 
   1307  // Expect D to have received the message sent on A.
   1308  ScopedMessage message;
   1309  ASSERT_TRUE(node1.ReadMessage(D, &message));
   1310  EXPECT_TRUE(MessageEquals(message, "hey"));
   1311 
   1312  EXPECT_EQ(OK, node0.node().ClosePort(A));
   1313  EXPECT_EQ(OK, node1.node().ClosePort(D));
   1314 
   1315  // No more ports should be open.
   1316  EXPECT_TRUE(node0.node().CanShutdownCleanly());
   1317  EXPECT_TRUE(node1.node().CanShutdownCleanly());
   1318 }
   1319 
   1320 TEST_F(PortsTest, MergePortWithClosedPeer1) {
   1321  // This tests that the right thing happens when initiating a merge on a port
   1322  // whose peer has already been closed.
   1323 
   1324  TestNode node0(0);
   1325  AddNode(&node0);
   1326 
   1327  TestNode node1(1);
   1328  AddNode(&node1);
   1329 
   1330  // Setup two independent port pairs, A-B on node0 and C-D on node1.
   1331  PortRef A, B, C, D;
   1332  EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
   1333  EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
   1334 
   1335  // Write a message on A.
   1336  EXPECT_EQ(OK, node0.SendStringMessage(A, "hey"));
   1337 
   1338  // Close A.
   1339  EXPECT_EQ(OK, node0.node().ClosePort(A));
   1340 
   1341  // Initiate a merge between B and C.
   1342  node1.AllowPortMerge(C);
   1343  EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
   1344 
   1345  WaitForIdle();
   1346 
   1347  // Expect all proxies to be gone once idle. node0 should have no ports since
   1348  // A was explicitly closed.
   1349  EXPECT_TRUE(node0.node().CanShutdownCleanly());
   1350  EXPECT_TRUE(
   1351      node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
   1352 
   1353  // Expect D to have received the message sent on A.
   1354  ScopedMessage message;
   1355  ASSERT_TRUE(node1.ReadMessage(D, &message));
   1356  EXPECT_TRUE(MessageEquals(message, "hey"));
   1357 
   1358  EXPECT_EQ(OK, node1.node().ClosePort(D));
   1359 
   1360  // No more ports should be open.
   1361  EXPECT_TRUE(node0.node().CanShutdownCleanly());
   1362  EXPECT_TRUE(node1.node().CanShutdownCleanly());
   1363 }
   1364 
   1365 TEST_F(PortsTest, MergePortWithClosedPeer2) {
   1366  // This tests that the right thing happens when merging into a port whose peer
   1367  // has already been closed.
   1368 
   1369  TestNode node0(0);
   1370  AddNode(&node0);
   1371 
   1372  TestNode node1(1);
   1373  AddNode(&node1);
   1374 
   1375  // Setup two independent port pairs, A-B on node0 and C-D on node1.
   1376  PortRef A, B, C, D;
   1377  EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
   1378  EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
   1379 
   1380  // Write a message on D and close it.
   1381  EXPECT_EQ(OK, node1.SendStringMessage(D, "hey"));
   1382  EXPECT_EQ(OK, node1.node().ClosePort(D));
   1383 
   1384  // Initiate a merge between B and C.
   1385  node1.AllowPortMerge(C);
   1386  EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
   1387 
   1388  WaitForIdle();
   1389 
   1390  // Expect all proxies to be gone once idle. node1 should have no ports since
   1391  // D was explicitly closed.
   1392  EXPECT_TRUE(
   1393      node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
   1394  EXPECT_TRUE(node1.node().CanShutdownCleanly());
   1395 
   1396  // Expect A to have received the message sent on D.
   1397  ScopedMessage message;
   1398  ASSERT_TRUE(node0.ReadMessage(A, &message));
   1399  EXPECT_TRUE(MessageEquals(message, "hey"));
   1400 
   1401  EXPECT_EQ(OK, node0.node().ClosePort(A));
   1402 
   1403  // No more ports should be open.
   1404  EXPECT_TRUE(node0.node().CanShutdownCleanly());
   1405  EXPECT_TRUE(node1.node().CanShutdownCleanly());
   1406 }
   1407 
   1408 TEST_F(PortsTest, MergePortsWithClosedPeers) {
   1409  // This tests that no residual ports are left behind if two ports are merged
   1410  // when both of their peers have been closed.
   1411 
   1412  TestNode node0(0);
   1413  AddNode(&node0);
   1414 
   1415  TestNode node1(1);
   1416  AddNode(&node1);
   1417 
   1418  // Setup two independent port pairs, A-B on node0 and C-D on node1.
   1419  PortRef A, B, C, D;
   1420  EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
   1421  EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
   1422 
   1423  // Close A and D.
   1424  EXPECT_EQ(OK, node0.node().ClosePort(A));
   1425  EXPECT_EQ(OK, node1.node().ClosePort(D));
   1426 
   1427  WaitForIdle();
   1428 
   1429  // Initiate a merge between B and C.
   1430  node1.AllowPortMerge(C);
   1431  EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
   1432 
   1433  WaitForIdle();
   1434 
   1435  // Expect everything to have gone away.
   1436  EXPECT_TRUE(node0.node().CanShutdownCleanly());
   1437  EXPECT_TRUE(node1.node().CanShutdownCleanly());
   1438 }
   1439 
   1440 TEST_F(PortsTest, MergePortsWithMovedPeers) {
   1441  // This tests that ports can be merged successfully even if their peers are
   1442  // moved around.
   1443 
   1444  TestNode node0(0);
   1445  AddNode(&node0);
   1446 
   1447  TestNode node1(1);
   1448  AddNode(&node1);
   1449 
   1450  // Setup two independent port pairs, A-B on node0 and C-D on node1.
   1451  PortRef A, B, C, D;
   1452  EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
   1453  EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
   1454 
   1455  // Set up another pair X-Y for moving ports on node0.
   1456  PortRef X, Y;
   1457  EXPECT_EQ(OK, node0.node().CreatePortPair(&X, &Y));
   1458 
   1459  ScopedMessage message;
   1460 
   1461  // Move A to new port E.
   1462  EXPECT_EQ(OK, node0.SendStringMessageWithPort(X, "foo", A));
   1463  ASSERT_TRUE(node0.ReadMessage(Y, &message));
   1464  ASSERT_EQ(1u, message->num_ports());
   1465  PortRef E;
   1466  ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E));
   1467 
   1468  EXPECT_EQ(OK, node0.node().ClosePort(X));
   1469  EXPECT_EQ(OK, node0.node().ClosePort(Y));
   1470 
   1471  // Write messages on E and D.
   1472  EXPECT_EQ(OK, node0.SendStringMessage(E, "hey"));
   1473  EXPECT_EQ(OK, node1.SendStringMessage(D, "hi"));
   1474 
   1475  // Initiate a merge between B and C.
   1476  node1.AllowPortMerge(C);
   1477  EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
   1478 
   1479  WaitForIdle();
   1480 
   1481  // Expect to receive D's message on E and E's message on D.
   1482  ASSERT_TRUE(node0.ReadMessage(E, &message));
   1483  EXPECT_TRUE(MessageEquals(message, "hi"));
   1484  ASSERT_TRUE(node1.ReadMessage(D, &message));
   1485  EXPECT_TRUE(MessageEquals(message, "hey"));
   1486 
   1487  // Close E and D.
   1488  EXPECT_EQ(OK, node0.node().ClosePort(E));
   1489  EXPECT_EQ(OK, node1.node().ClosePort(D));
   1490 
   1491  WaitForIdle();
   1492 
   1493  // Expect everything to have gone away.
   1494  EXPECT_TRUE(node0.node().CanShutdownCleanly());
   1495  EXPECT_TRUE(node1.node().CanShutdownCleanly());
   1496 }
   1497 
   1498 TEST_F(PortsTest, MergePortsFailsGracefully) {
   1499  // This tests that the system remains in a well-defined state if something
   1500  // goes wrong during port merge.
   1501 
   1502  TestNode node0(0);
   1503  AddNode(&node0);
   1504 
   1505  TestNode node1(1);
   1506  AddNode(&node1);
   1507 
   1508  // Setup two independent port pairs, A-B on node0 and C-D on node1.
   1509  PortRef A, B, C, D;
   1510  EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
   1511  EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
   1512 
   1513  ScopedMessage message;
   1514  PortRef X, Y;
   1515  EXPECT_EQ(OK, node0.node().CreateUninitializedPort(&X));
   1516  EXPECT_EQ(OK, node1.node().CreateUninitializedPort(&Y));
   1517  EXPECT_EQ(OK, node0.node().InitializePort(X, node1.name(), Y.name(),
   1518                                            node1.name(), Y.name()));
   1519  EXPECT_EQ(OK, node1.node().InitializePort(Y, node0.name(), X.name(),
   1520                                            node0.name(), X.name()));
   1521 
   1522  // Block the merge from proceeding until we can do something stupid with port
   1523  // C. This avoids the test logic racing with async merge logic.
   1524  node1.BlockOnEvent(Event::Type::kMergePort);
   1525 
   1526  // Initiate the merge between B and C.
   1527  node1.AllowPortMerge(C);
   1528  EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
   1529 
   1530  // Move C to a new port E. This is not a sane use of Node's public API but
   1531  // is still hypothetically possible. It allows us to force a merge failure
   1532  // because C will be in an invalid state by the time the merge is processed.
   1533  // As a result, B should be closed.
   1534  EXPECT_EQ(OK, node1.SendStringMessageWithPort(Y, "foo", C));
   1535 
   1536  node1.Unblock();
   1537 
   1538  WaitForIdle();
   1539 
   1540  ASSERT_TRUE(node0.ReadMessage(X, &message));
   1541  ASSERT_EQ(1u, message->num_ports());
   1542  PortRef E;
   1543  ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E));
   1544 
   1545  EXPECT_EQ(OK, node0.node().ClosePort(X));
   1546  EXPECT_EQ(OK, node1.node().ClosePort(Y));
   1547 
   1548  WaitForIdle();
   1549 
   1550  // C goes away as a result of normal proxy removal. B should have been closed
   1551  // cleanly by the failed MergePorts.
   1552  EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(C.name(), &C));
   1553  EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.node().GetPort(B.name(), &B));
   1554 
   1555  // Close A, D, and E.
   1556  EXPECT_EQ(OK, node0.node().ClosePort(A));
   1557  EXPECT_EQ(OK, node1.node().ClosePort(D));
   1558  EXPECT_EQ(OK, node0.node().ClosePort(E));
   1559 
   1560  WaitForIdle();
   1561 
   1562  // Expect everything to have gone away.
   1563  EXPECT_TRUE(node0.node().CanShutdownCleanly());
   1564  EXPECT_TRUE(node1.node().CanShutdownCleanly());
   1565 }
   1566 
   1567 TEST_F(PortsTest, RemotePeerStatus) {
   1568  TestNode node0(0);
   1569  AddNode(&node0);
   1570 
   1571  TestNode node1(1);
   1572  AddNode(&node1);
   1573 
   1574  // Create a local port pair. Neither port should appear to have a remote peer.
   1575  PortRef a, b;
   1576  PortStatus status{};
   1577  node0.node().CreatePortPair(&a, &b);
   1578  ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
   1579  EXPECT_FALSE(status.peer_remote);
   1580  ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
   1581  EXPECT_FALSE(status.peer_remote);
   1582 
   1583  // Create a port pair spanning the two nodes. Both spanning ports should
   1584  // immediately appear to have a remote peer.
   1585  PortRef x0, x1;
   1586  CreatePortPair(&node0, &x0, &node1, &x1);
   1587 
   1588  ASSERT_EQ(OK, node0.node().GetStatus(x0, &status));
   1589  EXPECT_TRUE(status.peer_remote);
   1590  ASSERT_EQ(OK, node1.node().GetStatus(x1, &status));
   1591  EXPECT_TRUE(status.peer_remote);
   1592 
   1593  PortRef x2, x3;
   1594  CreatePortPair(&node0, &x2, &node1, &x3);
   1595 
   1596  // Transfer |b| to |node1| and |x1| to |node0|. i.e., make the local peers
   1597  // remote and the remote peers local.
   1598  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x2, "foo", b));
   1599  EXPECT_EQ(OK, node1.SendStringMessageWithPort(x3, "bar", x1));
   1600  WaitForIdle();
   1601 
   1602  ScopedMessage message;
   1603  ASSERT_TRUE(node0.ReadMessage(x2, &message));
   1604  ASSERT_EQ(1u, message->num_ports());
   1605  ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &x1));
   1606 
   1607  ASSERT_TRUE(node1.ReadMessage(x3, &message));
   1608  ASSERT_EQ(1u, message->num_ports());
   1609  ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &b));
   1610 
   1611  // Now x0-x1 should be local to node0 and a-b should span the nodes.
   1612  ASSERT_EQ(OK, node0.node().GetStatus(x0, &status));
   1613  EXPECT_FALSE(status.peer_remote);
   1614  ASSERT_EQ(OK, node0.node().GetStatus(x1, &status));
   1615  EXPECT_FALSE(status.peer_remote);
   1616  ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
   1617  EXPECT_TRUE(status.peer_remote);
   1618  ASSERT_EQ(OK, node1.node().GetStatus(b, &status));
   1619  EXPECT_TRUE(status.peer_remote);
   1620 
   1621  // And swap them back one more time.
   1622  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x2, "foo", x1));
   1623  EXPECT_EQ(OK, node1.SendStringMessageWithPort(x3, "bar", b));
   1624  WaitForIdle();
   1625 
   1626  ASSERT_TRUE(node0.ReadMessage(x2, &message));
   1627  ASSERT_EQ(1u, message->num_ports());
   1628  ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &b));
   1629 
   1630  ASSERT_TRUE(node1.ReadMessage(x3, &message));
   1631  ASSERT_EQ(1u, message->num_ports());
   1632  ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &x1));
   1633 
   1634  ASSERT_EQ(OK, node0.node().GetStatus(x0, &status));
   1635  EXPECT_TRUE(status.peer_remote);
   1636  ASSERT_EQ(OK, node1.node().GetStatus(x1, &status));
   1637  EXPECT_TRUE(status.peer_remote);
   1638  ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
   1639  EXPECT_FALSE(status.peer_remote);
   1640  ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
   1641  EXPECT_FALSE(status.peer_remote);
   1642 
   1643  EXPECT_EQ(OK, node0.node().ClosePort(x0));
   1644  EXPECT_EQ(OK, node1.node().ClosePort(x1));
   1645  EXPECT_EQ(OK, node0.node().ClosePort(x2));
   1646  EXPECT_EQ(OK, node1.node().ClosePort(x3));
   1647  EXPECT_EQ(OK, node0.node().ClosePort(a));
   1648  EXPECT_EQ(OK, node0.node().ClosePort(b));
   1649 
   1650  EXPECT_TRUE(node0.node().CanShutdownCleanly());
   1651  EXPECT_TRUE(node1.node().CanShutdownCleanly());
   1652 }
   1653 
   1654 TEST_F(PortsTest, RemotePeerStatusAfterLocalPortMerge) {
   1655  TestNode node0(0);
   1656  AddNode(&node0);
   1657 
   1658  TestNode node1(1);
   1659  AddNode(&node1);
   1660 
   1661  // Set up a-b on node0 and c-d spanning node0-node1.
   1662  PortRef a, b, c, d;
   1663  node0.node().CreatePortPair(&a, &b);
   1664  CreatePortPair(&node0, &c, &node1, &d);
   1665 
   1666  PortStatus status{};
   1667  ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
   1668  EXPECT_FALSE(status.peer_remote);
   1669  ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
   1670  EXPECT_FALSE(status.peer_remote);
   1671  ASSERT_EQ(OK, node0.node().GetStatus(c, &status));
   1672  EXPECT_TRUE(status.peer_remote);
   1673  ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
   1674  EXPECT_TRUE(status.peer_remote);
   1675 
   1676  EXPECT_EQ(OK, node0.node().MergeLocalPorts(b, c));
   1677  WaitForIdle();
   1678 
   1679  ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
   1680  EXPECT_TRUE(status.peer_remote);
   1681  ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
   1682  EXPECT_TRUE(status.peer_remote);
   1683 
   1684  EXPECT_EQ(OK, node0.node().ClosePort(a));
   1685  EXPECT_EQ(OK, node1.node().ClosePort(d));
   1686  EXPECT_TRUE(node0.node().CanShutdownCleanly());
   1687  EXPECT_TRUE(node1.node().CanShutdownCleanly());
   1688 }
   1689 
   1690 TEST_F(PortsTest, RemotePeerStatusAfterRemotePortMerge) {
   1691  TestNode node0(0);
   1692  AddNode(&node0);
   1693 
   1694  TestNode node1(1);
   1695  AddNode(&node1);
   1696 
   1697  // Set up a-b on node0 and c-d on node1.
   1698  PortRef a, b, c, d;
   1699  node0.node().CreatePortPair(&a, &b);
   1700  node1.node().CreatePortPair(&c, &d);
   1701 
   1702  PortStatus status{};
   1703  ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
   1704  EXPECT_FALSE(status.peer_remote);
   1705  ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
   1706  EXPECT_FALSE(status.peer_remote);
   1707  ASSERT_EQ(OK, node1.node().GetStatus(c, &status));
   1708  EXPECT_FALSE(status.peer_remote);
   1709  ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
   1710  EXPECT_FALSE(status.peer_remote);
   1711 
   1712  node1.AllowPortMerge(c);
   1713  EXPECT_EQ(OK, node0.node().MergePorts(b, node1.name(), c.name()));
   1714  WaitForIdle();
   1715 
   1716  ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
   1717  EXPECT_TRUE(status.peer_remote);
   1718  ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
   1719  EXPECT_TRUE(status.peer_remote);
   1720 
   1721  EXPECT_EQ(OK, node0.node().ClosePort(a));
   1722  EXPECT_EQ(OK, node1.node().ClosePort(d));
   1723  EXPECT_TRUE(node0.node().CanShutdownCleanly());
   1724  EXPECT_TRUE(node1.node().CanShutdownCleanly());
   1725 }
   1726 
   1727 TEST_F(PortsTest, RetransmitUserMessageEvents) {
   1728  // Ensures that user message events can be retransmitted properly.
   1729  TestNode node0(0);
   1730  AddNode(&node0);
   1731 
   1732  PortRef a, b;
   1733  node0.node().CreatePortPair(&a, &b);
   1734 
   1735  // Ping.
   1736  const char* kMessage = "hey";
   1737  ScopedMessage message;
   1738  EXPECT_EQ(OK, node0.SendStringMessage(a, kMessage));
   1739  ASSERT_TRUE(node0.ReadMessage(b, &message));
   1740  EXPECT_TRUE(MessageEquals(message, kMessage));
   1741 
   1742  // Pong.
   1743  EXPECT_EQ(OK, node0.node().SendUserMessage(b, std::move(message)));
   1744  EXPECT_FALSE(message);
   1745  ASSERT_TRUE(node0.ReadMessage(a, &message));
   1746  EXPECT_TRUE(MessageEquals(message, kMessage));
   1747 
   1748  // Ping again.
   1749  EXPECT_EQ(OK, node0.node().SendUserMessage(a, std::move(message)));
   1750  EXPECT_FALSE(message);
   1751  ASSERT_TRUE(node0.ReadMessage(b, &message));
   1752  EXPECT_TRUE(MessageEquals(message, kMessage));
   1753 
   1754  // Pong again!
   1755  EXPECT_EQ(OK, node0.node().SendUserMessage(b, std::move(message)));
   1756  EXPECT_FALSE(message);
   1757  ASSERT_TRUE(node0.ReadMessage(a, &message));
   1758  EXPECT_TRUE(MessageEquals(message, kMessage));
   1759 
   1760  EXPECT_EQ(OK, node0.node().ClosePort(a));
   1761  EXPECT_EQ(OK, node0.node().ClosePort(b));
   1762 }
   1763 
   1764 TEST_F(PortsTest, SetAcknowledgeRequestInterval) {
   1765  TestNode node0(0);
   1766  AddNode(&node0);
   1767 
   1768  PortRef a0, a1;
   1769  EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
   1770  EXPECT_EQ(0u, node0.GetUnacknowledgedMessageCount(a0));
   1771 
   1772  // Send a batch of messages.
   1773  EXPECT_EQ(OK, node0.SendMultipleMessages(a0, 15));
   1774  EXPECT_EQ(15u, node0.GetUnacknowledgedMessageCount(a0));
   1775  EXPECT_TRUE(node0.ReadMultipleMessages(a1, 5));
   1776  WaitForIdle();
   1777  EXPECT_EQ(15u, node0.GetUnacknowledgedMessageCount(a0));
   1778 
   1779  // Set to acknowledge every read message, and validate that already-read
   1780  // messages are acknowledged.
   1781  EXPECT_EQ(OK, node0.node().SetAcknowledgeRequestInterval(a0, 1));
   1782  WaitForIdle();
   1783  EXPECT_EQ(10u, node0.GetUnacknowledgedMessageCount(a0));
   1784 
   1785  // Read a third of the messages from the other end.
   1786  EXPECT_TRUE(node0.ReadMultipleMessages(a1, 5));
   1787  WaitForIdle();
   1788 
   1789  EXPECT_EQ(5u, node0.GetUnacknowledgedMessageCount(a0));
   1790 
   1791  TestNode node1(1);
   1792  AddNode(&node1);
   1793 
   1794  // Transfer a1 across to node1.
   1795  PortRef x0, x1;
   1796  CreatePortPair(&node0, &x0, &node1, &x1);
   1797  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a1));
   1798  WaitForIdle();
   1799 
   1800  ScopedMessage message;
   1801  ASSERT_TRUE(node1.ReadMessage(x1, &message));
   1802  ASSERT_EQ(1u, message->num_ports());
   1803  ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &a1));
   1804 
   1805  // Read the last third of the messages from the transferred node, and
   1806  // validate that the unacknowledge message count updates correctly.
   1807  EXPECT_TRUE(node1.ReadMultipleMessages(a1, 5));
   1808  WaitForIdle();
   1809  EXPECT_EQ(0u, node0.GetUnacknowledgedMessageCount(a0));
   1810 
   1811  // Turn the acknowledges down and validate that they don't go on indefinitely.
   1812  EXPECT_EQ(OK, node0.node().SetAcknowledgeRequestInterval(a0, 0));
   1813  EXPECT_EQ(OK, node0.SendMultipleMessages(a0, 10));
   1814  WaitForIdle();
   1815  EXPECT_TRUE(node1.ReadMultipleMessages(a1, 10));
   1816  WaitForIdle();
   1817  EXPECT_NE(0u, node0.GetUnacknowledgedMessageCount(a0));
   1818 
   1819  // Close the far port and validate that the closure updates the unacknowledged
   1820  // count.
   1821  EXPECT_EQ(OK, node1.node().ClosePort(a1));
   1822  WaitForIdle();
   1823  EXPECT_EQ(0u, node0.GetUnacknowledgedMessageCount(a0));
   1824 
   1825  EXPECT_EQ(OK, node0.node().ClosePort(a0));
   1826  EXPECT_EQ(OK, node0.node().ClosePort(x0));
   1827  EXPECT_EQ(OK, node1.node().ClosePort(x1));
   1828 }
   1829 
   1830 }  // namespace test
   1831 }  // namespace ports
   1832 }  // namespace core
   1833 }  // namespace mojo